In [3]:
import pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[2]') \
    .appName('test') \
    .getOrCreate()

RuntimeError: Java gateway process exited before sending its port number

In [2]:
!tree

Folder PATH listing
Volume serial number is FCDB-F4E8
D:.
+---.ipynb_checkpoints
+---data
¦   +---green
¦   +---yellow
+---fhvhv
¦   +---2021
¦       +---01
+---spark-warehouse


In [3]:
df_green = spark.read.parquet('data/green/*.parquet')
df_yellow = spark.read.parquet('data/yellow/*.parquet')

In [4]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
df_yellow.printSchema()

root
 |-- VendorID: double (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [6]:
print(f'Number of columns: \nYellow_tripdata: {len(df_yellow.columns)}\nGreen_tripdata: {len(df_green.columns)}')

Number of columns: 
Yellow_tripdata: 18
Green_tripdata: 20


In [7]:
print(f'Number of rows: \nYellow_tripdata: {df_yellow.count()}\nGreen_tripdata: {df_green.count()}')

Number of rows: 
Yellow_tripdata: 4666625
Green_tripdata: 224917


In [8]:
len(set(df_yellow.columns) & set(df_green.columns))

16

In [9]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime','dropoff_datetime')

In [10]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime','dropoff_datetime')

In [11]:
sorted(set(df_yellow.columns) & set(df_green.columns), key=df_yellow.columns.index)

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [12]:
len(set(df_yellow.columns) & set(df_green.columns))

18

In [13]:
set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [14]:
common_columns  = list(sorted(set(df_yellow.columns) & set(df_green.columns), key=df_yellow.columns.index))

In [15]:
from pyspark.sql import functions as F

In [16]:
df_green_req = df_green.select(common_columns) \
    .withColumn('service_type', F.lit('green'))

In [17]:
df_green_req.show(1)

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|service_type|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------+
|       2|2021-03-01 01:05:42|2021-03-01 01:14:03|            1.0|         1.56|       1.0|                 N|          83|         129|         1.0|        7.5|  0.5|    0.5|       0.0|       

In [18]:
df_yellow_req = df_yellow.select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

In [19]:
df_trip_data = df_green_req.unionAll(df_yellow_req)

In [20]:
df_trip_data.tail(1)

[Row(VendorID=None, pickup_datetime=datetime.datetime(2021, 1, 25, 9, 38), dropoff_datetime=datetime.datetime(2021, 1, 25, 9, 50), passenger_count=None, trip_distance=4.93, RatecodeID=None, store_and_fwd_flag=None, PULocationID=248, DOLocationID=168, payment_type=None, fare_amount=20.76, extra=2.75, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=24.31, congestion_surcharge=0.0, service_type='yellow')]

In [21]:
df_trip_data.groupby('service_type').count().show()

+------------+-------+
|service_type|  count|
+------------+-------+
|       green| 224917|
|      yellow|4666625|
+------------+-------+



## Spark SQL

In [22]:
df_trip_data.createOrReplaceTempView ('trip_data')

In [23]:
spark.sql('select * from trip_data limit 10').show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|service_type|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------+
|     2.0|2021-03-01 01:05:42|2021-03-01 01:14:03|            1.0|         1.56|       1.0|                 N|          83|         129|         1.0|        7.5|  0.5|    0.5|       0.0|       

In [24]:
spark.sql("""
SELECT 
    service_type,
    COUNT(1)

FROM 
    trip_data

Group by
    service_type

""").show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green|  224917|
|      yellow| 4666625|
+------------+--------+



In [25]:
spark.sql("""

SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trip_data
GROUP BY
    1, 2, 3



""").show()

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         126|2021-

In [26]:
df_result = spark.sql("""

SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trip_data
GROUP BY
    1, 2, 3



""")

In [27]:
df_result.write.parquet('data/monthly_revenue')

In [None]:
# To write the result to one partition
df_result.coalesce(1).write.parquet('data/monthly_revenue', mode='overwrite')

In [28]:
!tree

Folder PATH listing
Volume serial number is FCDB-F4E8
D:.
+---.ipynb_checkpoints
+---data
¦   +---green
¦   +---monthly_revenue
¦   +---yellow
+---fhvhv
¦   +---2021
¦       +---01
+---spark-warehouse


In [29]:
import pandas as pd

In [30]:
df_out = pd.read_parquet('data/monthly_revenue/part-00000-2a76ea9f-afc4-468e-aba6-e143ec2f8ac9-c000.snappy.parquet')

In [31]:
df_out.head()

Unnamed: 0,revenue_zone,revenue_month,service_type,revenue_monthly_fare,revenue_monthly_extra,revenue_monthly_mta_tax,revenue_monthly_tip_amount,revenue_monthly_tolls_amount,revenue_monthly_improvement_surcharge,revenue_monthly_total_amount,revenue_monthly_congestion_surcharge,avg_montly_passenger_count,avg_montly_trip_distance
0,126,2021-02-28 23:00:00,green,4114.8,299.1,26.5,58.82,79.56,54.6,4641.63,8.25,1.375,4.796284
1,259,2020-12-31 23:00:00,green,4699.51,207.85,3.5,216.0,280.47,46.5,5453.83,0.0,1.0,8.066839
2,4,2021-02-28 23:00:00,green,663.77,57.75,0.5,0.0,12.24,6.3,740.56,0.0,1.0,7.920476
3,159,2021-01-31 23:00:00,green,8235.91,609.24,76.5,334.36,293.76,119.4,9796.42,55.0,1.211765,154.014579
4,201,2020-12-31 23:00:00,green,2323.21,58.35,2.0,92.95,177.24,15.9,2669.65,0.0,1.0,14.093019
