In [3]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.rdd import portable_hash
from pyspark.statcounter import StatCounter
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, lit, desc, max
from functools import reduce


import pandas as pd
import numpy as np

import os
import json

from datetime import datetime
from operator import itemgetter
from shapely.geometry import shape, Point

from matplotlib import pyplot as plt

spark = SparkSession.builder.appName("Taxi")\
        .config("spark.driver.memory", "6g")\
        .config("spark.driver.cores", "6")\
        .getOrCreate()
sc=spark.sparkContext

In [4]:
from  pprint import pprint
def title(s):
    pprint("---- %s -----" %s)    
    
def see(s, v):
    pprint("---- %s -----" %s)
    pprint(v)

# 1. Sample the Data
Save the sample to a file, because the sampling itself is time-consuming

In [231]:
taxiRawAll = sc.textFile("../data/sample.csv")
header = sc.parallelize(taxiRawAll.take(1))
taxiRaw = taxiRawAll.sample(withReplacement=False, fraction=0.0001 ,seed=17)
taxiRaw.coalesce(1) #Makes 1 file as an output, since it reduced the # of partitions to 1

CoalescedRDD[1114] at coalesce at NativeMethodAccessorImpl.java:0

# Run below only once!

In [None]:
header.union(taxiRaw).saveAsTextFile("../data/ch08-geospatial/trip_data_sample.csv")

# 2. Read the sample Data
Save the sample to a file, because the sampling itself is time-consuming

In [5]:
#read the sample
taxiRaw = sc.textFile("../data/sample.csv")

In [6]:
sampledf = spark.read.csv("../data/sample.csv", header=True)
sampledf.toPandas()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.00,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.50,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.10,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,.70,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.10,-73.97625,40.748528,-74.002586,40.747868
5,20D9ECB2CA0767CF7A01564DF2844A3E,598CCE5B9C1918568DEE71F43CF26CD2,CMT,1,N,2013-01-07 15:27:48,2013-01-07 15:38:37,1,648,1.70,-73.966743,40.764252,-73.983322,40.743763
6,496644932DF3932605C22C7926FF0FE0,513189AD756FF14FE670D10B92FAF04C,CMT,1,N,2013-01-08 11:01:15,2013-01-08 11:08:14,1,418,.80,-73.995804,40.743977,-74.007416,40.744343
7,0B57B9633A2FECD3D3B1944AFC7471CF,CCD4367B417ED6634D986F573A552A62,CMT,1,N,2013-01-07 12:39:18,2013-01-07 13:10:56,3,1898,10.70,-73.989937,40.756775,-73.86525,40.77063
8,2C0E91FF20A856C891483ED63589F982,1DA2F6543A62B8ED934771661A9D2FA0,CMT,1,N,2013-01-07 18:15:47,2013-01-07 18:20:47,1,299,.80,-73.980072,40.743137,-73.982712,40.735336
9,2D4B95E2FA7B2E85118EC5CA4570FA58,CD2F522EEE1FF5F5A8D8B679E23576B3,CMT,1,N,2013-01-07 15:33:28,2013-01-07 15:49:26,2,957,2.50,-73.977936,40.786983,-73.952919,40.80637


# Setting some UDFs will be used later in various sections

In [7]:
hour_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').strftime('%H'))
weekDay_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').isoweekday())
weekDayName_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').strftime('%A'))
weekNum_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').strftime('%V'))
monthDay_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').strftime('%m-%d'))
month_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').strftime('%m'))
year_month_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m'))
year_UDF = udf(lambda pickup_datetime: datetime.strptime(pickup_datetime, '%Y-%m-%d %H:%M:%S').strftime('%Y'))

# Statistic (1) - Peak Hour during Working Days and WeekEnds

In [8]:
# Peak Hour
peakHourdf = sampledf \
    .withColumn("pickup_hour", hour_UDF(sampledf['pickup_datetime'])) \
    .withColumn("week_day", weekDay_UDF(sampledf['pickup_datetime'])) \
    .withColumn("count",lit(1))['pickup_hour', 'week_day' ,'count']

# Peak hour during working days
title("Peak hour during working days")
peakHourWDdf = peakHourdf.where("week_day < 5 ") \
    .groupBy('pickup_hour').sum('count') \
    .withColumnRenamed('sum(count)',"trips_count") \
    .orderBy(desc('trips_count'))
peakHourWDdf.show(10)         

# Peak hour during WeekEnds
title("Peak hour during WeekEnds")
peakHourWEdf = peakHourdf.where("week_day > 4 ") \
    .groupBy('pickup_hour').sum('count') \
    .withColumnRenamed('sum(count)',"trips_count") \
    .orderBy(desc('trips_count'))
peakHourWEdf.show(10)       

'---- Peak hour during working days -----'
+-----------+-----------+
|pickup_hour|trips_count|
+-----------+-----------+
|         18|         22|
|         13|         20|
|         10|         19|
|         17|         15|
|         14|         15|
|         12|         14|
|         16|         12|
|         15|         12|
|         22|         12|
|         08|         12|
+-----------+-----------+
only showing top 10 rows

'---- Peak hour during WeekEnds -----'
+-----------+-----------+
|pickup_hour|trips_count|
+-----------+-----------+
|         12|      11967|
|         13|      11538|
|         14|      11126|
|         11|      10996|
|         15|      10548|
|         16|       9257|
|         10|       7584|
|         17|       7294|
|         09|       3476|
|         18|       3176|
+-----------+-----------+
only showing top 10 rows



# Statistic(2) - Rate ~ Pickup Hour
    Step 1 - Tuple for Hour <-> Count
        Total sum trips

In [9]:
sumAllTrips = peakHourdf.agg({"count":"sum"}).withColumnRenamed("sum(count)","total_count")

title("sumAllTrips")
sumAllTrips.show()

# Year <-> Trip count
TPH = peakHourdf \
    .groupBy('pickup_hour').sum('count') \
    .withColumnRenamed('sum(count)',"count") \
    .orderBy('pickup_hour') \
    .rdd.collectAsMap()
#see("TPH", TPH)

'---- sumAllTrips -----'
+-----------+
|total_count|
+-----------+
|      99999|
+-----------+



Step 2 - Build a table with desired features

In [10]:
ph_UDF = udf(lambda pickup_hour: TPH[pickup_hour])
#rateCount_UDF = udf(lambda rate_sum: rate_s)
rateHourdf = sampledf \
    .withColumn("pickup_hour", hour_UDF(sampledf['pickup_datetime'])) \
    .withColumn("rate_float",sampledf['rate_code'].cast("float")) \
    .groupBy('pickup_hour').sum('rate_float') \
    .withColumnRenamed('sum(rate_float)',"rate_float") 
    
pickRatedf = rateHourdf \
    .withColumn("rate_avg", (rateHourdf['rate_float']/ph_UDF(rateHourdf['pickup_hour']))) \
    ['pickup_hour','rate_avg'] \
    .orderBy(desc('pickup_hour'))
    
title("Pickup Hour ~ Average Rate for this hour")
pickRatedf.toPandas()

#datetime.strptime('2013-01-08 09:50:05', '%Y-%m-%d %H:%M:%S').strftime('%H')

'---- Pickup Hour ~ Average Rate for this hour -----'


Unnamed: 0,pickup_hour,rate_avg
0,23,1.285714
1,22,1.0
2,21,1.0
3,20,1.031963
4,19,1.028598
5,18,1.026579
6,17,1.042961
7,16,1.051786
8,15,1.059943
9,14,1.042366


# Statistic(3) - Taxi Average Monthly Mileage cross all years

In [11]:
avgMlgdf = sampledf \
    .withColumn("trip_distance_float",sampledf['trip_distance'].cast("float")) \
    .withColumn("month", month_UDF(sampledf['pickup_datetime'])) \
    ['month', 'trip_distance_float'] \
    .groupBy('month') \
    .mean('trip_distance_float') \
    .withColumnRenamed('avg(trip_distance_float)', "average_distance")
title("Taxi Average Monthly Mileage cross all years")        
avgMlgdf.toPandas()

'---- Taxi Average Monthly Mileage cross all years -----'


Unnamed: 0,month,average_distance
0,1,2.985983


# Statistic(4) - Average Daily no. of Passengers during each month cross all years

In [12]:
avgPsgdf = sampledf \
    .withColumn("passenger_count_float",sampledf['passenger_count'].cast("float")) \
    .withColumn("month", month_UDF(sampledf['pickup_datetime'])) \
    ['month', 'passenger_count_float'] \
    .groupBy('month') \
    .mean('passenger_count_float') \
    .withColumnRenamed('avg(passenger_count_float)', "average_passengers")
    
avgPsgdf.toPandas()

Unnamed: 0,month,average_passengers
0,1,2.163002


# Statistic(5) - Drivers to Taxis ratio

In [13]:
title("Drivers to Taxis Ratio")
DTratio = (sampledf.select('hack_license').distinct().count())/(sampledf.select('medallion').distinct().count())
print(DTratio)

'---- Drivers to Taxis Ratio -----'
1.5502793296089385


# Statistic(6) - Trips per day

In [14]:
tripsdf = sampledf \
    .withColumn("week_day", weekDayName_UDF(sampledf['pickup_datetime'])) \
    .withColumn("count", lit(1)) \
    ['week_day', 'count'] \
    .groupBy('week_day').agg({'count':'sum'}) \
    .withColumnRenamed('sum(count)', "trips_count") \
    .orderBy(desc('trips_count'))

# useless unless we filter by YEAR!
totalDay_count = tripsdf.withColumn("dummy_index", lit(1)) \
    .groupBy('dummy_index').sum('trips_count').rdd.collectAsMap()

percentage_UDF = udf(lambda trips_count: str(round(trips_count/totalDay_count[1]*100,3))+"%")
tripsdf = tripsdf \
    .withColumn("trips_%_rounded", percentage_UDF('trips_count')) \
    ['week_day', 'trips_count', 'trips_%_rounded']

tripsdf.toPandas()

Unnamed: 0,week_day,trips_count,trips_%_rounded
0,Sunday,99539,99.54%
1,Saturday,133,0.133%
2,Friday,97,0.097%
3,Tuesday,87,0.087%
4,Monday,70,0.07%
5,Thursday,42,0.042%
6,Wednesday,31,0.031%


# Statistic(7) - Average Percentage of Taxis Occupied at agiven Hour on a Day

In [83]:
workingTaxidf = sampledf \
    .withColumn("hour", (hour_UDF(sampledf['pickup_datetime'])).cast("int")) \
    .withColumn("count", lit(1)) \
    ['medallion', 'hour', 'count'] \
    .groupBy('medallion') \
    .pivot('hour') \
    .agg({'count':'mean'}).fillna(0) #this should gives 1 for the taxi worked in this hour and zero otherwise

    
Taxis_total = sampledf.select('medallion').distinct().count()

# print(Taxis_total)
# print(cars_onroad)
#(workingTaxidf.toPandas())['0'].sum()

TPHP = {}
df_cnames = iter(workingTaxidf.schema.names) 
for cname in df_cnames:
    if cname == "medallion":
        continue
    TPHP[cname] = str(round(((workingTaxidf.toPandas())[cname].sum()*100/Taxis_total),2)) + "%"

#title("Average Percentage of Taxis Occupied at agiven Hour")
pd.DataFrame([TPHP], columns=TPHP.keys())

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,14,15,16,17,18,19,20,21,22,23
0,0.96%,1.89%,22.84%,28.48%,24.18%,3.99%,12.85%,22.52%,20.86%,34.45%,...,68.9%,68.13%,60.37%,58.52%,36.84%,15.92%,3.4%,0.45%,0.37%,0.31%


# TESTING! ---- Please ignore all below section

In [14]:
datetime.strptime('2017-11-20 09:50:05', '%Y-%m-%d %H:%M:%S').weekday()
ddd = sampledf.withColumn("hoba",sampledf['rate_code'].cast("int"))
ddd.printSchema()


root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- hoba: integer (nullable = true)

