# 02 Dask Dataframe Operations

#### Objectives:
- Demonstrate similarity between Pandas and Dask Dataframes
- Familiarize user with operations on Dask Dataframes
- Read/Write to parquet
- Show a Dask Distributed Dataframe

Original Code: https://examples.dask.org/dataframes/01-data-access.html

#### Create single node instance to run Dask on

In [1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client

0,1
Client  Scheduler: inproc://10.0.108.198/514/1  Dashboard: http://10.0.108.198/514/1:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 2.00 GB


#### Get the Dask Scheduler UI

In [2]:
import os 
engine_id = os.environ.get('CDSW_ENGINE_ID')
cdsw_domain = os.environ.get('CDSW_DOMAIN')

from IPython.core.display import HTML
HTML('<a  target="_blank" rel="noopener noreferrer" href="http://read-only-{}.{}">http://read-only-{}.{}</a>'
     .format(engine_id,cdsw_domain,engine_id,cdsw_domain))

#### Import data into Dask Dataframe

In [3]:
import dask
df = dask.datasets.timeseries()
df

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int64,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


#### Write files to CSV

In [4]:
import os
import datetime

if not os.path.exists('data'):
    os.mkdir('data')

def name(i):
    """ Provide date for filename given index

    Examples
    --------
    >>> name(0)
    '2000-01-01'
    >>> name(10)
    '2000-01-11'
    """
    return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))

df.to_csv('data/*.csv', name_function=name);



In [5]:
!ls data/*.csv | head

data/2000-01-01.csv
data/2000-01-02.csv
data/2000-01-03.csv
data/2000-01-04.csv
data/2000-01-05.csv
data/2000-01-06.csv
data/2000-01-07.csv
data/2000-01-08.csv
data/2000-01-09.csv
data/2000-01-10.csv


#### Import data into Pandas Dataframe

In [6]:
import pandas as pd

df = pd.read_csv('data/2000-01-01.csv')
df.head()

Unnamed: 0,timestamp,id,name,x,y
0,2000-01-01 00:00:00,1024,Victor,-0.307923,-0.599237
1,2000-01-01 00:00:01,991,Sarah,0.503502,0.147275
2,2000-01-01 00:00:02,1024,Hannah,-0.048601,-0.393801
3,2000-01-01 00:00:03,1022,Quinn,0.766913,0.004139
4,2000-01-01 00:00:04,941,George,-0.92355,0.140271


#### Import datasets into Dask dataframe

In [7]:
import dask.dataframe as dd

df = dd.read_csv('data/2000-*-*.csv')
df

Unnamed: 0_level_0,timestamp,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,object,int64,object,float64,float64
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


In [8]:
## Optionally perist data in memory for faster computation
## df = df.persist()

#### Notice that just like Spark, Dask has lazy execution and it has not executed the DAG yet

In [9]:
df.head()

Unnamed: 0,timestamp,id,name,x,y
0,2000-01-01 00:00:00,1024,Victor,-0.307923,-0.599237
1,2000-01-01 00:00:01,991,Sarah,0.503502,0.147275
2,2000-01-01 00:00:02,1024,Hannah,-0.048601,-0.393801
3,2000-01-01 00:00:03,1022,Quinn,0.766913,0.004139
4,2000-01-01 00:00:04,941,George,-0.92355,0.140271


#### The data operation has now been executed

#### Timing GroupBy Operation

In [10]:
%time df.groupby('name').x.mean().compute()

CPU times: user 3.44 s, sys: 573 ms, total: 4.01 s
Wall time: 2.25 s


name
Alice      -0.001415
Bob         0.001008
Charlie    -0.000772
Dan         0.001319
Edith       0.000955
Frank       0.001545
George     -0.002194
Hannah     -0.001024
Ingrid     -0.000205
Jerry      -0.001310
Kevin      -0.004674
Laura      -0.000262
Michael    -0.001328
Norbert    -0.001011
Oliver     -0.001494
Patricia   -0.001378
Quinn       0.001699
Ray        -0.001067
Sarah      -0.000311
Tim        -0.002808
Ursula     -0.001150
Victor      0.003596
Wendy      -0.003244
Xavier      0.000905
Yvonne      0.001219
Zelda       0.003080
Name: x, dtype: float64

#### Writing to parquet

In [11]:
df.to_parquet('data/2000-01.parquet')

#### Reading from parquet

In [12]:
df = dd.read_parquet('data/2000-01.parquet')
df.groupby('name').x.mean().compute()

name
Alice      -0.001415
Bob         0.001008
Charlie    -0.000772
Dan         0.001319
Edith       0.000955
Frank       0.001545
George     -0.002194
Hannah     -0.001024
Ingrid     -0.000205
Jerry      -0.001310
Kevin      -0.004674
Laura      -0.000262
Michael    -0.001328
Norbert    -0.001011
Oliver     -0.001494
Patricia   -0.001378
Quinn       0.001699
Ray        -0.001067
Sarah      -0.000311
Tim        -0.002808
Ursula     -0.001150
Victor      0.003596
Wendy      -0.003244
Xavier      0.000905
Yvonne      0.001219
Zelda       0.003080
Name: x, dtype: float64

#### For more on efficient data storage with Dask Dataframes: https://github.com/dask/dask-tutorial/blob/master/07_dataframe_storage.ipynb

#### Next we will create a Dask Distributed Dataframe with a Dask Multinode Cluster

In [13]:
import cdsw_dask_utils
import cdsw

# Run a Dask cluster with three workers and return an object containing
# a description of the cluster. 
# 
# Note that the scheduler will run in the current session, and the Dask
# dashboard will become available in the nine-dot menu at the upper
# right corner of the CDSW app.

new_cluster = cdsw_dask_utils.run_dask_cluster(
  n=2, \
  cpu=1, \
  memory=2, \
  nvidia_gpu=0
)

# Connect a Dask client to the scheduler address in the cluster
# description.
from dask.distributed import Client
client = Client(new_cluster["scheduler_address"])

Waiting for Dask scheduler to become ready...
Dask scheduler is ready
IDs ['7i5cqsmkyxtzfwz2', 'gviokp59xla1tewu']


In [14]:
df = dd.read_parquet('data/2000-01.parquet')

#### Similar to Spark, Dask uses a DAG to optimize computation. You can visualize the DAG.

In [18]:
#df.x.max().visualize()

#### For more on the Dask Delayed API: https://docs.dask.org/en/latest/delayed.html

#### Stopping CDSW Workers

In [None]:
## stop CDSW workers
#Parameter
#worker_id (int, optional) - The ID numbers of the worker engines that must be stopped. 
#If an ID is not provided, all the worker engines on the cluster will be stopped.

cdsw.stop_workers()

#### Next we will use SciKit Learn with Dask for Machine Learning