# Read and Parse Raw Data

In [3]:
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import datediff, lag, udf, avg, min, max, col, unix_timestamp 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from datetime import datetime
import fiona
from shapely.geometry import Point, Polygon, shape
import pandas as pd

rdd_green = sc.textFile('s3n://msan694-group-data/green*.csv')
rdd_yellow = sc.textFile('s3n://msan694-group-data/yellow_tripdata_2016-01.csv')

In [4]:
rdd_green = rdd_green.filter(lambda l: (not l.startswith('VendorID')) & (not l == ''))
rdd_yellow = rdd_yellow.filter(lambda l: (not l.startswith('VendorID')) & (not l == ''))

In [5]:
rdd_split_green = rdd_green.map(lambda l: l.strip().split(","))
rdd_split_yellow = rdd_yellow.map(lambda l : l.strip().split(","))

# Define Schema and Create Dataframe

In [6]:
def toIntSafe(inval):
    try:
        return int(inval)
    except ValueError:
        return None

def toTimeSafe(inval):
    try:
        return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S")
    except ValueError:
        return None

def toFloatSafe(inval):
    try:
        return float(inval)
    except ValueError:
        return None

def convertType_green(r):
    return Row(
        toTimeSafe(r[1]),
        toTimeSafe(r[2]),
        toFloatSafe(r[5]),
        toFloatSafe(r[6]),
        toFloatSafe(r[7]),
        toFloatSafe(r[8]),
        toFloatSafe(r[10]),
        toFloatSafe(r[11]),
        #within_it(r[5], r[6])
        )

def convertType_yellow(r):
    return Row(
        toTimeSafe(r[1]),
        toTimeSafe(r[2]),
        toFloatSafe(r[5]),
        toFloatSafe(r[6]),
        toFloatSafe(r[9]),
        toFloatSafe(r[10]),
        toFloatSafe(r[4]),
        toFloatSafe(r[12]),
        #within_it(r[5], r[6])
    )

"""sf = fiona.open("Neighborhoods/nyc.shp")
districts = []
for district in sf:
    districts.append((district['geometry']['coordinates'], district['properties']['ntaname'], district['geometry']['type']))
districts_b = sc.broadcast(districts)"""

"""def within_it(x, y):
    dot = shape(Point(float(x), float(y)))
    for district in districts_b.value:
        included = False
        if district[2] == 'MultiPolygon':
            for poly in district[0]:
                if shape(Polygon(poly[0])).contains(dot):
                    return district[1]
        else:
            if shape(Polygon(district[0][0])).contains(dot):
                return district[1]
    return "Other"""

sch = StructType([
        StructField("pickupDateTime", TimestampType(), True),
        StructField("dropoffDateTime", TimestampType(), True),
        StructField("pickupLng", FloatType(), True),
        StructField("pickupLat", FloatType(), True),
        StructField("dropoffLng", FloatType(), True),
        StructField("dropoffLat", FloatType(), True),
        StructField("distance", FloatType(), True),
        StructField("fare", FloatType(), True),
        #StructField("District", StringType(), True)
    ])

rdd_newType_green = rdd_split_green.map(convertType_green)
rdd_newType_yellow = rdd_split_yellow.map(convertType_yellow)

df_green = sqlContext.createDataFrame(rdd_newType_green, sch)
df_yellow = sqlContext.createDataFrame(rdd_newType_yellow, sch)

#df_filtered = df.select('pickupDateTime', 'dropoffDateTime', 'pickupLng', 'pickupLat', 'dropoffLng', 'dropoffLat', 'distance', 'fare')
df_filtered_green = df_green.where("pickupLng > -74.3")\
                            .where("dropoffLng > -74.3")\
                            .where("pickupLat > 40.5")\
                            .where("dropoffLat > 40.5")\
                            .where("pickupLng < -73.7")\
                            .where("dropoffLng < -73.7")\
                            .where("pickupLat < 41.0")\
                            .where("dropoffLat < 41.0")\
                            .where("fare > 0.0")\
                            .where("distance > 0.0")

df_filtered_yellow = df_yellow.where("pickupLng > -74.3")\
                              .where("dropoffLng > -74.3")\
                              .where("pickupLat > 40.5")\
                              .where("dropoffLat > 40.5")\
                              .where("pickupLng < -73.7")\
                              .where("dropoffLng < -73.7")\
                              .where("pickupLat < 41.0")\
                              .where("dropoffLat < 41.0")\
                              .where("fare > 0.0")\
                              .where("distance > 0.0")
                            
                            
timeFmt = "%Y-%m-%d %H:%M:%S"
timeDiff = (unix_timestamp('dropoffDateTime', format=timeFmt) - unix_timestamp('pickupDateTime', format=timeFmt))
df_filtered_green = df_filtered_green.withColumn('duration', timeDiff)
df_filtered_green = df_filtered_green.where("duration < 18000").where("duration > 0").cache()

df_filtered_yellow = df_filtered_yellow.withColumn('duration', timeDiff)
df_filtered_yellow = df_filtered_yellow.where("duration < 18000").where("duration > 0").cache()

In [54]:
df_filtered_yellow.show(5)

+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
|      pickupDateTime|     dropoffDateTime| pickupLng|pickupLat|dropoffLng|dropoffLat|distance|fare|duration|
+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
|2016-01-01 00:00:...|2016-01-01 00:18:...| -73.98012| 40.74305| -73.91349|  40.76314|    5.52|19.0|    1110|
|2016-01-01 00:00:...|2016-01-01 00:26:...| -73.99406| 40.71999| -73.96636|  40.78987|    7.45|26.0|    1605|
|2016-01-01 00:00:...|2016-01-01 00:11:...| -73.97942|40.744614|-73.992035| 40.753944|     1.2| 9.0|     714|
|2016-01-01 00:00:...|2016-01-01 00:11:...| -73.94715|40.791046| -73.92077| 40.865578|     6.0|18.0|     672|
|2016-01-01 00:00:...|2016-01-01 00:11:...|-73.998344|40.723896| -73.99585|   40.6884|    3.21|11.5|     666|
+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
only showi

In [55]:
df_filtered_green.show(5)

+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
|      pickupDateTime|     dropoffDateTime| pickupLng|pickupLat|dropoffLng|dropoffLat|distance|fare|duration|
+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
|2016-01-01 00:29:...|2016-01-01 00:39:...| -73.92864| 40.68061| -73.92428| 40.698044|    1.46| 8.0|     612|
|2016-01-01 00:19:...|2016-01-01 00:39:...|-73.952675|40.723175| -73.92392|  40.76138|    3.56|15.5|    1179|
|2016-01-01 00:19:...|2016-01-01 00:39:...| -73.97161|40.676105| -74.01316| 40.646072|    3.79|16.5|    1215|
|2016-01-01 00:22:...|2016-01-01 00:38:...|  -73.9895| 40.66958| -74.00065| 40.689034|    3.01|13.5|     980|
|2016-01-01 00:24:...|2016-01-01 00:39:...| -73.96473|40.682854| -73.94072| 40.663013|    2.55|12.0|     921|
+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
only showi

In [7]:
df_filtered = df_filtered_green.unionAll(df_filtered_yellow)

In [57]:
df_filtered.show(5)

+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
|      pickupDateTime|     dropoffDateTime| pickupLng|pickupLat|dropoffLng|dropoffLat|distance|fare|duration|
+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
|2016-01-01 00:29:...|2016-01-01 00:39:...| -73.92864| 40.68061| -73.92428| 40.698044|    1.46| 8.0|     612|
|2016-01-01 00:19:...|2016-01-01 00:39:...|-73.952675|40.723175| -73.92392|  40.76138|    3.56|15.5|    1179|
|2016-01-01 00:19:...|2016-01-01 00:39:...| -73.97161|40.676105| -74.01316| 40.646072|    3.79|16.5|    1215|
|2016-01-01 00:22:...|2016-01-01 00:38:...|  -73.9895| 40.66958| -74.00065| 40.689034|    3.01|13.5|     980|
|2016-01-01 00:24:...|2016-01-01 00:39:...| -73.96473|40.682854| -73.94072| 40.663013|    2.55|12.0|     921|
+--------------------+--------------------+----------+---------+----------+----------+--------+----+--------+
only showi

In [61]:
df_filtered.count()

19433963

# KMEANS

In [8]:
va = VectorAssembler(outputCol = 'features', inputCols = ['pickupLng', 'pickupLat'])
kmeans = KMeans(k=10)

from pyspark.ml import Pipeline

pipeline = Pipeline(stages = (va, kmeans))
model = pipeline.fit(df_filtered)

df_filtered = model.transform(df_filtered)

In [9]:
centers = model.stages[1].clusterCenters()
center_df = pd.DataFrame(centers, columns=['lng', 'lat'])
center_df['group'] = center_df.index

# Add JFK and LGA

In [10]:
KennedyLng = -73.784214
KennedyLat = 40.645582
KennedyR = 0.01

def Kennedy(x1, x2):
    if ((x1 - KennedyLng)**2.0 + (x2 - KennedyLat)**2.0)**0.5 < KennedyR :
        return 1
    return 0

Kennedy_air = udf(lambda x, y: Kennedy(x, y))
df_filtered = df_filtered.withColumn("Kennedy", Kennedy_air('dropoffLng', 'dropoffLat'))

LGALng1 = -73.872238
LGALat1 = 40.773140
LGALng2 = -73.864355
LGALat2 = 40.769043
LGA_radius1 = 0.002
LGA_radius2 = 0.0025

def LGA(x1, x2):
    if ((x1-LGALng1)**2.0 + (x2 - LGALat1)**2.0)**0.5 < LGA_radius1 or ((x1-LGALng2)**2.0 + (x2 - LGALat2)**2.0)**0.5 < LGA_radius2:
        return 1
    return 0

LGA_air = udf(lambda x, y : LGA(x, y))
df_filtered = df_filtered.withColumn("LaGuardia", LGA_air('dropoffLng', 'dropoffLat'))

# Add Rush Hour

In [11]:
from pyspark.sql.functions import *

df_filtered = df_filtered.withColumn('Hour', hour('pickupDateTime'))

def rush_hour(hour):
    if hour >= 8 and hour <= 19:
        return 1
    return 0

is_rush = udf(lambda x : rush_hour(x))
df_filtered = df_filtered.withColumn('rushHour', is_rush('Hour'))
df_filtered = df_filtered.drop('Hour')

In [12]:
df_toJFK = df_filtered.where(df_filtered['Kennedy'] == 1)
df_toLGA = df_filtered.where(df_filtered['LaGuardia'] == 1)

In [13]:
pd_jfk = df_toJFK.groupBy('prediction', 'rushHour').agg({'duration':'avg', 'prediction':'count'}).toPandas()
pd_jfk.columns = ['group', 'rushHour', 'duration', 'ct']
pd_jfk

Unnamed: 0,group,rushHour,duration,ct
0,6,1,3046.196242,9580
1,5,1,2894.127533,1678
2,3,1,1379.552362,7028
3,8,0,1522.378715,2490
4,1,0,477.400234,2561
5,7,0,1785.872245,4446
6,9,1,2836.585942,9518
7,4,0,1879.852475,7131
8,7,1,2501.20487,8049
9,6,0,2052.806361,3553


In [14]:
pd_lga = df_toLGA.groupBy('prediction', 'rushHour').agg({'duration':'avg', 'prediction':'count'}).toPandas()
pd_lga.columns = ['group', 'rushHour', 'duration', 'ct']
pd_lga

Unnamed: 0,group,rushHour,duration,ct
0,6,1,2041.255252,10139
1,5,1,1646.144456,2949
2,3,1,783.446914,6480
3,8,0,544.171903,7353
4,1,0,1365.040682,762
5,7,0,1011.451106,5921
6,9,1,1438.030407,14273
7,4,0,1307.992623,8947
8,7,1,1319.755636,11532
9,6,0,1396.295126,3324


In [15]:
pd_jfk_r = pd_jfk[pd_jfk.rushHour == '1']
pd_jfk_nr = pd_jfk[pd_jfk.rushHour == '0']
df_jfk_r = center_df.set_index('group').join(pd_jfk_r.set_index('group')).reset_index()
df_jfk_nr = center_df.set_index('group').join(pd_jfk_nr.set_index('group')).reset_index()
pd_lga_r = pd_lga[pd_lga.rushHour == '1']
pd_lga_nr = pd_lga[pd_lga.rushHour == '0']
df_lga_r = center_df.set_index('group').join(pd_lga_r.set_index('group')).reset_index()
df_lga_nr = center_df.set_index('group').join(pd_lga_nr.set_index('group')).reset_index()

In [16]:
for df in [df_jfk_r, df_jfk_nr, df_lga_r, df_lga_nr]:
    df['durt'] = ["%.1f mins" %(x/60) for x in df['duration'].tolist()]
    df['percent'] = [1.0 * x / df.ct.sum() for x in df['ct'].tolist()]

# Plot

In [17]:
import pandas as pd
import seaborn as sns
from bokeh.io import output_notebook, show, output_file
from bokeh.plotting import figure
from bokeh.models import GMapPlot, GMapOptions, ColumnDataSource, Circle, DataRange1d, PanTool, WheelZoomTool, BoxSelectTool, HoverTool
from bokeh.models import LinearColorMapper
from bokeh.palettes import RdYlBu

In [18]:
centers = model.stages[1].clusterCenters()
center_df = pd.DataFrame(centers, columns=['lng', 'lat'])
center_df['group'] = center_df.index

def plot_map(df, fname):
    map_options = GMapOptions(lat=40.75, lng=-73.9, map_type='roadmap', zoom=11)
    API_KEY = 'AIzaSyC-4SnwvK3u2CR-zh-4zl7J_msCmDfq_Sg'
    color_mapper = LinearColorMapper(palette=RdYlBu[10])

    plot = GMapPlot(x_range=DataRange1d(),
                    y_range=DataRange1d(),
                    map_options=map_options,
                    api_key=API_KEY,
                    plot_width=1000,
                    plot_height=1000,)

    source = ColumnDataSource(data=dict(
                                    lat = df['lat'], 
                                    lon = df['lng'],
                                    grp = df['group'],
                                    dur = df['duration'],
                                    durt = df['durt'],
                                    pct = df['percent'],
                                    size = df['percent'] * 300
        ))
    
    circle = Circle(x='lon', y='lat', size='size',
                    fill_color={'field': 'dur', 'transform': color_mapper}, 
                    fill_alpha=0.6, line_color=None)
    
    plot.add_glyph(source, circle, name='circle')
    hover = HoverTool(names=['circle'],
                      tooltips=[
                        ("Group", "@grp"),
                        ("Longitude", "@lon"),
                        ("Latitude", "@lat"),
                        ("Avg Trip Duration", "@durt"),
                        ("Percent of Trips", "@pct{0.0%}"),
                ])
    plot.add_tools(PanTool(), WheelZoomTool(), BoxSelectTool(), hover)
    output_file(fname)
    return plot

In [None]:
p = plot_map(df_jfk_r, 'jfk_r.html')
show(p)

INFO:bokeh.core.state:Session output file 'jfk_r.html' already exists, will be overwritten.
