In [88]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("spark://localhost:7077") \
    .appName("test") \
    .getOrCreate()

In [89]:
df_green = spark.read.parquet('data/raw/green/*/*')

In [90]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [41]:
df_yellow = spark.read.parquet('data/raw/yellow/*/*')

In [42]:
df_yellow.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-01-01 00:35:40|  2022-01-01 00:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.6

In [8]:
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [15]:
df_green = df_green.withColumnRenamed('lpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime','dropoff_datetime')

In [16]:
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime','dropoff_datetime')

In [17]:
df_yellow.show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1|2022-01-01 00:35:40|2022-01-01 00:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.65|         0

In [18]:
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [19]:
commonCols = []
yellowCols = set(df_yellow.columns)
for col in df_green.columns:
    if col in yellowCols:
        commonCols.append(col)

In [20]:
from pyspark.sql import functions as F

In [21]:
greenSelect = df_green.select(commonCols).withColumn('service_type',F.lit('green'))
yellowSelect = df_yellow.select(commonCols).withColumn('service_type',F.lit('yellow'))

In [22]:
dfTripsData = greenSelect.unionAll(yellowSelect)

In [23]:
dfTripsData.groupBy('service_type').count().show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green|  501368|
|      yellow|22991977|
+------------+--------+



In [24]:
dfTripsData.createOrReplaceTempView('tripsData')

In [25]:
spark.sql("""SELECT service_type, count(service_type) FROM tripsData group by service_type;""").show()

+------------+-------------------+
|service_type|count(service_type)|
+------------+-------------------+
|       green|             501368|
|      yellow|           22991977|
+------------+-------------------+



In [72]:
df_green.createOrReplaceTempView('green')
df_yellow.createOrReplaceTempView('yellow')

In [73]:
dfYellowRev = spark.sql("""
SELECT date_trunc('hour',tpep_pickup_datetime) as hour, PULocationID as zone, sum(total_amount) as amount, count(1) as number_records
from yellow
where tpep_pickup_datetime >= '2022-01-01 00:00:00'
group by hour, zone""")

dfGreenRev = spark.sql("""
SELECT date_trunc('hour',lpep_pickup_datetime) as hour, PULocationID as zone, sum(total_amount) as amount, count(1) as number_records
from green
where lpep_pickup_datetime >= '2022-01-01 00:00:00'
group by hour, zone""")

In [74]:
df_greenT = dfGreenRev.withColumnRenamed('amount','green_amount').withColumnRenamed('number_records','green_records')
df_yellowT = dfYellowRev.withColumnRenamed('amount','yellow_amount').withColumnRenamed('number_records','yellow_records')


In [75]:
dfYellowGreen = df_yellowT.join(df_greenT,on=['hour','zone'],how='outer')

In [76]:
dfYellowGreen.show()

+-------------------+----+------------------+--------------+------------------+-------------+
|               hour|zone|     yellow_amount|yellow_records|      green_amount|green_records|
+-------------------+----+------------------+--------------+------------------+-------------+
|2022-01-01 00:00:00|  41|204.58999999999997|            12|109.98999999999998|            9|
|2022-01-01 00:00:00|  43|1520.5099999999989|            97|             12.05|            1|
|2022-01-01 00:00:00|  74|126.99999999999999|             9|113.25999999999999|            7|
|2022-01-01 00:00:00|  80| 85.05000000000001|             3|             18.81|            1|
|2022-01-01 00:00:00| 130|              NULL|          NULL|              28.3|            1|
|2022-01-01 00:00:00| 144| 765.3699999999999|            46|              NULL|         NULL|
|2022-01-01 00:00:00| 211| 558.6800000000001|            33|              NULL|         NULL|
|2022-01-01 00:00:00| 233| 972.0499999999995|            58|

In [77]:
dfYellowGreen.write.parquet('data/report/revenue/total', mode='overwrite')

In [79]:
dfZones = spark.read.option("header","true").csv('data/taxi+_zone_lookup.csv')

In [81]:
dfZones.write.parquet("data/zones/")

In [83]:
dfResult = dfYellowGreen.join(dfZones, dfYellowGreen.zone == dfZones.LocationID)

In [84]:
dfResult.show()

+-------------------+----+------------------+--------------+------------------+-------------+----------+---------+--------------------+------------+
|               hour|zone|     yellow_amount|yellow_records|      green_amount|green_records|LocationID|  Borough|                Zone|service_zone|
+-------------------+----+------------------+--------------+------------------+-------------+----------+---------+--------------------+------------+
|2022-01-01 00:00:00|  41|204.58999999999997|            12|109.98999999999998|            9|        41|Manhattan|      Central Harlem|   Boro Zone|
|2022-01-01 00:00:00|  43|1520.5099999999989|            97|             12.05|            1|        43|Manhattan|        Central Park| Yellow Zone|
|2022-01-01 00:00:00|  74|126.99999999999999|             9|113.25999999999999|            7|        74|Manhattan|   East Harlem North|   Boro Zone|
|2022-01-01 00:00:00|  80| 85.05000000000001|             3|             18.81|            1|        80| B

In [87]:
dfResult.drop('LocationID','zone').write.parquet('tmp/revenueZones')