In [30]:
import math
import os
import dask.dataframe as dd
import numpy as np
import pandas as pd

from dask.distributed import Client

In [31]:
client = Client('tcp://scheduler:8786')


+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| blosc   | 1.9.1  | 1.7.0     | 1.7.0   |
+---------+--------+-----------+---------+


In [32]:
client

0,1
Client  Scheduler: tcp://scheduler:8786  Dashboard: http://scheduler:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 2.09 GB


In [33]:
os.environ['LOCALSTACK_S3_ENDPOINT_URL'] = 'http://localstack:4572'

In [34]:
def roundup(x, base: int = 5):
    """Round `x` up to nearest `base`"""
    return int(math.ceil(x / float(base))) * base

In [35]:
def round_series_up(s: dd.Series) -> dd.Series:
    """Apply roundup function to all elements of `s`"""
    return s.apply(roundup, meta=pd.Series(data=[], dtype=np.float32))

In [36]:
def transform_dask_dataframe(df: dd.DataFrame) -> dd.DataFrame:
    """Process NYC taxi data"""
    return (
        df[[
            'tpep_pickup_datetime', 'tpep_dropoff_datetime',
            'trip_distance', 'total_amount'
        ]]
        .astype({
            'tpep_pickup_datetime': 'datetime64[ms]',
            'tpep_dropoff_datetime': 'datetime64[ms]'
        })
        .assign(drive_time=(lambda df: (df.tpep_dropoff_datetime - df.tpep_pickup_datetime).dt.seconds // 300))
        .assign(drive_time=lambda df: round_series_up(df.drive_time))
        .assign(trip_distance=lambda df: round_series_up(df.trip_distance))
        .query('drive_time <= 120 & trip_distance <= 50')
        .drop(['tpep_pickup_datetime', 'tpep_dropoff_datetime'], axis=1)
        .round({'trip_distance': 0})
        .groupby(['drive_time', 'trip_distance'])
        .mean()
        .rename(columns={'total_amount': 'avg_amount'})
    )

In [37]:
def compute_final_dataframe(df: dd.DataFrame) -> pd.DataFrame:
    """Execute dask task graph and compute final results"""
    return (
        df
        .compute()
        .reset_index()
        .pivot(
             index='drive_time',
             columns='trip_distance',
             values='avg_amount'
        )
        .fillna(0)
    )

In [38]:


# Lets toggle localstack by changing where boto3 is pointing to
if os.environ.get('LOCALSTACK_S3_ENDPOINT_URL'):
    taxi_data = dd.read_csv( 's3://nyc-tlc/trip data/yellow_tripdata_2018-04.csv',
        storage_options={
            'anon': True,
            'use_ssl': False,
            'key': 'foo',
            'secret': 'bar',
            "client_kwargs": {
                "endpoint_url": os.environ.get('LOCALSTACK_S3_ENDPOINT_URL'),
                "region_name": "us-east-1"
            }
        }
    )
else:
    # This assumes your using named profiles in aws cli with a default profile accessing your s3 bucket or EC2
    # instance or ECS task role
    taxi_data = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2018-04.csv')

In [39]:
taxi_data.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,2018-04-01 00:22:20,2018-04-01 00:22:26,1,0.0,1,N,145,145,2,2.5,0.5,0.5,0.0,0.0,0.3,3.8
1,1,2018-04-01 00:47:37,2018-04-01 01:08:42,1,6.7,1,N,152,90,2,22.5,0.5,0.5,0.0,0.0,0.3,23.8
2,1,2018-04-01 00:02:13,2018-04-01 00:17:52,2,4.1,1,N,239,158,1,15.5,0.5,0.5,3.35,0.0,0.3,20.15
3,1,2018-04-01 00:46:49,2018-04-01 00:52:05,1,0.7,1,N,90,249,1,5.5,0.5,0.5,1.35,0.0,0.3,8.15
4,1,2018-04-01 00:19:04,2018-04-01 00:19:09,1,0.0,1,N,145,145,2,2.5,0.5,0.5,0.0,0.0,0.3,3.8


In [40]:
taxi_data = transform_dask_dataframe(taxi_data)

In [41]:
taxi_data

Unnamed: 0_level_0,avg_amount
npartitions=1,Unnamed: 1_level_1
,float64
,...


In [42]:
taxi_data = compute_final_dataframe(taxi_data)

In [43]:
taxi_data

trip_distance,0,5,10,15,20
drive_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,3.8,6.341538,0.0,0.0,0.0
5,0.0,12.560333,26.423636,32.8,0.0
10,0.0,0.0,32.716667,44.536667,70.27
