# Dask

Parallel processing for Numpy Arrays and Pandas DataFrames

http://dask.pydata.org/en/latest/

Interesting example and use-case:

http://matthewrocklin.com/blog/work/2018/02/09/credit-models-with-dask

## Creating Dask Cluster

In [2]:
import dask
import dask.distributed
import dask.dataframe
import pandas as pd
from glob import glob

In [3]:
client = dask.distributed.Client() #starts Dask client

In [4]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:63954  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 12.73 GB


In principle Dask runs also without it, but it seems to be less efficient (?) and more difficult to monitor

## Download Sample Data

__Warning: this is a lot of data, make sure to have a fast and unlimited internet connection!__

In [19]:
import os
import urllib.request

In [22]:
for month in range(12):
    filename = f'green_tripdata_2018-{month+1:02d}.csv'
    url = f'https://s3.amazonaws.com/nyc-tlc/trip+data/{filename}'
    csvfile = f'../example_files/{filename}'
    if not os.path.exists(csvfile):
        print('downloading sample data: ', filename)
        print(url, csvfile)
        urllib.request.urlretrieve(url, csvfile)

downloading sample data:  green_tripdata_2018-01.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2018-01.csv ../example_files/green_tripdata_2018-01.csv
downloading sample data:  green_tripdata_2018-02.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2018-02.csv ../example_files/green_tripdata_2018-02.csv
downloading sample data:  green_tripdata_2018-03.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2018-03.csv ../example_files/green_tripdata_2018-03.csv
downloading sample data:  green_tripdata_2018-04.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2018-04.csv ../example_files/green_tripdata_2018-04.csv
downloading sample data:  green_tripdata_2018-05.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2018-05.csv ../example_files/green_tripdata_2018-05.csv
downloading sample data:  green_tripdata_2018-06.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2018-06.csv ../example_files/green_tripdata_2018-06.cs

## Dask DataFrame

The following example imports a large amount of data (from multiple csv files) into a Dask dataframe, does some analysis and aggregation and returns a Pandas dataframe.

In [4]:
file_pattern = '../example_files/green_*.csv'
#dask csv reader, works on wildcards (e.g. *), but not on zip files
ddf = dask.dataframe.read_csv(file_pattern, delimiter=',', decimal='.',
                             parse_dates=[1, 2], # columns to be parsed as dates
                             # manual specification of data types where inference does not work
                             dtype={'trip_type': 'float64'}, 
                             ) 
ddf.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type
0,2,2018-01-01 00:18:50,2018-01-01 00:24:39,N,1,236,236,5,0.7,6.0,0.5,0.5,0.0,0.0,,0.3,7.3,2,1.0
1,2,2018-01-01 00:30:26,2018-01-01 00:46:42,N,1,43,42,5,3.5,14.5,0.5,0.5,0.0,0.0,,0.3,15.8,2,1.0
2,2,2018-01-01 00:07:25,2018-01-01 00:19:45,N,1,74,152,1,2.14,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,2,1.0
3,2,2018-01-01 00:32:40,2018-01-01 00:33:41,N,1,255,255,1,0.03,-3.0,-0.5,-0.5,0.0,0.0,,-0.3,-4.3,3,1.0
4,2,2018-01-01 00:32:40,2018-01-01 00:33:41,N,1,255,255,1,0.03,3.0,0.5,0.5,0.0,0.0,,0.3,4.3,2,1.0


In [5]:
ddf['date'] = ddf.lpep_pickup_datetime.dt.date
ddf['tip_fraction'] = ddf.tip_amount / ddf.total_amount
ddf.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,date,tip_fraction
0,2,2018-01-01 00:18:50,2018-01-01 00:24:39,N,1,236,236,5,0.7,6.0,...,0.5,0.0,0.0,,0.3,7.3,2,1.0,2018-01-01,0.0
1,2,2018-01-01 00:30:26,2018-01-01 00:46:42,N,1,43,42,5,3.5,14.5,...,0.5,0.0,0.0,,0.3,15.8,2,1.0,2018-01-01,0.0
2,2,2018-01-01 00:07:25,2018-01-01 00:19:45,N,1,74,152,1,2.14,10.0,...,0.5,0.0,0.0,,0.3,11.3,2,1.0,2018-01-01,0.0
3,2,2018-01-01 00:32:40,2018-01-01 00:33:41,N,1,255,255,1,0.03,-3.0,...,-0.5,0.0,0.0,,-0.3,-4.3,3,1.0,2018-01-01,-0.0
4,2,2018-01-01 00:32:40,2018-01-01 00:33:41,N,1,255,255,1,0.03,3.0,...,0.5,0.0,0.0,,0.3,4.3,2,1.0,2018-01-01,0.0


In [6]:
agg = ddf.groupby(['date']).agg({'passenger_count':'sum', 'trip_distance':'sum',
                              'tip_fraction':'mean'}) #groupby analogue to Pandas synthax
agg_df = agg.compute() #creates Pandas DataFrame from Dask DataFrame and does the actual computing
agg_df.head()

Unnamed: 0_level_0,passenger_count,trip_distance,tip_fraction
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2008-12-31,114,199.52,0.024494
2009-01-01,197,719.99,0.023664
2010-09-23,267,930.68,0.034685
2017-12-31,53,100.24,0.047561
2018-01-01,32499,68925.36,0.063173


The execution progress and parallelism efficiency can be monitored in the web-gui of the Dask client:

http://localhost:8787/status

### Importing zipped Files

In contrast to Pandas, Dask DataFrames cannot be directly created from zipped csv files. The following code snippet uses Pandas to import zipped files on Dask grid via Dask Delayed.

Note that the example below is not for zipped input files, but works 1:1 for zipped files.

In [37]:
file_pattern = '../example_files/green_*.csv' # works also fine for *.zip files
files=glob(file_pattern)

dfs=[dask.delayed(pd.read_csv)(filename, delimiter=',', decimal='.', parse_dates=[1, 2])
     for filename in files]
ddf=dask.dataframe.from_delayed(dfs)
ddf.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type
0,2,2018-01-01 00:18:50,2018-01-01 00:24:39,N,1,236,236,5,0.7,6.0,0.5,0.5,0.0,0.0,,0.3,7.3,2,1.0
1,2,2018-01-01 00:30:26,2018-01-01 00:46:42,N,1,43,42,5,3.5,14.5,0.5,0.5,0.0,0.0,,0.3,15.8,2,1.0
2,2,2018-01-01 00:07:25,2018-01-01 00:19:45,N,1,74,152,1,2.14,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,2,1.0
3,2,2018-01-01 00:32:40,2018-01-01 00:33:41,N,1,255,255,1,0.03,-3.0,-0.5,-0.5,0.0,0.0,,-0.3,-4.3,3,1.0
4,2,2018-01-01 00:32:40,2018-01-01 00:33:41,N,1,255,255,1,0.03,3.0,0.5,0.5,0.0,0.0,,0.3,4.3,2,1.0


Note that the import of a single zipped csv file is not distributed, thus the memory of each worker must be sufficient to contain the Pandas DataFrame for each input file.

## Dask Delayed


Dask Delayed is used to submit functions with defined input and output to the calculators.

Note that the compute() statement is only executed for the result(s), the dependencies are handled automatically by Dask.

If not a single function but a class is intended to be submitted, a helper function can be used. If the helper function is defined inside a class, it should be static (i.e. without referring to self).

In [48]:
import numpy as np
import time

In [46]:
class A: #class which does the calculation, to be parallelized
    def __init__(self, p1,x1):
        self.p1=p1
        self.x1=x1
    def doCalc(self,p2,x2):
        return np.dot(self.p1+self.x1,p2+x2)

def calcHelper(p1,p2,x1,x2):
    a=A(p1,x1)
    return a.doCalc(p2,x2)
    
def mainProg():
    p1=np.arange(10000000)
    p2=p1**2
    p1s=client.scatter(p1) #distribute large data, which is required for all calculations, ahead to the calculators
    p2s=client.scatter(p2)
    result=[]
    for x1 in range(100):
        x2=x1**2
        result.append(dask.delayed(calcHelper)(p1s,p2s,x1,x2))
    result_tot=dask.delayed(np.sum)(result)
    return result_tot.compute()

In [56]:
print(f'started at {time.ctime()}')
print('result:', mainProg())
print(f'ended at {time.ctime()}')

started at Thu Jul 25 07:33:58 2019
result: 2201258113697798912
ended at Thu Jul 25 07:34:07 2019


In the example above, the command xs=client.scatter(x) is used to distribute large data effectively to the calculators.

If there are issues with multiprocessing, the client can be restricted to multithreading (which is usually slower) using

client=dask.distributed.Client(processes=False, threads_per_worker=4)

## Dask Futures

Dask Futures are similar to Dask Delayed, but computation already starts when the future is submitted (Dask Delayed is lazy, computation starts only when *compute()* is called).

In [5]:
futures = []
for i in range(4):
    futures.append(client.submit(lambda x: x**2, i))
res = client.submit(sum, futures)

Futures can be used as input for other calculations.

In [6]:
res

In [7]:
res.result()

14

## Cleanup

In [38]:
client.close()