In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Monkey D.Luffy') \
    .getOrCreate()

23/08/09 15:07:31 WARN Utils: Your hostname, ThienLes-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.16.20.54 instead (on interface en0)
23/08/09 15:07:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/09 15:07:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# start standalone spark cluster
spark = SparkSession.builder \
    .master("spark://localhost:7077") \
    .appName('Monkey D.Luffy') \
    .getOrCreate()

23/08/10 09:43:15 WARN Utils: Your hostname, ThienLes-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.16.20.54 instead (on interface en0)
23/08/10 09:43:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/10 09:43:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
df_green = spark.read.parquet('../data/pq/green/*/*')
df_yellow = spark.read.parquet('../data/pq/yellow/*/*')

                                                                                

In [5]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [4]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [5]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)
print(common_colums)

['VendorID', 'pickup_datetime', 'dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'payment_type', 'congestion_surcharge']


In [6]:
from pyspark.sql import functions as F

In [7]:
df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))

In [8]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [9]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green|  570466|
|      yellow|15000700|
+------------+--------+



                                                                                

In [10]:
df_trips_data.createOrReplaceTempView('trips_data')

In [49]:
df_trips_data.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

In [50]:
spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""").show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green|  570466|
|      yellow|15000700|
+------------+--------+



In [11]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [53]:
df_result.coalesce(1).write.parquet('../data/report/revenue/', mode='overwrite')

                                                                                

In [12]:
df_green.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [13]:
df_green.createOrReplaceTempView('green')

In [14]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [15]:
df_green_revenue \
    .repartition(20) \
    .write.parquet('../data/report/revenue/green', mode='overwrite')

23/08/09 15:09:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/08/09 15:09:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [16]:
df_yellow.createOrReplaceTempView('yellow')

In [17]:
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [18]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow', mode='overwrite')

23/08/09 15:12:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/08/09 15:12:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [19]:
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')

In [20]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')

In [21]:
df_join.write.parquet('data/report/revenue/total', mode='overwrite')

23/08/09 15:16:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [22]:
df_join = spark.read.parquet('data/report/revenue/total')

In [23]:
df_join.head()

Row(hour=datetime.datetime(2021, 1, 1, 0, 0), zone=82, green_amount=11.8, green_number_records=1, yellow_amount=11.8, yellow_number_records=1)

In [24]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2023-08-09 15:21:57--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.134.184, 52.217.201.136, 52.217.120.56, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.134.184|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2023-08-09 15:21:59 (44.3 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [25]:
!head taxi+_zone_lookup.csv

"LocationID","Borough","Zone","service_zone"
1,"EWR","Newark Airport","EWR"
2,"Queens","Jamaica Bay","Boro Zone"
3,"Bronx","Allerton/Pelham Gardens","Boro Zone"
4,"Manhattan","Alphabet City","Yellow Zone"
5,"Staten Island","Arden Heights","Boro Zone"
6,"Staten Island","Arrochar/Fort Wadsworth","Boro Zone"
7,"Queens","Astoria","Boro Zone"
8,"Queens","Astoria Park","Boro Zone"
9,"Queens","Auburndale","Boro Zone"


In [27]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [28]:
df_zones.write.parquet('../data/zones')

In [29]:
df_zones = spark.read.parquet('../data/zones/')

In [30]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)

In [31]:
df_result.drop('LocationID', 'zone').write.parquet('../data/tmp/revenue-zones')

23/08/09 15:27:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


In [42]:
dff = df_green.select(common_colums)

In [43]:
dff.take(5)

[Row(VendorID=None, pickup_datetime=datetime.datetime(2021, 6, 12, 7, 15), dropoff_datetime=datetime.datetime(2021, 6, 12, 7, 24), store_and_fwd_flag=None, RatecodeID=None, PULocationID=183, DOLocationID=242, passenger_count=None, trip_distance=1.69, fare_amount=13.62, extra=2.75, mta_tax=0.0, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=16.67, payment_type=None, congestion_surcharge=None),
 Row(VendorID=1, pickup_datetime=datetime.datetime(2021, 6, 24, 10, 42, 16), dropoff_datetime=datetime.datetime(2021, 6, 24, 11, 15, 32), store_and_fwd_flag='N', RatecodeID=5, PULocationID=80, DOLocationID=113, passenger_count=2, trip_distance=4.8, fare_amount=30.0, extra=0.0, mta_tax=0.0, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=30.3, payment_type=2, congestion_surcharge=0.0),
 Row(VendorID=None, pickup_datetime=datetime.datetime(2021, 6, 23, 13, 58), dropoff_datetime=datetime.datetime(2021, 6, 23, 14, 43), store_and_fwd_flag=None, Ratec