# Scalable Data Science with Dask

Talk given at [PyLadies Berlin meetup](https://www.meetup.com/PyLadies-Berlin/events/276969041/?_xtd=gqFyqDE4NDM4MTYxoXCmaXBob25l&from=ref) on 6th April 2021.

Dataset: [NYC Yellow Taxi Trips [2019]](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

## pandas

Read the data for January 2019.

In [None]:
# !wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-{03..12}.csv

In [1]:
%%time

import pandas as pd

df = pd.read_csv("yellow_tripdata_2019-01.csv")
df

CPU times: user 8.82 s, sys: 2.01 s, total: 10.8 s
Wall time: 10.8 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14.0,0.5,0.5,1.00,0.0,0.3,16.30,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.00,1,N,236,236,1,4.5,0.5,0.5,0.00,0.0,0.3,5.80,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.00,1,N,193,193,2,3.5,0.5,0.5,0.00,0.0,0.3,7.55,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.00,2,N,193,193,2,52.0,0.0,0.5,0.00,0.0,0.3,55.55,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7667787,2,2019-01-31 23:57:36,2019-02-01 00:18:39,1,4.79,1,N,263,4,1,18.0,0.5,0.5,3.86,0.0,0.3,23.16,0.0
7667788,2,2019-01-31 23:32:03,2019-01-31 23:33:11,1,0.00,1,N,193,193,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667789,2,2019-01-31 23:36:36,2019-01-31 23:36:40,1,0.00,1,N,264,264,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667790,2,2019-01-31 23:14:53,2019-01-31 23:15:20,1,0.00,1,N,264,7,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0


Calculate mean of total_amount.

In [2]:
%%time

df.total_amount.mean()

CPU times: user 11.6 ms, sys: 7.37 ms, total: 19 ms
Wall time: 18 ms


15.68222215991253

## Dask

Start a cluster.

In [3]:
from dask.distributed import Client

In [10]:
client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:52625,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:52645,Total threads: 2
Dashboard: http://127.0.0.1:52647/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:52631,
Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-9unnc2bq,Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-9unnc2bq

0,1
Comm: tcp://127.0.0.1:52643,Total threads: 2
Dashboard: http://127.0.0.1:52644/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:52630,
Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-y862en7q,Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-y862en7q

0,1
Comm: tcp://127.0.0.1:52638,Total threads: 2
Dashboard: http://127.0.0.1:52640/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:52629,
Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-xrn7g7ye,Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-xrn7g7ye

0,1
Comm: tcp://127.0.0.1:52637,Total threads: 2
Dashboard: http://127.0.0.1:52639/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:52628,
Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-jx6igha5,Local directory: /Users/pavithra/Developer/Confs/scalable-data-science-with-dask/pyladies-berlin/dask-worker-space/worker-jx6igha5


Read data for entire year 2019.

In [11]:
%%time

import dask.dataframe as dd

df = dd.read_csv("yellow_tripdata_2019-*.csv",
                 dtype={'RatecodeID': 'float64',
                        'VendorID': 'float64',
                        'passenger_count': 'float64',
                        'payment_type': 'float64'
                       })
df

CPU times: user 15.1 ms, sys: 8.42 ms, total: 23.5 ms
Wall time: 21.4 ms


Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=22,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float64,object,object,float64,float64,float64,object,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Dask Dataframes are lazily evaluated, need to call `head()` to view elements.

In [12]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.5,1.0,N,151,239,1.0,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1.0,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.6,1.0,N,239,246,1.0,14.0,0.5,0.5,1.0,0.0,0.3,16.3,
2,2.0,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.0,1.0,N,236,236,1.0,4.5,0.5,0.5,0.0,0.0,0.3,5.8,
3,2.0,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.0,1.0,N,193,193,2.0,3.5,0.5,0.5,0.0,0.0,0.3,7.55,
4,2.0,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.0,2.0,N,193,193,2.0,52.0,0.0,0.5,0.0,0.0,0.3,55.55,


Calculate mean of total_amount.

In [13]:
%%time

df.total_amount.mean()

CPU times: user 6.26 ms, sys: 867 µs, total: 7.12 ms
Wall time: 7.1 ms


dd.Scalar<series-..., dtype=float64>

Again, lazy evaluation. Need to call `compute()` to compute result.

In [None]:
%%time

df.total_amount.mean().compute()

Close the cluster.

In [15]:
client.close()

## Coiled

Create a Dask cluster on Coiled.

In [16]:
import coiled

cluster = coiled.Cluster(n_workers=10)

client = Client(cluster)
client

Token:

  ········································


Save credentials for next time? [Y/n]:  n




Found software environment build
Created FW rules: coiled-dask-pavithr13-67876-firewall
Created scheduler VM: coiled-dask-pavithr13-67876-scheduler (type: t3.medium, ip: ['3.238.4.136'])


Refreshing temporary credentials failed during mandatory refresh period.
Traceback (most recent call last):
  File "/Users/pavithra/opt/anaconda3/envs/scale-with-dask/lib/python3.9/site-packages/aiobotocore/credentials.py", line 291, in _protected_refresh
    metadata = await self._refresh_using()
  File "/Users/pavithra/opt/anaconda3/envs/scale-with-dask/lib/python3.9/site-packages/aiobotocore/credentials.py", line 345, in fetch_credentials
    return await self._get_cached_credentials()
  File "/Users/pavithra/opt/anaconda3/envs/scale-with-dask/lib/python3.9/site-packages/aiobotocore/credentials.py", line 355, in _get_cached_credentials
    response = await self._get_credentials()
  File "/Users/pavithra/opt/anaconda3/envs/scale-with-dask/lib/python3.9/site-packages/aiobotocore/credentials.py", line 382, in _get_credentials
    return await sts.assume_role(**kwargs)
  File "/Users/pavithra/opt/anaconda3/envs/scale-with-dask/lib/python3.9/site-packages/aiobotocore/client.py", line 155


+-------------+-----------+-----------+---------+
| Package     | client    | scheduler | workers |
+-------------+-----------+-----------+---------+
| blosc       | None      | 1.10.2    | None    |
| dask        | 2021.11.1 | 2021.10.0 | None    |
| distributed | 2021.11.1 | 2021.10.0 | None    |
| lz4         | None      | 3.1.3     | None    |
| toolz       | 0.11.2    | 0.11.1    | None    |
+-------------+-----------+-----------+---------+


0,1
Connection method: Cluster object,Cluster type: coiled.Cluster
Dashboard: http://3.238.4.136:8787,

0,1
Dashboard: http://3.238.4.136:8787,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tls://10.4.3.130:8786,Workers: 0
Dashboard: http://10.4.3.130:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


Read data for January 2019 from Amazon S3 and compute the mean of total_amount.

In [17]:
%%time

df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
        "store_and_fwd_flag": "category",
        "PULocationID": "UInt16",
        "DOLocationID": "UInt16",
    },
    storage_options={"anon": True},
    blocksize="16 MiB",
).persist()

df.total_amount.mean().compute()

CPU times: user 536 ms, sys: 94.7 ms, total: 631 ms
Wall time: 24.3 s


15.682222159912529