## Final Project Feature Engineering - Khadija

[Documentation Link to Feature Engineering]('https://docs.google.com/document/d/1YhXFN8HNfoEbczQpwnaPD7yfcGdORKHkKuJm3s_S7Lg/edit?usp=sharing')

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, lit, coalesce, to_timestamp, trim, lower, to_date, min, max, sha2, concat_ws, unix_timestamp, from_unixtime, floor, hour, dayofweek, month, date_format
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType
from math import pi

In [2]:
spark = SparkSession.builder \
    .appName("Final Project EDA") \
    .config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar") \
    .getOrCreate()

In [3]:
raw_weather_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_1_final_project/parquet_versions/weather_parquet/")
raw_events_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_1_final_project/parquet_versions/events_parquet/")
raw_taxi_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_1_final_project/parquet_versions/taxi_parquet/")
raw_zones_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_1_final_project/parquet_versions/zones_parquet/")

                                                                                

### Events Data Cleaning

**Removing Duplicates Data**

In [4]:
cols_count = len(raw_events_df.columns)
rows_count = raw_events_df.count()
print(f'Event Dataframe Dimension is: {(rows_count, cols_count)}')
# 12,645,998 rows before removing duplicates



Event Dataframe Dimension is: (12645998, 12)


                                                                                

In [5]:
raw_events_df = raw_events_df.drop_duplicates()

In [6]:
cols_count = len(raw_events_df.columns)
rows_count = raw_events_df.count()
print(f'Event Dataframe Dimension is: {(rows_count, cols_count)}')



Event Dataframe Dimension is: (737500, 12)


                                                                                

**Checking for missing values**

In [7]:
def count_missing(df):
    return df.select([
        count(when(isnan(c) | col(c).isNull(), c)).alias(c)for c in df.columns
    ])

In [8]:
count_missing(raw_events_df).show()



+--------+----------+---------------+-------------+------------+----------+-------------+--------------+-----------------+-------------------+---------------+---------------+
|Event_ID|Event_Name|Start_Date_Time|End_Date_Time|Event_Agency|Event_Type|Event_Borough|Event_Location|Event_Street_Side|Street_Closure_Type|Community_Board|Police_Precinct|
+--------+----------+---------------+-------------+------------+----------+-------------+--------------+-----------------+-------------------+---------------+---------------+
|       0|        93|              0|            0|           0|         0|            0|             0|           704799|                  0|              0|              0|
+--------+----------+---------------+-------------+------------+----------+-------------+--------------+-----------------+-------------------+---------------+---------------+



                                                                                

Analysis Notes:
- Events_street_side has a lot of missing data - get the percentage of the missing data and consider dropping the columns
- A few missing rows in the event_name column

In [9]:
def missing_summary(df):
    total_rows = df.count()
    
    null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    for col_name, null_count in null_counts.items():
        pct = (null_count / total_rows) * 100
        print(f"{col_name}: {null_count} missing ({pct:.2f}%)")

In [10]:
missing_summary(raw_events_df)



Event_ID: 0 missing (0.00%)
Event_Name: 93 missing (0.01%)
Start_Date_Time: 0 missing (0.00%)
End_Date_Time: 0 missing (0.00%)
Event_Agency: 0 missing (0.00%)
Event_Type: 0 missing (0.00%)
Event_Borough: 0 missing (0.00%)
Event_Location: 0 missing (0.00%)
Event_Street_Side: 704799 missing (95.57%)
Street_Closure_Type: 0 missing (0.00%)
Community_Board: 0 missing (0.00%)
Police_Precinct: 0 missing (0.00%)


                                                                                

Based on the Event Data Information - the open data portal defines the columns with missing data as follows:
- Event Street Side: This is the side of the street that the event will be held on if the location is a street (e.g North, East, West)

- Event Name: Less than 1% missing events name; but event name may not be necessary towards the model as such we can leave the missing cases or impute it with "Other"

Furthermore, looking at the events data frame, the only relevent columns seems to be event name, start date/time, end date/time, event type, borough, location

In [11]:
relevant_cols = [
    'Event_ID',
    'Event_Name',
    'Start_Date_Time',
    'End_Date_Time',
    'Event_Type',
    'Event_Borough',
    'Event_Location'
]

events_clean_df = raw_events_df.select(
        col('Event_ID').alias('event_id'),
        col('Event_Name').alias('event_name'),
        col('Start_Date_Time').alias('start_date_time'),
        col('End_Date_Time').alias('end_date_time'),
        col('Event_Type').alias('event_type'),
        col('Event_Borough').alias('event_borough'),
        col('Event_Location').alias('event_location')
)

# Impute missing Event_Name with "Other"
events_clean_df = events_clean_df.withColumn(
    'event_name',
    coalesce(col('event_name'), lit('Other'))
)

# Converting date/time columns to proper timestamp format of '01/23/2022 08:00:00 AM'
events_clean_df = events_clean_df.withColumn(
    'start_date_time',
    to_timestamp(col('start_date_time'), 'MM/dd/yyyy hh:mm:ss a')
)

events_clean_df = events_clean_df.withColumn(
    'end_date_time',
    to_timestamp(col('end_date_time'), 'MM/dd/yyyy hh:mm:ss a')
)

# Trim whitespace and lower case all rows
events_clean_df = events_clean_df.withColumn('event_name', lower(trim(col('event_name'))))
events_clean_df = events_clean_df.withColumn('event_type', lower(trim(col('event_type'))))
events_clean_df = events_clean_df.withColumn('event_borough', lower(trim(col('event_borough'))))
events_clean_df = events_clean_df.withColumn('event_location', lower(trim(col('event_location'))))
events_clean_df = events_clean_df.withColumn("event_date",to_date(col("start_date_time")))

**Categorizing events by type**

In [12]:
# Categorize events type
events_clean_df = events_clean_df.withColumn(
    "event_category",
    when(col("event_type").isin(
        "Sport - Adult", "Sport - Youth", "Athletic", "Athletic Race / Tour", "Stickball"
    ), "sports")
    .when(col("event_type").isin(
        "Filming/Photography", "Theater Load in and Load Outs", "Production Event", "Open Culture"
    ), "cultural_entertainment")
    .when(col("event_type").isin(
        "Block Party", "Street Event", "Street Festival", "Single Block Festival", "Plaza Event", "Plaza Partner Event",
        "Sidewalk Sale", "Open Street Event", "Open Street Partner Event", "BID Multi-Block", "Bike the Block", "Farmers Market", "Marathon"
    ), "street_festival")
    .when(col("event_type").isin(
        "Rally", "Stationary Demonstration", "Press Conference", "Religious Event", "Health Fair"
    ), "civic_political")
    .otherwise("miscellaneous")
)

In [13]:
events_clean_df.show(5)



+--------+--------------------+-------------------+-------------------+-------------+-------------+--------------------+----------+--------------+
|event_id|          event_name|    start_date_time|      end_date_time|   event_type|event_borough|      event_location|event_date|event_category|
+--------+--------------------+-------------------+-------------------+-------------+-------------+--------------------+----------+--------------+
|  689883|   softball - adults|2023-05-26 09:00:00|2023-05-26 22:00:00|sport - adult|staten island|gen. douglas maca...|2023-05-26| miscellaneous|
|  687425|baseball - 13 and...|2023-06-06 15:00:00|2023-06-06 21:00:00|sport - youth|    manhattan|randall's island ...|2023-06-06| miscellaneous|
|  681427|soccer - non regu...|2023-05-30 18:00:00|2023-05-30 20:00:00|sport - youth|     brooklyn|city line park (i...|2023-05-30| miscellaneous|
|  684248|  soccer -regulation|2023-05-28 18:00:00|2023-05-28 20:00:00|sport - adult|     brooklyn|calvert vaux park..

                                                                                

**Confirming that the dates are between Jan 2022 till Dec 2024**

In [14]:
events_clean_df.select(
    min(col("start_date_time")).alias("min_start_date"),
    max(col("start_date_time")).alias("max_start_date"),
    min(col("end_date_time")).alias("min_end_date"),
    max(col("end_date_time")).alias("max_end_date")
).show()



+-------------------+-------------------+-------------------+-------------------+
|     min_start_date|     max_start_date|       min_end_date|       max_end_date|
+-------------------+-------------------+-------------------+-------------------+
|2022-01-01 00:00:00|2024-12-31 22:00:00|2022-01-01 09:30:00|2026-12-31 23:00:00|
+-------------------+-------------------+-------------------+-------------------+



                                                                                

In [15]:
events_clean_df = events_clean_df.filter(F.col("end_date_time") <  "2025-01-01")

**Removing cases where the event start time is after the end time**

In [16]:
num_start_after_end = events_clean_df.filter(events_clean_df.start_date_time >= events_clean_df.end_date_time).count()
print(f"Number of cases where start time is greater than end time: {num_start_after_end}")



Number of cases where start time is greater than end time: 973


                                                                                

In [17]:
# Current Row counts
cols_count = len(events_clean_df.columns)
rows_count = events_clean_df.count()
print(f'Current Cleaned Events Dataframe Dimension is: {(rows_count, cols_count)}')



Current Cleaned Events Dataframe Dimension is: (737446, 9)


                                                                                

In [18]:
events_clean_df = events_clean_df.filter(events_clean_df.start_date_time < events_clean_df.end_date_time)

Given that there are 973 cases where the start time is after the end time, those cases would be removed

In [19]:
events_clean_df = events_clean_df.filter(
    (F.to_timestamp(F.col("start_date_time")).isNotNull()) &
    (F.to_timestamp(F.col("end_date_time")).isNotNull()) &
    (F.to_timestamp(F.col("end_date_time")) >= F.col("start_date_time"))
)

In [20]:
# Confirm that the rows were removed
cols_count = len(events_clean_df.columns)
rows_count = events_clean_df.count()
print(f'Current Cleaned Events Dataframe Dimension is: {(rows_count, cols_count)}')



Current Cleaned Events Dataframe Dimension is: (736473, 9)


                                                                                

**Deriving Venue Name**

In [21]:
events_clean_df.columns

['event_id',
 'event_name',
 'start_date_time',
 'end_date_time',
 'event_type',
 'event_borough',
 'event_location',
 'event_date',
 'event_category']

In [24]:
# events_clean_df = events_clean_df.withColumn("venue_name", 
#                                             F.split(F.col("Event_Location"), ":").getItem(0))

In [26]:
# venue_names = events_clean_df.groupBy(["venue_name", "event_borough"]).agg(count("event_id").alias("num_events"))
# venue_names_pdf = venue_names.toPandas()

                                                                                

In [24]:
# venue_names_pdf

In [29]:
# venue_names_pdf = venue_names_pdf[venue_names_pdf['num_events'] >= 20]
# venue_names_pdf.to_csv("/home/dataproc/venue_names.csv", index=False)

**Half Hour Buckets of Events**

In [26]:
events_halfhour = (
    events_clean_df
    .withColumn(
        "adjusted_end",
        F.when(
            F.col("end_date_time") - F.expr("INTERVAL 30 minutes") >= F.col("start_date_time"),
            F.col("end_date_time") - F.expr("INTERVAL 30 minutes")
        ).otherwise(F.col("start_date_time"))
    )
    .withColumn(
        "half_hour_array",
        F.sequence(
            F.date_trunc("minute", F.col("start_date_time")),
            F.date_trunc("minute", F.col("adjusted_end")),
            F.expr("INTERVAL 30 minutes")
        )
    )
    .withColumn("half_hour", F.explode("half_hour_array"))
    .drop("half_hour_array", "adjusted_end")
)

In [27]:
events_halfhour_pdf = events_halfhour.limit(100).toPandas()
events_halfhour_pdf.head(5)

                                                                                

Unnamed: 0,event_id,event_name,start_date_time,end_date_time,event_type,event_borough,event_location,event_date,event_category,half_hour
0,618434,soccer -regulation,2022-04-11 08:00:00,2022-04-11 11:00:00,sport - adult,manhattan,riverside park: 107th/108th st-soccer-01 e,2022-04-11,miscellaneous,2022-04-11 08:00:00
1,618434,soccer -regulation,2022-04-11 08:00:00,2022-04-11 11:00:00,sport - adult,manhattan,riverside park: 107th/108th st-soccer-01 e,2022-04-11,miscellaneous,2022-04-11 08:30:00
2,618434,soccer -regulation,2022-04-11 08:00:00,2022-04-11 11:00:00,sport - adult,manhattan,riverside park: 107th/108th st-soccer-01 e,2022-04-11,miscellaneous,2022-04-11 09:00:00
3,618434,soccer -regulation,2022-04-11 08:00:00,2022-04-11 11:00:00,sport - adult,manhattan,riverside park: 107th/108th st-soccer-01 e,2022-04-11,miscellaneous,2022-04-11 09:30:00
4,618434,soccer -regulation,2022-04-11 08:00:00,2022-04-11 11:00:00,sport - adult,manhattan,riverside park: 107th/108th st-soccer-01 e,2022-04-11,miscellaneous,2022-04-11 10:00:00


**Event-level timing + importance features**

In [28]:
from pyspark.sql import Window
df = events_halfhour

# Duration & timing relative to event
events_enriched = (
    df
    .withColumn(
        "event_duration_min",
        (F.unix_timestamp("end_date_time") - F.unix_timestamp("start_date_time")) / 60.0
    )
    .withColumn(
        "minutes_since_start",
        (F.unix_timestamp("half_hour") - F.unix_timestamp("start_date_time")) / 60.0
    )
    .withColumn(
        "minutes_to_end",
        (F.unix_timestamp("end_date_time") - F.unix_timestamp("half_hour")) / 60.0
    )
    # Windows around start / end (0–30 mins from start or end)
    .withColumn(
        "is_start_window",
        F.when(F.col("minutes_since_start").between(0, 30), 1).otherwise(0)
    )
    .withColumn(
        "is_end_window",
        F.when(F.col("minutes_to_end").between(0, 30), 1).otherwise(0)
    )
    # Crude importance score by event_type (adjust weights as you like)
    .withColumn(
        "event_importance_score",
        F.when(F.col("event_type").rlike("concert|parade|festival"), 3.0)
         .when(F.col("event_type").rlike("sport"), 2.0)
         .otherwise(1.0)
    )
)

**Base metrics per borough × half_hour**

In [29]:
base_agg = (
    events_enriched
    .groupBy("event_borough", "half_hour")
    .agg(
        F.countDistinct("event_id").alias("total_events"),
        F.avg("event_duration_min").alias("avg_event_duration_min"),
        F.sum("event_importance_score").alias("total_event_importance"),
        F.max("is_start_window").alias("event_start_flag"),
        F.max("is_end_window").alias("event_end_flag")
    )
)

In [30]:
base_agg.show(5)



+-------------+-------------------+------------+----------------------+----------------------+----------------+--------------+
|event_borough|          half_hour|total_events|avg_event_duration_min|total_event_importance|event_start_flag|event_end_flag|
+-------------+-------------------+------------+----------------------+----------------------+----------------+--------------+
|    manhattan|2022-02-11 23:00:00|          47|     57728.53448275862|                  58.0|               0|             0|
|    manhattan|2022-06-22 18:00:00|         222|    42827.864341085275|                 414.0|               1|             1|
|        bronx|2024-01-29 18:30:00|           7|               13747.5|                  14.0|               0|             1|
|    manhattan|2022-07-27 12:30:00|         227|    42259.874524714825|                 409.0|               1|             1|
|     brooklyn|2022-03-23 10:30:00|         141|     84149.46078431372|                 264.0|               1|

                                                                                

**Pivot event_type to counts per type**

In [31]:
type_agg = (
    events_enriched
    .groupBy("event_borough", "half_hour")
    .pivot("event_category")
    .agg(F.countDistinct("event_id"))
)

# Optional: fill nulls with 0 for the pivoted columns
type_agg = type_agg.fillna(0)

                                                                                

**Combine base metrics + type counts**

In [32]:
events_features = (
    base_agg
    .join(type_agg, ["event_borough", "half_hour"], "left")
)

**Add time-of-day / calendar features**

In [33]:
events_features = (
    events_features
    .withColumn("hour_of_day", F.hour("half_hour"))
    .withColumn("day_of_week", F.dayofweek("half_hour"))  # 1=Sun, 7=Sat
    .withColumn("is_weekend", F.col("day_of_week").isin(1, 7).cast("int"))
)
#Note - consider adding an is_holiday flag as well

**Final Events Data**

In [34]:
events_features_pdf = events_features.limit(5).toPandas()
events_features_pdf

                                                                                

Unnamed: 0,event_borough,half_hour,total_events,avg_event_duration_min,total_event_importance,event_start_flag,event_end_flag,miscellaneous,hour_of_day,day_of_week,is_weekend
0,bronx,2024-06-25 13:00:00,84,3866.811765,151.0,1,0,84,13,3,0
1,brooklyn,2022-02-06 09:30:00,68,11572.816901,99.0,1,1,68,9,1,1
2,brooklyn,2022-03-23 10:30:00,141,84149.460784,264.0,1,0,141,10,4,0
3,queens,2022-08-15 08:30:00,79,128359.166667,140.0,1,0,79,8,2,0
4,manhattan,2022-07-07 05:30:00,13,244849.025,40.0,0,0,13,5,5,0


### Taxi Data Cleaning

In [35]:
taxi_dedup = raw_taxi_df.dropDuplicates()

In [36]:
numeric_double_cols = [
    "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount",
    "tolls_amount", "improvement_surcharge", "total_amount",
    "congestion_surcharge", "airport_fee"
]

numeric_int_cols = [
    "VendorID", "passenger_count", "RatecodeID",
    "PULocationID", "DOLocationID", "payment_type"
]
taxi_dedup = taxi_dedup.withColumn(
    "pickup_ts_utc", F.to_timestamp("tpep_pickup_datetime")
).withColumn(
    "dropoff_ts_utc", F.to_timestamp("tpep_dropoff_datetime")
)

taxi_dedup = taxi_dedup.withColumn(
    "pickup_ts_local", F.from_utc_timestamp("pickup_ts_utc", "America/New_York")
).withColumn(
    "dropoff_ts_local", F.from_utc_timestamp("dropoff_ts_utc", "America/New_York")
)

for col in numeric_double_cols:
    taxi_dedup = taxi_dedup.withColumn(col, F.col(col).cast("double"))

for col in numeric_int_cols:
    taxi_dedup = taxi_dedup.withColumn(col, F.col(col).cast("int"))

In [37]:
clean = taxi_dedup.filter(
    (F.col("tpep_pickup_datetime") >= "2022-01-01") &
    (F.col("tpep_pickup_datetime") <  "2025-01-01") &
    (F.col("tpep_dropoff_datetime") >= "2022-01-01") &
    (F.col("tpep_dropoff_datetime") <  "2025-01-01") &
    (F.col("tpep_dropoff_datetime") >= F.col("tpep_pickup_datetime")) &

    (F.col("trip_distance") > 0) &
    (F.col("fare_amount") >= 0) &
    (F.col("extra") >= 0) &
    (F.col("mta_tax") >= 0) &
    (F.col("tip_amount") >= 0) &
    (F.col("tolls_amount") >= 0) &
    (F.col("improvement_surcharge") >= 0) &
    (F.col("total_amount") >= 0) &
    (F.col("congestion_surcharge") >= 0) &
    (F.col("PULocationID").between(1, 263)) &
    (F.col("DOLocationID").between(1, 263)) &
    (
        F.abs(
            F.col("total_amount") -
            (
                F.col("fare_amount") +
                F.col("extra") +
                F.col("mta_tax") +
                F.col("tip_amount") +
                F.col("tolls_amount") +
                F.col("improvement_surcharge") +
                F.col("congestion_surcharge") +
                F.col("airport_fee")
            )
        ) < 1e-4 
    )
)
print("Cleaned rows:", clean.count())



Cleaned rows: 81050572


                                                                                

In [38]:
clean = clean.withColumn("pickup_hour", F.date_trunc("hour", "pickup_ts_local"))
clean = clean.withColumn("pickup_date", F.to_date("pickup_ts_local"))

for col in numeric_double_cols:
    clean = clean.withColumn(col, F.col(col).cast("double"))

for col in numeric_int_cols:
    clean = clean.withColumn(col, F.col(col).cast("int"))

In [39]:
clean_withspeed = clean.withColumn(
    "trip_time_hours",
    (F.col("dropoff_ts_local").cast("long") -
     F.col("pickup_ts_local").cast("long")) / 3600.0
).withColumn(
    "speed_mph",
    F.col("trip_distance") / F.col("trip_time_hours")
)

In [40]:
clean_withspeed = clean_withspeed.filter(F.col("speed_mph") <= 200)
clean_withspeed = clean_withspeed.filter(F.col("trip_distance") <= 150)
clean_withspeed = clean_withspeed.filter(F.col("tip_amount") <= F.col("fare_amount") * 0.5)
clean_withspeed = clean_withspeed.filter(F.col("tolls_amount") <= 60)
clean_withspeed = clean_withspeed.filter(F.col("payment_type").isin(1, 2)) \
             .filter(F.col("RatecodeID") == 1)
clean_withspeed = clean_withspeed.withColumn(
    "fare_min",
    4.5 + 3.5 * F.col("trip_distance")
).withColumn(
    "fare_max",
    (10.5 + 3.5 * F.col("trip_distance")) * 1.4
).filter(
    (F.col("fare_amount") >= F.col("fare_min")) &
    (F.col("fare_amount") <= F.col("fare_max"))
)

print("Cleaned rows:", clean_withspeed.count())



Cleaned rows: 43803815


                                                                                

In [41]:
# taxi_pdf = clean_withspeed.limit(5).toPandas()
# taxi_pdf

### Weather Data

In [42]:
weather_dedup = raw_weather_df.drop_duplicates()

In [43]:
cols_to_drop = [
    "TEMP_ATTRIBUTES",
    "DEWP_ATTRIBUTES",
    "SLP_ATTRIBUTES",
    "STP_ATTRIBUTES",
    "VISIB_ATTRIBUTES",
    "WDSP_ATTRIBUTES",
    "MAX_ATTRIBUTES",
    "MIN_ATTRIBUTES",
    "PRCP_ATTRIBUTES",
    "STP"
]
weather_clean = weather_dedup.drop(*cols_to_drop)

weather_clean = weather_clean.withColumn("DATE", F.to_date("DATE", "yyyy-MM-dd"))

rename_map = {
    "STATION": "station_id",
    "DATE": "timestamp_utc",
    "LATITUDE": "lat",
    "LONGITUDE": "lon",
    "ELEVATION": "elevation_m",
    "NAME": "station_name",
    "TEMP": "temp_avg",
    "DEWP": "dew_point",
    "VISIB": "visibility_mi",
    "WDSP": "wind_speed_avg",
    "MXSPD": "wind_speed_max",
    "GUST": "wind_gust",
    "MAX": "temp_max",
    "MIN": "temp_min",
    "PRCP": "precip_in",
    "SNDP": "snow_depth_in",
    "FRSHTT": "weather_flags",
    "SLP": "pressure_sea_level"
}

weather_new = weather_clean
for old, new in rename_map.items():
    weather_new = weather_new.withColumnRenamed(old, new)

weather_new.printSchema()

root
 |-- station_id: long (nullable = true)
 |-- timestamp_utc: date (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- elevation_m: double (nullable = true)
 |-- station_name: string (nullable = true)
 |-- temp_avg: double (nullable = true)
 |-- dew_point: double (nullable = true)
 |-- pressure_sea_level: double (nullable = true)
 |-- visibility_mi: double (nullable = true)
 |-- wind_speed_avg: double (nullable = true)
 |-- wind_speed_max: double (nullable = true)
 |-- wind_gust: double (nullable = true)
 |-- temp_max: double (nullable = true)
 |-- temp_min: double (nullable = true)
 |-- precip_in: double (nullable = true)
 |-- snow_depth_in: double (nullable = true)
 |-- weather_flags: integer (nullable = true)



In [44]:
sentinels = {
    "snow_depth_in": 999.9,
    "precip_in": 99.99,
    "visibility_mi": 99.0,
    "wind_speed_avg": 999.0,
    "wind_speed_max": 999.0,
    "wind_gust": 999.9,
    "temp_avg": 9999.9,
    "dew_point": 9999.9,
    "pressure_sea_level": 9999.9,
}

weather_new = weather_new
for col, bad_val in sentinels.items():
    weather_new = weather_new.withColumn(
        col,
        F.when(F.col(col) == bad_val, None).otherwise(F.col(col))
    )

In [45]:
weather_desc = weather_new.describe( "dew_point",
    "visibility_mi", "wind_speed_avg", "wind_speed_max", "wind_gust",
    "precip_in", "snow_depth_in",
    "pressure_sea_level"
).toPandas()
weather_desc

Unnamed: 0,summary,dew_point,visibility_mi,wind_speed_avg,wind_speed_max,wind_gust,precip_in,snow_depth_in,pressure_sea_level
0,count,4384.0,4384.0,4384.0,4384.0,3048.0,4384.0,127.0,4382.0
1,mean,42.88713503649637,9.269548357664236,7.78138686131387,14.1016651459854,23.884416010498697,0.1359557481751824,2.4007874015748034,1016.566362391602
2,stddev,17.6818390443666,1.471865955900764,3.7189981320943657,5.407753322279354,6.512064231723263,0.3555216145225023,2.041610633361004,7.546474161512586
3,min,-10.8,0.7,0.5,2.9,14.0,0.0,1.2,989.4
4,max,76.0,10.0,23.7,36.9,56.9,4.9,11.0,1046.0


In [46]:
borough_station_map = [
    ("Manhattan", "72505394728"),  # Central Park
    ("Bronx", "72505394728"),
    ("Brooklyn", "72505394728"),
    ("Queens", "72503014732"),     # LaGuardia
    ("Staten Island", "72502014734"),  # Newark
    ("EWR", "72502014734"),        # Newark Airport
    ("JFK", "74486094789"),        # JFK Airport
]

mapping_df = spark.createDataFrame(borough_station_map, ["Borough", "station_id"])

In [47]:
weather_stations = weather_new.select(
    F.col("station_id").alias("weather_station_id"),
    "station_name"
).distinct()

mapping_full = mapping_df.join(
    weather_stations,
    mapping_df.station_id == weather_stations.weather_station_id,
    "left"
).select(
    "Borough",
    "station_id",
    "station_name"
)

### Join Taxi and Weather Data with zones respectively

In [48]:

zones_clean = raw_zones_df.select(
    F.col("LocationID").alias("PULocationID"),
    "Borough"
)

zone_station_map = zones_clean.join(
    mapping_full.select("Borough", "station_id", "station_name"),
    on="Borough",
    how="left"
)

In [49]:
taxi_with_station = clean_withspeed.join(
    zone_station_map,
    on="PULocationID",
    how="left"
)


In [50]:
weather_daily = weather_new.withColumnRenamed(
    "timestamp_utc", "weather_date"
)

In [51]:
# taxi_with_station.limit(10).toPandas().to_csv("/home/jupyter/taxi_sample.csv", index=False)

### Taxi Features

**Trip Level Feature Engineering**

In [52]:

# Hour, day-of-week, weekend, month (based on local pickup timestamp)
taxi_features = (
    taxi_with_station
    .withColumn("pickup_hour_of_day", F.hour("pickup_ts_local"))
    .withColumn("day_of_week", F.dayofweek("pickup_ts_local"))  # 1=Sunday, 7=Saturday
    .withColumn("is_weekend", F.col("day_of_week").isin(1, 7).cast("int"))
    .withColumn("month", F.month("pickup_ts_local"))
)


taxi_features = (
    taxi_features
    .withColumn(
        "hour_angle",
        2 * F.lit(pi) * (F.col("pickup_hour_of_day") / F.lit(24.0))
    )
    .withColumn("hour_sin", F.sin("hour_angle"))
    .withColumn("hour_cos", F.cos("hour_angle"))
    .drop("hour_angle")  # helper column
)


# Extreme speed flag
taxi_features = taxi_features.withColumn(
    "is_extreme_speed",
    (F.col("speed_mph") > 70).cast("int")
)

# Fare per mile (proxy for surge / pricing anomalies)
taxi_features = taxi_features.withColumn(
    "fare_per_mile",
    F.col("total_amount") / (F.col("trip_distance") + F.lit(1e-3))
)

# Compute 95th percentile threshold for fare_per_mile (action, not a transformation)
high_ppm_threshold = taxi_features.approxQuantile("fare_per_mile", [0.95], 0.01)[0]

taxi_features = taxi_features.withColumn(
    "is_high_ppm",
    (F.col("fare_per_mile") > F.lit(high_ppm_threshold)).cast("int")
)

# Proximity to "station" (if station_id indicates some transit anchor)
taxi_features = taxi_features.withColumn(
    "near_station_pickup",
    F.col("station_id").isNotNull().cast("int")
)

# Tip rate
taxi_features = taxi_features.withColumn(
    "tip_rate",
    F.col("tip_amount") / (F.col("fare_amount") + F.lit(1e-3))
)

# Total fees
taxi_features = taxi_features.withColumn(
    "total_fees",
    F.col("airport_fee") + F.col("congestion_surcharge") + F.col("extra")
)

                                                                                

**Create 30-minute buckets aligned with events**

In [53]:
taxi_features = taxi_features.withColumn(
    "pickup_half_hour",
    F.when(
        F.minute("pickup_ts_local") < 30,
        F.date_trunc("hour", "pickup_ts_local")
    ).otherwise(
        F.expr("date_trunc('hour', pickup_ts_local) + INTERVAL 30 MINUTES")
    )
)

**Aggregate to Borough × half-hour level**

In [54]:
taxi_half_hour = (
    taxi_features
    .groupBy("Borough", "pickup_half_hour")
    .agg(
        # Volume
        F.count("*").alias("trip_count"),
        F.sum("passenger_count").alias("total_passengers"),
        F.avg("passenger_count").alias("avg_passenger_count"),

        # Distance & speed
        F.avg("trip_distance").alias("avg_trip_distance"),
        F.avg("speed_mph").alias("avg_speed_mph"),

        # Fares
        F.avg("fare_amount").alias("avg_fare_amount"),
        F.avg("total_amount").alias("avg_total_amount"),
        F.avg("tip_rate").alias("avg_tip_rate"),
        F.avg("fare_per_mile").alias("avg_fare_per_mile"),
        F.sum("total_fees").alias("sum_total_fees"),

        # Pricing & quality flags
        F.sum("is_high_ppm").alias("num_high_ppm_trips"),
        F.sum("is_extreme_speed").alias("num_extreme_speed_trips"),
        F.sum("near_station_pickup").alias("num_near_station_pickups"),

        # Optional: keep cyclical summaries (averages)
        F.avg("hour_sin").alias("avg_hour_sin"),
        F.avg("hour_cos").alias("avg_hour_cos"),
    )
)

# Derive time features at the bucket level (for modeling)
taxi_half_hour = (
    taxi_half_hour
    .withColumn("day_of_week", F.dayofweek("pickup_half_hour"))
    .withColumn("is_weekend", F.col("day_of_week").isin(1, 7).cast("int"))
    .withColumn("month", F.month("pickup_half_hour"))
    .withColumn("hour_of_day", F.hour("pickup_half_hour"))
)

# Rename to match with events table
taxi_half_hour = taxi_half_hour.withColumnRenamed("Borough", "borough")
taxi_half_hour = taxi_half_hour.withColumnRenamed("pickup_half_hour", "half_hour")

In [55]:
final_taxi = taxi_half_hour
final_taxi_pdf = final_taxi.limit(5).toPandas()
final_taxi_pdf

25/11/17 01:17:33 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,borough,half_hour,trip_count,total_passengers,avg_passenger_count,avg_trip_distance,avg_speed_mph,avg_fare_amount,avg_total_amount,avg_tip_rate,...,sum_total_fees,num_high_ppm_trips,num_extreme_speed_trips,num_near_station_pickups,avg_hour_sin,avg_hour_cos,day_of_week,is_weekend,month,hour_of_day
0,Manhattan,2022-12-19 09:00:00,1669,2487,1.490114,2.44435,8.739558,16.776872,24.068969,0.187934,...,4335.75,78,0,1669,0.707107,-0.707107,2,0,12,9
1,Manhattan,2023-06-29 04:30:00,1109,1510,1.361587,2.368693,9.34646,15.893508,23.222011,0.203242,...,2833.5,42,0,1109,0.866025,0.5,5,0,6,4
2,Manhattan,2024-06-02 13:30:00,1560,2238,1.434615,2.419167,8.533289,16.964423,24.206019,0.18629,...,3959.5,51,0,1560,-0.258819,-0.965926,1,1,6,13
3,Queens,2023-11-01 13:30:00,126,176,1.396825,11.025714,17.392578,49.544444,72.715714,0.190028,...,1133.5,0,0,126,-0.258819,-0.965926,4,0,11,13
4,Queens,2024-03-31 20:00:00,101,130,1.287129,10.530792,28.1754,43.024752,63.052871,0.196488,...,767.5,0,0,101,-0.866025,0.5,1,1,3,20


### Weather Features

In [56]:
# weather_daily.limit(10).toPandas().to_csv("/home/jupyter/weather_sample.csv", index=False)

In [57]:
weather_daily = weather_daily.join(
                mapping_df,
                (weather_daily.station_id == mapping_df.station_id),
                "left"
)

weather_daily = weather_daily.drop(weather_daily.columns[-1])  
weather_daily = weather_daily.withColumnRenamed("Borough", "borough")


In [58]:
# ---- Base casting & temporal features ----
weather_features = (
    weather_daily
    # ensure date type
    .withColumn("weather_date", F.to_date("weather_date"))
    # temporal features
    .withColumn("day_of_week", F.dayofweek("weather_date"))  # 1 = Sun, 7 = Sat
    .withColumn("is_weekend", F.when(F.col("day_of_week").isin(1, 7), 1).otherwise(0))
    .withColumn("month", F.month("weather_date"))
    .withColumn(
        "season",
        F.when(F.col("month").isin(12, 1, 2), "winter")
         .when(F.col("month").isin(3, 4, 5), "spring")
         .when(F.col("month").isin(6, 7, 8), "summer")
         .otherwise("fall")
    )
)

# ---- Weather condition flags ----
weather_features = (
    weather_features
    .withColumn("is_precip", F.when(F.col("precip_in") > 0, 1).otherwise(0))
    .withColumn("is_heavy_rain", F.when(F.col("precip_in") >= 0.3, 1).otherwise(0))
    .withColumn("is_snow", F.when(F.col("snow_depth_in") > 0, 1).otherwise(0))
    .withColumn("is_low_visibility", F.when(F.col("visibility_mi") < 5, 1).otherwise(0))
    .withColumn("is_windy", F.when(F.col("wind_speed_max") >= 20, 1).otherwise(0))
    .withColumn("is_storm_gust", F.when(F.col("wind_gust") >= 35, 1).otherwise(0))
)

# ---- Derived numeric severity features ----
weather_features = (
    weather_features
    .withColumn("temp_range", F.col("temp_max") - F.col("temp_min"))
    .withColumn("wind_ratio", F.col("wind_speed_max") / F.col("wind_speed_avg"))
    .withColumn("wind_severity", F.col("wind_gust") - F.col("wind_speed_avg"))
    .withColumn("is_cold_snap", F.when(F.col("temp_min") < 32, 1).otherwise(0))
    .withColumn("is_heat_wave", F.when(F.col("temp_max") > 85, 1).otherwise(0))
)

In [59]:
weather_features.columns

['weather_date',
 'lat',
 'lon',
 'elevation_m',
 'station_name',
 'temp_avg',
 'dew_point',
 'pressure_sea_level',
 'visibility_mi',
 'wind_speed_avg',
 'wind_speed_max',
 'wind_gust',
 'temp_max',
 'temp_min',
 'precip_in',
 'snow_depth_in',
 'weather_flags',
 'borough',
 'day_of_week',
 'is_weekend',
 'month',
 'season',
 'is_precip',
 'is_heavy_rain',
 'is_snow',
 'is_low_visibility',
 'is_windy',
 'is_storm_gust',
 'temp_range',
 'wind_ratio',
 'wind_severity',
 'is_cold_snap',
 'is_heat_wave']

### Save Featured Tables to GCS

Note: I recommend having separate tables rather than 1 big table, otherwise it becomes had to manage

In addition, i am considering aggregating at the venue name level as opposed to borough name. Joining at this stage may lead to re-work

In [60]:
bucket = "msca-bdp-student-gcs/Group_1_final_project"
prefix = "curated"               

weather_path = f"gs://{bucket}/{prefix}/weather_curated"
events_path = f"gs://{bucket}/{prefix}/events_curated"
taxi_path = f"gs://{bucket}/{prefix}/taxi_curated"

In [61]:
weather_features.write \
    .mode("overwrite") \
    .parquet(weather_path)

                                                                                

In [62]:
events_features.write \
    .mode("overwrite") \
    .parquet(events_path)

                                                                                

In [63]:
final_taxi.write \
    .mode("overwrite") \
    .parquet(taxi_path)

                                                                                