## SETUP

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# start or reuse spark session
spark = (
    SparkSession.builder
        .appName("Walrus_Modeling")
        .getOrCreate()
)

# path to joined parquet folder
joined_path = "/home/wlevine/Walrus/Processed/hourly_panel"

# load the final joined dataframe
final_df = spark.read.parquet(joined_path)

print("=== Joined Data Schema ===")
final_df.printSchema()

print("Row count:", final_df.count())

final_df.show(10, truncate=False)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/24 15:36:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/24 15:36:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/24 15:36:34 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/11/24 15:36:34 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/11/24 15:36:34 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/11/24 15:36:34 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/11/24 15:36:34 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/11/24 15:36:34 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.


=== Joined Data Schema ===
root
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday_num: integer (nullable = true)
 |-- is_weekend: integer (nullable = true)
 |-- tod_bucket: string (nullable = true)
 |-- grid_key: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- station_complex: string (nullable = true)
 |-- flow_sum: long (nullable = true)
 |-- ridership_sum: long (nullable = true)
 |-- transfers_sum: long (nullable = true)
 |-- borough_clean: string (nullable = true)
 |-- complaints_total: long (nullable = true)
 |-- complaints_noise: long (nullable = true)
 |-- complaints_heat: long (nullable = true)
 |-- complaints_other: long (nullable = true)
 |-- borough_final: string (nullable = true)



                                                                                

Row count: 83449
+----------+----+-----------+----------+----------+----------------+-------------+-----------------------------------+--------+-------------+-------------+-------------+----------------+----------------+---------------+----------------+-------------+
|date      |hour|weekday_num|is_weekend|tod_bucket|grid_key        |borough      |station_complex                    |flow_sum|ridership_sum|transfers_sum|borough_clean|complaints_total|complaints_noise|complaints_heat|complaints_other|borough_final|
+----------+----+-----------+----------+----------+----------------+-------------+-----------------------------------+--------+-------------+-------------+-------------+----------------+----------------+---------------+----------------+-------------+
|2024-02-10|3   |7          |1         |Off-Peak  |40.6932_-73.99  |Brooklyn     |Court St (R)/Borough Hall (2,3,4,5)|12      |12           |0            |NULL         |0               |0               |0              |0          

## Model 1 - GBT

In [2]:
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# -----------------------------
# prepare modeling dataframe
# -----------------------------
model_df = (
    final_df
        .filter(F.col("flow_sum").isNotNull())
        .filter(F.col("borough_final").isNotNull())
        .fillna({
            "complaints_total": 0,
            "complaints_noise": 0,
            "complaints_heat": 0,
            "complaints_other": 0
        })
)

print("Modeling row count:", model_df.count())

model_df.select("flow_sum").summary().show()

Modeling row count: 83449


[Stage 8:>                                                        (0 + 50) / 50]

+-------+-----------------+
|summary|         flow_sum|
+-------+-----------------+
|  count|            83449|
|   mean|274.6747115004374|
| stddev| 401.913912972355|
|    min|                1|
|    25%|               29|
|    50%|              122|
|    75%|              341|
|    max|             4925|
+-------+-----------------+



                                                                                

In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# label
label_col = "flow_sum"

# numeric features (time + complaint load)
num_cols = [
    "hour",
    "weekday_num",
    "is_weekend",
    "complaints_total",
    "complaints_noise",
    "complaints_heat",
    "complaints_other"
]

# categorical features (keep it clean, not too many levels)
cat_cols = [
    "tod_bucket",      # AM peak / PM peak / Off-Peak
    "borough_final"    # borough after cleanup
]

# index + one-hot for categoricals
indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=f"{c}_idx",
        handleInvalid="keep"
    )
    for c in cat_cols
]

encoders = [
    OneHotEncoder(
        inputCols=[f"{c}_idx"],
        outputCols=[f"{c}_oh"]
    )
    for c in cat_cols
]

# all feature columns for assembler
feature_inputs = num_cols + [f"{c}_oh" for c in cat_cols]

assembler = VectorAssembler(
    inputCols=feature_inputs,
    outputCol="features"
)

# gradient-boosted tree regressor
gbt = GBTRegressor(
    labelCol=label_col,
    featuresCol="features",
    maxDepth=6,
    maxIter=50,
    stepSize=0.1,
    seed=42
)

# pipeline: index -> encode -> assemble -> model
stages = indexers + encoders + [assembler, gbt]
pipeline = Pipeline(stages=stages)

# train / test split
train_df, test_df = model_df.randomSplit([0.8, 0.2], seed=42)

print("Train rows:", train_df.count())
print("Test rows:", test_df.count())

# fit model
gbt_model_pipeline = pipeline.fit(train_df)

# predictions on test
predictions = gbt_model_pipeline.transform(test_df)

# basic regression metrics
evaluator_rmse = RegressionEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="rmse"
)

evaluator_r2 = RegressionEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="r2"
)

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"RMSE: {rmse:.2f}")
print(f"R^2 : {r2:.3f}")

# quick peek at predictions vs actual
predictions.select("date", "hour", "borough_final",
                   "flow_sum", "prediction") \
           .orderBy(F.col("date"), F.col("hour")) \
           .show(20, truncate=False)

Train rows: 66862
Test rows: 16587




RMSE: 323.80
R^2 : 0.370
+----------+----+-------------+--------+------------------+
|date      |hour|borough_final|flow_sum|prediction        |
+----------+----+-------------+--------+------------------+
|2024-02-05|0   |Brooklyn     |14      |17.224005714177572|
|2024-02-05|0   |Brooklyn     |13      |17.224005714177572|
|2024-02-05|0   |Brooklyn     |38      |17.224005714177572|
|2024-02-05|0   |Brooklyn     |22      |17.224005714177572|
|2024-02-05|0   |Brooklyn     |2       |17.224005714177572|
|2024-02-05|0   |Queens       |10      |26.76469445939464 |
|2024-02-05|0   |Brooklyn     |6       |17.224005714177572|
|2024-02-05|0   |Manhattan    |74      |83.7949178889751  |
|2024-02-05|0   |Manhattan    |21      |83.7949178889751  |
|2024-02-05|0   |Queens       |67      |26.76469445939464 |
|2024-02-05|0   |Brooklyn     |37      |17.224005714177572|
|2024-02-05|0   |Manhattan    |3       |83.7949178889751  |
|2024-02-05|0   |Queens       |41      |26.76469445939464 |
|2024-02-05|0  

##  Model 2 - Random Forest

In [4]:
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# -----------------------------------
# 1. prep modeling dataframe
# -----------------------------------

# keep rows with real flow + borough info, fill missing complaint counts with 0
model_df = (
    final_df
        .filter(F.col("flow_sum").isNotNull())
        .filter(F.col("borough_final").isNotNull())
        .fillna({
            "complaints_total": 0,
            "complaints_noise": 0,
            "complaints_heat": 0,
            "complaints_other": 0
        })
)

print("Modeling row count:", model_df.count())
model_df.select("flow_sum").summary().show()

# -----------------------------------
# 2. categorical encoders
# -----------------------------------
# going to treat borough + time-of-day bucket as categorical
cat_cols = ["borough_final", "tod_bucket"]
indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=f"{c}_idx",
        handleInvalid="keep"
    )
    for c in cat_cols
]

encoders = [
    OneHotEncoder(
        inputCol=f"{c}_idx",
        outputCol=f"{c}_oh"
    )
    for c in cat_cols
]

# -----------------------------------
# 3. assemble features
# -----------------------------------
# basic time signals + complaints + encoded cats
feature_cols = [
    "hour",
    "weekday_num",
    "is_weekend",
    "complaints_total",
    "complaints_noise",
    "complaints_heat",
    "complaints_other",
    "borough_final_oh",
    "tod_bucket_oh"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# -----------------------------------
# 4. random forest model
# -----------------------------------

rf = RandomForestRegressor(
    labelCol="flow_sum",
    featuresCol="features",
    numTrees=100,
    maxDepth=10,
    seed=42
)

# full pipeline: index -> encode -> assemble -> model
stages = []
stages.extend(indexers)
stages.extend(encoders)
stages.append(assembler)
stages.append(rf)

pipeline = Pipeline(stages=stages)

# -----------------------------------
# 5. train / test split
# -----------------------------------

train_df, test_df = model_df.randomSplit([0.7, 0.3], seed=42)

print("Train rows:", train_df.count())
print("Test rows :", test_df.count())

# fit model
rf_model = pipeline.fit(train_df)

# -----------------------------------
# 6. predictions + metrics
# -----------------------------------

preds = rf_model.transform(test_df)

preds.select("date", "hour", "borough_final", "flow_sum", "prediction").show(10, truncate=False)

evaluator_rmse = RegressionEvaluator(
    labelCol="flow_sum",
    predictionCol="prediction",
    metricName="rmse"
)

evaluator_r2 = RegressionEvaluator(
    labelCol="flow_sum",
    predictionCol="prediction",
    metricName="r2"
)

rmse = evaluator_rmse.evaluate(preds)
r2   = evaluator_r2.evaluate(preds)

print(f"Random Forest RMSE: {rmse:.2f}")
print(f"Random Forest R^2 : {r2:.3f}")

# if you ever want to inspect tree details:
rf_stage = rf_model.stages[-1]
print(rf_stage)  # quick summary of the forest

Modeling row count: 83449
+-------+-----------------+
|summary|         flow_sum|
+-------+-----------------+
|  count|            83449|
|   mean|274.6747115004374|
| stddev| 401.913912972355|
|    min|                1|
|    25%|               29|
|    50%|              122|
|    75%|              341|
|    max|             4925|
+-------+-----------------+

Train rows: 58493
Test rows : 24956


25/11/24 15:38:03 WARN DAGScheduler: Broadcasting large task binary with size 1007.3 KiB
25/11/24 15:38:04 WARN DAGScheduler: Broadcasting large task binary with size 1594.9 KiB
25/11/24 15:38:04 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/11/24 15:38:05 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
25/11/24 15:38:06 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
                                                                                

+----------+----+-------------+--------+------------------+
|date      |hour|borough_final|flow_sum|prediction        |
+----------+----+-------------+--------+------------------+
|2024-02-05|0   |Brooklyn     |10      |27.28531455158018 |
|2024-02-05|0   |Queens       |12      |50.09402739218851 |
|2024-02-05|0   |Queens       |3       |50.09402739218851 |
|2024-02-05|0   |Manhattan    |67      |101.4258571997568 |
|2024-02-05|1   |Brooklyn     |4       |23.21512298988411 |
|2024-02-05|1   |Manhattan    |20      |61.695911991255564|
|2024-02-05|1   |Bronx        |7       |26.761445340599344|
|2024-02-05|2   |Queens       |58      |44.66154789806345 |
|2024-02-05|2   |Bronx        |1       |26.79365197653512 |
|2024-02-05|3   |Brooklyn     |8       |23.56623483225975 |
+----------+----+-------------+--------+------------------+
only showing top 10 rows

Random Forest RMSE: 325.36
Random Forest R^2 : 0.359
RandomForestRegressionModel: uid=RandomForestRegressor_a87c6b4feaa6, numTrees=100

## Humor Score Builder

In [5]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ============================
# 1. NORMALIZE FLOW + COMPLAINTS
# ============================

# Avoid division by zero by using safe normalizations
flow_stats = final_df.agg(F.min("flow_sum").alias("min_f"),
                          F.max("flow_sum").alias("max_f")).collect()[0]

min_f = flow_stats["min_f"]
max_f = flow_stats["max_f"]

complaint_stats = final_df.agg(F.min("complaints_total").alias("min_c"),
                               F.max("complaints_total").alias("max_c")).collect()[0]

min_c = complaint_stats["min_c"]
max_c = complaint_stats["max_c"]

# add normalized columns
df_norm = (
    final_df
        .withColumn(
            "flow_norm",
            (F.col("flow_sum") - F.lit(min_f)) / (F.lit(max_f - min_f))
        )
        .withColumn(
            "complaint_norm",
            (F.col("complaints_total") - F.lit(min_c)) / (F.lit(max_c - min_c))
        )
)

# ============================
# 2. TIME CONTEXT FACTOR
# ============================

df_time = (
    df_norm
        .withColumn(
            "time_factor",
            F.when(F.col("tod_bucket") == "AM Peak", 1.0)
             .when(F.col("tod_bucket") == "PM Peak", 0.9)
             .otherwise(0.4)
        )
)

# ============================
# 3. BUILD HUMOR OPPORTUNITY SCORE
# ============================

df_humor = (
    df_time
        .withColumn(
            "humor_score",
            0.50 * F.col("flow_norm") +
            0.35 * F.col("complaint_norm") +
            0.15 * F.col("time_factor")
        )
)

# ============================
# 4. SAVE THE RESULTS FOR VISUALIZATION LATER
# ============================

df_humor.write.mode("overwrite").parquet("/home/wlevine/Walrus/Processed/humor_score_panel")

print("Humor Opportunity Score computed successfully.")

25/11/24 15:48:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/24 15:48:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/11/24 15:48:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/11/24 15:48:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/11/24 15:48:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/11/24 15:48:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
25/11/24 15:48:11 WARN MemoryManager: Total allocation exceeds 95.

Humor Opportunity Score computed successfully.


25/11/24 15:48:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 15.51% for 49 writers
25/11/24 15:48:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 15.83% for 48 writers
25/11/24 15:48:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 16.17% for 47 writers
25/11/24 15:48:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 16.52% for 46 writers
25/11/24 15:48:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 16.89% for 45 writers
25/11/24 15:48:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 17.27% for 44 writers
25/11/24 15:48:12 WARN MemoryManager: Total allocation exceeds 9