In [0]:
from pyspark.sql.functions import col,year,quarter,month,weekday,dayofmonth,to_date,to_timestamp,hour,avg
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql import SparkSession

In [0]:
spark=SparkSession\
    .builder\
    .appName("nyc_taxi_trip_analysis")\
    .getOrCreate()

In [0]:
taxi_schema = StructType([
    StructField("VendorID", StringType(), True),
    StructField("lpep_pickup_datetime", StringType(), True),
    StructField("lpep_dropoff_datetime", StringType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("RatecodeID", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("trip_type", IntegerType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])


In [0]:
df=spark.read.format("csv")\
    .option("header",True)\
    .schema(taxi_schema)\
    .load("dbfs:/FileStore/tables/taxi_tripdata.csv")

In [0]:
display(df.limit(20))

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
1,01-07-2021 00:30,01-07-2021 00:35,N,1,74,168,1,1.2,6.0,0.5,0.5,0.0,0.0,0.3,7.3,2,1,0.0
2,01-07-2021 00:25,01-07-2021 01:01,N,1,116,265,2,13.69,42.0,0.5,0.5,0.0,0.0,0.3,43.3,2,1,0.0
2,01-07-2021 00:05,01-07-2021 00:12,N,1,97,33,1,0.95,6.5,0.5,0.5,2.34,0.0,0.3,10.14,1,1,0.0
2,01-07-2021 00:41,01-07-2021 00:47,N,1,74,42,1,1.24,6.5,0.5,0.5,0.0,0.0,0.3,7.8,2,1,0.0
2,01-07-2021 00:51,01-07-2021 00:58,N,1,42,244,1,1.1,7.0,0.5,0.5,0.0,0.0,0.3,8.3,2,1,0.0
1,01-07-2021 00:05,01-07-2021 00:11,N,1,24,239,1,1.9,8.0,3.25,0.5,3.0,0.0,0.3,15.05,1,1,2.75
2,01-07-2021 00:57,01-07-2021 01:27,N,1,75,243,1,0.0,17.5,0.5,0.5,0.0,0.0,0.3,18.8,2,1,0.0
2,01-07-2021 00:27,01-07-2021 00:32,N,1,82,82,1,0.66,5.0,0.5,0.5,0.0,0.0,0.3,6.3,2,1,0.0
2,01-07-2021 00:29,01-07-2021 00:34,N,1,74,42,1,1.72,7.0,0.5,0.5,2.08,0.0,0.3,10.38,1,1,0.0
2,01-07-2021 00:41,01-07-2021 00:49,N,1,41,42,1,1.37,7.5,0.5,0.5,0.0,0.0,0.3,8.8,2,1,0.0


In [0]:
df_new=df.withColumn("lpep_pickup_datetime",to_timestamp("lpep_pickup_datetime", "dd-MM-yyyy HH:mm"))
df_new=df_new.withColumn("lpep_dropoff_datetime",to_timestamp("lpep_dropoff_datetime", "dd-MM-yyyy HH:mm"))

In [0]:
df_new=df_new.withColumnRenamed("lpep_pickup_datetime","Pickup_dateTime")\
     .withColumnRenamed("lpep_dropoff_datetime","Drop_dateTime")\
     .withColumnRenamed("store_and_fwd_flag","store_and_forward")

In [0]:
df_new=df_new.withColumn("PickUp_hour",hour("Pickup_dateTime"))\
    .withColumn("Month",month("Pickup_dateTime"))\
    .withColumn("Date",to_date("Pickup_dateTime"))
display(df_new.limit(100))

VendorID,Pickup_dateTime,Drop_dateTime,store_and_forward,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,PickUp_hour,Month,Date
1,2021-07-01T00:30:00Z,2021-07-01T00:35:00Z,N,1,74,168,1,1.2,6.0,0.5,0.5,0.0,0.0,0.3,7.3,2,1,0.0,0,7,2021-07-01
2,2021-07-01T00:25:00Z,2021-07-01T01:01:00Z,N,1,116,265,2,13.69,42.0,0.5,0.5,0.0,0.0,0.3,43.3,2,1,0.0,0,7,2021-07-01
2,2021-07-01T00:05:00Z,2021-07-01T00:12:00Z,N,1,97,33,1,0.95,6.5,0.5,0.5,2.34,0.0,0.3,10.14,1,1,0.0,0,7,2021-07-01
2,2021-07-01T00:41:00Z,2021-07-01T00:47:00Z,N,1,74,42,1,1.24,6.5,0.5,0.5,0.0,0.0,0.3,7.8,2,1,0.0,0,7,2021-07-01
2,2021-07-01T00:51:00Z,2021-07-01T00:58:00Z,N,1,42,244,1,1.1,7.0,0.5,0.5,0.0,0.0,0.3,8.3,2,1,0.0,0,7,2021-07-01
1,2021-07-01T00:05:00Z,2021-07-01T00:11:00Z,N,1,24,239,1,1.9,8.0,3.25,0.5,3.0,0.0,0.3,15.05,1,1,2.75,0,7,2021-07-01
2,2021-07-01T00:57:00Z,2021-07-01T01:27:00Z,N,1,75,243,1,0.0,17.5,0.5,0.5,0.0,0.0,0.3,18.8,2,1,0.0,0,7,2021-07-01
2,2021-07-01T00:27:00Z,2021-07-01T00:32:00Z,N,1,82,82,1,0.66,5.0,0.5,0.5,0.0,0.0,0.3,6.3,2,1,0.0,0,7,2021-07-01
2,2021-07-01T00:29:00Z,2021-07-01T00:34:00Z,N,1,74,42,1,1.72,7.0,0.5,0.5,2.08,0.0,0.3,10.38,1,1,0.0,0,7,2021-07-01
2,2021-07-01T00:41:00Z,2021-07-01T00:49:00Z,N,1,41,42,1,1.37,7.5,0.5,0.5,0.0,0.0,0.3,8.8,2,1,0.0,0,7,2021-07-01


In [0]:
#Busiest Pickup Locations
display(df_new.groupBy("PULocationID").count().orderBy(col("count").desc()).limit(10))

PULocationID,count
74,8770
75,7713
41,4761
42,3229
95,2486
166,2391
244,2278
7,1819
97,1754
82,1544


In [0]:
#Busiest Drop-off Locations
display(df_new.groupBy("DOLocationID").count().orderBy(col("count").desc()).limit(10))

DOLocationID,count
74,3666
75,3122
42,2904
41,2527
236,1700
238,1546
166,1443
244,1283
263,1231
61,1173


In [0]:
# Peak Travel Hours
display(df_new.groupBy('pickup_hour').count().orderBy(col("COUNT").desc()).limit(10))

pickup_hour,count
10,6096
11,6092
9,5798
12,5766
15,5744
14,5626
13,5534
16,5280
18,5183
17,5166


In [0]:
# Average Fare
display(df.selectExpr("AVG(fare_amount) as avg_fare"))

avg_fare
20.388304596670345


In [0]:
df_new

DataFrame[VendorID: string, Pickup_dateTime: timestamp, Drop_dateTime: timestamp, store_and_forward: string, RatecodeID: string, PULocationID: int, DOLocationID: int, passenger_count: int, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, payment_type: string, trip_type: int, congestion_surcharge: double, PickUp_hour: int, Month: int, Date: date]

In [0]:
# Payment Types
# 1= Credit card
# 2= Cash
# 3= No charge
# 4= Dispute
# 5= Unknown
# 6= Voided trip
display((df_new.groupBy("payment_type").count().orderBy(col("count").desc())).filter(df_new.payment_type !="Null"))

payment_type,count
1,29990
2,20831
3,307
4,44
5,1


In [0]:
#Which Payment Method Has the Highest Tips
# Tip amount – This field is automatically populated for credit card tips. 
# Cash tips are not included.
display((df_new.groupBy("payment_type").agg(avg("tip_amount").alias("avg_tip")).orderBy(col("avg_tip").desc())).filter(df_new.payment_type !="Null"))

payment_type,avg_tip
1,2.354012004001369
5,0.0
4,0.0
2,0.0
3,-0.0349837133550488


In [0]:
display(df_new.selectExpr("AVG(tolls_amount) AS avg_tolls", "AVG(total_amount) AS avg_total")
)

avg_tolls,avg_total
0.6245289218673238,24.204836362337524


In [0]:
display(df_new.selectExpr("AVG(fare_amount/trip_distance) as avg_fare_perMile"))

avg_fare_perMile
10.780987848331591


In [0]:
#most profitable short-distance trips
display(df_new.select(col("trip_distance"),col("fare_amount")).orderBy(col("fare_amount").desc()).where((col("trip_distance")<2 )& (col("trip_distance")>0)).limit(10))

trip_distance,fare_amount
0.37,160.0
1.85,140.0
0.03,140.0
0.37,113.0
1.27,91.0
0.22,82.95
0.06,80.0
0.1,76.0
0.01,75.93
0.35,74.01
