In [17]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [18]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [19]:
df_green.registerTempTable('green')

In [20]:
df_green_revenue = spark.sql("""
SELECT 
    -- Revenue grouping 
    date_trunc("hour", lpep_pickup_datetime) AS hour,
    PULocationID AS zone,


    SUM(total_amount) AS total_amount,
    COUNT(1) AS count_records
FROM 
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
    AND
    lpep_pickup_datetime < '2022-01-01 00:00:00'
GROUP BY 
    1,2
;
""")

In [21]:
df_green_revenue.show()



+-------------------+----+------------------+-------------+
|               hour|zone|      total_amount|count_records|
+-------------------+----+------------------+-------------+
|2020-01-31 11:00:00| 220|            168.08|            6|
|2020-01-04 16:00:00| 196|210.44000000000005|           10|
|2020-01-04 20:00:00|  25|369.57000000000005|           23|
|2020-01-29 14:00:00| 203|             43.26|            2|
|2020-01-02 13:00:00|  74|1037.6299999999994|           71|
|2020-01-23 22:00:00|  35|             36.11|            2|
|2020-01-26 12:00:00|  35|            257.03|            8|
|2020-01-22 10:00:00|  74|1179.0599999999988|           83|
|2020-01-10 22:00:00| 216|            181.89|            5|
|2020-01-24 10:00:00| 260|            149.63|           10|
|2020-01-24 05:00:00| 133|             19.78|            1|
|2020-01-03 13:00:00| 250|              27.6|            2|
|2020-01-16 01:00:00|   7|             110.1|            9|
|2020-01-02 09:00:00|  66|             2

                                                                                

In [22]:
df_green_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/green', mode='overwrite')

                                                                                

In [23]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')
df_yellow.registerTempTable('yellow')

In [24]:
df_yellow_revenue = spark.sql("""
SELECT 
    -- Revenue grouping 
    date_trunc("hour", tpep_pickup_datetime) AS hour,
    PULocationID AS zone,


    SUM(total_amount) AS total_amount,
    COUNT(1) AS count_records
FROM 
    yellow
WHERE
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
    AND
    tpep_pickup_datetime < '2022-01-01 00:00:00'
GROUP BY 
    1,2
;
""")

In [25]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow', mode='overwrite')

                                                                                

In [26]:
df_yellow_revenue.show()



+-------------------+----+------------------+-------------+
|               hour|zone|      total_amount|count_records|
+-------------------+----+------------------+-------------+
|2020-01-29 18:00:00| 229| 5427.470000000005|          304|
|2020-01-24 15:00:00| 264| 2249.289999999999|           93|
|2020-01-21 18:00:00| 162|14090.389999999996|          816|
|2020-01-18 00:00:00| 144| 2951.300000000001|          174|
|2020-01-17 03:00:00| 132|2031.0099999999995|           46|
|2020-01-23 19:00:00| 224|327.28000000000003|           16|
|2020-01-06 08:00:00|  43| 2765.499999999998|          169|
|2020-01-04 11:00:00| 140| 1662.959999999999|          112|
|2020-01-04 23:00:00|  87| 744.2199999999999|           36|
|2020-01-26 17:00:00| 107| 4251.760000000004|          274|
|2020-01-31 13:00:00| 140| 4380.170000000002|          277|
|2020-01-31 21:00:00|  50|1784.4399999999994|          104|
|2020-01-08 08:00:00| 249| 2433.119999999999|          165|
|2020-01-17 08:00:00|  13|2948.720000000

                                                                                

In [27]:
df_green_revenue = spark.read.parquet('data/report/revenue/green')
df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow')

In [28]:
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('total_amount','green_total_amount') \
    .withColumnRenamed('count_records','green_count_records')

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('total_amount','yellow_total_amount') \
    .withColumnRenamed('count_records','yellow_count_records')

In [29]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')

In [30]:
df_join.show()



+-------------------+----+------------------+-------------------+-------------------+--------------------+
|               hour|zone|green_total_amount|green_count_records|yellow_total_amount|yellow_count_records|
+-------------------+----+------------------+-------------------+-------------------+--------------------+
|2020-01-01 00:00:00|   3|              null|               null|               25.0|                   1|
|2020-01-01 00:00:00|   4|              null|               null|             1004.3|                  57|
|2020-01-01 00:00:00|   7| 769.7299999999996|                 45|  455.1700000000002|                  38|
|2020-01-01 00:00:00|  12|              null|               null| 106.99999999999999|                   6|
|2020-01-01 00:00:00|  37|            175.67|                  6| 161.60999999999999|                   7|
|2020-01-01 00:00:00|  40|            168.98|                  8|  89.97000000000001|                   5|
|2020-01-01 00:00:00|  45|           

                                                                                

In [31]:
df_join.write.parquet('data/report/revenue/total', mode='overwrite')

                                                                                

In [34]:
df_join = spark.read.parquet('data/report/revenue/total')

In [35]:
df_join.show()

+-------------------+----+------------------+-------------------+-------------------+--------------------+
|               hour|zone|green_total_amount|green_count_records|yellow_total_amount|yellow_count_records|
+-------------------+----+------------------+-------------------+-------------------+--------------------+
|2020-01-01 00:00:00|  14|              null|               null|                8.8|                   1|
|2020-01-01 00:00:00|  15|              null|               null|              34.09|                   1|
|2020-01-01 00:00:00|  17|195.02999999999997|                  9| 220.20999999999998|                   8|
|2020-01-01 00:00:00|  25|             531.0|                 26|             324.35|                  16|
|2020-01-01 00:00:00|  32| 68.94999999999999|                  2|               18.0|                   1|
|2020-01-01 00:00:00|  43|            107.52|                  6|  6539.510000000013|                 390|
|2020-01-01 00:00:00|  49|266.7600000

In [36]:
df_zones = spark.read.parquet('zones/')

In [37]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [38]:
df_join

DataFrame[hour: timestamp, zone: int, green_total_amount: double, green_count_records: bigint, yellow_total_amount: double, yellow_count_records: bigint]

In [39]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)

In [41]:
df_result.drop('LocationID').show()

+-------------------+----+------------------+-------------------+-------------------+--------------------+---------+--------------------+------------+
|               hour|zone|green_total_amount|green_count_records|yellow_total_amount|yellow_count_records|  Borough|                Zone|service_zone|
+-------------------+----+------------------+-------------------+-------------------+--------------------+---------+--------------------+------------+
|2020-01-01 00:00:00|  14|              null|               null|                8.8|                   1| Brooklyn|           Bay Ridge|   Boro Zone|
|2020-01-01 00:00:00|  15|              null|               null|              34.09|                   1|   Queens|Bay Terrace/Fort ...|   Boro Zone|
|2020-01-01 00:00:00|  17|195.02999999999997|                  9| 220.20999999999998|                   8| Brooklyn|             Bedford|   Boro Zone|
|2020-01-01 00:00:00|  25|             531.0|                 26|             324.35|         

In [43]:
df_result.drop('LocationID','zone').write.parquet('tmp/revenue-zones')

                                                                                