Manan Pujara

Data source : https://www.kaggle.com/datasets/gauravpathak1789/yellow-tripdata-2020-01

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("week8app") \
    .master("local[*]") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/13 21:19:11 WARN Utils: Your hostname, Dexter, resolves to a loopback address: 127.0.1.1; using 10.20.67.112 instead (on interface wlo1)
25/07/13 21:19:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/13 21:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

## Load CSV into PySpark DataFrame

**I'm using local notebook + pyspark library having issue with databricks account (and also i m out of free credits and subscription for azure databricks)**

- so for databricks goto catalog and there you can choose workspace and add schema or in existing schema you can create volume then upload csv then copy path and mount it 
image for better understanding 

<center><img src="./image.png" height="500" width="1000"/></center>

In [7]:
from pyspark.sql.functions import *


In [None]:
# Load the CSV
df = spark.read.option("header", True).option("inferSchema", True)\
    .csv("./yellow_tripdata_2020-01 (1).csv")

df.printSchema()
df.show(5)



                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+---

## Clean/Prepare the Data

In [15]:
columns_to_cast = ['fare_amount', 'extra', 'mta_tax', 'tip_amount',
                'tolls_amount', 'improvement_surcharge', 'total_amount']

for col_name in columns_to_cast:
    df = df.withColumn(col_name, col(col_name).cast("double"))

df = df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
       .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))

df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



## Flatten JSON fields 

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, col

# Defining Json Field for example 
schema = StructType([
    StructField("trip_id", IntegerType()),
    StructField("driver", StructType([
        StructField("name", StringType()),
        StructField("license", StringType())
    ])),
    StructField("fare", StructType([
        StructField("amount", DoubleType()),
        StructField("tip", DoubleType())
    ]))
])

# simulating json field for example here 
json_data = [
    ('{"trip_id":101,"driver":{"name":"Manan Pujara","license":"XYZ1234"},"fare":{"amount":25.0,"tip":3.0}}',)
]

df_raw = spark.createDataFrame(json_data, ["json_col"])


In [None]:
# Extracting nested fields 
df_flat = df_raw.withColumn("parsed", from_json(col("json_col"), schema))\
    .select(
        col("parsed.trip_id"),
        col("parsed.driver.name").alias("driver_name"),
        col("parsed.driver.license").alias("driver_license"),
        col("parsed.fare.amount").alias("fare_amount"),
        col("parsed.fare.tip").alias("fare_tip")
    )

df_flat.show(truncate=False)


+-------+------------+--------------+-----------+--------+
|trip_id|driver_name |driver_license|fare_amount|fare_tip|
+-------+------------+--------------+-----------+--------+
|101    |Manan Pujara|XYZ1234       |25.0       |3.0     |
+-------+------------+--------------+-----------+--------+



## Query 1 : Add Revenue Column

In [None]:
# making revenue column with addition of columns : fare_amount, extra, mta_tax, improvement_surcharge, tip_amount, tolls_amount, total_amount
df = df.withColumn("Revenue", 
    col("fare_amount") + col("extra") + col("mta_tax") + 
    col("improvement_surcharge") + col("tip_amount") + 
    col("tolls_amount") + col("total_amount")
)

df.select("fare_amount", "extra", "tip_amount", "total_amount", "Revenue").show(5)


+-----------+-----+----------+------------+-------+
|fare_amount|extra|tip_amount|total_amount|Revenue|
+-----------+-----+----------+------------+-------+
|        6.0|  3.0|      1.47|       11.27|  22.54|
|        7.0|  3.0|       1.5|        12.3|   24.6|
|        6.0|  3.0|       1.0|        10.8|   21.6|
|        5.5|  0.5|      1.36|        8.16|  16.32|
|        3.5|  0.5|       0.0|         4.8|    9.6|
+-----------+-----+----------+------------+-------+
only showing top 5 rows


## Query 2 : Passenger Count by Area

In [None]:
df.groupBy("PULocationID")\
  .agg(sum("passenger_count").alias("Total_Passengers"))\
  .orderBy(desc("Total_Passengers")).show(10)




+------------+----------------+
|PULocationID|Total_Passengers|
+------------+----------------+
|         237|          433243|
|         161|          425986|
|         236|          403347|
|         230|          360096|
|         162|          351011|
|         186|          338952|
|         132|          326402|
|          48|          297148|
|         142|          294502|
|         170|          289593|
+------------+----------------+
only showing top 10 rows


                                                                                

## Query 3 : Real-time Average Fare/Earning by Vendor

In [19]:
# finding average fare and total_amount using avg() aggregate function and group by with vendor ID
df.groupBy("VendorID")\
  .agg(avg("fare_amount").alias("Avg_Fare"),
       avg("total_amount").alias("Avg_Total_Earning"))\
  .orderBy("VendorID").show()


[Stage 11:>                                                       (0 + 12) / 12]

+--------+------------------+------------------+
|VendorID|          Avg_Fare| Avg_Total_Earning|
+--------+------------------+------------------+
|    NULL|31.996404547606964|37.217091425863046|
|       1|12.231264768274265|18.113429405184878|
|       2| 12.62490775307597|18.648347164036302|
+--------+------------------+------------------+



                                                                                

## Query 4 : Moving Count of Payments by Payment Mode


In [21]:
from pyspark.sql.window import Window

payment_window = Window.partitionBy("payment_type")\
                       .orderBy("tpep_pickup_datetime")\
                       .rowsBetween(-10, 0)

df.withColumn("Moving_Count", count("*").over(payment_window))\
  .select("payment_type", "tpep_pickup_datetime", "Moving_Count")\
  .orderBy("tpep_pickup_datetime").show(10)




+------------+--------------------+------------+
|payment_type|tpep_pickup_datetime|Moving_Count|
+------------+--------------------+------------+
|           2| 2003-01-01 00:07:17|           1|
|           1| 2008-12-31 23:02:40|           1|
|           1| 2008-12-31 23:02:50|           2|
|           1| 2008-12-31 23:03:44|           3|
|           1| 2008-12-31 23:03:48|           4|
|           2| 2008-12-31 23:06:13|           2|
|           2| 2008-12-31 23:17:15|           3|
|           2| 2008-12-31 23:24:11|           4|
|           1| 2008-12-31 23:34:13|           5|
|           2| 2008-12-31 23:35:00|           5|
+------------+--------------------+------------+
only showing top 10 rows


                                                                                

## Query 5 : Top 2 Gaining Vendors on a Specific Date

In [22]:
specific_date = "2020-01-15"

df.filter(to_date("tpep_pickup_datetime") == specific_date)\
  .groupBy("VendorID")\
  .agg(
      sum("total_amount").alias("Total_Revenue"),
      sum("passenger_count").alias("Total_Passengers"),
      sum("trip_distance").alias("Total_Distance")
  )\
  .orderBy(desc("Total_Revenue"))\
  .limit(2)\
  .show()




+--------+------------------+----------------+------------------+
|VendorID|     Total_Revenue|Total_Passengers|    Total_Distance|
+--------+------------------+----------------+------------------+
|       2| 2700441.549999132|          233339| 410271.7600000014|
|       1|1319816.5300006857|           82508|190960.49999999945|
+--------+------------------+----------------+------------------+



                                                                                

## Query 6 : Route With Most Passengers

In [23]:
df.groupBy("PULocationID", "DOLocationID")\
  .agg(sum("passenger_count").alias("Passenger_Count"))\
  .orderBy(desc("Passenger_Count"))\
  .limit(1)\
  .show()




+------------+------------+---------------+
|PULocationID|DOLocationID|Passenger_Count|
+------------+------------+---------------+
|         237|         236|          67885|
+------------+------------+---------------+



                                                                                

## Query 7 : Top Pickup Locations in Last 10 Seconds

In [24]:
latest_time = df.agg(max("tpep_pickup_datetime")).first()[0]
window_start = latest_time - expr("INTERVAL 10 seconds")

df.filter((col("tpep_pickup_datetime") > window_start) & (col("tpep_pickup_datetime") <= latest_time))\
  .groupBy("PULocationID")\
  .agg(sum("passenger_count").alias("Recent_Passengers"))\
  .orderBy(desc("Recent_Passengers"))\
  .show()


[Stage 26:>                                                       (0 + 12) / 12]

+------------+-----------------+
|PULocationID|Recent_Passengers|
+------------+-----------------+
|          90|                1|
+------------+-----------------+



                                                                                

## Save as External Parquet file

In [25]:
df.write.mode("overwrite").parquet("yellow_taxi_2020_01_parquet/")

df_parquet = spark.read.parquet("yellow_taxi_2020_01_parquet/")
df_parquet.show()


25/07/13 21:36:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/13 21:36:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/13 21:36:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/07/13 21:36:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/07/13 21:36:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/07/13 21:36:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/07/13 21:36:31 WARN MemoryManager: Total allocation exceeds 95.

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|           Revenue|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|              1|          1.2|         1|                 N|         238|         239|           1|        6.0|  

## Extra Query : What hour of the day has the most earnings?

In [26]:
df.withColumn("hour", hour("tpep_pickup_datetime"))\
  .groupBy("hour")\
  .agg(sum("total_amount").alias("Total_Earnings"))\
  .orderBy(desc("Total_Earnings"))\
  .show()




+----+------------------+
|hour|    Total_Earnings|
+----+------------------+
|  18| 8139717.280000629|
|  17| 7664537.440000109|
|  19| 7303387.380000077|
|  16| 7169013.939999765|
|  15| 7121991.509999728|
|  14| 6932949.289999597|
|  21| 6681671.489999872|
|  20| 6662722.759999678|
|  13| 6350768.439999287|
|  12| 6051112.649999132|
|  22| 6047689.889999723|
|  11| 5476375.819998952|
|   9| 5375311.279999266|
|   8| 5322249.309999476|
|  10|  5292959.88999899|
|  23|4540154.6599994255|
|   7| 4253480.069999366|
|   0| 3326239.589999602|
|   6|  2446269.25999986|
|   1|2241891.5899997917|
+----+------------------+
only showing top 20 rows


                                                                                