In [143]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date

spark = SparkSession.builder \
    .appName("ProcessingTask") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.repositories", "https://packages.confluent.io/maven/") \
    .config("spark.jars.packages", ",".join([
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
        "org.apache.spark:spark-avro_2.12:3.5.0"
    ])) \
    .config("spark.hadoop.fs.s3a.access.key", "telcoaz") \
    .config("spark.hadoop.fs.s3a.secret.key", "Telco12345") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

df = spark.read.parquet("s3a://spark/yellow_tripdata_2024-01.parquet")
# pd_df = df.limit(10).toPandas()

In [None]:
# working with selects
df.selectExpr("tolls_amount").show(5)

+------------+
|tolls_amount|
+------------+
|         0.0|
|         0.0|
|         0.0|
|         0.0|
|         0.0|
+------------+
only showing top 5 rows



In [None]:
#working with selects
df.select("tolls_amount").show(5)

+------------+
|tolls_amount|
+------------+
|         0.0|
|         0.0|
|         0.0|
|         0.0|
|         0.0|
+------------+
only showing top 5 rows



In [None]:
# extracting only date part
from pyspark.sql.functions import to_date
new_df = df.withColumn("new_column", to_date("tpep_pickup_datetime"))

In [84]:
new_df.select("tpep_pickup_datetime", "new_column").show(truncate=False)

+--------------------+----------+
|tpep_pickup_datetime|new_column|
+--------------------+----------+
|2024-01-01 00:57:55 |2024-01-01|
|2024-01-01 00:03:00 |2024-01-01|
|2024-01-01 00:17:06 |2024-01-01|
|2024-01-01 00:36:38 |2024-01-01|
|2024-01-01 00:46:51 |2024-01-01|
|2024-01-01 00:54:08 |2024-01-01|
|2024-01-01 00:49:44 |2024-01-01|
|2024-01-01 00:30:40 |2024-01-01|
|2024-01-01 00:26:01 |2024-01-01|
|2024-01-01 00:28:08 |2024-01-01|
|2024-01-01 00:35:22 |2024-01-01|
|2024-01-01 00:25:00 |2024-01-01|
|2024-01-01 00:35:16 |2024-01-01|
|2024-01-01 00:43:27 |2024-01-01|
|2024-01-01 00:51:53 |2024-01-01|
|2024-01-01 00:50:09 |2024-01-01|
|2024-01-01 00:41:06 |2024-01-01|
|2024-01-01 00:52:09 |2024-01-01|
|2024-01-01 00:56:38 |2024-01-01|
|2024-01-01 00:32:34 |2024-01-01|
+--------------------+----------+
only showing top 20 rows



In [24]:
# To extract pickup time from tpep_pickup_datetime column
df2 = df.selectExpr(
    "tpep_pickup_datetime",
    "to_date(tpep_pickup_datetime) as new_column"
)
df2.show()

+--------------------+----------+
|tpep_pickup_datetime|new_column|
+--------------------+----------+
| 2024-01-01 00:57:55|2024-01-01|
| 2024-01-01 00:03:00|2024-01-01|
| 2024-01-01 00:17:06|2024-01-01|
| 2024-01-01 00:36:38|2024-01-01|
| 2024-01-01 00:46:51|2024-01-01|
| 2024-01-01 00:54:08|2024-01-01|
| 2024-01-01 00:49:44|2024-01-01|
| 2024-01-01 00:30:40|2024-01-01|
| 2024-01-01 00:26:01|2024-01-01|
| 2024-01-01 00:28:08|2024-01-01|
| 2024-01-01 00:35:22|2024-01-01|
| 2024-01-01 00:25:00|2024-01-01|
| 2024-01-01 00:35:16|2024-01-01|
| 2024-01-01 00:43:27|2024-01-01|
| 2024-01-01 00:51:53|2024-01-01|
| 2024-01-01 00:50:09|2024-01-01|
| 2024-01-01 00:41:06|2024-01-01|
| 2024-01-01 00:52:09|2024-01-01|
| 2024-01-01 00:56:38|2024-01-01|
| 2024-01-01 00:32:34|2024-01-01|
+--------------------+----------+
only showing top 20 rows



In [None]:
# Example: Get total passenger count by year with spark.sql 
df.createOrReplaceTempView("trips")
result = spark.sql("""
    SELECT year(tpep_pickup_datetime) AS year, SUM(passenger_count) AS total_passenger_count
    FROM trips
    GROUP BY year
    ORDER BY year
""")

result.show()


In [25]:
from pyspark.sql.functions import year, sum
#group by year and find total_passenger_count by pickup year

df2 = df.withColumn("pickup_year", year("tpep_pickup_datetime"))

# 2. Group by year and sum passenger_count
result = df2.groupBy("pickup_year") \
            .agg(sum("passenger_count").alias("total_passenger_count"))

result.show()



+-----------+---------------------+
|pickup_year|total_passenger_count|
+-----------+---------------------+
|       2023|                   19|
|       2009|                    4|
|       2024|              3782723|
|       2002|                    2|
+-----------+---------------------+



                                                                                

In [27]:
from pyspark.sql.functions import col, avg

# Calculate tip percentage (tip_amount / fare_amount)
df_tip_pct = df.withColumn("tip_percentage", col("tip_amount") / col("fare_amount"))

# Average tip percentage across all trips
df_tip_pct.agg(avg("tip_percentage").alias("average_tip_percentage")).show()

+----------------------+
|average_tip_percentage|
+----------------------+
|   0.22809923154005327|
+----------------------+



In [42]:
# Count payment_types
df.groupBy("payment_type").count().orderBy("count", ascending=False).show()

+------------+-------+
|payment_type|  count|
+------------+-------+
|           1|2319046|
|           2| 439191|
|           0| 140162|
|           4|  46628|
|           3|  19597|
+------------+-------+



In [43]:
from pyspark.sql.functions import hour

# calculate pickup_hour and count which has most 
df.withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
  .groupBy("pickup_hour") \
  .count() \
  .orderBy("pickup_hour") \
  .show()



+-----------+------+
|pickup_hour| count|
+-----------+------+
|          0| 79094|
|          1| 53627|
|          2| 37517|
|          3| 24811|
|          4| 16742|
|          5| 18764|
|          6| 41429|
|          7| 83719|
|          8|117209|
|          9|128970|
|         10|138778|
|         11|150542|
|         12|164559|
|         13|169903|
|         14|182898|
|         15|189359|
|         16|190201|
|         17|206257|
|         18|212788|
|         19|184032|
+-----------+------+
only showing top 20 rows



                                                                                

In [44]:
from pyspark.sql.functions import dayofweek

# dayofweek: 1=Sunday, 2=Monday, ..., 7=Saturday
df.withColumn("pickup_dayofweek", dayofweek("tpep_pickup_datetime")) \
  .groupBy("pickup_dayofweek") \
  .count() \
  .orderBy("pickup_dayofweek") \
  .show()


+----------------+------+
|pickup_dayofweek| count|
+----------------+------+
|               1|339312|
|               2|408277|
|               3|463664|
|               4|495032|
|               5|428593|
|               6|408588|
|               7|421158|
+----------------+------+



                                                                                

In [56]:
from pyspark.sql.functions import unix_timestamp,col
#to find trip duration in munutes
df_duration = df.withColumn(
    "trip_duration_min",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0
)
df_duration.select("tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_duration_min").show(50)

+--------------------+---------------------+-------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|  trip_duration_min|
+--------------------+---------------------+-------------------+
| 2024-01-01 00:57:55|  2024-01-01 01:17:43|               19.8|
| 2024-01-01 00:03:00|  2024-01-01 00:09:36|                6.6|
| 2024-01-01 00:17:06|  2024-01-01 00:35:01| 17.916666666666668|
| 2024-01-01 00:36:38|  2024-01-01 00:44:56|                8.3|
| 2024-01-01 00:46:51|  2024-01-01 00:52:57|                6.1|
| 2024-01-01 00:54:08|  2024-01-01 01:26:31|  32.38333333333333|
| 2024-01-01 00:49:44|  2024-01-01 01:15:47|              26.05|
| 2024-01-01 00:30:40|  2024-01-01 00:58:40|               28.0|
| 2024-01-01 00:26:01|  2024-01-01 00:54:12| 28.183333333333334|
| 2024-01-01 00:28:08|  2024-01-01 00:29:16| 1.1333333333333333|
| 2024-01-01 00:35:22|  2024-01-01 00:41:41|  6.316666666666666|
| 2024-01-01 00:25:00|  2024-01-01 00:34:03|               9.05|
| 2024-01-01 00:35:16|  2

In [57]:
from pyspark.sql.functions import (unix_timestamp, col)

#to find average speed in mph
df_speed = df.withColumn(
    "trip_duration_hr",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 3600.0
).withColumn(
    "avg_speed_mph",
    col("trip_distance") / col("trip_duration_hr")
)

df_speed.select("trip_distance", "trip_duration_hr", "avg_speed_mph").show(50)

+-------------+--------------------+------------------+
|trip_distance|    trip_duration_hr|     avg_speed_mph|
+-------------+--------------------+------------------+
|         1.72|                0.33| 5.212121212121212|
|          1.8|                0.11|16.363636363636363|
|          4.7|  0.2986111111111111|15.739534883720932|
|          1.4| 0.13833333333333334|10.120481927710843|
|          0.8| 0.10166666666666667| 7.868852459016393|
|          4.7|  0.5397222222222222| 8.708183221821926|
|        10.82| 0.43416666666666665| 24.92130518234165|
|          3.0|  0.4666666666666667| 6.428571428571429|
|         5.44|  0.4697222222222222|11.581312832643407|
|         0.04| 0.01888888888888889|2.1176470588235294|
|         0.75| 0.10527777777777778|  7.12401055408971|
|          1.2| 0.15083333333333335| 7.955801104972375|
|          8.2|                0.61|13.442622950819672|
|          0.4| 0.06222222222222222| 6.428571428571429|
|          0.8| 0.06388888888888888|12.521739130

In [58]:
from pyspark.sql.functions import sum as spark_sum

# In order to find total_passengers per day, month, year
# Per Day
df.withColumn("pickup_date", df.tpep_pickup_datetime.cast("date")) \
  .groupBy("pickup_date") \
  .agg(spark_sum("passenger_count").alias("total_passengers")) \
  .orderBy("pickup_date") \
  .show()

# Per Month
df.withColumn("pickup_month", month("tpep_pickup_datetime")) \
  .groupBy("pickup_month") \
  .agg(spark_sum("passenger_count").alias("total_passengers")) \
  .orderBy("pickup_month") \
  .show()

# Per Year
df.withColumn("pickup_year", year("tpep_pickup_datetime")) \
  .groupBy("pickup_year") \
  .agg(spark_sum("passenger_count").alias("total_passengers")) \
  .orderBy("pickup_year") \
  .show()

                                                                                

+-----------+----------------+
|pickup_date|total_passengers|
+-----------+----------------+
| 2002-12-31|               2|
| 2009-01-01|               4|
| 2023-12-31|              19|
| 2024-01-01|          109339|
| 2024-01-02|          104215|
| 2024-01-03|          111171|
| 2024-01-04|          136003|
| 2024-01-05|          137311|
| 2024-01-06|          133978|
| 2024-01-07|           92647|
| 2024-01-08|          100966|
| 2024-01-09|          110213|
| 2024-01-10|          116838|
| 2024-01-11|          130953|
| 2024-01-12|          134404|
| 2024-01-13|          144345|
| 2024-01-14|          128141|
| 2024-01-15|          101132|
| 2024-01-16|          111709|
| 2024-01-17|          131284|
+-----------+----------------+
only showing top 20 rows



                                                                                

+------------+----------------+
|pickup_month|total_passengers|
+------------+----------------+
|           1|         3782720|
|           2|               7|
|          12|              21|
+------------+----------------+





+-----------+----------------+
|pickup_year|total_passengers|
+-----------+----------------+
|       2002|               2|
|       2009|               4|
|       2023|              19|
|       2024|         3782723|
+-----------+----------------+



                                                                                

In [49]:
# to find which vender has highest rides
df.groupBy("VendorID").count().orderBy("VendorID").show()



+--------+-------+
|VendorID|  count|
+--------+-------+
|       1| 729732|
|       2|2234632|
|       6|    260|
+--------+-------+



                                                                                

In [50]:
from pyspark.sql.functions import avg

# to find average_fare_per_trip
df.agg(avg("fare_amount").alias("avg_fare_per_trip")).show()

+------------------+
| avg_fare_per_trip|
+------------------+
|18.175061916792536|
+------------------+



In [51]:
from pyspark.sql.functions import col, avg

# Calculate tip percentage (tip_amount / fare_amount)
df_tip_pct = df.withColumn("tip_percentage", col("tip_amount") / col("fare_amount"))

# Average tip percentage across all trips
df_tip_pct.agg(avg("tip_percentage").alias("average_tip_percentage")).show()

+----------------------+
|average_tip_percentage|
+----------------------+
|   0.22809923154005327|
+----------------------+



                                                                                

In [52]:
# find passenger count per solo or group 
df.groupBy("passenger_count") \
  .count() \
  .orderBy("passenger_count") \
  .show()

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|           NULL| 140162|
|              0|  31465|
|              1|2188739|
|              2| 405103|
|              3|  91262|
|              4|  51974|
|              5|  33506|
|              6|  22353|
|              7|      8|
|              8|     51|
|              9|      1|
+---------------+-------+



In [53]:
from pyspark.sql.functions import when, count

df_behavior = df.withColumn(
    "ride_type",
    when(df.passenger_count == 1, "solo").otherwise("group")
)

df_behavior.groupBy("ride_type").count().show()

+---------+-------+
|ride_type|  count|
+---------+-------+
|    group| 775885|
|     solo|2188739|
+---------+-------+



                                                                                

Cache and Persist

cache()
Shortcut for persist() with the default storage level: MEMORY_ONLY.

Keeps your DataFrame/RDD in RAM only.

If data doesn’t fit in memory, Spark will recompute partitions that don’t fit when needed.

persist()
More flexible: lets you choose how and where your data is stored.

You can use:

MEMORY_ONLY (just like cache)

MEMORY_AND_DISK (if RAM is full, spill to disk)

DISK_ONLY, MEMORY_ONLY_SER, etc.

In [79]:
df = spark.read.parquet("s3a://spark/yellow_tripdata_2024-01.parquet")

# Expensive operation: group by and sum
start = time.time()
df.groupBy("payment_type").sum("fare_amount").show()
print("Without cache:", time.time() - start)

# With cache
df_cached = df.cache()
df_cached.count()  # Triggers caching
start = time.time()
df_cached.groupBy("payment_type").sum("fare_amount").show()
print("With cache:", time.time() - start)

+------------+-------------------+
|payment_type|   sum(fare_amount)|
+------------+-------------------+
|           1|4.303553892000613E7|
|           3| 132330.08999999988|
|           2|  7846602.789999888|
|           4|  62243.18999999998|
|           0| 2805509.7700004894|
+------------+-------------------+

Without cache: 0.2492225170135498


25/07/07 13:14:33 WARN CacheManager: Asked to cache already cached data.


+------------+-------------------+
|payment_type|   sum(fare_amount)|
+------------+-------------------+
|           1|4.303553892000613E7|
|           3| 132330.08999999988|
|           2|  7846602.789999888|
|           4|  62243.18999999998|
|           0| 2805509.7700004894|
+------------+-------------------+

With cache: 0.45485997200012207


PartitionBy and write to S3

In [139]:
df_repart = df.withColumn("pickup_date", to_date("tpep_pickup_datetime"))

In [140]:
df_repart.select("pickup_date").show()



+-----------+
|pickup_date|
+-----------+
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
| 2024-01-01|
+-----------+
only showing top 20 rows



                                                                                

In [None]:
# Write partitioned data to s3
df_repart.write \
    .partitionBy("pickup_date") \
    .mode("overwrite") \
    .parquet("s3a://spark/")

                                                                                

In [157]:
# reading only one partition 
partition_path = "s3a://spark/pickup_date=2024-01-06"
df_one_partition = spark.read.parquet(partition_path)
df_one_partition.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-06 19:30:22|  2024-01-06 19:40:17|              1|         1.48|         1|                 N|         114|         231|           1|       11.4|  0.0|    0.5|      2.7

Spark UDF vs Pandas UDF

In [115]:
# Spark UDF example
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def trip_type(distance):
    return "short" if distance < 3 else "long"

trip_type_udf = udf(trip_type, StringType())

df_with_type = df.withColumn("trip_type", trip_type_udf(df["trip_distance"]))
df_with_type.select("trip_distance", "trip_type").show(10)




+-------------+---------+
|trip_distance|trip_type|
+-------------+---------+
|         1.72|    short|
|          1.8|    short|
|          4.7|     long|
|          1.4|    short|
|          0.8|    short|
|          4.7|     long|
|        10.82|     long|
|          3.0|     long|
|         5.44|     long|
|         0.04|    short|
+-------------+---------+
only showing top 10 rows



                                                                                

In [None]:
!pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-20.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (42.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.3/42.3 MB[0m [31m43.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-20.0.0
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
#pandas udf example
# add pip install pyarrow on workers as well......
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

@pandas_udf(StringType())
def trip_type_pandas_udf(distance_series):
    return distance_series.apply(lambda d: "short" if d < 3 else "long")

df_with_type = df.withColumn("trip_type", trip_type_pandas_udf(df["trip_distance"]))
df_with_type.select("trip_distance", "trip_type").show(10)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2

Why Pandas UDFs are Faster than Regular UDFs
1. Execution Model
Regular (Scalar) UDFs:
Process one row at a time.
Spark sends each row from the JVM to Python, applies your function, and sends the result back.

This is slow because there are millions of rows and lots of cross-language (JVM–Python) communication!

Pandas UDFs (Vectorized UDFs):
Process a whole batch of rows at once using Pandas Series (vectorized, column-wise).
Spark sends big batches (as Arrow tables) to Python, applies your function to the entire batch, and sends all results back at once.

This is much faster because it uses efficient in-memory Arrow serialization and reduces communication overhead!

coalesce and repartition

In [None]:
#repartition 
df_repart = df.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
df_repart = df_repart.repartition(15)
print("Partitions after repartition:", df_repart.rdd.getNumPartitions()) 

df_repart.write \
    .partitionBy("pickup_date") \
    .mode("overwrite") \
    .parquet("s3a://spark/")

In [None]:
# repartition by column
df_by_date = df_repart.repartition(10, "pickup_date")
print("Number of partitions:", df_by_date.rdd.getNumPartitions())

df_by_date.write \
    .partitionBy("pickup_date") \
    .mode("overwrite") \
    .parquet("s3a://spark/")

In [None]:
#coalesce
df_coalesce = df.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
df_coalesce = df_coalesce.coalesce(2)
print("Num partitions:", df_coalesce.rdd.getNumPartitions()) 

df_coalesce.write \
    .partitionBy("pickup_date") \
    .mode("overwrite") \
    .parquet("s3a://spark/")
	

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Start Spark session
spark = SparkSession.builder.appName("Flatten Nested JSON").getOrCreate()

# Read the JSON file with schema
schema_str = """
id BIGINT,
name STRING,
address STRUCT<city:STRING, zipcode:STRING>,
orders ARRAY<STRUCT<order_id:STRING, items:ARRAY<STRING>, total:DOUBLE>>,
metadata STRUCT<preferences:STRUCT<newsletter:BOOLEAN, language:STRING>, last_login:STRING>
"""

df = spark.read \
    .schema(schema_str) \
    .option("multiLine", "true") \
    .json("input.json")  # <-- Replace with your JSON file path

# Step 1: Explode the `orders` array
df_exploded_orders = df.select(
    "id",
    "name",
    col("address.city").alias("city"),
    col("address.zipcode").alias("zipcode"),
    explode("orders").alias("order"),
    col("metadata.preferences.newsletter").alias("newsletter"),
    col("metadata.preferences.language").alias("language"),
    col("metadata.last_login").alias("last_login")
)

# Step 2: Explode the `items` array inside each order
df_final = df_exploded_orders.select(
    "id",
    "name",
    "city",
    "zipcode",
    col("order.order_id").alias("order_id"),
    explode("order.items").alias("item"),  # Explode each item
    col("order.total").alias("total"),
    "newsletter",
    "language",
    "last_login"
)

# Show the final flattened DataFrame
df_final.show(truncate=False)
