# Week 5 Batch

## First Look at Spark/PySpark

In [1]:
import pyspark

pyspark.__version__

'3.3.2'

In [2]:
import pandas as pd

pd.__version__

'2.2.3'

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types

In [4]:
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

In [5]:
df = spark.read.option("header", "true").parquet("yellow_tripdata_2024-10.parquet")

In [6]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 07:30:44|  2024-10-01 07:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [28]:
df.head(5)

[Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 7, 30, 44), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 7, 48, 26), passenger_count=1, trip_distance=3.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=246, payment_type=1, fare_amount=18.4, extra=1.0, mta_tax=0.5, tip_amount=1.5, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=24.9, congestion_surcharge=2.5, Airport_fee=0.0),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 7, 12, 20), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 7, 25, 25), passenger_count=1, trip_distance=2.2, RatecodeID=1, store_and_fwd_flag='N', PULocationID=48, DOLocationID=236, payment_type=1, fare_amount=14.2, extra=3.5, mta_tax=0.5, tip_amount=3.8, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=23.0, congestion_surcharge=2.5, Airport_fee=0.0),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 7, 4, 46), tpep_dropoff_datetime=datetime.datetime(2

In [29]:
for i in df.schema:
    print(i)

StructField('VendorID', IntegerType(), True)
StructField('tpep_pickup_datetime', TimestampType(), True)
StructField('tpep_dropoff_datetime', TimestampType(), True)
StructField('passenger_count', LongType(), True)
StructField('trip_distance', DoubleType(), True)
StructField('RatecodeID', LongType(), True)
StructField('store_and_fwd_flag', StringType(), True)
StructField('PULocationID', IntegerType(), True)
StructField('DOLocationID', IntegerType(), True)
StructField('payment_type', LongType(), True)
StructField('fare_amount', DoubleType(), True)
StructField('extra', DoubleType(), True)
StructField('mta_tax', DoubleType(), True)
StructField('tip_amount', DoubleType(), True)
StructField('tolls_amount', DoubleType(), True)
StructField('improvement_surcharge', DoubleType(), True)
StructField('total_amount', DoubleType(), True)
StructField('congestion_surcharge', DoubleType(), True)
StructField('Airport_fee', DoubleType(), True)


In [34]:
df = df.repartition(4)

In [36]:
df.write.parquet("yellow_tripdata/2024/10/", mode="overwrite")

## Spark DataFrame

In [7]:
df = spark.read.parquet("yellow_tripdata/2024/10/")
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-10-07 23:40:43|  2024-10-08 01:10:56|              1|         14.8|        99|                 N|         127|         225|           1|       47.5|  0.0|    0.5|       0.

In [8]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [36]:
df.withColumn("pickup_date", F.to_date(df["tpep_pickup_datetime"])).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+
|       1| 2024-10-07 23:40:43|  2024-10-08 01:10:56|              1|         14.8|        99|                 N|         127|         225|           1

In [9]:
df = df.withColumn(
    "trip_duration_hours",
    (
        F.unix_timestamp(df["tpep_dropoff_datetime"])
        - F.unix_timestamp(df["tpep_pickup_datetime"])
    )
    / 3600,
)

df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|trip_duration_hours|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|       1| 2024-10-07 23:40:43|  2024-10-08 01:10:56|              1|         14.8|        99|                 N|         127| 

In [10]:
df.createOrReplaceTempView("yellow_2024_10")

In [None]:
spark.sql("""
SELECT
    MAX(trip_duration_hours)
FROM
    yellow_2024_10
""").show()

+------------------------+
|max(trip_duration_hours)|
+------------------------+
|      162.61777777777777|
+------------------------+



In [13]:
zone_lookup = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")
zone_lookup.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [14]:
zone_lookup.createOrReplaceTempView("zones")

In [18]:
df_join = df.join(zone_lookup, df["PULocationID"] == zone_lookup["LocationID"], "inner")
df_join.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+----------+---------+--------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|trip_duration_hours|LocationID|  Borough|                Zone|service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+----------+-------

In [19]:
df_join.createOrReplaceTempView("yellow_2024_10_with_zones")

In [20]:
spark.sql("""
SELECT 
    Zone,
    COUNT(*) AS trip_count
FROM 
    yellow_2024_10_with_zones
GROUP BY Zone
ORDER BY trip_count

""").show()

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|Governor's Island...|         1|
|       Arden Heights|         2|
|       Rikers Island|         2|
|         Jamaica Bay|         3|
| Green-Wood Cemetery|         3|
|       West Brighton|         4|
|       Port Richmond|         4|
|Eltingville/Annad...|         4|
|   Rossville/Woodrow|         4|
|Charleston/Totten...|         4|
|        Crotona Park|         6|
|         Great Kills|         6|
|     Mariners Harbor|         7|
|Heartland Village...|         7|
|Saint George/New ...|         9|
|             Oakwood|         9|
|New Dorp/Midland ...|        10|
|       Broad Channel|        10|
|         Westerleigh|        12|
|     Pelham Bay Park|        12|
+--------------------+----------+
only showing top 20 rows



In [64]:
spark.sql("""
SELECT 
    COUNT(*) AS trip_count
FROM
    yellow_2024_10
WHERE
    DATE(tpep_pickup_datetime) = '2024-10-15'

""").show()

+----------+
|trip_count|
+----------+
|    117098|
+----------+



## Taxi Schema

In [7]:
df_green = spark.read.option("header", "true").csv("data/raw/green/2021/01/")
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-01-01 00:15:56|  2021-01-01 00:19:52|                 N|         1|          43|         151|              1|         1.01|        5.5|  0.5|    0.

In [8]:
df_green.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [9]:
df_green_pd = pd.read_csv(
    "data/raw/green/2021/01/green_tripdata_2021_01.csv.gz", nrows=1000
)
df_green_pd.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2021-01-01 00:15:56,2021-01-01 00:19:52,N,1,43,151,1,1.01,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2,1,0.0
1,2,2021-01-01 00:25:59,2021-01-01 00:34:44,N,1,166,239,1,2.53,10.0,0.5,0.5,2.81,0.0,,0.3,16.86,1,1,2.75
2,2,2021-01-01 00:45:57,2021-01-01 00:51:55,N,1,41,42,1,1.12,6.0,0.5,0.5,1.0,0.0,,0.3,8.3,1,1,0.0
3,2,2020-12-31 23:57:51,2021-01-01 00:04:56,N,1,168,75,1,1.99,8.0,0.5,0.5,0.0,0.0,,0.3,9.3,2,1,0.0
4,2,2021-01-01 00:16:36,2021-01-01 00:16:40,N,2,265,265,3,0.0,-52.0,0.0,-0.5,0.0,0.0,,-0.3,-52.8,3,1,0.0


In [21]:
spark.createDataFrame(df_green_pd).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [10]:
green_schema = types.StructType(
    [
        types.StructField("VendorID", types.IntegerType(), True),
        types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
        types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
        types.StructField("store_and_fwd_flag", types.StringType(), True),
        types.StructField("RatecodeID", types.IntegerType(), True),
        types.StructField("PULocationID", types.IntegerType(), True),
        types.StructField("DOLocationID", types.IntegerType(), True),
        types.StructField("passenger_count", types.IntegerType(), True),
        types.StructField("trip_distance", types.DoubleType(), True),
        types.StructField("fare_amount", types.DoubleType(), True),
        types.StructField("extra", types.DoubleType(), True),
        types.StructField("mta_tax", types.DoubleType(), True),
        types.StructField("tip_amount", types.DoubleType(), True),
        types.StructField("tolls_amount", types.DoubleType(), True),
        types.StructField("ehail_fee", types.DoubleType(), True),
        types.StructField("improvement_surcharge", types.DoubleType(), True),
        types.StructField("total_amount", types.DoubleType(), True),
        types.StructField("payment_type", types.IntegerType(), True),
        types.StructField("trip_type", types.IntegerType(), True),
        types.StructField("congestion_surcharge", types.DoubleType(), True),
    ]
)

In [None]:
## use customed schema
# df_green = (
#     spark.read.option("header", "true")
#     .schema(green_schema)
#     .csv("data/raw/green/2021/01/")
# )

## use infer schema
df_green = (
    spark.read.option("inferSchema", "true")
    .option("header", "true")
    .csv("data/raw/green/2021/01/")
)
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [25]:
df_yellow_pd = pd.read_csv(
    "data/raw/yellow/2021/01/yellow_tripdata_2021_01.csv.gz", nrows=1000
)
df_yellow_pd.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5


In [13]:
df_yellow = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("data/raw/yellow/2021/01/")
)
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [None]:
## repartion and write to parquet
# year = 2020
# for month in range(1, 13):
#     input_path = f"data/raw/yellow/{year}/{month:02d}/"
#     output_path = f"data/pq/yellow/{year}/{month:02d}/"
#     df_yellow = (
#         spark.read.option("header", "true").schema(yellow_schema).csv(input_path)
#     )
#     df_yellow.repartition(4).write.parquet(output_path)

## SQL with Spark

In [None]:
df_green = spark.read.parquet("data/pq/green/*/*")

In [None]:
df_green = df_green.withColumnRenamed(
    "lpep_pickup_datetime", "pickup_datetime"
).withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

In [None]:
df_yellow = spark.read.parquet("data/pq/yellow/*/*")

In [None]:
df_yellow = df_yellow.withColumnRenamed(
    "tpep_pickup_datetime", "pickup_datetime"
).withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

In [19]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [None]:
df_green_sel = df_green.select(common_colums).withColumn("service_type", F.lit("green"))

In [None]:
df_yellow_sel = df_yellow.select(common_colums).withColumn(
    "service_type", F.lit("yellow")
)

In [22]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [None]:
df_trips_data.groupBy("service_type").count().show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [24]:
df_trips_data.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

In [25]:
df_trips_data.registerTempTable("trips_data")



In [26]:
spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""").show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [27]:
df_result = spark.sql("""
SELECT 
    -- Revenue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_monthly_passenger_count,
    AVG(trip_distance) AS avg_monthly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [30]:
df_result.show()

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_monthly_passenger_count|avg_monthly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|          33

In [28]:
df_result.coalesce(1).write.parquet("data/report/revenue/", mode="overwrite")

## Group by and Join Spark

In [None]:
df_green = spark.read.parquet("data/pq/green/*/*")

In [None]:
df_green.createOrReplaceTempView("green")

In [24]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [None]:
df_green_revenue.repartition(20).write.parquet(
    "data/report/revenue/green", mode="overwrite"
)

In [None]:
df_yellow = spark.read.parquet("data/pq/yellow/*/*")
df_yellow.createOrReplaceTempView("yellow")

In [28]:
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', tpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    yellow
WHERE
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [None]:
df_yellow_revenue.repartition(20).write.parquet(
    "data/report/revenue/yellow", mode="overwrite"
)

In [None]:
df_green_revenue = spark.read.parquet("data/report/revenue/green")
df_yellow_revenue = spark.read.parquet("data/report/revenue/yellow")

In [None]:
df_green_revenue_tmp = df_green_revenue.withColumnRenamed(
    "amount", "green_amount"
).withColumnRenamed("number_records", "green_number_records")

df_yellow_revenue_tmp = df_yellow_revenue.withColumnRenamed(
    "amount", "yellow_amount"
).withColumnRenamed("number_records", "yellow_number_records")

In [None]:
df_join = df_green_revenue_tmp.join(
    df_yellow_revenue_tmp, on=["hour", "zone"], how="outer"
)

In [None]:
df_join.write.parquet("data/report/revenue/total", mode="overwrite")

In [None]:
df_join = spark.read.parquet("data/report/revenue/total")

In [35]:
df_join

DataFrame[hour: timestamp, zone: int, green_amount: double, green_number_records: bigint, yellow_amount: double, yellow_number_records: bigint]

In [36]:
df_result = df_join.join(zone_lookup, df_join.zone == zone_lookup.LocationID)

In [None]:
df_result.drop("LocationID", "zone").write.parquet("tmp/revenue-zones")

## Connecting Spark to GCS

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [4]:
gcs_connector_jar = "/b/Belajar/data engineering/homework-de-zoomcamp/week-5/lib/gcs-connector-hadoop3-2.2.5.jar"
credentials_location = "../../credentials/gcs_creds_latest.json"

# Konfigurasi Spark
conf = (
    SparkConf()
    .setMaster("local[*]")
    .setAppName("test")
    .set("spark.jars", gcs_connector_jar)
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
    .set(
        "spark.hadoop.google.cloud.auth.service.account.json.keyfile",
        credentials_location,
    )
    .set(
        "spark.hadoop.fs.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
    )
    .set(
        "spark.hadoop.fs.AbstractFileSystem.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
    )
)

# Inisialisasi SparkContext
sc = SparkContext(conf=conf)

# Konfigurasi Hadoop
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set(
    "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

# Inisialisasi SparkSession
spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()

# Contoh penggunaan
df = spark.read.parquet("gs://ny_taxi_project_bucket-1/pq/green/*/*")
df.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       1| 2020-01-03 19:00:01|  2020-01-03 19:05:48|                 N|         1|         244|         116|              1|          1.0|        6.0|  1.0|    0.

In [5]:
df.count()

2304517