In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
df_green = spark.read \
.option("header","true") \
.csv('data/raw/green/2021/01/')

In [4]:
import pandas as pd

In [6]:
df_green_pd = pd.read_csv('data/raw/green/2021/01/green_tripdata_2021_01.csv.gz', nrows=1000)

In [8]:
spark.createDataFrame(df_green_pd).schema

StructType(List(StructField(VendorID,LongType,true),StructField(lpep_pickup_datetime,StringType,true),StructField(lpep_dropoff_datetime,StringType,true),StructField(store_and_fwd_flag,StringType,true),StructField(RatecodeID,LongType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(passenger_count,LongType,true),StructField(trip_distance,DoubleType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(ehail_fee,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),StructField(total_amount,DoubleType,true),StructField(payment_type,LongType,true),StructField(trip_type,LongType,true),StructField(congestion_surcharge,DoubleType,true)))

In [9]:
from pyspark.sql import types

In [10]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])


In [11]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1
processing data for 2020/2
processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [12]:
year = 2021 

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7
processing data for 2021/8
processing data for 2021/9


AnalysisException: 'Path does not exist: file:/home/rungsunan/code/DEZ/homework4/data/raw/green/2021/09;'

In [13]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'

    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1
processing data for 2020/2
processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [14]:
year = 2021

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/yellow/{year}/{month:02d}/'
    output_path = f'data/pq/yellow/{year}/{month:02d}/'

    df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_yellow \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7
processing data for 2021/8
processing data for 2021/9


AnalysisException: 'Path does not exist: file:/home/rungsunan/code/DEZ/homework4/data/raw/yellow/2021/09;'

In [20]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-28 16:21:30|  2020-01-28 16:29:10|                 N|         1|          75|         263|              2|         0.81|        6.0|  1.0|    0.

In [21]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [22]:
df_green=spark.read.parquet('data/pq/green/*/*')


In [24]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [27]:
df_green = df_green\
.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime')\
.withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [28]:
df_yellow = df_yellow\
.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')\
.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [30]:
common_columns = []
yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)

In [31]:
from pyspark.sql import functions as F

In [32]:
df_green_sel = df_green\
.select(common_columns)\
.withColumn('service_type', F.lit('green'))

In [33]:
df_yellow_sel = df_yellow\
.select(common_columns)\
.withColumn('service_type', F.lit('yellow'))

In [34]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [35]:
df_trips_data.groupBy('service_type').count().show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [40]:
df_trips_data.registerTempTable('trips_data')

In [43]:
spark.sql("""
SELECT * FROM trips_data LIMIT 10
""").show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2020-01-28 16:21:30|2020-01-28 16:29:10|                 N|         1|          75|         263|              2|         0.81|        6.0|  1.0|    0.5|       0.0|         0.0|       

In [44]:
spark.sql("""
SELECT service_type,count(1) FROM trips_data 
GROUP BY 
service_type
""").show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [45]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [46]:
df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')

In [48]:
df_result.show()

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|          85|2020-

In [49]:
spark.version


'2.4.0'

In [50]:
df_fhvhv = spark.read \
.option("header","true") \
.csv('data/raw/fhvhv/2021/02/')

In [51]:
df_fhvhv_pd = pd.read_csv('data/raw/fhvhv/2021/02/fhvhv_tripdata_2021_02.csv.gz', nrows=1000)

In [52]:
spark.createDataFrame(df_fhvhv_pd).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))

In [57]:
fhvhv_schema = types.StructType([
    types.StructField("hvfhs_license_num",types.StringType(),True),
    types.StructField("dispatching_base_num",types.StringType(),True),
    types.StructField("pickup_datetime",types.TimestampType(),True),
    types.StructField("dropoff_datetime",types.TimestampType(),True),
    types.StructField("PULocationID",types.IntegerType(),True),
    types.StructField("DOLocationID",types.IntegerType(),True),
    types.StructField("SR_Flag",types.DoubleType(),True)
])


In [56]:
from pyspark.sql import types

In [71]:
df_fhvhv = spark.read \
.option("header","true") \
.schema(fhvhv_schema) \
.csv('data/raw/fhvhv/2021/02/')

In [62]:
year = 2021

for month in range(2, 3):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/fhvhv/{year}/{month:02d}/'
    output_path = f'data/pq/fhvhv/{year}/{month:02d}/'

    df_fhvhv = spark.read \
        .option("header", "true") \
        .schema(fhvhv_schema) \
        .csv(input_path)

    df_fhvhv \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/2


In [89]:
df_fhvhv=spark.read.parquet('data/pq/fhvhv/2021/02')

In [90]:
df_fhvhv.columns


['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [83]:
df_fhvhv.withColumn("pickup_date",col("pickup_datetime").cast(DateType))

TypeError: 'str' object is not callable

In [75]:
df_fhvhv.groupBy('pickup_datetime').count().show()

+-------------------+-----+
|    pickup_datetime|count|
+-------------------+-----+
|2021-02-16 23:31:05|    4|
|2021-02-06 13:01:53|   10|
|2021-02-13 13:12:59|    8|
|2021-02-11 07:55:05|    7|
|2021-02-16 09:04:21|    8|
|2021-02-05 13:19:03|    8|
|2021-02-15 09:37:45|    9|
|2021-02-05 17:47:14|   12|
|2021-02-03 16:55:41|    6|
|2021-02-16 16:52:15|    2|
|2021-02-22 16:41:25|   10|
|2021-02-27 00:37:18|    6|
|2021-02-01 00:00:35|    2|
|2021-02-01 00:09:18|    3|
|2021-02-01 00:29:57|    1|
|2021-02-01 00:42:45|    2|
|2021-02-01 00:43:47|    1|
|2021-02-01 00:51:48|    2|
|2021-02-01 00:52:51|    2|
|2021-02-01 01:13:06|    1|
+-------------------+-----+
only showing top 20 rows



In [94]:
df_fhvhv.registerTempTable('fhvhv_data')

In [100]:
spark.sql("""
SELECT 
 count(1),date_trunc('day', pickup_datetime) as HOUR FROM fhvhv_data
 group by date_trunc('day', pickup_datetime)
""").show()

+--------+-------------------+
|count(1)|               HOUR|
+--------+-------------------+
|  398445|2021-02-08 00:00:00|
|  392696|2021-02-23 00:00:00|
|  460661|2021-02-14 00:00:00|
|  469162|2021-02-12 00:00:00|
|  362596|2021-02-16 00:00:00|
|  367170|2021-02-15 00:00:00|
|  509331|2021-02-13 00:00:00|
|  422116|2021-02-28 00:00:00|
|  500049|2021-02-26 00:00:00|
|  344533|2021-02-07 00:00:00|
|  459887|2021-02-19 00:00:00|
|  428288|2021-02-11 00:00:00|
|  412556|2021-02-24 00:00:00|
|  436556|2021-02-25 00:00:00|
|  509383|2021-02-27 00:00:00|
|  399763|2021-02-21 00:00:00|
|  302785|2021-02-02 00:00:00|
|  398476|2021-02-09 00:00:00|
|  497072|2021-02-20 00:00:00|
|  470555|2021-02-06 00:00:00|
+--------+-------------------+
only showing top 20 rows



In [109]:
spark.sql("""
SELECT date_trunc('day', pickup_datetime),
int(to_timestamp(dropoff_datetime))
- int(to_timestamp(pickup_datetime)) as diff
  FROM fhvhv_data order by diff desc
 
""").show()

+--------------------------------+-----+
|date_trunc(day, pickup_datetime)| diff|
+--------------------------------+-----+
|             2021-02-11 00:00:00|75540|
|             2021-02-17 00:00:00|57221|
|             2021-02-20 00:00:00|44039|
|             2021-02-03 00:00:00|40653|
|             2021-02-19 00:00:00|37577|
|             2021-02-25 00:00:00|35010|
|             2021-02-20 00:00:00|34806|
|             2021-02-18 00:00:00|34612|
|             2021-02-18 00:00:00|34555|
|             2021-02-10 00:00:00|34169|
|             2021-02-10 00:00:00|32476|
|             2021-02-25 00:00:00|32439|
|             2021-02-21 00:00:00|32223|
|             2021-02-09 00:00:00|32087|
|             2021-02-06 00:00:00|31447|
|             2021-02-02 00:00:00|30913|
|             2021-02-10 00:00:00|30856|
|             2021-02-09 00:00:00|30732|
|             2021-02-21 00:00:00|30660|
|             2021-02-05 00:00:00|30511|
+--------------------------------+-----+
only showing top

In [112]:
spark.sql("""
SELECT dispatching_base_num, count(1) as count
  FROM fhvhv_data 
  group by dispatching_base_num order by count desc
 
""").show()

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



In [121]:
spark.sql("""
select  concat(PULocationID,DOLocationID), count(1) as count
  FROM fhvhv_data 
  group by concat(PULocationID,DOLocationID)
  order by count desc
 
""").show()


+------------------------------------------------------------------+-----+
|concat(CAST(PULocationID AS STRING), CAST(DOLocationID AS STRING))|count|
+------------------------------------------------------------------+-----+
|                                                              7676|45041|
|                                                              2626|37330|
|                                                              3939|28026|
|                                                              6161|25992|
|                                                              1414|18199|
|                                                            129129|14688|
|                                                                77|14688|
|                                                              4242|14502|
|                                                              3737|14424|
|                                                              8989|13976|
|                        

In [120]:
spark.sql("""
select "PULocationID","DOLocationID", count(*) as trip_count
	from fhvhv_data  
	where "DOLocationID" != 265 and "DOLocationID" != 264
	group by ("PULocationID","DOLocationID") 
	order by  trip_count desc
""").show()

+------------+------------+----------+
|PULocationID|DOLocationID|trip_count|
+------------+------------+----------+
+------------+------------+----------+

