In [82]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName('NYC Taxi traffic analysis')\
    .getOrCreate()

In [83]:
df = spark.read.format("parquet").load("gs://pyspark-tutorial-sameera-holy/data/NYC/*.parquet")

                                                                                

In [26]:
df.describe()

                                                                                

DataFrame[summary: string, VendorID: string, passenger_count: string, trip_distance: string, RatecodeID: string, store_and_fwd_flag: string, PULocationID: string, DOLocationID: string, payment_type: string, fare_amount: string, extra: string, mta_tax: string, tip_amount: string, tolls_amount: string, improvement_surcharge: string, total_amount: string, congestion_surcharge: string, airport_fee: string]

In [27]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [28]:
df.head()

                                                                                

Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2021, 10, 1, 0, 25, 56), tpep_dropoff_datetime=datetime.datetime(2021, 10, 1, 1, 11, 35), passenger_count=1.0, trip_distance=7.4, RatecodeID=1.0, store_and_fwd_flag='Y', PULocationID=140, DOLocationID=36, payment_type=1, fare_amount=33.0, extra=3.0, mta_tax=0.5, tip_amount=4.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=40.8, congestion_surcharge=2.5, airport_fee=0.0)

In [30]:
from pyspark.sql.functions import *

# Trip Analysis

## 1. Average distance and duration

In [31]:
df_enriched = df.withColumn( #df.limit(1000).withColumn
    "duration_in_mins",
    (
        unix_timestamp(df["tpep_dropoff_datetime"]) - unix_timestamp(df["tpep_pickup_datetime"])
    )/60
)

In [32]:
df_month = df_enriched.groupBy(
    month("tpep_pickup_datetime").alias("month")).agg(
    avg("duration_in_mins").alias("average_trip_time_in_minutes")
    ).orderBy("month", ascending=True)

In [33]:
df_month.toPandas()

                                                                                

Unnamed: 0,month,average_trip_time_in_minutes
0,1,13.961937
1,2,14.886453
2,3,14.73099
3,4,15.510014
4,5,15.906733
5,6,16.826478
6,7,16.624542
7,8,16.572784
8,9,17.500142
9,10,17.420199


In [36]:
df_day = df_enriched.groupBy(
    dayofmonth("tpep_pickup_datetime").alias("dayofmonth")).agg(
    avg("duration_in_mins").alias("average_trip_time_in_minutes")
    ).orderBy("dayofmonth", ascending=True)

df_day.toPandas()

                                                                                

Unnamed: 0,dayofmonth,average_trip_time_in_minutes
0,1,16.870032
1,2,16.638504
2,3,16.814806
3,4,16.634436
4,5,16.471852
5,6,16.303049
6,7,16.899675
7,8,16.748068
8,9,16.866152
9,10,16.966998


In [38]:
df_hour = df_enriched.groupBy(
    hour("tpep_pickup_datetime").alias("hour")).agg(
    avg("duration_in_mins").alias("average_trip_time_in_minutes")
    ).orderBy("hour", ascending=True)

df_hour.toPandas()

                                                                                

Unnamed: 0,hour,average_trip_time_in_minutes
0,0,15.730268
1,1,15.850163
2,2,14.827951
3,3,15.549171
4,4,16.769697
5,5,16.754209
6,6,16.470007
7,7,16.430668
8,8,15.924119
9,9,15.93716


## 2. find top 10 locations (pickup and drop-off)

In [46]:
pickup_counts = df_enriched.groupBy("PULocationID").count().withColumnRenamed("count", "pickup_count").orderBy("pickup_count", ascending=False).limit(10)
dropoff_counts = df_enriched.groupBy("DOLocationID").count().withColumnRenamed("count", "dropoff_count").orderBy("dropoff_count", ascending=False).limit(10)

In [48]:
pickup_counts.toPandas()

                                                                                

Unnamed: 0,PULocationID,pickup_count
0,237,1553554
1,236,1424614
2,161,1091329
3,132,1025063
4,186,1019650
5,142,989927
6,170,967766
7,162,954917
8,239,932473
9,141,909845


In [49]:
dropoff_counts.toPandas()

                                                                                

Unnamed: 0,DOLocationID,dropoff_count
0,236,1434919
1,237,1356518
2,161,1001077
3,170,920433
4,141,902052
5,239,886837
6,142,854324
7,48,782803
8,238,779046
9,162,772823


# Tip Analysis

## 1. Tip percentage by trip

In [57]:
pickup_tip_avg = df.groupBy("PULocationID").avg("tip_amount").withColumnRenamed("avg(tip_amount)", "avg_pickup_tip")
dropoff_tip_avg = df.groupBy("DOLocationID").avg("tip_amount").withColumnRenamed("avg(tip_amount)", "avg_dropoff_tip")

tip_analysis = pickup_tip_avg.join(dropoff_tip_avg, pickup_tip_avg.PULocationID == dropoff_tip_avg.DOLocationID, "outer") \
                              .select(
                                  pickup_tip_avg.PULocationID,
                                  pickup_tip_avg.avg_pickup_tip,
                                  dropoff_tip_avg.avg_dropoff_tip
                              )

tip_analysis.show()


[Stage 126:>                                                        (0 + 1) / 1]

+------------+-------------------+------------------+
|PULocationID|     avg_pickup_tip|   avg_dropoff_tip|
+------------+-------------------+------------------+
|           6| 3.4790666666666668| 4.499037328094302|
|           7| 1.3013844314847673| 2.656280360659553|
|          19| 0.8762969588550983|2.0907807215332577|
|          22|0.46592423710978603|2.4033159700165174|
|          25|  2.230209600283402| 4.315656239446144|
|          26| 0.6087384777455886| 2.063804650401179|
|          29| 0.9945027372262772| 2.359642616318274|
|          31|  4.218533568904593| 4.714030126849893|
|          32| 0.5163899697362732|1.3893791946308724|
|          34| 3.0828074866310153| 3.514888503468782|
|          39| 0.5917798427448177|1.1539258530523488|
|          43|  2.118766290166797|2.0653535694655165|
|          50|  2.093459859164062|2.1060518287285857|
|          54| 3.2075388967468177| 4.813885425442293|
|          57| 0.7426246719160104|0.8945221238938055|
|          65| 2.58494041607

                                                                                

Yes. The tip for some location is different than other locations

Find correlation between distance and tip

In [60]:
corr = df.select(corr("trip_distance", "tip_amount")).first()[0]
corr

                                                                                

0.001648755157615622

The correlation is near zero. Therefore, there is not much relation between distance and the tip

## 2. Tips by Time

In [68]:
df_year_tip = df.groupBy(
        year("tpep_pickup_datetime").alias("year")).agg(
        avg("tip_amount").alias("avg_tip_amnt_year")
    ).orderBy("year", ascending=True) \

df_year_tip.show()



+----+-------------------+
|year|  avg_tip_amnt_year|
+----+-------------------+
|2002|                0.0|
|2003|0.40099999999999997|
|2004|                1.5|
|2008| 0.6582558139534883|
|2009| 0.8521182266009851|
|2011|               0.59|
|2020| 1.4675000000000002|
|2021| 2.3411800209678297|
|2022|               2.56|
|2028|                0.0|
|2029|               2.16|
|2070|                0.0|
|2098|                0.0|
+----+-------------------+



                                                                                

No, it is difficult to conclude with the above year-tip information

In [70]:
df_day_tip = df.groupBy(
        dayofmonth("tpep_pickup_datetime").alias("day")).agg(
        avg("tip_amount").alias("avg_tip_amnt_day")
    ).orderBy("day", ascending=True) \

df_day_tip.toPandas()

                                                                                

Unnamed: 0,day,avg_tip_amnt_day
0,1,2.354352
1,2,2.346385
2,3,2.339511
3,4,2.313832
4,5,2.325328
5,6,2.323062
6,7,2.363647
7,8,2.35296
8,9,2.35784
9,10,2.373683


No, the tip does not change with day. It is almost the same every day of the month.

In [71]:
df_day_week_tip = df.groupBy(
        dayofweek("tpep_pickup_datetime").alias("weekday")).agg(
        avg("tip_amount").alias("avg_tip_amnt_weekday")
    ).orderBy("weekday", ascending=True) \

df_day_week_tip.show()



+-------+--------------------+
|weekday|avg_tip_amnt_weekday|
+-------+--------------------+
|      1|   2.482065861076579|
|      2|  2.3317220871051973|
|      3|   2.277526969828649|
|      4|  2.3011234998178165|
|      5|   2.366834601495962|
|      6|  2.3545071417344587|
|      7|   2.303158334446044|
+-------+--------------------+



                                                                                

No, the tip does not change with weekday. It is almost the same every day of the week.

## 3. Does the payment type affect tipping

In [80]:
tip_payment_type = df.groupBy("payment_type")\
                    .avg("tip_amount")\
                    .withColumnRenamed("avg(tip_amount)", "avg_tip_payment_type")\
                    .orderBy("avg_tip_payment_type", ascending=False)

tip_payment_type.show()



+------------+--------------------+
|payment_type|avg_tip_payment_type|
+------------+--------------------+
|           1|   3.075551030676598|
|           0|   2.170006816821669|
|           4|0.022958282745690745|
|           2|4.108590704647675E-4|
|           5|                 0.0|
|           3|-0.01167058060330...|
+------------+--------------------+



                                                                                

Yes, the payment type has an influence on tipping. The types 1 and 0 has higher tipping than others

# Fare Analysis

## 1. Calculate the avg fare by pull-drop location

In [89]:
pickup_fare_avg = df.groupBy("PULocationID").avg("fare_amount").withColumnRenamed("avg(fare_amount)", "avg_pickup_fare").orderBy("avg_pickup_fare", ascending=False)
dropoff_fare_avg = df.groupBy("DOLocationID").avg("fare_amount").withColumnRenamed("avg(fare_amount)", "avg_dropoff_fare").orderBy("avg_dropoff_fare", ascending=False)

pickup_fare_avg.limit(10).show()



+------------+-----------------+
|PULocationID|  avg_pickup_fare|
+------------+-----------------+
|          44|99.34643231114437|
|          84|90.55050847457628|
|         110|             84.5|
|         204|83.64895833333334|
|          99|82.19200000000001|
|           5|81.81747826086955|
|           1|78.47885572139306|
|         109|66.72931818181819|
|          27|66.16974358974358|
|           2|66.07461538461538|
+------------+-----------------+



                                                                                

## 2. Calculate the avg fare by passenger count. (does a correlation between passenger count and fare amount exists)

In [92]:
passengerCnt_fare = df.groupBy("passenger_count").avg("fare_amount").withColumnRenamed("avg(fare_amount)", "passengerCnt_fare").orderBy("passengerCnt_fare", ascending=False)

passengerCnt_fare.limit(10).show()



+---------------+------------------+
|passenger_count| passengerCnt_fare|
+---------------+------------------+
|            9.0|             61.35|
|            7.0| 52.91679487179488|
|            8.0| 49.14408163265306|
|           null|25.502716631913163|
|            4.0|14.284687986716282|
|            2.0|13.776399293941516|
|            3.0| 13.55566381846172|
|            6.0|12.751094437062962|
|            1.0|12.709557736576157|
|            5.0|12.666400646383602|
+---------------+------------------+



                                                                                

From the data, we can say that higher the passenger count is, higher the fare is. Therefore correlation exists

## 3. Correlate the fare amount and distance trip

In [106]:
dist_fare = df.groupBy("trip_distance").avg("fare_amount").withColumnRenamed("avg(fare_amount)", "dist_fare").orderBy("dist_fare", ascending=False)

dist_fare.show()



+-------------+---------+
|trip_distance|dist_fare|
+-------------+---------+
|       964.27|   2413.0|
|       821.54|   2056.0|
|       709.88|   1217.0|
|        427.7|   1128.5|
|       243.33|   1043.5|
|        633.8|   1025.0|
|       344.88|    864.5|
|        323.0|    823.0|
|        271.4|    808.5|
|       207.13|    800.0|
|       215.95|    800.0|
|       165.99|    790.0|
|       153.84|    749.5|
|        260.5|    722.0|
|        282.1|    716.0|
|        267.7|    708.0|
|       110.17|    701.0|
|        270.2|    688.5|
|       258.98|    653.0|
|       247.37|    620.5|
+-------------+---------+
only showing top 20 rows



                                                                                

From the data, there exists a positive correlation where the highest fare is for the trip with large distance.

# Demand Prediction

## 1. Feature engineering: Use the date and time of the pickups to create features for the model, such as hour of the day, day of the week, etc.

In [132]:
df_features = df.withColumn("pickup_datetime", df["tpep_pickup_datetime"].cast("timestamp"))

df_features = df_features.withColumn("pickup_hour", hour(df_features["pickup_datetime"]))
df_features = df_features.withColumn("pickup_day_of_month", dayofmonth(df_features["pickup_datetime"]))
df_features = df_features.withColumn("pickup_month", month(df_features["pickup_datetime"]))
df_features = df_features.withColumn("pickup_year", year(df_features["pickup_datetime"]))

df_features = df_features.drop(*["VendorID", "RatecodeID", "PULocationID", "DOLocationID", "pickup_datetime",
                                "tpep_pickup_datetime", "dropoff_datetime", "tpep_dropoff_datetime"])

df_features.limit(10).toPandas()

                                                                                

Unnamed: 0,passenger_count,trip_distance,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,pickup_hour,pickup_day_of_month,pickup_month,pickup_year
0,1.0,7.4,Y,1,33.0,3.0,0.5,4.0,0.0,0.3,40.8,2.5,0.0,0,1,10,2021
1,1.0,14.1,N,2,39.0,3.0,0.5,0.0,0.0,0.3,42.8,2.5,0.0,0,1,10,2021
2,2.0,5.64,N,1,20.0,0.5,0.5,4.76,0.0,0.3,28.56,2.5,0.0,0,1,10,2021
3,0.0,5.5,N,1,27.5,3.0,0.5,3.0,0.0,0.3,34.3,2.5,0.0,0,1,10,2021
4,1.0,3.7,N,2,17.5,3.0,0.5,0.0,0.0,0.3,21.3,2.5,0.0,0,1,10,2021
5,1.0,4.2,N,1,14.5,0.5,0.5,3.66,0.0,0.3,21.96,2.5,0.0,0,1,10,2021
6,1.0,6.97,N,2,21.5,0.5,0.5,0.0,0.0,0.3,24.05,0.0,1.25,0,1,10,2021
7,1.0,4.0,N,1,14.0,3.0,0.5,3.55,0.0,0.3,21.35,2.5,0.0,0,1,10,2021
8,1.0,1.9,N,1,8.0,3.0,0.5,2.36,0.0,0.3,14.16,2.5,0.0,0,1,10,2021
9,1.0,66.6,N,2,170.5,1.75,0.5,0.0,23.3,0.3,196.35,0.0,1.25,0,1,10,2021


## 2. Regression model: Use a regression model (such as linear regression) to predict the number of pickups in the next hour based on the features.

In [166]:
from pyspark.ml.feature import VectorAssembler,
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

For each hour data, get the number of pickups by aggregation(sum) of the variable passenger_count

In [133]:
aggregated_data = df_features.groupBy(
    "pickup_hour", "pickup_day_of_month", "pickup_month", "pickup_year"
).agg(
    sum("passenger_count").alias("number_of_pickups"),
    avg("trip_distance").alias("avg_trip_distance"),
    avg("fare_amount").alias("avg_fare_amount"),
    avg("total_amount").alias("avg_total_amount"),
).orderBy("pickup_hour", "pickup_day_of_month", "pickup_month", "pickup_year")

aggregated_data.limit(10).toPandas()

                                                                                

Unnamed: 0,pickup_hour,pickup_day_of_month,pickup_month,pickup_year,number_of_pickups,avg_trip_distance,avg_fare_amount,avg_total_amount
0,0,1,1,2003,2.0,3.8,13.75,17.55
1,0,1,1,2009,249.0,3.00224,12.996,16.8616
2,0,1,1,2021,1620.0,3.577394,13.44865,18.739329
3,0,1,1,2022,25.0,4.6425,15.5,22.5525
4,0,1,2,2011,2.0,1.95,10.5,14.98
5,0,1,2,2021,250.0,5.022325,18.043289,24.058947
6,0,1,3,2021,471.0,5.898835,20.288564,26.846829
7,0,1,4,2021,776.0,5.476673,18.39296,24.468284
8,0,1,5,2021,3618.0,3.428061,12.923136,19.090159
9,0,1,6,2021,1304.0,5.826816,19.390875,26.507845


In [139]:
features = aggregated_data.columns

Remove number_of_pickups as it is the target variable

In [142]:
features.remove("number_of_pickups")

In [143]:
features

['pickup_hour',
 'pickup_day_of_month',
 'pickup_month',
 'pickup_year',
 'avg_trip_distance',
 'avg_fare_amount',
 'avg_total_amount']

In [177]:
assembler = VectorAssembler(inputCols=features, outputCol="features")
data_assembled = assembler.transform(aggregated_data)

In [178]:
train_data, test_data = data_assembled.randomSplit([0.8, 0.2], seed=42)

Train the model with Linear Regression

In [181]:
lr = LinearRegression(featuresCol="features", labelCol="number_of_pickups",  regParam=0.2)

# Create a pipeline
pipeline = Pipeline(stages=[lr])

# Train the model
model = pipeline.fit(train_data)

                                                                                

The model has made a good prediction\
**Original Data:** number of pickups\
**Precited Data:** prediction

In [189]:
# Make predictions on the test data
predictions = model.transform(test_data)

predictions.select('pickup_hour',
 'pickup_day_of_month',
 'pickup_month',
 'pickup_year',"number_of_pickups", "prediction").limit(10).show()



+-----------+-------------------+------------+-----------+-----------------+-----------------+
|pickup_hour|pickup_day_of_month|pickup_month|pickup_year|number_of_pickups|       prediction|
+-----------+-------------------+------------+-----------+-----------------+-----------------+
|         12|                  1|           4|       2021|           6249.0|4581.407727193917|
|         12|                  1|          10|       2021|           7926.0|6535.869906769585|
|         12|                  1|          12|       2021|           9231.0| 7232.27509794342|
|         12|                  2|           1|       2021|           3374.0|3384.295514360114|
|         12|                  2|           2|       2021|           2404.0|4178.031729388924|
|         12|                  3|           5|       2021|           6125.0| 5005.23439416934|
|         12|                  3|           6|       2021|           7548.0|5152.899365512305|
|         12|                  3|          11|    

                                                                                

In [184]:
evaluator = RegressionEvaluator(
    labelCol="number_of_pickups", predictionCol="prediction", metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)



Root Mean Squared Error (RMSE): 2112.0360787538048


                                                                                