In [20]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("spark_groupby_join") \
    .config("spark.ui.enabled", "true") \
    .getOrCreate()

24/03/08 14:13:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# load data
df_green = spark.read.parquet("/home/labber/dtc-de/WEEK5-PYSPARK/data/green/*")

                                                                                

In [3]:
# register the dataframe as a table
df_green.registerTempTable('green')



In [11]:
df_green_revenue = spark.sql("""
SELECT
    date_trunc('hour', lpep_pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [12]:
df_green_revenue.show(5)



+-------------------+----+------+--------------+
|               hour|zone|amount|number_records|
+-------------------+----+------+--------------+
|2020-01-01 02:00:00| 169|119.18|             4|
|2020-01-01 03:00:00| 193|   5.3|             1|
|2020-01-01 07:00:00|  25|  12.8|             1|
|2020-01-01 11:00:00| 196| 58.04|             4|
|2020-01-01 23:00:00| 241|   9.3|             1|
+-------------------+----+------+--------------+
only showing top 5 rows



                                                                                

In [13]:
df_green_revenue.write.parquet("/home/labber/dtc-de/WEEK5-PYSPARK/data/green_revenue", mode="overwrite")

24/03/08 13:27:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [15]:
df_yellow = spark.read.parquet("/home/labber/dtc-de/WEEK5-PYSPARK/data/yellow/*")
df_yellow.registerTempTable('yellow')




In [16]:
df_yellow_revenue = spark.sql("""
SELECT
    date_trunc('hour', tpep_pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    yellow
WHERE
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [17]:
df_yellow_revenue.show(5)



+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2020-01-01 01:00:00|  93|              11.8|             1|
|2020-01-01 02:00:00| 169|296.90000000000003|             3|
|2020-01-01 03:00:00| 193|             32.22|             5|
|2020-01-01 07:00:00|  25|            128.55|             3|
|2020-01-01 13:00:00| 157|              55.3|             1|
+-------------------+----+------------------+--------------+
only showing top 5 rows



                                                                                

In [18]:
df_yellow_revenue.write.parquet("/home/labber/dtc-de/WEEK5-PYSPARK/data/yellow_revenue", mode="overwrite")

24/03/08 13:44:15 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [24]:
df_green_revenue_temp = df_green_revenue \
    .withColumnRenamed("amount", "green_amount") \
    .withColumnRenamed("number_records", "green_number_records")

df_yellow_revenue_temp = df_yellow_revenue \
    .withColumnRenamed("amount", "yellow_amount") \
    .withColumnRenamed("number_records", "yellow_number_records")

In [25]:
df_join = df_green_revenue_temp.join(df_yellow_revenue_temp, ["hour", "zone"], "outer")

In [26]:
df_join.show(5)



+-------------------+----+------------+--------------------+------------------+---------------------+
|               hour|zone|green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|  45|        NULL|                NULL| 732.4799999999997|                   42|
|2020-01-01 00:00:00|  60|      160.04|                   6|57.620000000000005|                    2|
|2020-01-01 00:00:00|  63|        51.9|                   2|              70.8|                    1|
|2020-01-01 00:00:00|  87|        NULL|                NULL|           2456.67|                  112|
|2020-01-01 00:00:00|  89|        11.3|                   1|             48.16|                    2|
+-------------------+----+------------+--------------------+------------------+---------------------+
only showing top 5 rows



                                                                                

In [28]:
df_join.write.parquet("/home/labber/dtc-de/WEEK5-PYSPARK/data/join_revenue", mode="overwrite")

24/03/08 14:22:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [31]:
df_zones = spark.read.parquet("/home/labber/dtc-de/WEEK5-PYSPARK/zones/*")
df_zones

                                                                                

DataFrame[LocationID: string, Borough: string, Zone: string, service_zone: string]

In [32]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID) \
    .drop(df_zones.LocationID) \
    .drop(df_join.zone) \
    .write.parquet("/home/labber/dtc-de/WEEK5-PYSPARK/data/join_revenue_zones", mode="overwrite")

24/03/08 16:08:15 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                