In [None]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd
import matplotlib.pyplot as plt

bucket = "xxxxxx"
path = f"gs://{bucket}/landing/cleaned"
traffic_data_path = f"{path}/traffic_speeds"
collisions_data_path = f"{path}/collisions"
sdf1_clean = spark.read.parquet(traffic_data_path)
sdf2_clean = spark.read.parquet(collisions_data_path)

In [None]:
sdf1_clean = spark.read.parquet(traffic_data_path)
sdf2_clean = spark.read.parquet(collisions_data_path)

sdf1_clean = sdf1_clean.withColumn("DATE", to_date(col("DATA_AS_OF").substr(1, 10), "MM/dd/yyyy"))
sdf2_clean = sdf2_clean.withColumn("DATE", to_date(col("CRASH DATE").substr(1, 10), "MM/dd/yyyy"))

sdf1_clean = sdf1_clean.withColumn("BOROUGH", upper(trim(col("BOROUGH"))))
sdf2_clean = sdf2_clean.withColumn("BOROUGH", upper(trim(col("BOROUGH"))))

sdf1_clean = sdf1_clean.dropna(subset=["BOROUGH", "DATE"])
sdf2_clean = sdf2_clean.dropna(subset=["BOROUGH", "DATE"])

traffic_agg = sdf1_clean.groupBy("BOROUGH", "DATE").agg(
    avg("SPEED").alias("AVG_SPEED"),
    avg("TRAVEL_TIME").alias("AVG_TRAVEL_TIME")
)

collision_agg = sdf2_clean.groupBy("BOROUGH", "DATE").agg(
    count("*").alias("COLLISIONS"),
)

joined_data = traffic_agg.join(collision_agg, ["BOROUGH", "DATE"], "inner")


In [None]:
joined_data = joined_data.withColumn("MONTH", month(col("DATE")))
joined_data = joined_data.withColumn("DAY_OF_WEEK", dayofweek(col("DATE")))
joined_data = joined_data.withColumn("WEEKEND", (col("DAY_OF_WEEK").isin([1, 7])).cast("int"))


In [None]:
joined_data.printSchema()

In [None]:
indexer = StringIndexer(
    inputCols=["BOROUGH", "MONTH", "DAY_OF_WEEK", "WEEKEND"],
    outputCols=["BOROUGH_Indexed", "MONTH_Indexed", "DAY_OF_WEEK_Indexed", "WEEKEND_Indexed"],
    handleInvalid="keep"
)

encoder = OneHotEncoder(
    inputCols=["BOROUGH_Indexed", "MONTH_Indexed", "DAY_OF_WEEK_Indexed", "WEEKEND_Indexed"],
    outputCols=["BOROUGH_Encoded", "MONTH_Encoded", "DAY_OF_WEEK_Encoded", "WEEKEND_Encoded"],
    dropLast=True,
    handleInvalid="keep"
)

assembler = VectorAssembler(
    inputCols=["BOROUGH_Encoded", "MONTH_Encoded", "DAY_OF_WEEK_Encoded", "WEEKEND_Encoded", "AVG_SPEED", "AVG_TRAVEL_TIME"],
    outputCol="features"
)

In [None]:
linear_reg = LinearRegression(labelCol="COLLISIONS")

pipeline = Pipeline(stages=[indexer, encoder, assembler, linear_reg])

train_data, test_data = joined_data.randomSplit([0.7, 0.3], seed=42)

evaluator = RegressionEvaluator(labelCol="COLLISIONS")

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(linear_reg.fitIntercept, [True, False]) \
    .addGrid(linear_reg.regParam, [0.001, 0.01, 0.1, 1, 10]) \
    .addGrid(linear_reg.elasticNetParam, [0, 0.25, 0.5, 0.75, 1]) \
    .build()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3)
cvModel = cv.fit(train_data)


In [None]:
bestModel = cvModel.bestModel

print(f"Average metrics for each grid combination: {cvModel.avgMetrics}")

predictions = bestModel.transform(test_data)


In [None]:
print(f"  Coefficients: {bestModel.stages[-1].coefficients}")
print(f"  regParam: {bestModel.stages[-1].getRegParam()}")
print(f"  elasticNetParam: {bestModel.stages[-1].getElasticNetParam()}")

In [None]:
predictions.select("BOROUGH", "MONTH", "DAY_OF_WEEK", "WEEKEND", "AVG_SPEED", "AVG_TRAVEL_TIME", "prediction","COLLISIONS","features").show(truncate=False)


In [None]:
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print(f"RMSE: {rmse} R-squared: {r2}")


In [None]:
predictions_df = predictions.select("BOROUGH", "MONTH", "DAY_OF_WEEK", "WEEKEND", "AVG_SPEED", "AVG_TRAVEL_TIME", "COLLISIONS", "prediction", "features").toPandas()


In [None]:
avg_collisions_borough = predictions_pd.groupby("BOROUGH")["COLLISIONS"].mean()
avg_collisions_borough.plot(kind="bar", color="blue")
plt.xlabel("Borough")
plt.ylabel("Average Collisions")
plt.title("Average Collisions by Borough")
plt.savefig("avg_collisions_by_borough.png")
plt.show()

In [None]:
plt.scatter(predictions_pd["COLLISIONS"], predictions_pd["prediction"], alpha=0.5)
plt.plot([0, predictions_pd["COLLISIONS"].max()], [0, predictions_pd["COLLISIONS"].max()], color="red", linewidth=2)
plt.xlabel("Actual Collisions")
plt.ylabel("Predicted Collisions")
plt.title("Scatter Plot of Actual vs. Predicted Collisions")
plt.savefig("actual_vs_predicted_collisions.png")
plt.show()

In [None]:
plt.scatter(predictions_pd["AVG_SPEED"], predictions_pd["prediction"], alpha=0.5)
plt.xlabel("Average Speed")
plt.ylabel("Predicted Collisions")
plt.title("Scatter Plot of Average Speed vs Predicted Collisions")
plt.savefig("avg_speed_vs_predicted_collisions.png")
plt.show()

In [None]:
plt.scatter(predictions_pd["AVG_TRAVEL_TIME"], predictions_pd["prediction"], alpha=0.5)
plt.xlabel("Average Travel Time(Minutes)")
plt.ylabel("Predicted Collisions")
plt.title("Scatter Plot of Average Travel Time vs Predicted Collisions")
plt.xlim(0, 600)  
plt.savefig("avg_travel_time_vs_predicted_collisions.png")
plt.show()
