In [2]:
pip install geopandas

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install rtree

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install pygeos

Note: you may need to restart the kernel to use updated packages.


## Setup Cluster

In [1]:
from dask_yarn import YarnCluster
from dask.distributed import Client

  from distributed.utils import (
  from distributed.utils import (


In [2]:
# Create a cluster where each worker has 4 vCPU core and 8 GiB of memory:
# workers ~ processes, worker_vcores ~ threads
cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz",
                      worker_vcores = 4,
                      worker_memory = "8GiB"
                      )

# Scale cluster out to 4 such workers:
cluster.scale(4)

# Connect to the cluster (before proceeding, you should wait for workers to be registered by the dask scheduler, as below):
client = Client(cluster)

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  tcp://172.31.95.94:41815
distributed.scheduler - INFO -   dashboard at:                    :45149
distributed.scheduler - INFO - Receive client connection: Client-e5ee9112-562c-11ec-aba2-12ead9e9f74f
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.31.80.204:45347', name: dask.worker_3, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://172.31.80.204:45347
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.31.95.236:37719', name: dask.worker_1, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://172.31.95.236:37719
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register wo

In [3]:
client

0,1
Connection method: Cluster object,Cluster type: dask_yarn.YarnCluster
Dashboard: /proxy/45149/status,


In [None]:
cluster.shutdown()

# Clean Divvy Data

In [5]:
import dask.dataframe as dd
import matplotlib.pyplot as plt
import dask
import boto3
import pandas as pd
import numpy as np

import geopandas as gpd
import rtree
import pygeos
import dask_geopandas as dgpd

import datetime
import time

## Geopandas Shape Files

In [6]:
# read shape file from s3 bucket
zip_shp = gpd.read_file("s3://macs30123-dxu/divvy/Shapefile/Zipcodes.shp")
# drop unnecessary columns
zip_shp.drop(['objectid', 'shape_area', 'shape_len'], axis=1, inplace=True)

In [7]:
def assign_zip(df, lon_var, lat_var, zip_shp):
    '''
    This function returns a geopanda dataframe with zip code corresponding to selected lat,lon data
    '''

    # local Geodataframe
    local_geometry = gpd.points_from_xy(df[lon_var], df[lat_var])
    local_gdf = gpd.GeoDataFrame(data=df,
                                 geometry = local_geometry,
                                 crs = zip_shp.crs)
    gdf = gpd.sjoin(local_gdf, zip_shp, how="left", predicate='intersects')

    return gdf

## Clean Data

In [8]:
# read data from s3 bucket
# this data includes all data from April 2020, to October 2021
data = dd.read_csv("s3://macs30123-dxu/divvy/data/*-divvy-tripdata.zip", 
                   dtype={'start_station_id': 'object',
                          'end_station_id': 'object'})

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  "Setting ``blocksize=None``" % compression


In [9]:
# check number of rows in raw data
print(len(data.index))

8102341


In [10]:
t0 = time.time()

# select only electric bike
data = data[data['rideable_type'] == 'electric_bike']

# select only trips with start/end coordinates
data = data.dropna(subset=['start_lat', 'start_lng', 'end_lat', 'end_lng'], 
                   how='any')

# create relevant time variables based on start time
data['year'] = data['started_at'].apply(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S').year,
                                        meta=('year', int))
data['month'] = data['started_at'].apply(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S').month,
                                         meta=('month', int))
data['weekday'] = data['started_at'].apply(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S').weekday(),
                                           meta=('weekday', int))
data['hour'] = data['started_at'].apply(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S').hour,
                                        meta=('hour', int))

data_df = data.compute()

print("time_elapsed (engineer time var): ", time.time()-t0)

time_elapsed (engineer time var):  87.41385221481323


Due to the difficulty in working with Geopandas and Dask, we converted dataframe back into pandas before using geopandas package on it.

In [15]:
t1 = time.time()

# add zip code to starting location
data_start_zip = assign_zip(data_df, 'start_lng', 'start_lat', zip_shp)
data_start_zip.drop(['index_right', 'geometry'], axis=1, inplace=True)
data_start_zip.rename(columns={"zip": "start_zip"}, inplace=True)

# add zip code to ending location
data_zip = assign_zip(data_start_zip, 'end_lng', 'end_lat', zip_shp)
data_zip.drop(['index_right', 'geometry'], axis=1, inplace=True)
data_zip.rename(columns={"zip": "end_zip"}, inplace=True)
data_zip.reset_index(inplace = True)

print("time_elapsed (geopandas): ", time.time()-t1)

time_elapsed (geopandas):  53.747613191604614


In [16]:
# check number of rows in final data
print(len(data_zip.index))

2196135


In [17]:
# take a glimpse at the data
data_zip.head()

Unnamed: 0,index,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,year,month,weekday,hour,start_zip,end_zip
0,1677,FCC5AF1EE6C90556,electric_bike,2020-07-31 09:02:32,2020-07-31 09:08:02,Southport Ave & Waveland Ave,227,Broadway & Sheridan Rd,256,41.948101,-87.664004,41.952773,-87.650284,member,2020,7,4,9,60613,60613
1,1678,D250FDFE9547D37D,electric_bike,2020-07-31 10:33:04,2020-07-31 10:36:40,Broadway & Sheridan Rd,256,Halsted St & Roscoe St,299,41.952835,-87.65005,41.943737,-87.648974,member,2020,7,4,10,60613,60657
2,1679,518886FA68C576E7,electric_bike,2020-07-31 08:24:40,2020-07-31 08:31:08,Broadway & Sheridan Rd,256,Southport Ave & Waveland Ave,227,41.952801,-87.65004,41.948261,-87.664,member,2020,7,4,8,60613,60613
3,2309,C567E225898542DD,electric_bike,2020-07-31 13:14:47,2020-07-31 13:21:17,Calumet Ave & 18th St,338,Wabash Ave & 16th St,72,41.857687,-87.619427,41.860265,-87.625791,member,2020,7,4,13,60616,60605
4,2310,35B9C123ADCD865E,electric_bike,2020-07-31 07:05:09,2020-07-31 07:13:30,Ashland Ave & Chicago Ave,350,Ashland Ave & Augusta Blvd,30,41.895914,-87.66784,41.89958,-87.668511,member,2020,7,4,7,60622,60622


## Export Data as Parquet to s3

In [23]:
# export panda dataframe as parquet
parquet_name = 'divvy_electric.parquet'
divvy_par = data_zip.to_parquet(parquet_name)


This metadata specification does not yet make stability promises.  We do not yet recommend using this in a production setting unless you are able to rewrite your Parquet/Feather files.

  This is separate from the ipykernel package so we can avoid doing imports until


In [20]:
s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')

In [21]:
# check existing buckets
bucket_response = s3.list_buckets()
buckets = bucket_response['Buckets']
print(buckets)

# if not, create bucket
# bucket = s3.create_bucket(Bucket='macs30123-dxu')

[{'Name': 'aws-emr-resources-355474598182-us-east-1', 'CreationDate': datetime.datetime(2021, 11, 25, 21, 14, 26, tzinfo=tzlocal())}, {'Name': 'aws-logs-355474598182-us-east-1', 'CreationDate': datetime.datetime(2021, 11, 25, 21, 14, 26, tzinfo=tzlocal())}, {'Name': 'macs30123-dxu', 'CreationDate': datetime.datetime(2021, 11, 3, 3, 5, 2, tzinfo=tzlocal())}]


In [25]:
# upload to bucket
bucket_name = 'macs30123-dxu'

s3.upload_file(Filename = parquet_name, 
               Bucket = bucket_name,
               Key = 'divvy/' + parquet_name)