## Parallelize Pandas with Dask.dataframe


In [None]:
import dask
from dask.distributed import Client, progress
from dask import delayed
df = None
c = Client('tcp://localhost:8786')
c.restart()
c

In [None]:
from azureml.core import Workspace, Run
import os
run = Run.get_context()
ws = run.experiment.workspace

## or load directly through blob file system
# using https://github.com/dask/adlfs -- still pretty beta, 
# throws an error message, but seesm to work
ds = ws.get_default_datastore()
ACCOUNT_NAME = ds.account_name
ACCOUNT_KEY = ds.account_key
CONTAINER = ds.container_name
print(CONTAINER)

In [None]:
import dask.dataframe as dd
from fsspec.registry import known_implementations
known_implementations['abfs'] = {'class': 'adlfs.AzureBlobFileSystem'}
STORAGE_OPTIONS={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
df = dd.read_csv(f'abfs://{CONTAINER}/nyctaxig/sample.csv', 
                 storage_options=STORAGE_OPTIONS,
                 parse_dates=['lpepPickupDatetime', 'lpepDropoffDatetime'], dtype={'tripType': 'float64'})

In [None]:
# enable this code path instead of the above if you run into
# any issues with the AzureBlobFileSystem (https://github.com/dask/adlfs)
# this will load the data from the workspace blob storage mounted via blobFUSE
if False:
    from azureml.core import Workspace
    ## get the last run on the dask experiment which should be running 
    ## our dask cluster, and retrieve the data path from it
    ws = Workspace.from_config()
    exp = ws.experiments['dask']
    run = None
    for run in ws.experiments['dask'].get_runs():
        if run.get_status() == "Running":
            cluster_run = run
            break;

    if (run == None):
        raise Exception('Cluster should be in state \'Running\'')

    data_path = cluster_run.get_metrics()['datastore'] + '/nyctaxig'


    import dask
    import dask.dataframe as dd
    from dask import delayed

    def load_data(path):
        return dd.read_csv(path, parse_dates=['lpepPickupDatetime', 'lpepDropoffDatetime'])

    data_2015 = data_path + '/2015'
    data_2015_csv = data_2015 + '/*.csv'
    df = delayed(load_data)(data_2015_csv).compute()

In [None]:
# fall back to this path if neither of the above paths have been enabled
if df is None:
    ## or in this case straight from GOOGLE Storage
    import dask.dataframe as dd
    df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv',
                     storage_options={'token': 'anon'}, 
                     parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])


In [None]:
%time len(df)

In [None]:
df.partitions

In [None]:
%time df.map_partitions(len).compute().sum()


Dask DataFrames
---------------

*  Coordinate many Pandas DataFrames across a cluster
*  Faithfully implement a subset of the Pandas API
*  Use Pandas under the hood (for speed and maturity)

In [None]:
df

In [None]:
df.dtypes

In [None]:
# list of column names that need to be re-mapped
remap = {}
#remap['tpep_pickup_datetime'] = 'pickup_datetime'
#remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
#remap['RatecodeID'] = 'rate_code'

remap['lpepPickupDatetime'] = 'pickupDatetime'
remap['lpepDropoffDatetime'] = 'dropoffDatetime'


#create a list of columns & dtypes the df must have
must_haves = {
    'vendorID': 'object',
    'pickupDatetime': 'datetime64[ms]',
    'dropoffDatetime': 'datetime64[ms]',
    'passengerCount': 'int32',
    'tripDistance': 'float32',
    'pickupLongitude': 'float32',
    'pickupLatitude': 'float32',
    'rateCodeID': 'int32',
    'paymentType': 'int32',
    'dropoffLongitude': 'float32',
    'dropoffLatitude': 'float32',
    'fareAmount': 'float32',
    'tipAmount': 'float32',
    'totalAmount': 'float32'
}

query_frags = [
    'fareAmount > 0 and fareAmount < 500',
    'passengerCount > 0 and passengerCount < 6',
    'pickupLongitude > -75 and pickupLongitude < -73',
    'dropoffLongitude > -75 and dropoffLongitude < -73',
    'pickupLatitude > 40 and pickupLatitude < 42',
    'dropoffLatitude > 40 and dropoffLatitude < 42'
]
query = ' and '.join(query_frags)

In [None]:
df.sample(frac=0.00001).compute()

In [None]:
# helper function which takes a DataFrame partition
def clean(df_part, remap, must_haves, query):    
    df_part = df_part.query(query)
    
    # some col-names include pre-pended spaces remove & lowercase column names
    # tmp = {col:col.strip().lower() for col in list(df_part.columns)}

    # rename using the supplied mapping
    df_part = df_part.rename(columns=remap)
    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col, axis=1)
            continue

        if df_part[col].dtype == 'object' and col in ['pickupDatetime', 'dropoffDatetime']:
            df_part[col] = df_part[col].astype('datetime64[ms]')
            continue
            
        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].str.fillna('-1')
            df_part[col] = df_part[col].astype('float32')
        else:
            # save some memory by using 32 bit floats
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
    
    return df_part

In [None]:
taxi_df = clean(df, remap, must_haves, query)

In [None]:
import math
from math import  pi
from dask.array import cos, sin, arcsin, sqrt, floor
import numpy as np

def haversine_distance(pickupLatitude, pickupLongitude, dropoffLatitude, dropoffLongitude):
    x_1 = pi / 180 * pickupLatitude
    y_1 = pi / 180 * pickupLongitude
    x_2 = pi / 180 * dropoffLatitude
    y_2 = pi / 180 * dropoffLongitude

    dlon = y_2 - y_1
    dlat = x_2 - x_1
    a = sin(dlat / 2)**2 + cos(x_1) * cos(x_2) * sin(dlon / 2)**2

    c = 2 * arcsin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers

    return c * r

def day_of_the_week(day, month, year):
    if month < 3:
        shift = month
    else:
        shift = 0
    Y = year - (month < 3)
    y = Y - 2000
    c = 20
    d = day
    m = month + shift + 1
    return (d + floor(m * 2.6) + y + (y // 4) + (c // 4) - 2 * c) % 7
        
def add_features(df):
    df['hour'] = df['pickupDatetime'].dt.hour.astype('int32')
    df['year'] = df['pickupDatetime'].dt.year.astype('int32')
    df['month'] = df['pickupDatetime'].dt.month.astype('int32')
    df['day'] = df['pickupDatetime'].dt.day.astype('int32')
    df['day_of_week'] = df['pickupDatetime'].dt.weekday.astype('int32')
       
    #df['diff'] = df['dropoff_datetime'].astype('int32') - df['pickup_datetime'].astype('int32')
    df['diff'] = df['dropoffDatetime'] - df['pickupDatetime']
    
    df['pickupLatitude_r'] = (df['pickupLatitude'] // .01 * .01).astype('float32')
    df['pickupLongitude_r'] = (df['pickupLongitude'] // .01 * .01).astype('float32')
    df['dropoffLatitude_r'] = (df['dropoffLatitude'] // .01 * .01).astype('float32')
    df['dropoffLongitude_r'] = (df['dropoffLongitude'] // .01 * .01).astype('float32')
    
    #df = df.drop('pickup_datetime', axis=1)
    #df = df.drop('dropoff_datetime', axis=1)

    #df = df.apply_rows(haversine_distance_kernel,
    #                   incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
    #                   outcols=dict(h_distance=np.float32),
    #                   kwargs=dict())

    import numpy

    df['h_distance'] = haversine_distance(df['pickupLatitude'], 
                                          df['pickupLongitude'], 
                                          df['dropoffLatitude'], 
                                          df['dropoffLongitude']).astype('float32')

    #df = df.apply_rows(day_of_the_week_kernel,
    #                   incols=['day', 'month', 'year'],
    #                   outcols=dict(day_of_week=np.float32),
    #                   kwargs=dict())
    #df['day_of_week'] = numpy.empty(len(df), dtype=np.int32)
    #day_of_the_week_kernel(df['day'],
    #                       df['month'],
    #                       df['year'],
    #                       df['day_of_week'])
    
    
    df['is_weekend'] = (df['day_of_week']>5).astype("int32")
    return df

In [None]:
taxi_df = add_features(taxi_df)
taxi_df.dtypes

In [None]:
%time len(taxi_df)

In [None]:
taxi_df = taxi_df.persist()
progress(taxi_df)

In [None]:
%time taxi_df.passengerCount.sum().compute()

In [None]:
# Compute average trip distance grouped by passenger count
taxi_df.groupby('passengerCount').tripDistance.mean().compute()

### Tip Fraction, grouped by day-of-week and hour-of-day

In [None]:
df2 = taxi_df[(taxi_df.tipAmount > 0) & (taxi_df.fareAmount > 0)]
df2['tipFraction'] = df2.tipAmount / df2.fareAmount

In [None]:
# Group df.tpep_pickup_datetime by dayofweek and hour
dayofweek = df2.groupby(df2.pickupDatetime.dt.dayofweek).tipFraction.mean() 
hour = df2.groupby(df2.pickupDatetime.dt.hour).tipFraction.mean()

dayofweek, hour = dask.persist(dayofweek, hour)
progress(dayofweek, hour)

### Plot results

This requires matplotlib to be installed

In [None]:
%matplotlib inline

In [None]:
hour.compute().plot(figsize=(10, 6), title='Tip Fraction by Hour')

In [None]:
dayofweek.compute().plot(figsize=(10, 6), title='Tip Fraction by Day of Week')

In [None]:
import pandas as pd
%matplotlib inline
taxi_df.groupby('passengerCount').fareAmount.mean().compute().sort_index().plot(legend=True)

In [None]:
taxi_df.groupby(taxi_df.passengerCount).tripDistance.mean().compute().plot(legend=True)

In [None]:
by_payment = taxi_df.groupby(taxi_df.paymentType).fareAmount.count().compute()
by_payment.index = by_payment.index.map({1: 'Credit card',
    2: 'Cash',
    3: 'No charge',
    4: 'Dispute',
    5: 'Unknown',
    6: 'Voided trip'})

In [None]:
by_payment.plot(legend=True, kind='bar')


### Let's save the transformed dataset back to blob

In [None]:
import uuid
output_uuid = uuid.uuid1().hex
run.log('output_uuid', output_uuid)

output_path = run.get_metrics()['datastore'] + '/output/' + output_uuid + '.parquet'

print('save parquet to ', output_path)

taxi_df.to_parquet(output_path)

print('done')

In [None]:
import dask
import dask.dataframe as dd

df = dd.read_parquet(output_path)


In [None]:
df.head()