In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/28 21:06:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2024-01.parquet #jan 2024
!rm fhvhv_tripdata_2024-01.parquet
df = spark.read.parquet('fhvhv_tripdata_2024-01.parquet')
#df.show()
df.head(3)
df.schema
df.printSchema()
df.count()
df_partitioned = df.repartition(16)
df_partitioned.rdd.getNumPartitions()
df_partitioned.write.parquet("fhv_partitioned", mode='overwrite')
# read the partitioned data
df = spark.read.parquet('fhv_partitioned')
# lazy / transformation - select, filter, groupby, joins
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003')
# eager / actions - #show, take head, write
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003').show(3)

# ^just like sql, but pyspark is more flexible
df.printSchema()

udf - user defined functions

- huge list of functions
- can also define out own function
- and this is not what you would typically do in data warehouses
bcz there defining your own fucntions is cumbersome.
- with some complicates cases you end with ith a bunch of case statements in sql, making it difficult to test, unlike python code

In [None]:
from pyspark.sql import functions as F
F.to_date()
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

crazy_stuff('B02884')
# convert it to udf using udf()

from pyspark.sql import types
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()


In [None]:
df_yellow = spark.read.parquet('../data/raw/yellow/*/*')
df_yellow.printSchema()

                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [None]:
from pyspark.sql.types import IntegerType

int_columns = ["passenger_count", "RatecodeID", "payment_type"]
for col in int_columns:
    df_yellow = df_yellow.withColumn(col, df_yellow[col].cast(IntegerType()))
df_yellow.select('payment_type').show(5)

In [6]:
import os
from pyspark.sql.types import IntegerType

TAXI_TYPE = "green"
YEAR = "2024"
INPUT_BASE = f"data/raw/{TAXI_TYPE}/{YEAR}"
OUTPUT_BASE = f"data/pq/{TAXI_TYPE}/{YEAR}"

# Columns to cast to IntegerType
int_columns = ["passenger_count", "RatecodeID", "payment_type"]

# Loop through each month
for month in range(1, 13):
    month_str = f"{month:02d}"
    input_path = os.path.join(INPUT_BASE, month_str, f"{TAXI_TYPE}_tripdata_{YEAR}_{month_str}.parquet")
    output_path = os.path.join(OUTPUT_BASE, month_str)

    # Read
    df = spark.read.parquet(input_path)

    # Cast columns
    for col in int_columns:
        if col in df.columns:
            df = df.withColumn(col, df[col].cast(IntegerType()))

    # Write
    df.write.mode("overwrite").parquet(output_path)

    print(f"Processed and saved: {output_path}")

Processed and saved: data/pq/green/2024/01
Processed and saved: data/pq/green/2024/02
Processed and saved: data/pq/green/2024/03
Processed and saved: data/pq/green/2024/04
Processed and saved: data/pq/green/2024/05
Processed and saved: data/pq/green/2024/06
Processed and saved: data/pq/green/2024/07
Processed and saved: data/pq/green/2024/08
Processed and saved: data/pq/green/2024/09
Processed and saved: data/pq/green/2024/10
Processed and saved: data/pq/green/2024/11
Processed and saved: data/pq/green/2024/12


In [None]:
df_green = spark.read.parquet('../data/raw/green/*/*')
df_green.printSchema()
df_green.count()


root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



660218

# Spark SQL

In [6]:
df_green = spark.read.parquet('../data_old/pq/green/*/*')
df_green.printSchema()
df_yellow = spark.read.parquet('../data_old/pq/yellow/*/*')
df_yellow.printSchema()
set(df_green.columns) & set(df_yellow.columns)
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
# reorder green df column to math yellow df
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)
from pyspark.sql import functions as F

df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))

df_trips_data = df_green_sel.unionAll(df_yellow_sel)

df_trips_data.groupBy('service_type').count().show()
# run sql
df_trips_data.registerTempTable('trips_data')

spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""").show()



                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timest



+------------+--------+
|service_type|   count|
+------------+--------+
|       green|  660218|
|      yellow|41169720|
+------------+--------+





+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green|  660218|
|      yellow|41169720|
+------------+--------+



                                                                                

In [14]:
spark.sql("""
SELECT 
    -- Revenue grouping 
    --PULocationID AS revenue_zone,
    date_format(pickup_datetime, 'MMM') AS month, 
    service_type, 
    
    count(*) total_trips,
    
     -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_monthly_passenger_count,
    AVG(trip_distance) AS avg_monthly_trip_distance
FROM
FROM
    trips_data
GROUP BY
    1, 2, 3
""").show()



+-------------+------------+-----------+---------------------------+-------------------------+
|revenue_month|service_type|total_trips|avg_monthly_passenger_count|avg_monthly_trip_distance|
+-------------+------------+-----------+---------------------------+-------------------------+
|          Apr|       green|      56473|         1.3049463154996788|       12.088635985338115|
|          May|       green|      61007|         1.3236911105472384|       14.607664366384167|
|          Mar|       green|      57451|         1.3093362719947972|       13.523978520826482|
|          Jan|       green|      56569|          1.309158294766151|       31.482167618306956|
|          Feb|       green|      53578|           1.30035538005923|       17.665699914143893|
|          Jun|       green|      54738|         1.3194231533526373|       13.989549124922332|
|          Dec|       green|      53987|         1.3050609545052494|       13.493061662992872|
|          Nov|       green|      52214|         1

                                                                                

In [9]:
df_result = spark.sql("""
SELECT 
    -- Revenue grouping 
    PULocationID AS revenue_zone,
    date_format(pickup_datetime, 'MMM-yyyy') AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_monthly_passenger_count,
    AVG(trip_distance) AS avg_monthly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

# df_result.show()
# df_result.coalesce(1).write.parquet('data/report/', mode='overwrite')


                                                                                

+------------+-------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|revenue_zone|revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_monthly_passenger_count|avg_monthly_trip_distance|
+------------+-------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|           4|     May-2024|   

                                                                                

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("spark://de-vm.asia-south1-c.c.velvety-tangent-463717-h8.internal:7077") \
    .appName('test') \
    .getOrCreate()
# spark://de-vm.asia-south1-c.c.velvety-tangent-463717-h8.internal:7077

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/25 19:33:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_green = spark.read.parquet('data/pq/green/*/*')

25/06/25 19:35:59 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/06/25 19:36:14 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/06/25 19:36:29 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/06/25 19:36:44 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

In [None]:
!head taxi_zone_lookup.csv
df = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')
df = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

df.show()
df.write.parquet('zones')