# SQL with Spark

In this notebook, we will learn how to use SQL with Spark. We will again use the `Yellow Taxi` dataset, but this time not only on one month but a whole year and we will use Spark to load the data and then use SQL to query it.

Let's load the data first:

In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types

Download the data:

In [2]:
for month in range(1,4):
    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-0{month}.parquet'
    os.system(f'wget -P ./data/2021 {url}')
    

--2023-07-07 14:17:14--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 143.204.101.20, 143.204.101.175, 143.204.101.63, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|143.204.101.20|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1333519 (1,3M) [binary/octet-stream]
Saving to: ‘./data/2021/green_tripdata_2021-01.parquet.1’

     0K .......... .......... .......... .......... ..........  3% 2,28M 1s
    50K .......... .......... .......... .......... ..........  7% 4,44M 0s
   100K .......... .......... .......... .......... .......... 11% 3,62M 0s
   150K .......... .......... .......... .......... .......... 15% 8,83M 0s
   200K .......... .......... .......... .......... .......... 19% 6,60M 0s
   250K .......... .......... .......... .......... .......... 23% 4,99M 0s
   300K .......... .......... .......... ..........

Create a Spark Session:

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Green Taxi 2021') \
    .getOrCreate()

23/07/07 14:17:41 WARN Utils: Your hostname, Johanns-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.127 instead (on interface en0)
23/07/07 14:17:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/07 14:17:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Load the data, convert the `double` type columns to `float` and partition it:

In [4]:

for month in range(1,4):
    print(f'processing data for 2021/{month}')
    input_path = f'./data/2021/green_tripdata_2021-0{month}.parquet'
    output_path = f'./data/2021/partitioned/green_tripdata_2021-0{month}.parquet'
    
    df = spark.read \
        .parquet(input_path)
    
    # convert the double to float
    for col in df.columns:
        if df.schema[col].dataType == types.DoubleType():
            df = df.withColumn(col, F.col(col).cast('float'))
    
    
    df.repartition(4)\
        .write \
        .parquet(output_path)
        

processing data for 2021/1


                                                                                

processing data for 2021/2


                                                                                

processing data for 2021/3


Now we can import the whole dataset into a Spark DataFrame:

In [5]:
df = spark.read \
    .parquet('./data/2021/partitioned/*')

In [6]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: float (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- payment_type: float (nullable = true)
 |-- trip_type: float (nullable = true)
 |-- congestion_surcharge: float (nullable = true)



How many rows do we have?

In [7]:
df.count()

224917

Nearly 31 million rows! That’s a lot of data. Let’s take a look at the first 5 rows.

In [8]:
df.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-03-17 04:37:00|  2021-03-17 04:47:00|              null|      null|         135|           9|           null|         2.37|      19.34| 2.75|    0.

Now let's rename a few columns:

In [9]:
df = df \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')\
    .withColumnRenamed('PULocationID', 'pickup_location_id')\
    .withColumnRenamed('DOLocationID', 'dropoff_location_id')

To be able to use SQL with Spark, we need to create a temporary table of the DataFrame:

In [11]:
df.registerTempTable('taxi_trips')

And now we can query the data using SQL:

In [12]:
spark.sql("SELECT * FROM taxi_trips LIMIT 5").show()

+--------+-------------------+-------------------+------------------+----------+------------------+-------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|pickup_location_id|dropoff_location_id|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+-------------------+-------------------+------------------+----------+------------------+-------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2|2021-03-17 04:37:00|2021-03-17 04:47:00|              null|      null|               135|                  9|           nul

If we wanted to use SQL for example to create a Revenue per Month Report, we could do it like this:

In [17]:
df_result_monthly = spark.sql("""
SELECT 
    -- Reveneue grouping 
    pickup_location_id AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    taxi_trips
GROUP BY
    1, 2
""")

If we wanted to use SQL for example to create a **Revenue per Day Report**, we could do it like this:

In [30]:
df_result = spark.sql("""
SELECT 
    -- Revenue grouping 
    pickup_location_id AS revenue_zone,
    date_trunc('day', pickup_datetime) AS revenue_day,  -- Change to 'day' for revenue per day

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_daily_fare,
    SUM(extra) AS revenue_daily_extra,
    SUM(mta_tax) AS revenue_daily_mta_tax,
    SUM(tip_amount) AS revenue_daily_tip_amount,
    SUM(tolls_amount) AS revenue_daily_tolls_amount,
    SUM(improvement_surcharge) AS revenue_daily_improvement_surcharge,
    SUM(total_amount) AS revenue_daily_total_amount,
    SUM(congestion_surcharge) AS revenue_daily_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_daily_passenger_count,
    AVG(trip_distance) AS avg_daily_trip_distance
FROM
    taxi_trips
GROUP BY
    1, 2
""")


In [31]:
df_result.show(5)

+------------+-------------------+------------------+-------------------+---------------------+------------------------+--------------------------+-----------------------------------+--------------------------+----------------------------------+-------------------------+-----------------------+
|revenue_zone|        revenue_day|revenue_daily_fare|revenue_daily_extra|revenue_daily_mta_tax|revenue_daily_tip_amount|revenue_daily_tolls_amount|revenue_daily_improvement_surcharge|revenue_daily_total_amount|revenue_daily_congestion_surcharge|avg_daily_passenger_count|avg_daily_trip_distance|
+------------+-------------------+------------------+-------------------+---------------------+------------------------+--------------------------+-----------------------------------+--------------------------+----------------------------------+-------------------------+-----------------------+
|          69|2021-03-10 00:00:00|503.46000480651855|               40.5|                  2.0|      1.279999971

In [28]:
df_result_monthly.show(5)

+------------+-------------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         162|2021-03-01 00:00:00|  1032.8099975585938|   

You can have a look at the [UI](http://localhost:4040/SQL/) to see the SQL query and the execution plan.

We can now export the data into a new parquet file with coalesced partitions (coalescing reduces the number of partitions by moving the data into the minimum number of partitions):

In [32]:
output = './data/report/green_taxi_2021.parquet'

df_result.coalesce(1) \
    .write.parquet(output, mode='overwrite')

                                                                                