# SQL in Spark

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master(
    "local[*]"
).appName(
    "test"
).getOrCreate()

24/09/25 20:31:51 WARN Utils: Your hostname, DESKTOP-CSJ1S7Q resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/09/25 20:31:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/09/25 20:31:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/25 20:31:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/25 20:31:53 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
df_green = spark.read.parquet("data/raw/green/*/*/")
df_yellow = spark.read.parquet("data/raw/yellow/*/*/")

df_yellow.show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2020-01-01 01:28:15|  2020-01-01 01:33:03|            1.0|          1.2|       1.0|                 N|         238|         239|           1|        6.0|  3.0|    0.5|      1.4

                                                                                

In [4]:
# rename columns
df_green = df_green.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

df_yellow = df_yellow.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

In [5]:
common_columns = [column for column in df_green.columns if column in df_yellow.columns]
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [6]:
from pyspark.sql import functions as F

In [7]:
df_green_sel = df_green.select(common_columns).withColumn("service_type", F.lit("green"))
df_yellow_sel = df_yellow.select(common_columns).withColumn("service_type", F.lit("yellow"))

In [8]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)
df_trips_data.groupBy("service_type").count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2802931|
|      yellow|55553400|
+------------+--------+



                                                                                

In [9]:
df_trips_data.createOrReplaceTempView("trips_data")

In [10]:
spark.sql("""
SELECT * FROM trips_data LIMIT 1
""").show()

                                                                                

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2019-12-18 16:52:30|2019-12-18 16:54:39|                 N|       1.0|         264|         264|            5.0|          0.0|        3.5|  0.5|    0.5|      0.01|         0.0|       

In [11]:
spark.sql("""
SELECT 
    service_type,
    count(1)
FROM trips_data
GROUP BY
    service_type
""").show()



+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2802931|
|      yellow|55553400|
+------------+--------+



                                                                                

In [12]:
query = """
select 
    -- Revenue grouping 
    PULocationID as revenue_zone,
    date_trunc("month", "pickup_datetime") as revenue_month,
    service_type, 

    -- Revenue calculation 
    sum(fare_amount) as revenue_monthly_fare,
    sum(extra) as revenue_monthly_extra,
    sum(mta_tax) as revenue_monthly_mta_tax,
    sum(tip_amount) as revenue_monthly_tip_amount,
    sum(tolls_amount) as revenue_monthly_tolls_amount,
    sum(improvement_surcharge) as revenue_monthly_improvement_surcharge,
    sum(total_amount) as revenue_monthly_total_amount,
    sum(congestion_surcharge) as revenue_monthly_congestion_surcharge,

    -- Additional calculations
    avg(passenger_count) as avg_monthly_passenger_count,
    avg(trip_distance) as avg_monthly_trip_distance

    from trips_data
    group by 1,2,3
"""

df_result = spark.sql(query)

In [13]:
df_result.show(5)



+------------+-------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|revenue_zone|revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_monthly_passenger_count|avg_monthly_trip_distance|
+------------+-------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|         124|         null|   

                                                                                

In [14]:
df_result.coalesce(1).write.parquet("data/report/revenue/", mode="overwrite")

                                                                                