In [1]:
# ============================================================
# 1. Set up Spark Session
# ============================================================

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Seattle911")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/23 03:25:19 WARN Utils: Your hostname, Phongs-MacBook-Pro-23.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.46 instead (on interface en0)
25/11/23 03:25:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/23 03:25:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/23 03:25:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/23 03:25:20 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
# ============================================================
# 2. Data Import
# ============================================================

import os
from pathlib import Path

PROJECT_ROOT = Path("../../").resolve()

data_path = PROJECT_ROOT / "data" / "processed" / "calldata_20251019_processed.parquet"

df = spark.read.parquet(str(data_path))
print(f"Loaded {df.count():,} rows with {len(df.columns)} columns")

Loaded 848,167 rows with 22 columns


In [3]:
# ============================================================
# 3. Data Filtering and Feature Setup
# ============================================================
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from xgboost.spark import SparkXGBRegressor

label_col = "First SPD Call Sign Response Time (s)"


raw_feats = [
    'Priority',
    'Dispatch Neighborhood',
    'Dispatch Sector',
    'Count Of Officers',
    'is_rush_hour',
    'is_nighttime',
    'priority_x_officers',
    'priority_x_hour',
    'event_ts',
    'year',
    'month',
    'day',
    'day_of_week',
    'hour',
    'TEMP',
    'PRCP',
    'COCO',
    'weather_severity',
    'is_raining',
    'is_freezing'
]

num_feats = [
    'Priority',
    'Count Of Officers',
    'is_rush_hour',
    'is_nighttime',
    'priority_x_officers',
    'priority_x_hour',
    'year',
    'month',
    'day',
    'day_of_week',
    'hour',
    'TEMP',
    'PRCP',
    'weather_severity',
    'is_raining',
    'is_freezing'
]

cat_feats = ["Dispatch Neighborhood", "Dispatch Sector"]

def to_d(c):
    return F.coalesce(F.col(c).cast("double"), F.lit(0.0))

In [4]:
# ============================================================
# 4. Pipeline: Index â†’ OneHot â†’ Assemble
# ============================================================
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in cat_feats
]
encoders = [
    OneHotEncoder(inputCols=[f"{c}_idx"], outputCols=[f"{c}_vec"])
    for c in cat_feats
]
assembled_feats = [f"{c}_vec" for c in cat_feats] + num_feats
assembler = VectorAssembler(inputCols=assembled_feats, outputCol="features")

# Label column
df_labeled = df.withColumn("label", F.col(label_col)).dropna(subset=["label"])

# Split train/test (CV runs only on train)
train, test = df_labeled.randomSplit([0.8, 0.2], seed=42)
train = train.cache()
print(f"Train size: {train.count():,}, Test size: {test.count():,}")

[Stage 8:>                                                        (0 + 11) / 11]

Train size: 678,581, Test size: 169,586


                                                                                

In [5]:
# ============================================================
# 4. Linear Regression Baseline Model
# ============================================================

from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction",
    maxIter=50,
    regParam=0.0,
    elasticNetParam=0.0,
    standardization=True
)

lr_pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

In [6]:
# ============================================================
# 5. Fit LR Model
# ============================================================

lr_model = lr_pipeline.fit(train)
lr_pred = lr_model.transform(test)

25/11/23 03:25:29 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/11/23 03:25:29 WARN Instrumentation: [a6b152bc] regParam is zero, which might cause numerical instability and overfitting.
25/11/23 03:25:30 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/23 03:25:30 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
25/11/23 03:25:30 WARN Instrumentation: [a6b152bc] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [None]:
# ============================================================
# 5. Evaluate LR Model
# ============================================================

lr_mae = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="mae"
).evaluate(lr_pred)

lr_rmse = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse"
).evaluate(lr_pred)

lr_mse = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="mse"
).evaluate(lr_pred)

lr_r2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2"
).evaluate(lr_pred)

print("\n Linear Regression Results:")
print(f"MAE:   {lr_mae:,.2f}")
print(f"RMSE:  {lr_rmse:,.2f}")
print(f"MSE:   {lr_mse:,.2f}")
print(f"RÂ²:    {lr_r2:,.4f}")

                                                                                


ðŸ“Š Linear Regression Results:
MAE:   1,003.98
RMSE:  1,375.95
MSE:   1,893,248.21
RÂ²:    0.1460


In [8]:
# ============================================================
# 6. Inspect LR Stage
# ============================================================

from pyspark.ml.regression import LinearRegressionModel

lr_stage = next(
    s for s in lr_model.stages if isinstance(s, LinearRegressionModel)
)

print("\nIntercept:", lr_stage.intercept)
print("Number of coefficients:", len(lr_stage.coefficients))


Intercept: -98899.0930877213
Number of coefficients: 94


In [9]:
spark.stop()