In [81]:
spark.version

u'3.0.3'

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("local[*]") \
  .appName("taxi_data") \
  .getOrCreate()



In [31]:
import pandas as pd
df_yellow_pd = pd.read_csv("data/raw/yellow/2020/yellow_tripdata_2020_1.csv.gz", nrows=1000)
df_yellow_pd.dtypes

# loads Spark schema from Panda schema as an example to edit:
spark.createDataFrame(df_yellow_pd).schema

StructType(List(StructField(VendorID,LongType,true),StructField(tpep_pickup_datetime,StringType,true),StructField(tpep_dropoff_datetime,StringType,true),StructField(passenger_count,LongType,true),StructField(trip_distance,DoubleType,true),StructField(RatecodeID,LongType,true),StructField(store_and_fwd_flag,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(payment_type,LongType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),StructField(total_amount,DoubleType,true),StructField(congestion_surcharge,DoubleType,true)))

In [32]:
import pandas as pd
df_green_pd = pd.read_csv("data/raw/green/2020/green_tripdata_2020_1.csv.gz", nrows=1000)
df_green_pd.dtypes

# loads Spark schema from Panda schema as an example to edit:
spark.createDataFrame(df_green_pd).schema

StructType(List(StructField(VendorID,LongType,true),StructField(lpep_pickup_datetime,StringType,true),StructField(lpep_dropoff_datetime,StringType,true),StructField(store_and_fwd_flag,StringType,true),StructField(RatecodeID,LongType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(passenger_count,LongType,true),StructField(trip_distance,DoubleType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(ehail_fee,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),StructField(total_amount,DoubleType,true),StructField(payment_type,LongType,true),StructField(trip_type,LongType,true),StructField(congestion_surcharge,DoubleType,true)))

In [33]:
from pyspark.sql import types

# create Spark schema:

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])


In [25]:
# create parquet files:

for year in (2020, 2021):
    for month in range(1,13):
        print("Executing year: {}, month: {}".format(year,month))
#         print("data/raw/yellow/{year}/*{year}_{month}.*".format(year=year, month=month))
        df_yellow = spark.read \
          .option("header", "true") \
          .schema(yellow_schema) \
          .csv("data/raw/yellow/{year}/*{year}_{month}.*".format(year=year,month=month))

        df_yellow \
            .repartition(4) \
            .write \
            .parquet("data/pq/yellow/{}/{}".format(year,month))

Executing yearn: 2020, month: 1
Executing yearn: 2020, month: 2
Executing yearn: 2020, month: 3
Executing yearn: 2020, month: 4
Executing yearn: 2020, month: 5
Executing yearn: 2020, month: 6
Executing yearn: 2020, month: 7
Executing yearn: 2020, month: 8
Executing yearn: 2020, month: 9
Executing yearn: 2020, month: 10
Executing yearn: 2020, month: 11
Executing yearn: 2020, month: 12
Executing yearn: 2021, month: 1
Executing yearn: 2021, month: 2
Executing yearn: 2021, month: 3
Executing yearn: 2021, month: 4
Executing yearn: 2021, month: 5
Executing yearn: 2021, month: 6
Executing yearn: 2021, month: 7
Executing yearn: 2021, month: 8


AnalysisException: Path does not exist: file:/home/cristiandugacicu/projects/personal/de-zoomcamp/course5/notebooks/data/raw/yellow/2021/*2021_8.*;

In [36]:
for year in (2020, 2021):
    for month in range(1,13):
        print("Executing year: {}, month: {}".format(year,month))
#         print("data/raw/green/{year}/*{year}_{month}.*".format(year=year, month=month))
        df_green = spark.read \
          .option("header", "true") \
          .schema(yellow_schema) \
          .csv("data/raw/green/{year}/*{year}_{month}.*".format(year=year,month=month))

        df_green \
            .repartition(4) \
            .write \
            .parquet("data/pq/green/{}/{}".format(year,month))

Executing year: 2020, month: 1
Executing year: 2020, month: 2
Executing year: 2020, month: 3
Executing year: 2020, month: 4
Executing year: 2020, month: 5
Executing year: 2020, month: 6
Executing year: 2020, month: 7
Executing year: 2020, month: 8
Executing year: 2020, month: 9
Executing year: 2020, month: 10
Executing year: 2020, month: 11
Executing year: 2020, month: 12
Executing year: 2021, month: 1
Executing year: 2021, month: 2
Executing year: 2021, month: 3
Executing year: 2021, month: 4
Executing year: 2021, month: 5
Executing year: 2021, month: 6
Executing year: 2021, month: 7
Executing year: 2021, month: 8


AnalysisException: Path does not exist: file:/home/cristiandugacicu/projects/personal/de-zoomcamp/course5/notebooks/data/raw/green/2021/*2021_8.*;

In [44]:
df_yellow_pq = spark.read.parquet("data/pq/yellow/*/*/*") \
      .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
      .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

df_green_pq = spark.read.parquet("data/pq/green/*/*/*") \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

df_yellow_pq.show()
df_yellow_pq.printSchema()

display("Yellow columns:", df_yellow_pq.columns)
display("Green columns:", df_green_pq.columns)

+--------+-------------------+-------------------+---------------+-------------+------------------+----------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+-------------------+-------------------+---------------+-------------+------------------+----------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1|2020-01-06 06:40:20|2020-01-06 06:42:06|              1|          0.6|                 1|      null|         164|         170|           1|        4.0|  2.5|    0.5|       1.8|         0.0|                  0.3|         9.

'Yellow columns:'

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

'Green columns:'

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [45]:
from pyspark.sql import functions as F

# set(df_yellow_pq.columns) && set(df_green_pq.columns)
common_collumns = []

for col in df_green_pq.columns:
    if col in df_yellow_pq.columns:
        common_collumns.append(col)
        
display(common_collumns) 


df_yellow_sel = df_yellow_pq.select(common_collumns) \
  .withColumn("service_type", F.lit("yellow")) 
    
df_green_sel = df_green_pq.select(common_collumns) \
  .withColumn("service_type", F.lit("green"))

#union
df_trips_data = df_green_sel.unionAll(df_yellow_sel)


['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [46]:
df_trips_data.groupBy("service_type").count().show()

# expect
# green 2304517
# yellow 39649199


# make dataFrame a tabe
df_trips_data.registerTempTable("trips_data")

spark \
  .sql("""
    SELECT service_type, count(1)
    FROM trips_data 
    GROUP BY service_type;
    """) \
  .show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [51]:
df_result = spark.sql("""
SELECT 
    -- Reveneue groupings:
    
    PULocationID AS revenue_zone,
    
    -- data for each month:
    date_trunc('month', pickup_datetime) AS revenue_month, 
    
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")


In [52]:
df_result \
    .coalesce(1) \  #redice number of partitions to 1
    .write.parquet('data/report/revenue/', mode='overwrite')

In [72]:
# GROUP
df_yellow_pq2 = spark.read.parquet("data/pq/yellow/*/*") \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")


# make dataFrame a tabe
df_yellow_pq2.registerTempTable("yellow_data2")

df_yellow_revenue = spark.sql("""
SELECT
  date_trunc('hour', pickup_datetime) AS hour,
  PULocationID AS zone,
  
  SUM(total_amount) AS amount,
  COUNT(1) AS number_records
FROM
  yellow_data2
WHERE
  pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
  1, 2
ORDER BY
  1, 2
""")#.show()

df_yellow_revenue.repartition(20).write.parquet('data/report/revenue/yellow', mode='overwrite')

In [73]:
# GROUP
df_green_pq2 = spark.read.parquet("data/pq/green/*/*") \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")


# make dataFrame a tabe
df_green_pq2.registerTempTable("green_data2")

df_green_revenue = spark.sql("""
SELECT
  date_trunc('hour', pickup_datetime) AS hour,
  PULocationID AS zone,
  
  SUM(total_amount) AS amount,
  COUNT(1) AS number_records
FROM
  green_data2
WHERE
  pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
  1, 2
""") #.show()

df_green_revenue.repartition(20).write.parquet('data/report/revenue/green', mode='overwrite')

In [78]:
#JOIN
df_yellow_revenue_temp = df_yellow_revenue \
  .withColumnRenamed('amount', 'yellow_amount') \
  .withColumnRenamed('number_records', 'yellow_number_records') \

df_green_revenue_temp = df_green_revenue \
  .withColumnRenamed('amount', 'green_amount') \
  .withColumnRenamed('number_records', 'green_number_records') \


df_join = df_green_revenue_temp.join(df_yellow_revenue_temp, on=['hour', 'zone'], how='outer')
# df_join.show()

df_join.write.parquet('data/report/revenue/total')

In [None]:
# JOIN small file
df_zones = spark.read.parquet('zones/')
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)
df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')