In [0]:
%sql
CREATE CATALOG IF NOT EXISTS snebolsin_nyc_catalog
MANAGED LOCATION 's3://ssnebolsinbucket/';

In [0]:
%sql
USE CATALOG snebolsin_nyc_catalog;
GRANT ALL PRIVILEGES ON CATALOG snebolsin_nyc_catalog TO `deniskulemza1@gmail.com`

In [0]:
%sql
SELECT current_catalog(), current_schema();

current_catalog(),current_schema()
snebolsin_nyc_catalog,default


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS trips_schema;

In [0]:
%sql
USE SCHEMA trips_schema

In [0]:
# create a table to check if everything is working as expected
%sql
CREATE TABLE IF NOT EXISTS snebolsin_nyc_catalog.trips_schema.raw_trips_test (
  VendorID BIGINT,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count DOUBLE,
  trip_distance DOUBLE,
  RatecodeID DOUBLE,
  store_and_fwd_flag STRING,
  PULocationID BIGINT,
  DOLocationID BIGINT,
  payment_type BIGINT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  congestion_surcharge DOUBLE,
  airport_fee DOUBLE,
  taxi_type STRING
)
USING DELTA
LOCATION 's3://ssnebolsinbucket/unity_catalog/raw_trips_test'

In [0]:
# describe the table
%sql
describe extended snebolsin_nyc_catalog.trips_schema.raw_trips;

col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp,
tpep_dropoff_datetime,timestamp,
passenger_count,double,
trip_distance,double,
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [0]:
from pyspark.sql.functions import col, sum, avg, count, max, min, broadcast, unix_timestamp, hour, dayofweek, format_string, when, dayofweek
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import lit
from datetime import date

In [0]:
spark = SparkSession.builder.appName("homework7_sn").getOrCreate()

In [0]:
S3_BUCKET_NAME = "robot-dreams-source-data"
TAXI_ZONE_LOOKUP_PATH = f"s3a://{S3_BUCKET_NAME}/home-work-1/nyc_taxi/taxi_zone_lookup.csv"

In [0]:
%sql
SHOW EXTERNAL LOCATIONS;

name,url,comment
isilantiev-external-location,s3://isilantiev-emr-studio/,
mnt_robot-dreams-delta,s3://robot-dreams-delta-tables/,
mnt_robot-dreams-source-mount,s3://robot-dreams-source-data/,
snebolsin_s3_external_loc,s3://ssnebolsinbucket/,


In [0]:
# define function to list all files in a folder
def list_all_files(path):
    all_files = []
    items = dbutils.fs.ls(path)
    
    for item in items:
        if item.isDir():
            # Recursively list subfolder
            all_files += list_all_files(item.path)
        else:
            # It's a file
            all_files.append(item.path)
    
    return all_files

In [0]:
# define green and yellow files lists
green_files = list_all_files(f"s3a://{S3_BUCKET_NAME}/home-work-1/nyc_taxi/green/")
yellow_files = list_all_files(f"s3a://{S3_BUCKET_NAME}/home-work-1/nyc_taxi/yellow/")

In [0]:
# make green taxi df
green_taxi_union_list = []

# prepare list of columns to cast
columns_to_cast = {
    "passenger_count": "bigint",
    "payment_type": "bigint",
    "improvement_surcharge": "double",
    "congestion_surcharge": "double",
    "PULocationID": "bigint"
}

# loop over the files to cast each file and collect them into a list
for file in green_files:
    df = spark.read.parquet(file)
    for col_name, target_type in columns_to_cast.items():
        df = df.withColumn(col_name, col(col_name).cast(target_type))

    green_taxi_union_list.append(df)
    
# combine all dfs from the list    
green_taxi_final = green_taxi_union_list[0]

for df in green_taxi_union_list[1:]:
    green_taxi_final = green_taxi_final.unionByName(df)

# add taxi type
green_taxi_final = green_taxi_final.withColumn("taxi_type", lit("green"))

print(f"Total rows count green taxi file: {green_taxi_final.count()}")

green_taxi_final.printSchema()

IOStream.flush timed out


Total rows count green taxi file: 82630053
root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- taxi_type: string (nullable = false)



In [0]:
# make yellow taxi df
yellow_taxi_union_list = []

# prepare columns to cast
columns_to_cast = {
    "PULocationID": "bigint"
}

# prepare columns to add
missing_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'ehail_fee', 'trip_type']

# loop over the files to cast each file and collect them into a list
for file in yellow_files:
    df = spark.read.parquet(file)
    for col_name in missing_columns:
        if col_name not in df.columns:
            df = df.withColumn(col_name, lit(None))
    for col_name, target_type in columns_to_cast.items():
        df = df.withColumn(col_name, col(col_name).cast(target_type))

    yellow_taxi_union_list.append(df)
    
### Align all colums    

# Get all column names
all_columns = set()
for df in yellow_taxi_union_list:
    all_columns.update(df.columns)
all_columns = sorted(all_columns)

# Align all DataFrames to have same columns
def align_columns(df, all_cols):
    for col_name in all_cols:
        if col_name not in df.columns:
            df = df.withColumn(col_name, lit(None))
    return df.select(*all_cols)  # Reorder columns

aligned_dfs = [align_columns(df, all_columns) for df in yellow_taxi_union_list]

# Union all dfs into one
yellow_taxi_final = yellow_taxi_union_list[0]

for df in yellow_taxi_union_list[1:]:
    yellow_taxi_final = yellow_taxi_final.unionByName(df)
    
yellow_taxi_final = yellow_taxi_final.withColumn("taxi_type", lit("yellow"))
    
print(f"Total rows count yellow taxi file: {yellow_taxi_final.count()}")

yellow_taxi_final.printSchema()

Total rows count yellow taxi file: 745067811
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- lpep_pickup_datetime: void (nullable = true)
 |-- lpep_dropoff_datetime: void (nu

In [0]:
# Add missing columns to Green taxi file
columns_to_add = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "airport_fee"]
for col_name in columns_to_add:
        green_taxi_final = green_taxi_final.withColumn(col_name, lit(None))

In [0]:
# Combine Yellow and Green taxi files
raw_trips_df = yellow_taxi_final.unionByName(green_taxi_final)

In [0]:
# show first row
raw_trips_df.limit(1).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+---------------------+---------+---------+---------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|lpep_pickup_datetime|lpep_dropoff_datetime|ehail_fee|trip_type|taxi_type|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+---------------------

In [0]:
# show schema for raw_trips_df
raw_trips_df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- ehail_f

In [0]:
# make transformations for the united file
raw_trips_df = raw_trips_df.withColumn("pickup_unix", unix_timestamp("tpep_pickup_datetime"))  # prepare proper format for calculation
raw_trips_df = raw_trips_df.withColumn("dropoff_unix", unix_timestamp("tpep_dropoff_datetime")) # prepare proper format for calculation
raw_trips_df = raw_trips_df.withColumn("duration_min", (col("dropoff_unix").cast("long") - col("pickup_unix").cast("long")) / 60)  ## calc duration minutes

In [0]:
# filter raw trips df
raw_trips_df_filtered = raw_trips_df.filter((raw_trips_df.trip_distance > 0.1) & (raw_trips_df.fare_amount > 2) & (raw_trips_df.duration_min > 1))

In [0]:
# count of rows
raw_trips_df_filtered.count()

734844554

In [0]:
# prepare pickup hour and day of week
raw_trips_df = raw_trips_df.withColumn("pickup_hour", hour('tpep_pickup_datetime'))
raw_trips_df = raw_trips_df.withColumn("pickup_day_of_week", dayofweek('tpep_pickup_datetime'))

In [0]:
# Join the file with Zone Lookup with Broadcast Hash Join
# load csv zone lookup file
taxi_zone_lookup_df = spark.read.csv(TAXI_ZONE_LOOKUP_PATH, header=True, inferSchema=True)

In [0]:
# set a low autoBroadcastJoinThreshold, 1MB for small tables to broadcast.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1048576")

In [0]:
# select only required fields from zone lookup file
zone_lookup_df_selected = taxi_zone_lookup_df.select("LocationID", "Zone")

In [0]:
# join with zone lookup based on Pick Up location to take pick up zone
raw_trips_df_with_PUzone = raw_trips_df.join(
    broadcast(zone_lookup_df_selected),
    raw_trips_df.PULocationID == zone_lookup_df_selected.LocationID,
    "left_outer"
).drop('LocationID').withColumnRenamed("Zone", "pickup_zone")

In [0]:
# join with zone lookup based on Drop Off location to take drop off zone
raw_trips_df_with_allZone = raw_trips_df_with_PUzone.join(
    broadcast(zone_lookup_df_selected),
    raw_trips_df_with_PUzone.DOLocationID == zone_lookup_df_selected.LocationID,
    "left_outer"
).drop('LocationID').withColumnRenamed("Zone", "dropoff_zone")

In [0]:
# final file with zones schema
raw_trips_df_with_allZone.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- ehail_f

In [0]:
# save raw trips to s3 and catalog
raw_trips_df_with_allZone.write.format("delta") \
  .mode("overwrite") \
  .option("path", "s3://ssnebolsinbucket/unity_catalog/raw_trips") \
  .saveAsTable("snebolsin_nyc_catalog.trips_schema.raw_trips")

In [0]:
# Створення та агрегація фінального датафрейму zone_summary

  * pickup_zone — зона посадки
  * total_trips — загальна кількість поїздок
  * avg_trip_distance — середня відстань поїздки
  * avg_total_amount — середній тариф поїздки
  * avg_tip_amount — середня сума чайових
  * yellow_share — частка жовтих поїздок у зоні
  * green_share — частка зелених поїздок у зоні
  * max_trip_distance — максимальна відстань поїздки
  * min_tip_amount — мінімальна сума чайових

In [0]:
zone_summary = raw_trips_df_with_allZone.groupBy("pickup_zone").agg(  
    count("*").alias("total_trips"),
    avg("trip_distance").alias("avg_trip_distance"),
    avg("total_amount").alias("avg_total_amount"),
    avg("tip_amount").alias("avg_tip_amount"),
    max("trip_distance").alias("max_trip_distance"),
    min("tip_amount").alias("min_tip_amount"),
    sum(when(col("taxi_type") == "yellow", 1).otherwise(0)).alias("yellow_trips"),
    sum(when(col("taxi_type") == "green", 1).otherwise(0)).alias("green_trips")
    )

In [0]:
# calculating yellos/green trip shares
zone_summary = zone_summary \
        .withColumn("yellow_share", (col("yellow_trips") / col("total_trips")) * 100) \
        .withColumn("green_share", (col("green_trips") / col("total_trips")) * 100) \
        .drop('yellow_trips','green_trips')

In [0]:
# show first 5 rows
zone_summary.limit(5).show()

+----------------+-----------+------------------+------------------+------------------+-----------------+--------------+------------------+-----------------+
|     pickup_zone|total_trips| avg_trip_distance|  avg_total_amount|    avg_tip_amount|max_trip_distance|min_tip_amount|      yellow_share|      green_share|
+----------------+-----------+------------------+------------------+------------------+-----------------+--------------+------------------+-----------------+
|       Homecrest|      58322|19.859932786941442| 24.37981550701268|0.7829127944857855|        235036.33|           0.0|25.273481705016977|74.72651829498302|
|          Corona|     213991|  9.46704384763845|18.434467991644468|   0.8782040833493|        250984.47|          -1.0|16.090863634451917| 83.9091363655481|
|Bensonhurst West|      55171| 36.43936869007268|28.532142067390428|1.1026501241594313|        350696.98|          -4.0|29.484693045259284|70.51530695474072|
|     Westerleigh|       1640| 6.990115853658536|39.

In [0]:
# saving df to S3 bucket
zone_summary.write.format("delta") \
  .mode("overwrite") \
  .option("path", "s3://ssnebolsinbucket/unity_catalog/zone_summary") \
  .saveAsTable("snebolsin_nyc_catalog.trips_schema.zone_summary")

In [0]:
# Агрегація по днях тижня та зонах
Підрахувати кількість поїздок за певний день тижня (наприклад, понеділок — Sunday, etc.).
Розрахувати частку поїздок з тарифами більше $30 за кожною зоною (high_fare_share).

In [0]:
# making the calculations - total trips and trips with total amount higher 30$, filtering this to valid days of week
zone_days_summary = raw_trips_df_with_allZone.groupBy("pickup_zone", "pickup_day_of_week").agg(  
    count("*").alias("total_trips"),
    sum(when(col("total_amount") > 30, 1).otherwise(0)).alias("high_price_trip")
    ).filter(col("pickup_day_of_week").between(1, 7))

In [0]:
# naming the days of week
zone_days_summary = zone_days_summary.withColumn(
    "pickup_day_of_week",
     when(col("pickup_day_of_week") == 1, "Sunday")
    .when(col("pickup_day_of_week") == 2, "Monday")
    .when(col("pickup_day_of_week") == 3, "Tuesday")
    .when(col("pickup_day_of_week") == 4, "Wednesday")
    .when(col("pickup_day_of_week") == 5, "Thursday")
    .when(col("pickup_day_of_week") == 6, "Friday")
    .when(col("pickup_day_of_week") == 7, "Saturday")
    .otherwise("NA")
)

In [0]:
# caclulating high fare share
zone_days_summary = zone_days_summary.withColumn("high_fare_share", col("high_price_trip") / col("total_trips") * 100).drop("total_trips","high_price_trip")

In [0]:
# show top 10 rows
zone_days_summary.limit(10).show()

+--------------------+------------------+------------------+
|         pickup_zone|pickup_day_of_week|   high_fare_share|
+--------------------+------------------+------------------+
|         Old Astoria|          Saturday| 9.284985029768391|
|  Van Cortlandt Park|            Sunday| 32.73381294964029|
|         Old Astoria|         Wednesday|  9.09421670882567|
|Washington Height...|            Friday|21.785595475715237|
|            Canarsie|            Friday|48.213543830494395|
|Marine Park/Floyd...|            Friday| 51.29682997118156|
|       East Flushing|            Monday| 33.89021479713604|
|                SoHo|            Sunday| 5.993063364451522|
|            Kips Bay|            Sunday|  5.00790292661877|
|            Elmhurst|            Monday|18.666803897351446|
+--------------------+------------------+------------------+



In [0]:
# saving df to S3 bucket
zone_days_summary.write.format("delta") \
  .mode("overwrite") \
  .option("path", "s3://ssnebolsinbucket/unity_catalog/zone_days_summary") \
  .saveAsTable("snebolsin_nyc_catalog.trips_schema.zone_days_summary")