## MTA Service Announcement Preprocessing

In [19]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import to_date, hour
from pyspark.sql.types import DateType, IntegerType
import pandas as pd
import datetime


In [2]:
spark = SparkSession.builder \
    .appName("ReadCSV") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/25 13:07:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/25 13:07:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/08/25 13:07:34 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema explicitly
schema = StructType([
    StructField("Alert ID", IntegerType(), True),
    StructField("Event ID", IntegerType(), True),
    StructField("Update Number", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Agency", StringType(), True),
    StructField("Status Label", StringType(), True),
    StructField("Affected", StringType(), True),
])

# Read the CSV file with the defined schema
df = spark.read.schema(schema).option("header", "true").csv("../data/landing/mta.csv")


In [4]:
df.printSchema()

root
 |-- Alert ID: integer (nullable = true)
 |-- Event ID: integer (nullable = true)
 |-- Update Number: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Status Label: string (nullable = true)
 |-- Affected: string (nullable = true)



In [20]:
df.show()

+--------+--------+-------------+--------------------+-----------+--------------------+--------------------+
|Alert ID|Event ID|Update Number|                Date|     Agency|        Status Label|            Affected|
+--------+--------+-------------+--------------------+-----------+--------------------+--------------------+
|  337474|  163057|            0|07/30/2024 11:56:...|NYCT Subway|              delays|                   D|
|  337473|  163056|            0|07/30/2024 11:46:...|NYCT Subway|              delays|                   6|
|  337469|  163055|            0|07/30/2024 11:42:...|   NYCT Bus|      part-suspended|                M14D|
|  337471|  163055|            1|07/30/2024 11:44:...|   NYCT Bus|      part-suspended|                M14D|
|  337468|  163054|            0|07/30/2024 11:42:...|   NYCT Bus|              detour|                M14A|
|  337470|  163054|            1|07/30/2024 11:43:...|   NYCT Bus|              detour|                M14A|
|  337467|  163053|

In [21]:
df.count()

334431

In [23]:
# Filter based on the minimum possible values for numerical features
df = df.where((F.col('Update Number') >= 0) &
              (F.col('Alert ID') >= 0) &
              (F.col('Event ID') >= 0))

df.count()

334431

In [6]:
df_converted = df.drop('Affected').drop('Event ID')
df_converted.show()

+--------+-------------+--------------------+-----------+--------------------+
|Alert ID|Update Number|                Date|     Agency|        Status Label|
+--------+-------------+--------------------+-----------+--------------------+
|  337474|            0|07/30/2024 11:56:...|NYCT Subway|              delays|
|  337473|            0|07/30/2024 11:46:...|NYCT Subway|              delays|
|  337469|            0|07/30/2024 11:42:...|   NYCT Bus|      part-suspended|
|  337471|            1|07/30/2024 11:44:...|   NYCT Bus|      part-suspended|
|  337468|            0|07/30/2024 11:42:...|   NYCT Bus|              detour|
|  337470|            1|07/30/2024 11:43:...|   NYCT Bus|              detour|
|  337467|            0|07/30/2024 11:32:...|       LIRR|         some-delays|
|  337464|            0|07/30/2024 11:24:...|       LIRR|         some-delays|
|  337466|            1|07/30/2024 11:29:...|       LIRR|         some-delays|
|  337463|            0|07/30/2024 11:18:...|NYCT Su

In [7]:
# Define the date format used in the CSV
date_format = "MM/dd/yyyy hh:mm:ss a"

# Convert the Date column to timestamp
df_converted = df_converted.withColumn("Date", to_timestamp(col("Date"), date_format))

# Now want to make sure to include necessary dates
# Define the date range
start_date = '2023-07-01'
end_date = '2023-12-31'

# Filter the DataFrame
df_converted = df_converted.filter((col("Date") >= start_date) & (col("Date") <= end_date)).orderBy("Date")

df_converted.show(truncate=False)
df_converted.printSchema()



+--------+-------------+-------------------+-----------+------------------------------+
|Alert ID|Update Number|Date               |Agency     |Status Label                  |
+--------+-------------+-------------------+-----------+------------------------------+
|233465  |1            |2023-07-01 00:01:00|NYCT Bus   |buses-detoured                |
|233466  |2            |2023-07-01 00:05:00|LIRR       |delays                        |
|233467  |0            |2023-07-01 00:10:00|LIRR       |some-delays                   |
|233468  |1            |2023-07-01 00:12:00|NYCT Subway|delays                        |
|233469  |0            |2023-07-01 00:17:00|NYCT Subway|express-to-local | some-delays|
|233470  |0            |2023-07-01 00:21:00|LIRR       |some-delays                   |
|233471  |0            |2023-07-01 00:23:00|LIRR       |some-delays                   |
|233472  |3            |2023-07-01 00:26:00|LIRR       |some-delays                   |
|233473  |1            |2023-07-

                                                                                

In [8]:
# Need to remove any rows which involve service announcements affecting the Long Island Rail Road (LIRR) and Metro-North Rail Road (MNR) service as it is not contained in our defined geographical area
df_converted = df_converted.filter(~df_converted["Agency"].contains("LIRR"))
df_converted = df_converted.filter(~df_converted["Agency"].contains("MNR"))
df_converted = df_converted.filter(~df_converted["Agency"].contains("BT"))

# Extract the date from the 'Date' column
df_converted = df_converted.withColumn('date_only', to_date(df_converted['Date']))

# Extract the hour from the 'Date' column
df_converted = df_converted.withColumn('hour', hour(df_converted['Date']))


df_converted.show(truncate=False)


+--------+-------------+-------------------+-----------+------------------------------+----------+----+
|Alert ID|Update Number|Date               |Agency     |Status Label                  |date_only |hour|
+--------+-------------+-------------------+-----------+------------------------------+----------+----+
|233465  |1            |2023-07-01 00:01:00|NYCT Bus   |buses-detoured                |2023-07-01|0   |
|233468  |1            |2023-07-01 00:12:00|NYCT Subway|delays                        |2023-07-01|0   |
|233469  |0            |2023-07-01 00:17:00|NYCT Subway|express-to-local | some-delays|2023-07-01|0   |
|233473  |1            |2023-07-01 00:29:00|NYCT Subway|delays                        |2023-07-01|0   |
|233474  |0            |2023-07-01 00:32:00|NYCT Subway|some-delays | local-to-express|2023-07-01|0   |
|233477  |0            |2023-07-01 00:55:00|NYCT Subway|delays                        |2023-07-01|0   |
|233479  |1            |2023-07-01 01:02:00|NYCT Subway|stations

In [9]:
from pyspark.sql import functions as F

# Group by 'date_only', 'hour', and 'Agency' and count the occurrences
grouped_df = df_converted.groupBy("date_only", "hour", "Agency").count()

# Pivot the 'Agency' column to create separate columns for each agency
pivot_df = grouped_df.groupBy("date_only", "hour") \
    .pivot("Agency", ["NYCT Bus", "NYCT Subway"]) \
    .agg(F.first("count"))

# Rename the columns to 'bus_count', 'subway_count', and 'bt_count'
pivot_df = pivot_df.withColumnRenamed("NYCT Bus", "bus_count") \
                   .withColumnRenamed("NYCT Subway", "subway_count")

# Fill null values with 0, in case some combinations don't have counts
pivot_df = pivot_df.fillna(0)

# rename dat_only to date for consistency
pivot_df = pivot_df.withColumnRenamed("date_only", "date")

# Show the results
pivot_df.show()



+----------+----+---------+------------+
|      date|hour|bus_count|subway_count|
+----------+----+---------+------------+
|2023-12-28|   9|        0|           7|
|2023-12-30|  23|        1|           5|
|2023-10-10|  10|        8|           5|
|2023-08-04|   0|        5|           6|
|2023-07-25|   9|        4|           2|
|2023-11-01|   8|        1|           3|
|2023-09-16|   8|        1|           0|
|2023-10-21|   9|        2|           0|
|2023-07-08|   4|        0|           9|
|2023-07-05|  18|        2|           6|
|2023-07-09|   2|        0|           4|
|2023-08-21|   7|        0|           6|
|2023-09-29|   5|        3|           2|
|2023-09-21|  12|        5|          12|
|2023-07-14|  11|        9|          12|
|2023-12-07|  17|        2|          13|
|2023-09-19|  14|        1|           4|
|2023-07-18|   8|        1|           5|
|2023-10-05|  17|        0|           5|
|2023-11-17|  15|        0|           8|
+----------+----+---------+------------+
only showing top

In [10]:
# checking to see if all dates are included
(30+30+31+31+31+31)*24

4416

In [11]:
pivot_df.count()

4281

In [12]:
# not all dates are included, now need to include them all
all_dates = pd.date_range(start=start_date, end=end_date).date

# Generate all hours
all_hours = list(range(0, 24))

# Create DataFrame with all date and hour combinations
all_combinations = [(date, hour) for date in all_dates for hour in all_hours]
all_combinations_df = spark.createDataFrame(all_combinations, ["date", "hour"])

# Ensure 'date' is of DateType and 'hour' is of IntegerType
all_combinations_df = all_combinations_df.withColumn("date", col("date").cast(DateType()))
all_combinations_df = all_combinations_df.withColumn("hour", col("hour").cast(IntegerType()))

In [14]:
# Perform a full outer join with the original DataFrame
full_df = all_combinations_df.join(pivot_df, on=["date", "hour"], how="left")

# Fill missing counts with 0
full_df = full_df.fillna({"bus_count": 0, "subway_count": 0})

# Sort the DataFrame by 'date' and 'hour'
full_df = full_df.orderBy(["date", "hour"])

# Show the resulting DataFrame
full_df.show()

+----------+----+---------+------------+
|      date|hour|bus_count|subway_count|
+----------+----+---------+------------+
|2023-07-01|   0|        1|           5|
|2023-07-01|   1|        1|           7|
|2023-07-01|   2|        2|           2|
|2023-07-01|   3|        0|           1|
|2023-07-01|   4|        0|          11|
|2023-07-01|   5|        0|           7|
|2023-07-01|   6|        0|           3|
|2023-07-01|   7|        0|           6|
|2023-07-01|   8|        4|           4|
|2023-07-01|   9|        1|           3|
|2023-07-01|  10|        0|           6|
|2023-07-01|  11|        0|           5|
|2023-07-01|  12|        3|           4|
|2023-07-01|  13|        0|           3|
|2023-07-01|  14|        0|           4|
|2023-07-01|  15|        2|           6|
|2023-07-01|  16|        0|           1|
|2023-07-01|  17|        0|           2|
|2023-07-01|  18|        1|           2|
|2023-07-01|  19|        1|           4|
+----------+----+---------+------------+
only showing top

                                                                                

24/08/25 13:07:54 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [15]:
# Checking to see if all dates and hours are now included
#  Get the number of columns in the DataFrame
num_rows = full_df.count()

# Print the number of rows
print(f"Number of rows: {num_rows}")

Number of rows: 4416


In [17]:
full_df.write.mode("overwrite").option("header", "true").csv("../data/curated/hourly_service_counts.csv")

