# Week 3: Spark MLlib Lab with Pipelines

In this notebook, we'll explore Apache Spark's MLlib library using ML Pipelines for a more structured workflow:

- Data Preprocessing & Feature Engineering using Pipeline stages
- Classification & Regression Models as Pipeline stages
- Model Evaluation and Hyperparameter Tuning with CrossValidator

We'll be working with the Wine Quality (White) dataset.

**Dataset URL**: https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv

## Import Spark MLlib Libraries (with Pipeline imports)

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create a Spark session
spark = SparkSession.builder \
    .appName("Week3SparkSession") \
    .getOrCreate()

## Load Wine Quality Data

In [None]:
# URL of the dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv"

# Download the file locally first (since Spark can't read directly from HTTP URLs)
import requests
import os

# Create a local file path
local_file_path = "winequality-white.csv"

# Download the file
response = requests.get(url)
with open(local_file_path, "wb") as file:
    file.write(response.content)

print(f"File downloaded to {os.path.abspath(local_file_path)}")

# Import the dataset from the local file
rawDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", ";") \
    .csv(local_file_path)

# Display the first few rows
rawDF.show(5)

# Basic statistics
rawDF.describe().show()

File downloaded to /content/winequality-white.csv
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|
|          7.2|            0.23|       0.32|      

## Rename quality to label

In [None]:
# Rename the target column to 'label' for ML compatibility
df = rawDF.withColumnRenamed("quality", "label")
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|label|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|    6|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|    6|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|    6|
|          7.2|            0.23|       0.32|           8.5|    0.058|               47.0|               186.0| 0

## Pipeline Approach: Creating and Using Classification Pipelines

In [None]:
# Split the data first
train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)
print(f"Training set size: {train_data.count()}")
print(f"Test set size: {test_data.count()}")

# Define feature columns (all columns except 'label')
featureCols = [col for col in df.columns if col != "label"]

# Step 1: Create the feature assembler
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# Step 2: Create different model instances
lr = LogisticRegression(featuresCol="features", labelCol="label")
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
rf = RandomForestClassifier(featuresCol="features", labelCol="label")

# Step 3: Create pipeline instances for each model
lr_pipeline = Pipeline(stages=[assembler, lr])
dt_pipeline = Pipeline(stages=[assembler, dt])
rf_pipeline = Pipeline(stages=[assembler, rf])

# Step 4: Train the pipelines
print("Training Logistic Regression Pipeline...")
lr_model = lr_pipeline.fit(train_data)

print("Training Decision Tree Pipeline...")
dt_model = dt_pipeline.fit(train_data)

print("Training Random Forest Pipeline...")
rf_model = rf_pipeline.fit(train_data)

Training set size: 3916
Test set size: 982
Training Logistic Regression Pipeline...
Training Decision Tree Pipeline...
Training Random Forest Pipeline...


## Evaluating Pipeline Models

In [None]:
# Create an evaluator for classification models
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

# Get predictions from each model
lr_predictions = lr_model.transform(test_data)
dt_predictions = dt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)

# Calculate metrics for each model
models = ["Logistic Regression", "Decision Tree", "Random Forest"]
predictions = [lr_predictions, dt_predictions, rf_predictions]

for model_name, pred_df in zip(models, predictions):
    accuracy = evaluator.setMetricName("accuracy").evaluate(pred_df)
    f1 = evaluator.setMetricName("f1").evaluate(pred_df)
    precision = evaluator.setMetricName("weightedPrecision").evaluate(pred_df)
    recall = evaluator.setMetricName("weightedRecall").evaluate(pred_df)

    print(f"\n{model_name} Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print("-" * 40)


Logistic Regression Metrics:
Accuracy: 0.5071
F1 Score: 0.4741
Precision: 0.5016
Recall: 0.5071
----------------------------------------

Decision Tree Metrics:
Accuracy: 0.5061
F1 Score: 0.4744
Precision: 0.4990
Recall: 0.5061
----------------------------------------

Random Forest Metrics:
Accuracy: 0.5346
F1 Score: 0.4886
Precision: 0.5059
Recall: 0.5346
----------------------------------------


## Pipeline Approach: Linear Regression

In [None]:
# Create a Linear Regression model
lr_reg = LinearRegression(featuresCol="features", labelCol="label")

# Create a regression pipeline
reg_pipeline = Pipeline(stages=[assembler, lr_reg])

# Train the regression pipeline
print("Training Linear Regression Pipeline...")
reg_model = reg_pipeline.fit(train_data)

# Make predictions
reg_predictions = reg_model.transform(test_data)

# Evaluate
reg_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

rmse = reg_evaluator.setMetricName("rmse").evaluate(reg_predictions)
mae = reg_evaluator.setMetricName("mae").evaluate(reg_predictions)
r2 = reg_evaluator.setMetricName("r2").evaluate(reg_predictions)

print("\nLinear Regression Metrics:")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")

Training Linear Regression Pipeline...

Linear Regression Metrics:
RMSE: 0.7550
MAE: 0.6026
R²: 0.3112


## Hyperparameter Tuning with Pipelines

In [None]:
# Create a Random Forest model with parameters to tune
rf_tuning = RandomForestClassifier(featuresCol="features", labelCol="label")

# Create a pipeline for tuning
pipeline = Pipeline(stages=[assembler, rf_tuning])

# Create parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf_tuning.numTrees, [10, 20, 30]) \
    .addGrid(rf_tuning.maxDepth, [5, 10]) \
    .build()

# Create evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

# Create the cross-validator
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# Train with cross-validation
print("Training model with cross-validation...")
cv_model = crossval.fit(train_data)

# Get the best model
best_model = cv_model.bestModel

# Get parameters of the best model
best_rf_model = best_model.stages[-1]  # The RandomForestClassifier is the last stage
print(f"\nBest RandomForest Parameters:")
print(f"Number of Trees: {best_rf_model.getNumTrees}")
print(f"Max Depth: {best_rf_model.getMaxDepth()}")

# Evaluate on test set
cv_predictions = best_model.transform(test_data)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"\nTest Accuracy of Best Model: {cv_accuracy:.4f}")

Training model with cross-validation...

Best RandomForest Parameters:
Number of Trees: 30
Max Depth: 10

Test Accuracy of Best Model: 0.6120


## Feature Importance Analysis with Best Model

In [None]:
# Extract feature importances from the best Random Forest model
best_rf = best_model.stages[-1]
feature_importances = list(zip(featureCols, best_rf.featureImportances.toArray()))
sorted_importances = sorted(feature_importances, key=lambda x: x[1], reverse=True)

print("Feature Importances from Best Random Forest Model:")
for feature, importance in sorted_importances:
    print(f"{feature}: {importance:.4f}")

Feature Importances from Best Random Forest Model:
alcohol: 0.1566
volatile acidity: 0.1282
free sulfur dioxide: 0.0930
chlorides: 0.0837
pH: 0.0825
fixed acidity: 0.0822
density: 0.0800
total sulfur dioxide: 0.0779
sulphates: 0.0747
citric acid: 0.0747
residual sugar: 0.0666


## Saving and Loading Pipeline Models

In [None]:
# Save the best model pipeline
pipeline_path = "wine_quality_pipeline_model"
best_model.write().overwrite().save(pipeline_path)

# Load the pipeline model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load(pipeline_path)

# Test the loaded model
loaded_predictions = loaded_model.transform(test_data.limit(5))
loaded_predictions.select("label", "prediction").show()

print("\nModel pipeline successfully saved and loaded!")

+-----+----------+
|label|prediction|
+-----+----------+
|    7|       7.0|
|    7|       7.0|
|    5|       6.0|
|    6|       7.0|
|    7|       6.0|
+-----+----------+


Model pipeline successfully saved and loaded!
