<p float="center">
  <img src="images/Coiled-Logo_Horizontal_Black-CMYK.png" alt="Coiled logo" width="350" hspace="10"/>
</p>

<p float="center">
  <img src="images/dask_horizontal_no_pad.svg" alt="Dask logo" width="400" hspace="10" />
</p>

# Data Science At Scale

In this notebook, we'll 

* Perform a basic analytics workflow on the NYC taxi dataset using Pandas;
* Scale up this workflow to a dataset that doesn't fit in RAM using Dask;
* (Optional) Scale out this workflow to leverage a cluster on the Cloud using Coiled.

The workflow is intentionally boring so that we can see the power of scalable data science immediately: we'll load some data and perform some basic analytics.

In the notebooks that follow, we'll jump into more interesting examples, including machine learning.


Before scaling up, let's look at a common workflow in Pandas.

## 1. Pandas: Convert CSV to Parquet and Engineer a Feature

<img src="images/pandas-logo.svg" alt="pandas logo" style="width: 500px;"/>

In the following, we'll 

* use Pandas to load in part of the NYC taxi dataset from a CSV and
* compute the average tip as a function of the number of passengers.

If you're following along in Binder, you won't be able to execute the code but you can read it.

### Download the data from Amazon

In [3]:
!wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-{01..12}.csv

--2021-06-28 10:59:57--  https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.36.230
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.36.230|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 687088084 (655M) [text/csv]
Saving to: ‘yellow_tripdata_2019-01.csv’


2021-06-28 11:06:18 (1,73 MB/s) - ‘yellow_tripdata_2019-01.csv’ saved [687088084/687088084]

--2021-06-28 11:06:18--  https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-02.csv
Reusing existing connection to s3.amazonaws.com:443.
HTTP request sent, awaiting response... 200 OK
Length: 649882828 (620M) [text/csv]
Saving to: ‘yellow_tripdata_2019-02.csv’


2021-06-28 11:11:14 (2,10 MB/s) - ‘yellow_tripdata_2019-02.csv’ saved [649882828/649882828]

--2021-06-28 11:11:14--  https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-03.csv
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.36.230|:443... con

Note: this will take at least several minutes to download the above.

In [3]:
# Check out head of 1st file
!head yellow_tripdata_2019-01.csv

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14,0.5,0.5,1,0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,.00,1,N,236,236,1,4.5,0.5,0.5,0,0,0.3,5.8,
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,.00,1,N,193,193,2,3.5,0.5,0.5,0,0,0.3,7.55,
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,.00,2,N,193,193,2,52,0,0.5,0,0,0.3,55.55,
2,2018-11-28 16:25:49,2018-11-28 16:28:26,5,.00,1,N,193,193,2,3.5,0.5,0.5,0,5.76,0.3,13.31,
2,2018-11-28 16:29:37,2018-11-28 16:33:43,5,.00,2,N,193,193,2,52,0,0.5,0,0,0.3,55.55,
1,2019-01-01 00:21:28,2019-01-01 00:28:37,1,1.30,1,N,163,229,1,6.5,0.5,0.5,1.25,0,0.3,9.05,
1,2019-01-01 00:

### Investigate data locally with Pandas


In [13]:
%%time
# Import pandas and read in beginning of 1st file
import pandas as pd
df = pd.read_csv(
    "../taxi_data/yellow_tripdata_2019-01.csv", 
    nrows=10000,
)
df

CPU times: user 32.6 s, sys: 11.6 s, total: 44.2 s
Wall time: 59.1 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14.0,0.5,0.5,1.00,0.0,0.3,16.30,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.00,1,N,236,236,1,4.5,0.5,0.5,0.00,0.0,0.3,5.80,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.00,1,N,193,193,2,3.5,0.5,0.5,0.00,0.0,0.3,7.55,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.00,2,N,193,193,2,52.0,0.0,0.5,0.00,0.0,0.3,55.55,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7667787,2,2019-01-31 23:57:36,2019-02-01 00:18:39,1,4.79,1,N,263,4,1,18.0,0.5,0.5,3.86,0.0,0.3,23.16,0.0
7667788,2,2019-01-31 23:32:03,2019-01-31 23:33:11,1,0.00,1,N,193,193,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667789,2,2019-01-31 23:36:36,2019-01-31 23:36:40,1,0.00,1,N,264,264,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0
7667790,2,2019-01-31 23:14:53,2019-01-31 23:15:20,1,0.00,1,N,264,7,1,0.0,0.0,0.0,0.00,0.0,0.0,0.00,0.0


## Basic Analytics

In [12]:
%%time

# Compute average tip as a function of the number of passengers
df.groupby("passenger_count").tip_amount.mean()

CPU times: user 11.8 ms, sys: 9 ms, total: 20.8 ms
Wall time: 69 ms


passenger_count
0    1.719027
1    1.667853
2    1.829683
3    1.709951
4    1.688510
5    1.840508
6    1.836183
Name: tip_amount, dtype: float64

**Recap:** We have

* used Pandas to load in part of the NYC taxi dataset from a CSV and
* computed the average tip as a function of the number of passengers.

### Operate on many files in a for loop?

We could do this, but it's unpleasant

```python
for filename in glob("~/data/nyctaxi/yellow_tripdata_2019-*.csv"):
    df = pd.read_csv(filename)
    ...
    df.to_parquet(...)
```

## 2. Use Dask locally to process the full dataset

<img src="images/dask_horizontal_no_pad.svg" alt="Dask logo" style="width: 500px;"/>

The full NYC taxi dataset won't even fit in the RAM of my laptop. Do I need a large or external cluster yet? No. First, I can take advantage of all the cores on my laptop in parallel. This is what we call *scaling up* our computation (out-of-core computing). Later we'll see how to *scale out* computation across a cluster.

One way of doing this is with [Dask](dask.org/). As we're about to see, part of the value of Dask lies in its API being as close as possible to the PyData APIs we know and love, in this case, Pandas.

In [the words of Matthew Rocklin](https://coiled.io/blog/history-dask/), core developer and co-maintainer of Dask and CEO of Coiled, there was a social goal of Dask:
> Invent nothing. We wanted to be as familiar as possible to what users already knew in the PyData stack

Let's do it!

The plan:

* use Dask to load in **all** of the NYC taxi dataset from 10+ CSVs (8+ GBs) and
* compute the average tip as a function of the number of passengers.

We'll also dive into the basics of Dask and distributed compute (but we'll execute some code first and dive into this part while it runs!).

In [1]:
# Import Dask parts, spin up a local cluster, and instantiate a Client
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads:  4,Total memory:  4.00 GiB

0,1
Comm: tcp://127.0.0.1:51144,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads:  4
Started:  Just now,Total memory:  4.00 GiB

0,1
Comm: tcp://127.0.0.1:51154,Total threads: 1
Dashboard: http://127.0.0.1:51158/status,Memory: 1.00 GiB
Nanny: tcp://127.0.0.1:51146,
Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-c9ype741,Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-c9ype741

0,1
Comm: tcp://127.0.0.1:51155,Total threads: 1
Dashboard: http://127.0.0.1:51157/status,Memory: 1.00 GiB
Nanny: tcp://127.0.0.1:51148,
Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-zmac1qai,Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-zmac1qai

0,1
Comm: tcp://127.0.0.1:51153,Total threads: 1
Dashboard: http://127.0.0.1:51159/status,Memory: 1.00 GiB
Nanny: tcp://127.0.0.1:51149,
Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-0xxxfqoh,Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-0xxxfqoh

0,1
Comm: tcp://127.0.0.1:51152,Total threads: 1
Dashboard: http://127.0.0.1:51156/status,Memory: 1.00 GiB
Nanny: tcp://127.0.0.1:51147,
Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-qb8l3ulo,Local directory: /Users/richard/Desktop/data-science-at-scale/dask-worker-space/worker-qb8l3ulo


In [3]:
%%time

import dask.dataframe as dd

# Import the full dataset (note the Dask API!)
df = dd.read_csv(
    "yellow_tripdata_2019-*.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'}

)
df

CPU times: user 59.1 ms, sys: 29.9 ms, total: 89.1 ms
Wall time: 164 ms


Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=127,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float64,datetime64[ns],datetime64[ns],float64,float64,float64,object,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [4]:
%%time

# Prepare to compute the average tip 
# as a function of the number of passengers
mean_amount = df.groupby("passenger_count").tip_amount.mean()

CPU times: user 29.9 ms, sys: 5.15 ms, total: 35 ms
Wall time: 49.6 ms


In [5]:
%%time

# compute the average tip as a function of the number of passengers
mean_amount.compute()

CPU times: user 2min 4s, sys: 14.8 s, total: 2min 19s
Wall time: 8min 21s


passenger_count
0.0    2.122789
1.0    2.206790
2.0    2.214306
3.0    2.137775
4.0    2.023804
5.0    2.235441
6.0    2.221105
7.0    6.675962
8.0    7.111625
9.0    7.377822
Name: tip_amount, dtype: float64

### 2a Notes on what is happening in Dask and Python

The above code will take some time to run so let's take this opportunity to see what is going on with Dask, Python, and the distributed computation.

#### Components of Dask

Dask contains 3 main components and we have already seen two of them above:
* High-level collections in the form of Dask DataFrames (these set up the steps of your computation);
* Schedulers (these actually execute the computation, in this case, on a single machine).

Let's get a sense for what these are.

<img src="images/dask-components.svg" width="400px">

#### Dask DataFrames

What exactly is this Dask DataFrame? A schematic is worth a thousand words:

<img src="images/dask-dataframe.svg" width="400px">

Essentially, the Dask DataFrame is a large, virtual dataframe divided along the index into multiple Pandas DataFrames.

#### Dask Schedulers, Workers, and beyond

<img src="images/dask-cluster.svg" width="400px">

Work (Python code) is performed on a cluster, which consists of 
* a _scheduler_ (which manages and sends the work / tasks to the workers)
* _workers_, which compute the tasks.

The _client_ is "the user-facing entry point for cluster users." What this means is that the client lives wherever you are writing your Python code and the client talks to the scheduler, passing it the tasks.

**More Resources**
- tutorial.dask.org
- Dask Summit videos (TBD), such as the first 10min. of [this talk]()
- See how Dask is used to solve real-world problems, such as XX and the fight against Covid-19 (TBD)

**Recap:** We have

* used Dask to load in **all** of the NYC taxi dataset from 10+ CSVs (8+ GBs), 
* computed the average tip as a function of the number of passengers, and 
* dived into the basic of Dask and distributed compute and understand the basic concepts.

## 3. Optional: Work directly from the cloud with Coiled 

<br>
<img src="images/Coiled-Logo_Horizontal_Black-CMYK.png" alt="Coiled logo" style="width: 500px;"/>
<br>

Here I'll spin up a cluster on Coiled to show you just how easy it can be. Note that to do so, I've:
1. signed into the [Coiled Free tier](https://coiled.io/product), 
2. pip installed `coiled`, and 
3. authenticated. 

You can do the same!

The plan:

* use Coiled to load in **all** of the NYC taxi dataset from 10+ CSVs (8+ GBs) on an AWS cluster, 
* massage the data, 
* engineer a feature, 
* compute the average tip as a function of the number of passengers, and 
* save to [Parquet](https://en.wikipedia.org/wiki/Apache_Parquet) (far more efficient than CSV, but not human-readable).

In [6]:
import coiled
from dask.distributed import LocalCluster, Client

To set up a Coiled cluster takes just 4 simple steps:
1. Create a software environment (saved to your account for later use)
2. Create a cluster configuration (saved to your account for later use)
3. Spin up the cluster 
4. Connect cluster to Dask client

In [15]:
%%time

# Create a Software Environment
coiled.create_software_environment(
    name="my-software-env",
    conda="binder/environment.yml",
)

Updating software environment...
Creating new software environment
Creating new ecr build
STEP 1: FROM coiled/default:sha-6b4e896
STEP 2: COPY environment.yml environment.yml
--> 76321a813e2
STEP 3: RUN conda env update -n coiled -f environment.yml     && rm environment.yml     && conda clean --all -y     && echo "conda activate coiled" >> ~/.bashrc
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... done

Downloading and Extracting Packages
openjpeg-2.4.0       | 444 KB    | ########## | 100% 
xorg-libxrender-0.9. | 32 KB     | ########## | 100% 
nbformat-5.1.3       | 47 KB     | ########## | 100% 
_libgcc_mutex-0.1    | 3 KB      | ########## | 100% 
pyqt5-sip-4.19.18    | 310 KB    | ########## | 100% 
xorg-libice-1.0.10   | 58 KB     | ########## | 100% 
click-8.0.1          | 146 KB    | ########## | 100% 
pexpect-4.8.0        | 47 KB     | ########## | 100% 
mysql-common-8.0.25  | 1.6 MB    | ########## | 100% 
zeromq-4.3.4         

yaml-0.2.5           | 82 KB     | ########## | 100% 
libtiff-4.3.0        | 668 KB    | ########## | 100% 
re2-2021.06.01       | 218 KB    | ########## | 100% 
pysocks-1.7.1        | 27 KB     | ########## | 100% 
pyqt-impl-5.12.3     | 5.9 MB    | ########## | 100% 
libllvm10-10.0.1     | 26.4 MB   | ########## | 100% 
anyio-3.2.1          | 139 KB    | ########## | 100% 
sortedcontainers-2.4 | 26 KB     | ########## | 100% 
babel-2.9.1          | 6.2 MB    | ########## | 100% 
dask-glm-0.2.0       | 14 KB     | ########## | 100% 
coiled-0.0.39.1      | 53 KB     | ########## | 100% 
expat-2.4.1          | 182 KB    | ########## | 100% 
gettext-0.19.8.1     | 3.6 MB    | ########## | 100% 
ld_impl_linux-64-2.3 | 618 KB    | ########## | 100% 
attrs-21.2.0         | 44 KB     | ########## | 100% 
gts-0.7.6            | 411 KB    | ########## | 100% 
_openmp_mutex-4.5    | 22 KB     | ########## | 100% 
sqlite-3.36.0        | 1.4 MB    | ########## | 100% 
cloudpickle-1.6.0    | 22 KB

Removed libtool-2.4.6-h58526e2_1007.tar.bz2
Removed libthrift-0.14.2-he6d91bd_1.tar.bz2
Removed tblib-1.7.0-pyhd8ed1ab_0.tar.bz2
Removed libiconv-1.16-h516909a_0.tar.bz2
Removed libcurl-7.77.0-h2574ce0_0.tar.bz2
Removed aws-checksums-0.1.11-ha31a3da_7.tar.bz2
Removed xorg-libx11-1.7.2-h7f98852_0.tar.bz2
Removed zstd-1.5.0-ha95c52a_0.tar.bz2
Removed pytables-3.6.1-py38hc386592_3.tar.bz2
Removed s3transfer-0.3.7-pyhd8ed1ab_0.tar.bz2
Removed olefile-0.46-pyh9f0ad1d_1.tar.bz2
Removed font-ttf-ubuntu-0.83-hab24e00_0.tar.bz2
Removed aws-c-common-0.6.2-h7f98852_0.tar.bz2
Removed requests-2.25.1-pyhd3deb0d_0.tar.bz2
Removed nbconvert-6.1.0-py38h578d9bd_0.tar.bz2
Removed libssh2-1.9.0-ha56f1ee_6.tar.bz2
Removed websocket-client-0.57.0-py38h578d9bd_4.tar.bz2
Removed nomkl-1.0-h5ca1d4c_0.tar.bz2
Removed six-1.16.0-pyh6c4a22f_0.tar.bz2
Removed numexpr-2.7.3-py38h51da96c_0.tar.bz2
Removed packaging-20.9-pyh44b312d_0.tar.bz2
Removed boto3-1.17.49-pyhd8ed1ab_0.tar.bz2
Removed wheel-0.36.2-pyhd3deb0d_

In [16]:
%%time 

# Control the resources of your cluster by creating a new cluster configuration
coiled.create_cluster_configuration(
    name="my-cluster-config",
    worker_memory="16 GiB",
    worker_cpu=4,
    scheduler_memory="8 GiB",
    scheduler_cpu=2,
    software="my-software-env",
)


CPU times: user 309 ms, sys: 119 ms, total: 428 ms
Wall time: 1.76 s


In [17]:
# Spin up cluster, instantiate a Client
cluster = coiled.Cluster(n_workers=10, configuration="my-cluster-config")
client = Client(cluster)
client

Output()

DataNotFoundError: Unable to load data for: endpoints

In [12]:
import dask.dataframe as dd

# Read data into a Dask DataFrame
df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv", 
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={
        'RatecodeID': 'float64',
       'VendorID': 'float64',
       'passenger_count': 'float64',
       'payment_type': 'float64'
    },
    storage_options={"anon":True}
)
df

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=127,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float64,datetime64[ns],datetime64[ns],float64,float64,float64,object,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [13]:
%%time

# Prepare to compute the average tip 
# as a function of the number of passengers
mean_amount = df.groupby("passenger_count").tip_amount.mean()

CPU times: user 16.8 ms, sys: 1.38 ms, total: 18.1 ms
Wall time: 17.2 ms


In [14]:
%%time

# Compute the average tip 
# as a function of the number of passengers
mean_amount.compute()

CPU times: user 1.89 s, sys: 281 ms, total: 2.17 s
Wall time: 27.5 s


passenger_count
0.0    2.122789
1.0    2.206790
2.0    2.214306
3.0    2.137775
4.0    2.023804
5.0    2.235441
6.0    2.221105
7.0    6.675962
8.0    7.111625
9.0    7.377822
Name: tip_amount, dtype: float64

And let's not forget our basic Dask hygiene (and financial-health) practices:

In [15]:
# shutdown the cluster
client.shutdown()

**Recap:** We have
* used Coiled to load in **all** of the NYC taxi dataset from 10+ CSVs (10 GBs) on an AWS cluster,
* computed the average tip as a function of the number of passengers, and 
* learned a bunch about using Dask on cloud-based clusters!