# Pipelines

SparkML uses pipeline concept with Transformers (eg. VectorAssembler) and Estimators (eg. LinearRegression, LogisticRegression)

In [2]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .appName("5A-ML") \
    .master("yarn") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/27 13:58:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load merged PSI-Weather data (from HDFS)

merged_path = "hdfs:///data/psi_weather_merged.parquet"
df_merged = spark.read.parquet(merged_path)
df_merged.show(5)
df_merged.printSchema()


25/07/27 13:58:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----+-----+----+----+-------+----------+--------+---------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+--------------------+-------------------+--------------------+------------+------------+
|north|south|east|west|central|  psi_date|psi_hour|     name|temp|feelslike| dew|humidity|precip|precipprob|preciptype|snow|snowdepth|windgust|windspeed|winddir|sealevelpressure|cloudcover|visibility|solarradiation|solarenergy|uvindex|severerisk|          conditions|               icon|            stations|weather_date|weather_hour|
+-----+-----+----+----+-------+----------+--------+---------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+--------------------+-------------------+--------------------+------------+---------

In [5]:
# create or transform relevant columns
# a. PSI label: avg of (north, south, east, west, central) -> psi_avg
# b. Classification label (1 if psi_avg >= 10, otherwise 0)
# c. Numeric weather features: temp, humidity, windspeed, precip

from pyspark.sql import functions as F

df_features = df_merged.withColumn(
    "psi_avg",
    (F.col("north") + F.col("south") + F.col("east") + F.col("west") + F.col("central"))/5.0
)

# Classification label: unhealthy if psi_avg >= 101
df_features = df_features.withColumn(
    "label_class",
    F.when(F.col("psi_avg") >= 101, 1).otherwise(0)
)


# 3.	Create or transform relevant columns:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["temp", "humidity", "windspeed", "precip"],  # adjust as needed
    outputCol="features"
)
df_final = assembler.transform(df_features)
# Now df_final has columns: psi_avg, label_class, features



# Linear Regression (Predict PSI)

In [6]:
# split into training/test

train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)

In [7]:
# train linear regression model

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="psi_avg")
lr_model = lr.fit(train_df)


25/07/27 14:02:03 WARN Instrumentation: [739dccd6] regParam is zero, which might cause numerical instability and overfitting.
25/07/27 14:02:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/07/27 14:02:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [8]:
# predict on test set

predictions_lr = lr_model.transform(test_df)
predictions_lr.select("psi_avg", "features", "prediction").show(5)


+-------+--------------------+------------------+
|psi_avg|            features|        prediction|
+-------+--------------------+------------------+
|   15.8|[24.6,90.74,6.7,0...|37.303399321754796|
|   15.2|[24.0,94.07,3.8,0.0]| 36.86814759898148|
|   15.2|[23.4,97.42,4.2,0...|36.629502657292875|
|   18.4|[24.0,93.88,6.4,0...| 36.90886236654545|
|   19.0|[29.6,63.54,8.6,0.0]|  39.7418847383947|
+-------+--------------------+------------------+
only showing top 5 rows



In [9]:
# evaluate with RegressionEvaluator

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="psi_avg", 
    predictionCol="prediction", 
    metricName="rmse"  # or "mae", "r2"
)
rmse = evaluator.evaluate(predictions_lr)
print("Linear Regression RMSE:", rmse)


Linear Regression RMSE: 10.381760785172588


# K-Means Clustering (Group Similar Conditions)

In [10]:
# import k-means algo

from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol="features", k=3, seed=42)
kmeans_model = kmeans.fit(df_final)


                                                                                

In [11]:
# get cluster predictions

df_clusters = kmeans_model.transform(df_final)
df_clusters.show(5)


+-----+-----+----+----+-------+----------+--------+---------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+--------------------+-------------------+--------------------+------------+------------+-------+-----------+--------------------+----------+
|north|south|east|west|central|  psi_date|psi_hour|     name|temp|feelslike| dew|humidity|precip|precipprob|preciptype|snow|snowdepth|windgust|windspeed|winddir|sealevelpressure|cloudcover|visibility|solarradiation|solarenergy|uvindex|severerisk|          conditions|               icon|            stations|weather_date|weather_hour|psi_avg|label_class|            features|prediction|
+-----+-----+----+----+-------+----------+--------+---------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------

In [12]:
# evaluate using ClusteringEvaluator

from pyspark.ml.evaluation import ClusteringEvaluator

evaluator_cluster = ClusteringEvaluator(featuresCol="features", predictionCol="prediction")
silhouette = evaluator_cluster.evaluate(df_clusters)
print("K-Means Silhouette score:", silhouette)


K-Means Silhouette score: 0.602169149636199


In [13]:
# Analyze cluster centers

centers = kmeans_model.clusterCenters()
for i, center in enumerate(centers):
    print(f"Cluster {i} center:", center)



Cluster 0 center: [26.03534483 89.20528271  5.65360041  0.49088628]
Cluster 1 center: [3.10574622e+01 6.23968774e+01 1.31865884e+01 1.74682773e-02]
Cluster 2 center: [2.81359948e+01 7.75427202e+01 9.17843866e+00 7.63361888e-02]


# Logistic Regression (Healthy vs Unhealthy)

In [14]:
# split into train/test

train_cls, test_cls = df_final.randomSplit([0.8, 0.2], seed=99)

In [15]:
# train LR 

from pyspark.ml.classification import LogisticRegression

lr_cls = LogisticRegression(
    featuresCol="features", 
    labelCol="label_class", 
    probabilityCol="probability"
)
lr_cls_model = lr_cls.fit(train_cls)



25/07/27 14:04:35 WARN Instrumentation: [35df831c] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.


In [16]:
# predict

predictions_cls = lr_cls_model.transform(test_cls)
predictions_cls.select("label_class", "probability", "prediction").show(5)

+-----------+-----------+----------+
|label_class|probability|prediction|
+-----------+-----------+----------+
|          0|  [1.0,0.0]|       0.0|
|          0|  [1.0,0.0]|       0.0|
|          0|  [1.0,0.0]|       0.0|
|          0|  [1.0,0.0]|       0.0|
|          0|  [1.0,0.0]|       0.0|
+-----------+-----------+----------+
only showing top 5 rows



In [17]:
# evaluate with MultiClassificationEvaluator

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_cls = MulticlassClassificationEvaluator(
    labelCol="label_class", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator_cls.evaluate(predictions_cls)
print("Logistic Regression Accuracy:", accuracy)


Logistic Regression Accuracy: 1.0


In [None]:
# pick 2 highest probability (if multi-class)
# for multi-class, examine probability vector, 
# pick 2 top classes by probability, do further analusis or "optimization"ArithmeticError

# Example for binary classification: probability = [prob_of_class_0, prob_of_class_1]
# If it was multi-class, probability could be length > 2, we pick top 2 indices.

from pyspark.sql import functions as F

# For binary, we might see if the model is uncertain by threshold
df_top2 = predictions_cls.withColumn(
    "predicted_class",
    F.when(F.col("probability")[1] > 0.5, 1).otherwise(0)
)
# "Optimization" might mean focusing on borderline cases or adjusting the decision threshold.
df_top2.show(5)



AnalysisException: [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "probability". Need a complex type [STRUCT, ARRAY, MAP] but got "STRUCT<type: TINYINT, size: INT, indices: ARRAY<INT>, values: ARRAY<DOUBLE>>".

In [21]:
df2 = predictions_cls.withColumn("prob_class1", col("probability").values[1])
df2 = df2.withColumn("predicted", F.when(col("prob_class1") > 0.5, 1).otherwise(0))


AnalysisException: [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "probability". Need a complex type [STRUCT, ARRAY, MAP] but got "STRUCT<type: TINYINT, size: INT, indices: ARRAY<INT>, values: ARRAY<DOUBLE>>".

In [22]:
from pyspark.sql.functions import udf, col, when
from pyspark.sql.types import DoubleType

# UDF to extract the probability for class 1 from the vector
def extract_prob_class1(v):
    return float(v.values[1]) if v is not None else None

extract_prob1_udf = udf(extract_prob_class1, DoubleType())

df2 = predictions_cls \
    .withColumn("prob_class1", extract_prob1_udf(col("probability"))) \
    .withColumn("predicted", when(col("prob_class1") > 0.5, 1).otherwise(0))

df2.show(5)


[Stage 70:>                                                         (0 + 1) / 1]

+-----+-----+----+----+-------+----------+--------+---------+----+---------+----+--------+------+----------+----------+----+---------+--------+---------+-------+----------------+----------+----------+--------------+-----------+-------+----------+--------------------+-------------------+--------------------+------------+------------+-------+-----------+--------------------+--------------------+-----------+----------+-----------+---------+
|north|south|east|west|central|  psi_date|psi_hour|     name|temp|feelslike| dew|humidity|precip|precipprob|preciptype|snow|snowdepth|windgust|windspeed|winddir|sealevelpressure|cloudcover|visibility|solarradiation|solarenergy|uvindex|severerisk|          conditions|               icon|            stations|weather_date|weather_hour|psi_avg|label_class|            features|       rawPrediction|probability|prediction|prob_class1|predicted|
+-----+-----+----+----+-------+----------+--------+---------+----+---------+----+--------+------+----------+--------

                                                                                

# Comparing & Plotting Results

In [23]:
# create pandas df for metrics

import pandas as pd

results = {
    "Model": ["LinearReg", "KMeans", "LogReg", "DeepLearning"],
    "Score": [rmse_linreg, silhouette_kmeans, accuracy_logreg, accuracy_dl]
}
pdf_results = pd.DataFrame(results)
print(pdf_results)


NameError: name 'rmse_linreg' is not defined