## Instanciating Spark

Import the PySpark library and the `SparkSession` class

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName("spark_sql_groupby_join").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/23 09:02:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Loading raw data into a Spark DataFrame

In [3]:
df_green = spark.read.parquet("data/raw/green/*/*")

                                                                                

In [4]:
df_green.show()

                                                                                

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2019-12-18 15:52:30|  2019-12-18 15:54:39|                 N|       1.0|         264|         264|            5.0|          0.0|        3.5|  0.5|    0.

Now that we have loaded our sample data (i.e. green taxi data for 2020 and 2021) for this exercise, let's proceed with another `sql` query, but similar to the previous exercise, that involves a `groupby` statement.

## Registering a temporary table

In [5]:
# Firstly, we need to always create a temp table to be queried on - sql queries cannot be queried on a spark dataframe
df_green.registerTempTable("green")



## Group By

In [6]:
# Secondly, we write a query that breaks down the revenue as well as the number of trips by hour by zone
df_green_revenue = spark.sql(
    """
SELECT
    EXTRACT(HOUR FROM lpep_pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) as revenue,
    COUNT(1) as number_records
FROM green
WHERE lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY 1,2
ORDER BY 1,2
"""
)

In [7]:
df_green_revenue.show()

[Stage 2:>                                                          (0 + 4) / 4]

+----+----+------------------+--------------+
|hour|zone|           revenue|number_records|
+----+----+------------------+--------------+
|   0|   3|            386.14|            11|
|   0|   4|             74.31|             2|
|   0|   5|            179.12|             3|
|   0|   7| 23819.25999999975|          1754|
|   0|   8|             10.79|             1|
|   0|   9|            187.71|             5|
|   0|  10|            805.69|            23|
|   0|  11|            378.27|            10|
|   0|  13|             61.55|             1|
|   0|  14|1721.9299999999998|            36|
|   0|  15|            289.06|             5|
|   0|  16|484.18000000000006|            15|
|   0|  17| 4357.509999999998|           220|
|   0|  18|2221.7199999999993|            72|
|   0|  19|            280.63|             8|
|   0|  20|1001.5400000000001|            54|
|   0|  21| 941.6099999999999|            23|
|   0|  22|1592.9499999999998|            58|
|   0|  23|176.10000000000002|    

                                                                                

In [8]:
# Lastly, lets write the output of our query into a parquet file
df_green_revenue.repartition(2).write.parquet('data/report/revenue_green', mode='overwrite')

                                                                                

In [9]:
# Repeat same steps for yellow taxi data

df_yellow = spark.read.parquet('data/raw/yellow/*/*')

In [10]:
df_yellow.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|            1.0|          1.2|       1.0|                 N|         238|         239|           1|        6.0|  3.0|    0.5|      1.4

In [11]:
df_yellow.registerTempTable("yellow")

In [12]:
df_yellow_revenue = spark.sql(
    """
SELECT
    EXTRACT(HOUR FROM tpep_pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) as revenue,
    COUNT(1) as number_records
FROM yellow
WHERE tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY 1,2
ORDER BY 1,2
""")

In [13]:
# Lastly, lets write the output of our query into a parquet file
df_yellow_revenue.repartition(2).write.parquet('data/report/revenue_yellow', mode='overwrite')

                                                                                

## Joining two large tables

In [14]:
df_green_revenue.tail(10)

                                                                                

[Row(hour=23, zone=256, revenue=11064.33000000002, number_records=542),
 Row(hour=23, zone=257, revenue=387.83000000000004, number_records=17),
 Row(hour=23, zone=258, revenue=591.5699999999999, number_records=19),
 Row(hour=23, zone=259, revenue=773.63, number_records=25),
 Row(hour=23, zone=260, revenue=16333.959999999955, number_records=1180),
 Row(hour=23, zone=261, revenue=43.0, number_records=1),
 Row(hour=23, zone=262, revenue=299.31, number_records=11),
 Row(hour=23, zone=263, revenue=758.98, number_records=38),
 Row(hour=23, zone=264, revenue=2582.669999999999, number_records=147),
 Row(hour=23, zone=265, revenue=3716.8499999999995, number_records=77)]

In [15]:
# Changing column names before joining with yellow table
df_green_revenue_tmp = df_green_revenue.withColumnsRenamed(
    {"revenue":"green_revenue", "number_records": "green_number_records"})

In [16]:
df_green_revenue_tmp.show()



+----+----+------------------+--------------------+
|hour|zone|     green_revenue|green_number_records|
+----+----+------------------+--------------------+
|   0|   3|            386.14|                  11|
|   0|   4|             74.31|                   2|
|   0|   5|            179.12|                   3|
|   0|   7| 23819.25999999975|                1754|
|   0|   8|             10.79|                   1|
|   0|   9|            187.71|                   5|
|   0|  10|            805.69|                  23|
|   0|  11|            378.27|                  10|
|   0|  13|             61.55|                   1|
|   0|  14|1721.9299999999998|                  36|
|   0|  15|            289.06|                   5|
|   0|  16|484.18000000000006|                  15|
|   0|  17| 4357.509999999998|                 220|
|   0|  18|2221.7199999999993|                  72|
|   0|  19|            280.63|                   8|
|   0|  20|1001.5400000000001|                  54|
|   0|  21| 

                                                                                

In [17]:
# Repeating same steps for yellow revenue table
df_yellow_revenue_tmp = df_yellow_revenue.withColumnsRenamed(
    {"revenue":"yellow_revenue", "number_records": "yellow_number_records"}
)

df_yellow_revenue_tmp.show()



+----+----+------------------+---------------------+
|hour|zone|    yellow_revenue|yellow_number_records|
+----+----+------------------+---------------------+
|   0|   1|1648.3899999999999|                   17|
|   0|   3|           1619.92|                   32|
|   0|   4|  95819.5299999997|                 5077|
|   0|   5|            386.54|                    5|
|   0|   6|            571.27|                    8|
|   0|   7| 32001.82000000005|                 1959|
|   0|   8|332.08000000000004|                   10|
|   0|   9|            668.61|                   15|
|   0|  10|19588.309999999994|                  371|
|   0|  11| 776.5699999999999|                   19|
|   0|  12|3124.8500000000004|                  121|
|   0|  13|58995.669999999955|                 2527|
|   0|  14|2967.6699999999996|                   88|
|   0|  15|            436.25|                   11|
|   0|  16|1016.6800000000001|                   29|
|   0|  17|13222.679999999997|                

                                                                                

In [18]:
df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=["hour","zone"], how='outer').show()



+----+----+------------------+--------------------+------------------+---------------------+
|hour|zone|     green_revenue|green_number_records|    yellow_revenue|yellow_number_records|
+----+----+------------------+--------------------+------------------+---------------------+
|   0|   1|              NULL|                NULL|1648.3899999999999|                   17|
|   0|   3|            386.14|                  11|           1619.92|                   32|
|   0|   4|             74.31|                   2|  95819.5299999997|                 5077|
|   0|   5|            179.12|                   3|            386.54|                    5|
|   0|   6|              NULL|                NULL|            571.27|                    8|
|   0|   7| 23819.25999999975|                1754| 32001.82000000005|                 1959|
|   0|   8|             10.79|                   1|332.08000000000004|                   10|
|   0|   9|            187.71|                   5|            668.61|

                                                                                

In [19]:
df_merged = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=["hour","zone"], how='outer')


In [20]:
# Viewing the data type 
df_merged

DataFrame[hour: int, zone: bigint, green_revenue: double, green_number_records: bigint, yellow_revenue: double, yellow_number_records: bigint]

In [21]:
df_merged.show()



+----+----+------------------+--------------------+------------------+---------------------+
|hour|zone|     green_revenue|green_number_records|    yellow_revenue|yellow_number_records|
+----+----+------------------+--------------------+------------------+---------------------+
|   0|   1|              NULL|                NULL|1648.3899999999999|                   17|
|   0|   3|            386.14|                  11|           1619.92|                   32|
|   0|   4|             74.31|                   2|  95819.5299999997|                 5077|
|   0|   5|            179.12|                   3|            386.54|                    5|
|   0|   6|              NULL|                NULL|            571.27|                    8|
|   0|   7| 23819.25999999975|                1754| 32001.82000000005|                 1959|
|   0|   8|             10.79|                   1|332.08000000000004|                   10|
|   0|   9|            187.71|                   5|            668.61|

                                                                                

In [22]:
# lets save this output
df_merged.write.parquet("data/report/revenue/total", mode='overwrite')

                                                                                

## Joining a large table and a small table

In [23]:
# Loading zone table as the small table for this exercise
df_zones = spark.read.parquet("zones/*")

In [24]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [25]:
# Joining both tables
df_result = df_merged.join(
    df_zones, df_merged.zone == df_zones.LocationID
    ).drop("LocationID","zone").withColumnsRenamed(
    {"Borough":"pickup_Borough", "service_zone": "pickup_service_zone"}
)

In [26]:
df_result.show()



+----+------------------+--------------------+------------------+---------------------+--------------+-------------------+
|hour|     green_revenue|green_number_records|    yellow_revenue|yellow_number_records|pickup_Borough|pickup_service_zone|
+----+------------------+--------------------+------------------+---------------------+--------------+-------------------+
|   0|              NULL|                NULL|1648.3899999999999|                   17|           EWR|                EWR|
|   0|            386.14|                  11|           1619.92|                   32|         Bronx|          Boro Zone|
|   0|             74.31|                   2|  95819.5299999997|                 5077|     Manhattan|        Yellow Zone|
|   0|            179.12|                   3|            386.54|                    5| Staten Island|          Boro Zone|
|   0|              NULL|                NULL|            571.27|                    8| Staten Island|          Boro Zone|
|   0| 23819.259

                                                                                

In [27]:
# Now to wriite it out as a .parquet file
df_result.write.parquet('tmp/revenue-zones', mode='overwrite')

                                                                                