# ML Pipeline using MLLib


In [0]:
from pyspark.sql.functions import col

# Load dataset
data_path = "/Volumes/levkiwi_lakehouse/ml_sandbox/data/train.csv"
train_df = spark.read.csv(data_path, header=True, inferSchema=True)

# Cast Boolean columns to int
train_df = train_df.withColumn("PassengerId", col("PassengerId").cast("string")) \
                   .withColumn("VIP", col("VIP").cast("int")) \
                   .withColumn("CryoSleep", col("CryoSleep").cast("int")) \
                   .withColumn("Transported", col("Transported").cast("int")) 

display(train_df)

## Spark Dataframes & MLLib pipeline

In [0]:
# pyspark imports
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler

# Step A: Define column lists
numerical_cols = ["Age", "RoomService", "FoodCourt", "ShoppingMall", "Spa", "VRDeck"]
categorical_cols = ["HomePlanet", "Destination", "VIP", "CryoSleep"]
label_col = "Transported"  # The target

# Step B: Imputer for numerical columns
#         The Imputer in Spark can replace missing values with either mean or median (default is mean).
imputer = Imputer(
    inputCols=numerical_cols,
    outputCols=[col + "_imputed" for col in numerical_cols]
).setStrategy("mean")

# Step C: For each categorical column, create a StringIndexer and OneHotEncoder
#         We'll store these stages in a list to later build a Pipeline.
stages = [imputer]

indexer_output_cols = []
ohe_output_cols = []

for cat_col in categorical_cols:
    # Create the StringIndexer
    indexer = StringIndexer(
        inputCol=cat_col,
        outputCol=cat_col + "_indexed"
    ).setHandleInvalid("keep")  # you can decide how to handle unseen or null values
    
    # Create the OneHotEncoder
    # For Spark 3.0+, OneHotEncoder can take multiple input and output columns, but here we do a single col for clarity
    encoder = OneHotEncoder(
        inputCol=cat_col + "_indexed",
        outputCol=cat_col + "_ohe"
    )
    
    stages += [indexer, encoder]
    indexer_output_cols.append(cat_col + "_indexed")
    ohe_output_cols.append(cat_col + "_ohe")

# Step D: Assemble all features (imputed numeric + OHE categorical) into a single "features" vector
#         We’ll use the columns that were imputed for numeric, and the OHE output for categorical.
assembler = VectorAssembler(
    inputCols=[col + "_imputed" for col in numerical_cols] + ohe_output_cols,
    outputCol="features"
)

stages += [assembler]

In [0]:
# Build the Pipeline with all the stages
preprocessing_pipeline = Pipeline(stages=stages)

# Fit the Pipeline on train DataFrame
preprocessing_pipeline = preprocessing_pipeline.fit(train_df)

# Transform the train DataFrame
train_transformed = preprocessing_pipeline.transform(train_df)

# You can now use 'train_transformed' for any downstream tasks (model training, etc.)
# 'train_transformed' will contain:
# - The imputed columns: <colName>_imputed
# - The indexed categorical columns: <colName>_indexed
# - The one-hot encoded columns: <colName>_ohe
# - A single assembled features vector column: "features"

# Show a preview
train_transformed.select(
    ["Age_imputed", "HomePlanet_ohe", "features"]
).show(5, truncate=False)

## Decision Tree Classifier 

We extend the pipeline with a decision tree classifier to predict the Transported variable.

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

# Step E: StringIndexer for label column "Transported" -> "label"
#         This transforms "True"/"False" (or any other categories) into numeric 0.0 or 1.0.
label_indexer = StringIndexer(
    inputCol=label_col,
    outputCol="label"
).setHandleInvalid("keep")
stages += [label_indexer]

# Define the hyperparameters for the DecisionTreeClassifier
hyperparams = {
    'impurity': 'entropy',          # Function to measure the quality of a split
    'maxDepth': 3,                  # Limits the depth of the tree to prevent overfitting
    'minInstancesPerNode': 10,      # The minimum number of samples required to be at a leaf node
    'seed': 42                      # Ensures reproducibility of the results
}

# Step F: DecisionTreeClassifier
dt = DecisionTreeClassifier(
    labelCol="label",
    featuresCol="features",
    **hyperparams
)

stages += [dt]

# Build the Pipeline
model_pipeline = Pipeline(stages=stages)

# Fit the Pipeline on train_df
model_pipeline = model_pipeline.fit(train_df)

# Transform the DataFrame
train_transformed = model_pipeline.transform(train_df)

train_transformed.select(
    "label", "features", "prediction", "probability"
).show(10, truncate=False)