In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/03 11:25:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/03 11:25:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/03 11:25:17 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/03 11:25:17 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/05/03 11:25:17 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/05/03 11:25:17 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


In [3]:
df_green = spark.read.parquet('../datasets/data/pq/green/*/*')

                                                                                

In [4]:
df_green.createOrReplaceTempView('green')

In [5]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM 
    green
WHERE 
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [6]:
df_green_revenue \
    .repartition(20) \
    .write.parquet('../datasets/data/report/revenue/green', mode='overwrite')

                                                                                

In [7]:
df_yellow = spark.read.parquet('../datasets/data/pq/yellow/*/*')

In [8]:
df_yellow.createOrReplaceTempView('yellow')

In [9]:
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', tpep_pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM 
    yellow
WHERE 
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [10]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('../datasets/data/report/revenue/yellow', mode='overwrite')

                                                                                

In [11]:
df_green_revenue_temp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_temp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')    

In [12]:
df_join = df_green_revenue_temp.join(df_yellow_revenue_temp, on=['hour', 'zone'], how='outer')

In [13]:
df_join.write.parquet('../datasets/data/report/revenue/total')

                                                                                

In [14]:
df_join

DataFrame[hour: timestamp, zone: int, green_amount: double, green_number_records: bigint, yellow_amount: double, yellow_number_records: bigint]

In [15]:
df_zones = spark.read.parquet('../datasets/zones/')

In [16]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)

In [17]:
df_result.drop('LocationID', 'zone').write.parquet('../datasets/tmp/revenue-zones')

                                                                                

In [18]:
df_result.show()

                                                                                

+-------------------+----+------------------+--------------------+------------------+---------------------+----------+---------+--------------------+------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|LocationID|  Borough|                Zone|service_zone|
+-------------------+----+------------------+--------------------+------------------+---------------------+----------+---------+--------------------+------------+
|2020-01-01 00:00:00|   3|              NULL|                NULL|              25.0|                    1|         3|    Bronx|Allerton/Pelham G...|   Boro Zone|
|2020-01-01 00:00:00|   4|              NULL|                NULL|1004.3000000000002|                   57|         4|Manhattan|       Alphabet City| Yellow Zone|
|2020-01-01 00:00:00|   7| 769.7299999999996|                  45| 455.1700000000001|                   38|         7|   Queens|             Astoria|   Boro Zone|
|2020-01-01 00:00:00| 