In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, hour, avg, col


In [3]:
spark = SparkSession.builder.appName("NYC Taxi Trip Analysis").getOrCreate()

In [10]:
df = spark.read.csv("/content/FILE.csv", header=True, inferSchema=True)

In [11]:
df = df.withColumn("pickup_datetime", to_timestamp("pickup_datetime", "dd-MM-yyyy HH:mm"))
df = df.withColumn("dropoff_datetime", to_timestamp("dropoff_datetime", "dd-MM-yyyy HH:mm"))


In [12]:
df = df.withColumn("pickup_hour", hour("pickup_datetime"))


In [None]:
# 1. Average Trip Duration per Hour

In [13]:
avg_duration_by_hour = df.groupBy("pickup_hour") \
    .agg(avg("trip_duration").alias("avg_trip_duration")) \
    .orderBy("pickup_hour")

avg_duration_by_hour.show()

+-----------+------------------+
|pickup_hour| avg_trip_duration|
+-----------+------------------+
|          0|  902.366786393408|
|          1|  868.113301942319|
|          2|  772.278376139188|
|          3| 811.1673984632272|
|          4|1100.7176470588236|
|          5| 765.0469992769342|
|          6| 698.1391391391392|
|          7| 803.3330645161291|
|          8| 944.8384030418251|
|          9| 942.5535122597747|
|         10| 952.2411603158256|
|         11| 942.5669562366825|
|         12| 971.8299694189602|
|         13|  986.724535554132|
|         14| 1031.148948948949|
|         15|1125.0550742574258|
|         16|1086.0443105281815|
|         17|1064.8161903380242|
|         18|1042.1610730034297|
|         19| 895.6161195492406|
+-----------+------------------+
only showing top 20 rows



In [None]:
# 2. Passenger Count Distribution

In [17]:
passenger_dist = df.groupBy("passenger_count") \
    .count() \
    .orderBy("passenger_count")
passenger_dist.show()

+---------------+-----+
|passenger_count|count|
+---------------+-----+
|              0|    1|
|              1|92171|
|              2|18813|
|              3| 5281|
|              4| 2544|
|              5| 6989|
|              6| 4243|
+---------------+-----+



In [None]:
# 3. Store-and-Forward Flag Distribution

In [18]:
df.groupBy("store_and_fwd_flag").count().show()

+------------------+------+
|store_and_fwd_flag| count|
+------------------+------+
|                 Y|   769|
|                 N|129273|
+------------------+------+



In [None]:
# 4. Repartitioning and Caching to Demonstrate Scalability

In [19]:
df_repartitioned = df.repartition(8, "pickup_hour")
df_repartitioned.cache()

DataFrame[id: string, vendor_id: int, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: int, pickup_longitude: double, pickup_latitude: double, dropoff_longitude: double, dropoff_latitude: double, store_and_fwd_flag: string, trip_duration: int, pickup_hour: int]

In [None]:
# 5. Top 5 Longest Trips by Duration

In [20]:
top_longest = df.orderBy(col("trip_duration").desc()).select(
    "id", "pickup_datetime", "dropoff_datetime", "trip_duration").limit(5)
top_longest.show()

+---------+-------------------+-------------------+-------------+
|       id|    pickup_datetime|   dropoff_datetime|trip_duration|
+---------+-------------------+-------------------+-------------+
|id0953667|2016-05-06 00:00:00|2016-05-07 00:00:00|        86390|
|id2837671|2016-06-30 16:37:00|2016-07-01 16:37:00|        86387|
|id3782820|2016-05-12 13:48:00|2016-05-13 13:47:00|        86378|
|id2307896|2016-05-14 04:48:00|2016-05-15 04:47:00|        86377|
|id2198959|2016-02-22 09:06:00|2016-02-23 09:05:00|        86358|
+---------+-------------------+-------------------+-------------+



In [None]:
#  6. Repartition by pickup hour

In [22]:
df_repartitioned.cache()

avg_duration_partitioned = df_repartitioned.groupBy("pickup_hour") \
    .agg(avg("trip_duration").alias("avg_duration"))

avg_duration_partitioned.show()

+-----------+------------------+
|pickup_hour|      avg_duration|
+-----------+------------------+
|         13|  986.724535554132|
|         12| 971.8299694189602|
|         14| 1031.148948948949|
|         18|1042.1610730034297|
|         17|1064.8161903380242|
|         23| 912.5974237730311|
|          9| 942.5535122597747|
|         16|1086.0443105281815|
|          6| 698.1391391391392|
|         10| 952.2411603158256|
|          5| 765.0469992769342|
|         11| 942.5669562366825|
|          7| 803.3330645161291|
|          3| 811.1673984632272|
|          1|  868.113301942319|
|         19| 895.6161195492406|
|         20| 907.3041037294479|
|         15|1125.0550742574258|
|         22| 883.1973018549747|
|          2|  772.278376139188|
+-----------+------------------+
only showing top 20 rows



In [24]:
avg_duration_partitioned.orderBy("avg_duration").show()

+-----------+------------------+
|pickup_hour|      avg_duration|
+-----------+------------------+
|          6| 698.1391391391392|
|          5| 765.0469992769342|
|          2|  772.278376139188|
|          7| 803.3330645161291|
|          3| 811.1673984632272|
|         21| 852.6107329842932|
|          1|  868.113301942319|
|         22| 883.1973018549747|
|         19| 895.6161195492406|
|          0|  902.366786393408|
|         20| 907.3041037294479|
|         23| 912.5974237730311|
|          9| 942.5535122597747|
|         11| 942.5669562366825|
|          8| 944.8384030418251|
|         10| 952.2411603158256|
|         12| 971.8299694189602|
|         13|  986.724535554132|
|         14| 1031.148948948949|
|         18|1042.1610730034297|
+-----------+------------------+
only showing top 20 rows



In [None]:
#  7. Compare Timings

In [25]:
import time

start = time.time()
df.groupBy("pickup_hour").agg(avg("trip_duration")).show()
print("Time without caching:", time.time() - start)

df.cache()

start = time.time()
df.groupBy("pickup_hour").agg(avg("trip_duration")).show()
print("Time with caching:", time.time() - start)

df_partitioned = df.repartition(12, "pickup_hour")
df_partitioned.cache()

start = time.time()
df_partitioned.groupBy("pickup_hour").agg(avg("trip_duration")).show()
print("Time with repartitioning and caching:", time.time() - start)


+-----------+------------------+
|pickup_hour|avg(trip_duration)|
+-----------+------------------+
|         12| 971.8299694189602|
|         22| 883.1973018549747|
|          1|  868.113301942319|
|         13|  986.724535554132|
|         16|1086.0443105281815|
|          6| 698.1391391391392|
|          3| 811.1673984632272|
|         20| 907.3041037294479|
|          5| 765.0469992769342|
|         19| 895.6161195492406|
|         15|1125.0550742574258|
|         17|1064.8161903380242|
|          9| 942.5535122597747|
|          4|1100.7176470588236|
|          8| 944.8384030418251|
|         23| 912.5974237730311|
|          7| 803.3330645161291|
|         10| 952.2411603158256|
|         21| 852.6107329842932|
|         11| 942.5669562366825|
+-----------+------------------+
only showing top 20 rows

Time without caching: 1.0353631973266602
+-----------+------------------+
|pickup_hour|avg(trip_duration)|
+-----------+------------------+
|         12| 971.8299694189602|
|        