In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .getOrCreate()


print("Spark session created!")


Spark session created!


In [3]:
from google.colab import files
uploaded = files.upload()


Saving yellow_tripdata_2023-01.parquet to yellow_tripdata_2023-01.parquet


In [6]:

df = spark.read.parquet("yellow_tripdata_2023-01.parquet")
df.show(5)
df.printSchema()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.


 Query 1. - Add a column named as ""Revenue"" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'

In [7]:
from pyspark.sql.functions import col, round


df_with_revenue = df.withColumn(
    "Revenue",
    round(
        col("fare_amount") +
        col("extra") +
        col("mta_tax") +
        col("improvement_surcharge") +
        col("tip_amount") +
        col("tolls_amount"),
        2
    )
)

df_with_revenue.select(
    "fare_amount", "extra", "mta_tax", "improvement_surcharge",
    "tip_amount", "tolls_amount", "Revenue"
).show(10)


+-----------+-----+-------+---------------------+----------+------------+-------+
|fare_amount|extra|mta_tax|improvement_surcharge|tip_amount|tolls_amount|Revenue|
+-----------+-----+-------+---------------------+----------+------------+-------+
|        9.3|  1.0|    0.5|                  1.0|       0.0|         0.0|   11.8|
|        7.9|  1.0|    0.5|                  1.0|       4.0|         0.0|   14.4|
|       14.9|  1.0|    0.5|                  1.0|      15.0|         0.0|   32.4|
|       12.1| 7.25|    0.5|                  1.0|       0.0|         0.0|  20.85|
|       11.4|  1.0|    0.5|                  1.0|      3.28|         0.0|  17.18|
|       12.8|  1.0|    0.5|                  1.0|      10.0|         0.0|   25.3|
|       12.1|  1.0|    0.5|                  1.0|      3.42|         0.0|  18.02|
|       45.7|  1.0|    0.5|                  1.0|     10.74|         3.0|  61.94|
|       17.7|  1.0|    0.5|                  1.0|      5.68|         0.0|  25.88|
|       14.9|  1

Query 2. - Increasing count of total passengers in New York City by area

In [8]:
from pyspark.sql.functions import sum as _sum

area_passenger_count = df_with_revenue.groupBy("PULocationID") \
    .agg(_sum("passenger_count").alias("Total_Passengers")) \
    .orderBy("Total_Passengers", ascending=False)

area_passenger_count.show(10)


+------------+----------------+
|PULocationID|Total_Passengers|
+------------+----------------+
|         132|        228407.0|
|         237|        192476.0|
|         161|        181236.0|
|         236|        180238.0|
|         186|        143349.0|
|         230|        142150.0|
|         162|        137405.0|
|         142|        134096.0|
|         138|        119617.0|
|         239|        115799.0|
+------------+----------------+
only showing top 10 rows



Query 3. - Realtime Average fare/total earning amount earned by 2 vendors

In [9]:
from pyspark.sql.functions import avg, sum as _sum


vendor_stats = df_with_revenue.groupBy("VendorID") \
    .agg(
        avg("fare_amount").alias("Average_Fare"),
        _sum("total_amount").alias("Total_Earnings")
    ) \
    .orderBy("Total_Earnings", ascending=False)

vendor_stats.show()


+--------+------------------+-------------------+
|VendorID|      Average_Fare|     Total_Earnings|
+--------+------------------+-------------------+
|       2|  18.7133974919165|6.155633746991244E7|
|       1|17.429674993078745|2.130885475000763E7|
+--------+------------------+-------------------+



Query 4. - Moving Count of payments made by each payment mode

In [10]:
from pyspark.sql.functions import window

payment_window_df = df_with_revenue.groupBy(
    window("tpep_pickup_datetime", "1 hour"),
    "payment_type"
).count().orderBy("window")

payment_window_df.show(10, truncate=False)


+------------------------------------------+------------+-----+
|window                                    |payment_type|count|
+------------------------------------------+------------+-----+
|{2008-12-31 23:00:00, 2009-01-01 00:00:00}|2           |2    |
|{2022-10-24 17:00:00, 2022-10-24 18:00:00}|2           |1    |
|{2022-10-24 20:00:00, 2022-10-24 21:00:00}|1           |1    |
|{2022-10-24 21:00:00, 2022-10-24 22:00:00}|1           |1    |
|{2022-10-24 23:00:00, 2022-10-25 00:00:00}|1           |1    |
|{2022-10-25 00:00:00, 2022-10-25 01:00:00}|2           |2    |
|{2022-10-25 03:00:00, 2022-10-25 04:00:00}|1           |1    |
|{2022-10-25 07:00:00, 2022-10-25 08:00:00}|1           |1    |
|{2022-10-25 09:00:00, 2022-10-25 10:00:00}|2           |1    |
|{2022-10-25 11:00:00, 2022-10-25 12:00:00}|1           |1    |
+------------------------------------------+------------+-----+
only showing top 10 rows



Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

In [11]:
from pyspark.sql.functions import to_date

filtered_df = df_with_revenue.filter(
    to_date("tpep_pickup_datetime") == "2023-01-15"
)

top_vendors = filtered_df.groupBy("VendorID").agg(
    _sum("total_amount").alias("Total_Earnings"),
    _sum("passenger_count").alias("Total_Passengers"),
    _sum("trip_distance").alias("Total_Trip_Distance")
).orderBy("Total_Earnings", ascending=False)

top_vendors.show(2)


+--------+------------------+----------------+-------------------+
|VendorID|    Total_Earnings|Total_Passengers|Total_Trip_Distance|
+--------+------------------+----------------+-------------------+
|       2|1982371.8499999905|        106011.0|   452629.900000001|
|       1| 625710.4699999961|         31557.0|  80744.90000000077|
+--------+------------------+----------------+-------------------+



Query 6. - Most no of passenger between a route of two location.

In [12]:
from pyspark.sql.functions import concat_ws

df_with_routes = df_with_revenue.withColumn(
    "Route",
    concat_ws(" → ", df_with_revenue["PULocationID"], df_with_revenue["DOLocationID"])
)


top_routes = df_with_routes.groupBy("Route").agg(
    _sum("passenger_count").alias("Total_Passengers")
).orderBy("Total_Passengers", ascending=False)

top_routes.show(10, truncate=False)


+---------+----------------+
|Route    |Total_Passengers|
+---------+----------------+
|237 → 236|29549.0         |
|236 → 237|24978.0         |
|264 → 264|22053.0         |
|236 → 236|19682.0         |
|237 → 237|19091.0         |
|161 → 237|12148.0         |
|237 → 161|11890.0         |
|142 → 239|11200.0         |
|161 → 236|11063.0         |
|239 → 238|10935.0         |
+---------+----------------+
only showing top 10 rows



Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.

In [13]:
from pyspark.sql.functions import max as _max, col
from pyspark.sql.functions import unix_timestamp

max_time_row = df_with_revenue.select(_max("tpep_pickup_datetime").alias("max_time")).collect()[0]
max_time = max_time_row["max_time"]

from datetime import timedelta
lower_bound = max_time - timedelta(seconds=10)

recent_trips = df_with_revenue.filter(
    (col("tpep_pickup_datetime") > lower_bound) & (col("tpep_pickup_datetime") <= max_time)
)

top_recent_locations = recent_trips.groupBy("PULocationID") \
    .agg(_sum("passenger_count").alias("Total_Passengers")) \
    .orderBy("Total_Passengers", ascending=False)


top_recent_locations.show(10)


+------------+----------------+
|PULocationID|Total_Passengers|
+------------+----------------+
|         162|             1.0|
+------------+----------------+

