![dask.png](dask.png)

In [1]:
import dask
import dask.dataframe as dd
from dask.distributed import LocalCluster, Client
import pandas as pd
import numpy as np
import dask.array as da
import time
import os

In [2]:
import os
# The jupyter notebook is launched from your $HOME directory.
# Change the working directory to the workshop directory
# which was created in your username directory under /scratch/vp91
os.chdir(os.path.expandvars("/scratch/vp91/$USER/"))

### Dask Collections

* **High-level collections**: Mimic NumPy, lists, and pandas but can operate in parallel on datasets that don’t fit into memory 
    * Array
    * DataFrame
    * Bag
    
* **Low-level collections**: Give finer control to build custom parallel and distributed computations
    * Delayed
    * Futures



# Dask Dataframes

![dataframe.png](dataframe.png)

* One Dask DataFrame is comprised of many in-memory pandas DataFrames separated along the index. 
* One operation on a Dask DataFrame triggers many pandas operations on the constituent pandas DataFrames 
* These operations are mindful of potential parallelism and memory constraints.

In [26]:
path = os.getcwd()
path

'/scratch/vp91/jj8451'

In [12]:
!ls data/nycflights/*.csv

ls: cannot access 'data/nycflights/*.csv': No such file or directory


In [16]:
# Read all the csv file into a single Dask dataframe
ddf = dd.read_csv(
    os.path.join(path+"/Dask-Data-Analytics/Dask/data", "nycflights", "*.csv"), parse_dates={"Date": [0, 1, 2]}
)

* dask.dataframe.read_csv only reads in a sample from the beginning of the file
* These inferred datatypes are then enforced when reading all partitions
* Sometimes, datatypes inferred in the sample can be incorrect. 
    * The first n rows have no value for CRSElapsedTime (which pandas infers as a float), and later on turn out to be strings (object dtype). 

* Good practice - specify dtypes directly using the dtype keyword. 

In [17]:
ddf = dd.read_csv(
    os.path.join(path+"/Dask-Data-Analytics/Dask/data", "nycflights", "*.csv"),
    parse_dates={"Date": [0, 1, 2]},
    dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
)


In [18]:
ddf

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,string,int64,string,float64,float64,float64,float64,float64,string,string,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


### Lazy evaluation
* Representation of the DataFrame object contains no data 
* Dask has just done enough to read the start of the first file, and infer the column names and dtypes

* Dask **constructs** the logic (called task graph) of your computation immediately
* **Evaluates** them only when necessary

In [19]:
ddf.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

* Functions like len, head, tail also trigger an evaluation.
    * load actual data, (that is, load each file into a pandas DataFrame)
    * apply the corresponding functions to each pandas DataFrame (also known as a partition)
    * combine the subtotals to give you the final grand total

In [20]:
len(ddf)

2611892

In [21]:
ddf.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,False,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,False,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,False,0


### Operation on multiple files in Pandas

In [29]:
%%time
# find the max value of the DepDelay coulmn in all the 10 dataframes
files = os.listdir(os.path.join(path+"/Dask-Data-Analytics/Dask/data", 'nycflights'))
maxes = []

for file in files:
    df = pd.read_csv(os.path.join(path+"/Dask-Data-Analytics/Dask/data", 'nycflights', file))
    maxes.append(df.DepDelay.max())

final_max = max(maxes)
print(final_max)

1435.0
CPU times: user 3.06 s, sys: 125 ms, total: 3.18 s
Wall time: 3.18 s


### Operation on multiple files in Dask

In [30]:
# find the max value of the DepDelay coulmn in all the 10 dataframes
# This only creates the task graph, it does not execute the operation
result = ddf.DepDelay.max()

In [31]:
result.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [32]:
%%time
result.compute()

CPU times: user 11 s, sys: 1.18 s, total: 12.2 s
Wall time: 4.72 s


1435.0

### Excercise: Find the number of flight from each city

* We can also combine multiple compute steps into a single instruction
* This is usualy more efficient
    * Task graphs for both results are merged when calling dask.compute
    * shared operations to only be done once instead of twice

In [33]:
non_canceled = ddf[~ddf.Cancelled]
mean_delay = non_canceled.DepDelay.mean()
std_delay = non_canceled.DepDelay.std()

In [34]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

CPU times: user 22.4 s, sys: 2.06 s, total: 24.5 s
Wall time: 10.1 s


In [35]:
%%time

mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

CPU times: user 11.2 s, sys: 1.42 s, total: 12.6 s
Wall time: 5.1 s


# Dask  Arrays - parallelized numpy

![arrays.png](arrays.png)

* Dask Array implements a subset of the NumPy ndarray interface using **blocked** algorithms
* Large array is cut into many small arrays
* Large computations are performed by combining many smaller computations

In [36]:

# NumPy array
a_np = np.ones(10)
a_np

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

In [37]:
# This is how a blocked operation is done in numpy. We divide the whole ndarray
# of size 10 int slices of 2, each of size 5

a_np_sum = a_np[:5].sum() + a_np[5:].sum()
a_np_sum

10.0

In [38]:
# Dask array

# In task ndarray we specify the slices usinh the keyword chunk. 
# chunk defines the numer of elements in each slice

a_da = da.ones(10, chunks=5)
a_da

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 80 B 40 B Shape (10,) (5,) Dask graph 2 chunks in 1 graph layer Data type float64 numpy.ndarray",10  1,

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [39]:
a_da_sum = a_da.sum()
a_da_sum

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 3 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [40]:
a_da_sum.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [41]:
a_da_sum.compute()

10.0

* Dask can also find an optimal chunk by itself
* If your chunks are too small
    * the amount of actual work done by every task is very tiny
    * the overhead of coordinating all these tasks results in a very inefficient process
* If your chunks are too big
    * you will likely run out of memory
    * data will have to be moved to the disk 
    * this will lead to performance decrements

In [42]:
%%time

xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000)) # We specify the chunk
yd = xd.mean(axis=0)
yd.compute()

CPU times: user 27.9 s, sys: 1.36 s, total: 29.2 s
Wall time: 937 ms


array([10.00005444,  9.99975643,  9.99987993, ...,  9.99991706,
        9.99945356, 10.00021097])

In [43]:
%%time

xd = da.random.normal(10, 0.1, size=(30_000, 30_000)) # Dask finds the chunk
yd = xd.mean(axis=0)
yd.compute()

CPU times: user 27.6 s, sys: 1.2 s, total: 28.8 s
Wall time: 1.07 s


array([10.00050605, 10.00034658,  9.99981788, ..., 10.00102603,
        9.99991709,  9.99988898])

In [44]:
xd.chunksize

(4096, 4096)

# Delayed decorator

* A Block of code can have operations that can happen in parallel
* Normally in python these operation will happen sequentially
    * Or the user will identify the parallel section and write parallel codes
* The Dask **delayed** function decorates your functions so that they operate lazily 
* Dask will defer execution of the function, placing the function and its arguments into a task graph
* Dask will then identify oppurtunities for parallelism in the task graph
* The Dask schedulers will exploit this parallelism, generally improving performance

In [45]:
@dask.delayed
def inc(x):
    time.sleep(1)
    return x + 1




In [46]:
@dask.delayed
def add(x, y):
    time.sleep(1)
    return x + y

In [47]:
# As the two increments are independent of each other, we can run them in parallel

x = inc(1)
y = inc(2)
z = add(x, y)

In [48]:
# Here Z is a delayed object

z.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [49]:
z.compute()

5

# Dask future

* we can submit individual functions for evaluation
* The call returns immediately, giving one or more future
    * whose status begins as “pending”
    * later becomes “finished”
* There is no **blocking** of the local Python session.

* Difference between futures and delayed
    * delayed is lazy (it just constructs a graph) 
    * futures are eager. 
    * With futures, as soon as the inputs are available and there is compute available, the computation starts

In [50]:
client = Client(n_workers=4)

def inc(x):
    time.sleep(1)
    return x + 1


def double(x):
    sleep(2)
    return 2 * x


def add(x, y):
    time.sleep(1)
    return x + y

In [51]:
future = client.submit(inc, 1)  # returns immediately with pending future
future

If we check the future after a few seconds we can see that it is complete

In [52]:
future

In [53]:

future.result()

2

## Compute Vs Persist

In [54]:
df = dask.datasets.timeseries()
df.npartitions

30

In [55]:
type(df)

dask.dataframe.core.DataFrame

In [56]:
computed_df = df.compute()
type(computed_df)

pandas.core.frame.DataFrame

In [57]:
df_persist = df.persist()
type(df_persist)

dask.dataframe.core.DataFrame

In [58]:
df_persist.npartitions

30