In [None]:
yellow_taxi_df = spark.sql("SELECT * FROM yellow_tripdata_latest")
green_taxi_df = spark.sql("SELECT * FROM green_tripdata_2024_10")

In [None]:
from pyspark.sql.functions import col


yellow_taxi_df = yellow_taxi_df.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
                               .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

green_taxi_df = green_taxi_df.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
                             .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")


yellow_taxi_df = yellow_taxi_df.select("pickup_datetime", "dropoff_datetime", "PULocationID", "trip_distance", "total_amount")
green_taxi_df = green_taxi_df.select("pickup_datetime", "dropoff_datetime", "PULocationID", "trip_distance", "total_amount")


combined_taxi_df = yellow_taxi_df.unionByName(green_taxi_df)

# Caching the combined dataset for better performance
#combined_taxi_df = combined_taxi_df.cache()

display(combined_taxi_df.limit(5))


pickup_datetime,dropoff_datetime,PULocationID,trip_distance,total_amount
2024-12-01T00:12:27.000Z,2024-12-01T00:31:12.000Z,138,9.76,51.97
2024-11-30T23:56:04.000Z,2024-12-01T00:28:15.000Z,158,7.62,50.76
2024-12-01T00:50:35.000Z,2024-12-01T01:24:46.000Z,132,20.07,82.69
2024-12-01T00:18:16.000Z,2024-12-01T00:33:16.000Z,142,2.34,24.72
2024-12-01T00:56:13.000Z,2024-12-01T01:18:25.000Z,107,5.05,36.8


In [None]:
from pyspark.sql.functions import unix_timestamp, when


combined_taxi_df = combined_taxi_df.withColumn("trip_duration",
                when((col("pickup_datetime").isNotNull()) & (col("dropoff_datetime").isNotNull()),
                     (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60)
                .otherwise(None))


combined_taxi_df = combined_taxi_df.withColumn("trip_speed_mph",
                when(col("trip_duration") > 0, col("trip_distance") / (col("trip_duration") / 60))
                .otherwise(0))

display(combined_taxi_df.select("trip_distance", "trip_duration", "trip_speed_mph").limit(5))


trip_distance,trip_duration,trip_speed_mph
3.0,17.7,10.16949152542373
2.2,13.083333333333334,10.089171974522294
2.7,9.1,17.802197802197803
3.1,10.85,17.142857142857146
0.0,0.2833333333333333,0.0


In [None]:
from pyspark.sql.functions import approx_percentile


percentile_threshold = combined_taxi_df.approxQuantile("total_amount", [0.99], 0.01)[0]


combined_taxi_df = combined_taxi_df.withColumn("is_outlier",
                when(col("total_amount") > percentile_threshold, 1).otherwise(0))


display(combined_taxi_df.filter(col("is_outlier") == 1).limit(5))


pickup_datetime,dropoff_datetime,PULocationID,trip_distance,total_amount,trip_duration,trip_speed_mph,is_outlier


In [None]:

if "Borough" in combined_taxi_df.columns and "Pickup_Zone" in combined_taxi_df.columns:
    combined_taxi_df = combined_taxi_df.drop("Borough", "Pickup_Zone")


zones_df = spark.read.table("taxi_zones")

zones_df = zones_df.withColumn("LocationID", col("LocationID").cast("int"))

combined_taxi_df = combined_taxi_df.join(zones_df, combined_taxi_df["PULocationID"] == zones_df["LocationID"], "left") \
                                   .select(combined_taxi_df["*"], zones_df["Borough"], zones_df["Zone"].alias("Pickup_Zone"))
display(combined_taxi_df.limit(5))



pickup_datetime,dropoff_datetime,PULocationID,trip_distance,total_amount,trip_duration,trip_speed_mph,is_outlier,Borough,Pickup_Zone
2024-10-01T00:30:44.000Z,2024-10-01T00:48:26.000Z,162,3.0,24.9,17.7,10.16949152542373,0,Manhattan,Midtown East
2024-10-01T00:12:20.000Z,2024-10-01T00:25:25.000Z,48,2.2,23.0,13.083333333333334,10.089171974522294,0,Manhattan,Clinton East
2024-10-01T00:04:46.000Z,2024-10-01T00:13:52.000Z,142,2.7,22.2,9.1,17.802197802197803,0,Manhattan,Lincoln Square East
2024-10-01T00:12:10.000Z,2024-10-01T00:23:01.000Z,233,3.1,21.2,10.85,17.142857142857146,0,Manhattan,UN/Turtle Bay South
2024-10-01T00:30:22.000Z,2024-10-01T00:30:39.000Z,262,0.0,8.0,0.2833333333333333,0.0,0,Manhattan,Yorkville East


In [None]:
from pyspark.sql.functions import date_format, count


hourly_trips = combined_taxi_df.groupBy(date_format("pickup_datetime", "yyyy-MM-dd HH").alias("hour")) \
                               .agg(count("*").alias("trip_count"))

daily_trips = combined_taxi_df.groupBy(date_format("pickup_datetime", "yyyy-MM-dd").alias("day")) \
                              .agg(count("*").alias("trip_count"))

display(hourly_trips.limit(10))
display(daily_trips.limit(10))


hour,trip_count
2024-10-17 20,8505
2024-10-08 18,9555
2024-10-28 15,7186
2024-10-09 03,287
2024-10-29 08,6277
2024-10-02 04,328
2024-10-03 20,6593
2024-10-11 02,1006
2024-10-16 10,6189
2024-10-17 07,5067


day,trip_count
2024-10-17,138413
2024-10-03,110629
2024-10-08,123158
2024-10-19,137476
2024-10-16,136995
2024-10-09,131766
2024-10-26,150328
2024-11-01,139463
2024-10-15,130751
2024-10-23,130133


In [None]:
combined_taxi_df

DataFrame[pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, trip_distance: double, total_amount: double, trip_duration: double, trip_speed_mph: double, is_outlier: int, Borough: string, Pickup_Zone: string]

In [None]:
top_revenue_locations = combined_taxi_df.select("Pickup_Zone", "total_amount") \
                                        .groupBy("Pickup_Zone") \
                                        .agg({"total_amount": "sum"}) \
                                        .withColumnRenamed("sum(total_amount)", "total_revenue") \
                                        .orderBy(col("total_revenue").desc()) \
                                        .limit(10)

display(top_revenue_locations)


Pickup_Zone,total_revenue
JFK Airport,39671142.17001904
LaGuardia Airport,22929463.760001924
Midtown Center,12880405.62999951
Upper East Side South,11800999.759998431
Times Sq/Theatre District,10923190.659999494
Upper East Side North,10307896.579999143
Penn Station/Madison Sq West,9485968.739999833
Midtown East,9396751.249999624
Lincoln Square East,8335983.909999904
Midtown North,8291605.059999749


In [None]:
from pyspark.sql.functions import dayofweek
trip_patterns = combined_taxi_df.withColumn("day_of_week", dayofweek("pickup_datetime")) \
                                .withColumn("is_weekend", when(col("day_of_week").isin(1, 7), "Weekend").otherwise("Weekday")) \
                                .groupBy("is_weekend", date_format("pickup_datetime", "yyyy-MM").alias("month")) \
                                .agg(count("*").alias("total_trips")) \
                                .orderBy("month")

display(trip_patterns)

is_weekend,month,total_trips
Weekday,2002-12,1
Weekday,2008-12,4
Weekday,2009-01,4
Weekday,2024-09,16
Weekend,2024-09,1
Weekend,2024-10,1002383
Weekday,2024-10,2887532
Weekday,2024-11,2557512
Weekend,2024-11,1141074
Weekend,2024-12,1062760


In [None]:
combined_taxi_df.printSchema()

root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- trip_speed_mph: double (nullable = true)
 |-- is_outlier: integer (nullable = false)
 |-- Borough: string (nullable = true)
 |-- Pickup_Zone: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Pickup_Zone: string (nullable = true)

