## Big Data Processing using Distributed Dask on Fargate

This notebook will demonstrate how to perform big data processing with Distributed Dask on Fargate  It will leverage Newyork taxi trips datasets for demonstrating how to process hundreds of millions of records to calculate total number of trips made each day, calculate mean of passengers  across trips for each day and max trip duration across trips etc

## Setup conda packages dependencies

Install cloudpickle version 1.6.0 to match the version used in dask package

In [None]:
!conda install cloudpickle=1.6.0 -c conda-forge  -y

## Connect to Dask Fargate Cluster.  You need to provision this cluster following the instructions from here https://github.com/rvvittal/aws-dask-sm-fargate

In [1]:
from dask.distributed import Client

#enable this client for local device testing
#client = Client()

#enable this client for local distributed cluster testing 
#client = Client('localhost:8786')

#enable this client for fargate distributed cluster testing
client = Client('Dask-Scheduler.local-dask:8786')


cloudpickle
+-----------------------+---------+
|                       | version |
+-----------------------+---------+
| client                | 1.3.0   |
| scheduler             | 1.6.0   |
| tcp://11.0.3.179:9000 | 1.6.0   |
| tcp://11.0.3.43:9000  | 1.6.0   |
+-----------------------+---------+

numpy
+-----------------------+---------+
|                       | version |
+-----------------------+---------+
| client                | 1.18.1  |
| scheduler             | 1.19.1  |
| tcp://11.0.3.179:9000 | 1.19.1  |
| tcp://11.0.3.43:9000  | 1.19.1  |
+-----------------------+---------+

toolz
+-----------------------+---------+
|                       | version |
+-----------------------+---------+
| client                | 0.10.0  |
| scheduler             | 0.11.1  |
| tcp://11.0.3.179:9000 | 0.11.1  |
| tcp://11.0.3.43:9000  | 0.11.1  |
+-----------------------+---------+


## Scale out the number of dask workers as needed for your data science work

In [24]:
#enable this  when cluster is running on Fargate to scale out your cluster. 
!sudo aws ecs update-service --service Dask-Workers --desired-count 4 --cluster Fargate-Dask-Cluster

{
    "service": {
        "serviceArn": "arn:aws:ecs:us-west-2:716664005094:service/Dask-Workers",
        "serviceName": "Dask-Workers",
        "clusterArn": "arn:aws:ecs:us-west-2:716664005094:cluster/Fargate-Dask-Cluster",
        "loadBalancers": [],
        "serviceRegistries": [
            {
                "registryArn": "arn:aws:servicediscovery:us-west-2:716664005094:service/srv-ms5sjkuwukl7t6yq"
            }
        ],
        "status": "ACTIVE",
        "desiredCount": 4,
        "runningCount": 2,
        "pendingCount": 0,
        "launchType": "FARGATE",
        "platformVersion": "LATEST",
        "taskDefinition": "arn:aws:ecs:us-west-2:716664005094:task-definition/cloudformation-dask-workers-v4:1",
        "deploymentConfiguration": {
            "maximumPercent": 200,
            "minimumHealthyPercent": 100
        },
        "deployments": [
            {
                "id": "ecs-svc/7695731697923122642",
                "status": "PRI

## Restart the client after scale out operation

In [25]:
client.restart()

0,1
Client  Scheduler: tcp://Dask-Scheduler.local-dask:8786  Dashboard: http://Dask-Scheduler.local-dask:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 65.54 GB


## Introduction to Dask DataFrame
A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames. For more details, review this page: https://docs.dask.org/en/latest/dataframe.html 



In [3]:
import s3fs
import dask.dataframe as dd
import boto3


## Using Dask for EDA on NewYork Taxi Trip datasets

In [65]:
df = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2018-*.csv', storage_options={'anon': True}, assume_missing=True, parse_dates=['tpep_pickup_datetime','tpep_dropoff_datetime']
)

##  Calculate the trip duration in seconds 

In [66]:
df['trip_dur_secs'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.seconds

In [67]:
%%time
df.head()

CPU times: user 13.1 ms, sys: 0 ns, total: 13.1 ms
Wall time: 5.59 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_dur_secs
0,1.0,2018-01-01 00:21:05,2018-01-01 00:24:23,1.0,0.5,1.0,N,41.0,24.0,2.0,4.5,0.5,0.5,0.0,0.0,0.3,5.8,198
1,1.0,2018-01-01 00:44:55,2018-01-01 01:03:05,1.0,2.7,1.0,N,239.0,140.0,2.0,14.0,0.5,0.5,0.0,0.0,0.3,15.3,1090
2,1.0,2018-01-01 00:08:26,2018-01-01 00:14:21,2.0,0.8,1.0,N,262.0,141.0,1.0,6.0,0.5,0.5,1.0,0.0,0.3,8.3,355
3,1.0,2018-01-01 00:20:22,2018-01-01 00:52:51,1.0,10.2,1.0,N,140.0,257.0,2.0,33.5,0.5,0.5,0.0,0.0,0.3,34.8,1949
4,1.0,2018-01-01 00:09:18,2018-01-01 00:27:06,2.0,2.5,1.0,N,246.0,239.0,1.0,12.5,0.5,0.5,2.75,0.0,0.3,16.55,1068


In [56]:
df.dtypes

VendorID                        float64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                    float64
DOLocationID                    float64
payment_type                    float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
trip_dur_secs                     int64
dtype: object

## Calculate max trip duration across all trips

In [68]:
%%time
max_trip_duration = df.trip_dur_secs.max().compute()
print(max_trip_duration)

86399
CPU times: user 635 ms, sys: 1.25 ms, total: 637 ms
Wall time: 1min 5s


## Calculate total mean  of passengers across trips  by pickup date

In [69]:
# df['date_only'] = df['date_time_column'].dt.date
df['pickup_date'] = df['tpep_dropoff_datetime'].dt.date

In [70]:
%%time
df.head()

CPU times: user 13.7 ms, sys: 0 ns, total: 13.7 ms
Wall time: 4.8 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_dur_secs,pickup_date
0,1.0,2018-01-01 00:21:05,2018-01-01 00:24:23,1.0,0.5,1.0,N,41.0,24.0,2.0,4.5,0.5,0.5,0.0,0.0,0.3,5.8,198,2018-01-01
1,1.0,2018-01-01 00:44:55,2018-01-01 01:03:05,1.0,2.7,1.0,N,239.0,140.0,2.0,14.0,0.5,0.5,0.0,0.0,0.3,15.3,1090,2018-01-01
2,1.0,2018-01-01 00:08:26,2018-01-01 00:14:21,2.0,0.8,1.0,N,262.0,141.0,1.0,6.0,0.5,0.5,1.0,0.0,0.3,8.3,355,2018-01-01
3,1.0,2018-01-01 00:20:22,2018-01-01 00:52:51,1.0,10.2,1.0,N,140.0,257.0,2.0,33.5,0.5,0.5,0.0,0.0,0.3,34.8,1949,2018-01-01
4,1.0,2018-01-01 00:09:18,2018-01-01 00:27:06,2.0,2.5,1.0,N,246.0,239.0,1.0,12.5,0.5,0.5,2.75,0.0,0.3,16.55,1068,2018-01-01


In [71]:
%%time
df_mean_psngr_pickup_date = df.groupby('pickup_date').passenger_count.mean().compute()

CPU times: user 686 ms, sys: 9.58 ms, total: 695 ms
Wall time: 1min 51s


## Calculate total trips by pickup date

In [72]:
%%time
df_trips_by_pickup_date = df.groupby('pickup_date').store_and_fwd_flag.count().compute()

CPU times: user 667 ms, sys: 8.55 ms, total: 675 ms
Wall time: 1min 14s


In [73]:
len(df_trips_by_pickup_date)

455

In [74]:
df_trips_by_pickup_date.head()

pickup_date
2002-12-31     10
2003-01-01     19
2008-12-31    278
2009-01-01    516
2017-01-02      1
Name: store_and_fwd_flag, dtype: int64

In [75]:
# load and count number of rows
len(df)

102804250

## Find trips that had trip duration < 250 seconds

In [77]:
df_short = df[df.trip_dur_secs < 250]

In [78]:
df_short.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_dur_secs,pickup_date
0,1.0,2018-01-01 00:21:05,2018-01-01 00:24:23,1.0,0.5,1.0,N,41.0,24.0,2.0,4.5,0.5,0.5,0.0,0.0,0.3,5.8,198,2018-01-01
5,1.0,2018-01-01 00:29:29,2018-01-01 00:32:48,3.0,0.5,1.0,N,143.0,143.0,2.0,4.5,0.5,0.5,0.0,0.0,0.3,5.8,199,2018-01-01
7,1.0,2018-01-01 00:49:29,2018-01-01 00:51:53,1.0,0.7,1.0,N,239.0,238.0,1.0,4.0,0.5,0.5,1.0,0.0,0.3,6.3,144,2018-01-01
27,2.0,2018-01-01 00:48:40,2018-01-01 00:51:30,5.0,0.57,1.0,N,142.0,239.0,1.0,4.0,0.5,0.5,1.06,0.0,0.3,6.36,170,2018-01-01
34,2.0,2018-01-01 00:25:52,2018-01-01 00:29:24,1.0,0.49,1.0,N,238.0,238.0,1.0,4.5,0.5,0.5,1.45,0.0,0.3,7.25,212,2018-01-01


In [79]:
len(df_short)

11001092

## Scale down Fargate resources when no longer needed

In [80]:
!sudo aws ecs update-service --service Dask-Workers --desired-count 1 --cluster Fargate-Dask-Cluster

{
    "service": {
        "serviceArn": "arn:aws:ecs:us-west-2:716664005094:service/Dask-Workers",
        "serviceName": "Dask-Workers",
        "clusterArn": "arn:aws:ecs:us-west-2:716664005094:cluster/Fargate-Dask-Cluster",
        "loadBalancers": [],
        "serviceRegistries": [
            {
                "registryArn": "arn:aws:servicediscovery:us-west-2:716664005094:service/srv-ms5sjkuwukl7t6yq"
            }
        ],
        "status": "ACTIVE",
        "desiredCount": 1,
        "runningCount": 4,
        "pendingCount": 0,
        "launchType": "FARGATE",
        "platformVersion": "LATEST",
        "taskDefinition": "arn:aws:ecs:us-west-2:716664005094:task-definition/cloudformation-dask-workers-v4:1",
        "deploymentConfiguration": {
            "maximumPercent": 200,
            "minimumHealthyPercent": 100
        },
        "deployments": [
            {
                "id": "ecs-svc/7695731697923122642",
                "status": "PRI