### PySpark - Project 
#### Extract - Transform - Load (ETL) with Spark & Build Machine Learning Pipeline

In [1]:
# WINDOWS PYTHON PATH 
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable          
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable  

print("Python executable:", sys.executable)

Python executable: C:\Users\roni2\miniconda3\envs\pysparkenv\python.exe


In [2]:
from utils.io import read_csv, write_parquet
from utils.transform import (
    clean_age, fill_missing_embarked, extract_deck,
    create_family_size, create_is_alone,
    extract_title, simplify_title, add_timestamp
)

# 1. Baca data
df = read_csv("data/titanic.csv")

# 2. Full ETL Pipeline 
df_clean = (df
    .transform(clean_age)                  # ← pakai .transform() kfungsi return DataFrame
    .transform(fill_missing_embarked)
    .transform(extract_deck)
    .transform(create_family_size)
    .transform(create_is_alone)
    .transform(extract_title)
    .transform(simplify_title)
    .transform(add_timestamp)
)

# 3. Simpan hasil
write_parquet(df_clean, "output/titanic_cleaned", partitionBy="Pclass")

df_clean.show(10)

Success: Data disimpan ke output/titanic_cleaned
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+----+----------+-------+-------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Deck|FamilySize|IsAlone|  Title|        processed_at|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+----+----------+-------+-------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|null|         2|      0|    Mr.|2025-11-24 17:06:...|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   C|         2|      0|   Mrs.|2025-11-24 17:06:...|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|      

In [3]:
# ================== 1. Import Library ML ==================
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import os

print("Siap training model!")

Siap training model!


In [4]:
# ================== 2. Siapkan Data untuk ML ==================
# Pilih kolom yang akan jadi fitur
df_ml = df_clean.select(
    "Survived", "Pclass", "Sex", "Age", "SibSp", "Parch", "Fare",
    "Embarked", "FamilySize", "IsAlone", "Title", "Deck"
).na.drop()  # drop baris yang ada null (Aman karena Age & Deck sudah di-handle)

print(f"Data siap training: {df_ml.count()} baris (setelah drop NA)")

Data siap training: 185 baris (setelah drop NA)


In [5]:
# ================== 3. Pipeline Preprocessing + Model ==================

indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep")
    for col in ["Sex", "Embarked", "Title", "Deck"]
]

# OneHotEncoder
encoders = [
    OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_vec")
    for col in ["Sex", "Embarked", "Title", "Deck"]
]

# Gabung semua fitur jadi 1 kolom "features"
assembler = VectorAssembler(
    inputCols=[
        "Pclass", "Age", "SibSp", "Parch", "Fare", "FamilySize", "IsAlone",
        "Sex_vec", "Embarked_vec", "Title_vec", "Deck_vec"
    ],
    outputCol="features"
)

# buat Model ML, silahkan gunakan salah satu, kali ini saya gunakan RF
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=200, maxDepth=10, seed=42)
# lr = LogisticRegression(labelCol="Survived", featuresCol="features")  # kalau mau coba yang lebih simpel

# 5. Full Pipeline ETL +ML
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

In [6]:
# ==================Bersihkan Deck & Title yang kosong ==================
from pyspark.sql.functions import when, col

df_ml = df_clean.select(
    "Survived", "Pclass", "Sex", "Age", "SibSp", "Parch", "Fare",
    "Embarked", "FamilySize", "IsAlone", "Title", "Deck"
).na.drop(subset=["Age", "Embarked"]) \
 .withColumn("Deck", when(col("Deck") == "", "Unknown").otherwise(col("Deck"))) \
 .withColumn("Title", when(col("Title") == "", "Unknown").otherwise(col("Title")))

print(f"Data siap ML: {df_ml.count()} baris")
df_ml.groupBy("Deck").count().show()
df_ml.groupBy("Title").count().show()

Data siap ML: 714 baris
+----+-----+
|Deck|count|
+----+-----+
|   G|    4|
|   F|   11|
|   T|    1|
|   C|   51|
|   B|   45|
|   D|   31|
|   A|   12|
|null|  529|
|   E|   30|
+----+-----+

+---------+-----+
|    Title|count|
+---------+-----+
|      Mr.|  398|
|    Miss.|  146|
|  Master.|   36|
|    Capt.|    1|
|Jonkheer.|    1|
|      Ms.|    1|
|   Major.|    2|
|    Mlle.|    2|
|  Unknown|    1|
|     Mrs.|  108|
|     Don.|    1|
|     Rev.|    6|
|      Dr.|    6|
|    Lady.|    1|
|     Mme.|    1|
|     Sir.|    1|
|     Col.|    2|
+---------+-----+



In [7]:
# ================== StringIndexer ==================
indexers = [
    StringIndexer(inputCol="Sex",       outputCol="Sex_idx",       handleInvalid="keep"),
    StringIndexer(inputCol="Embarked", outputCol="Embarked_idx", handleInvalid="keep"),
    StringIndexer(inputCol="Title",     outputCol="Title_idx",     handleInvalid="keep"),
    StringIndexer(inputCol="Deck",      outputCol="Deck_idx",      handleInvalid="keep")
]

encoders = [
    OneHotEncoder(inputCol="Sex_idx",       outputCol="Sex_vec"),
    OneHotEncoder(inputCol="Embarked_idx", outputCol="Embarked_vec"),
    OneHotEncoder(inputCol="Title_idx",     outputCol="Title_vec"),
    OneHotEncoder(inputCol="Deck_idx",      outputCol="Deck_vec")
]

In [8]:
assembler = VectorAssembler(
    inputCols=["Pclass","Age","SibSp","Parch","Fare","FamilySize","IsAlone",
               "Sex_vec","Embarked_vec","Title_vec","Deck_vec"],
    outputCol="features"
)

rf = RandomForestClassifier(labelCol="Survived", featuresCol="features",
                            numTrees=200, maxDepth=10, seed=42)

pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Split & Train
train, test = df_ml.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)   # SEKARANG PASTI JALAN!
print("Training SELESAI!")

Training SELESAI!


In [9]:
# 1. Simpan parameter
import json
import os
os.makedirs("model", exist_ok=True)

model_info = {
    "feature_cols": ["Pclass","Sex","Age","SibSp","Parch","Fare","Embarked","FamilySize","IsAlone","Title","Deck"],
    "rf_params": {
        "numTrees": 200,
        "maxDepth": 10,
        "seed": 42
    }
}

with open("model/titanic_config.json", "w") as f:
    json.dump(model_info, f)

print("Config model disimpan! ")

Config model disimpan! 


In [10]:
import json
from pyspark.ml import Pipeline

with open("model/titanic_config.json") as f:
    config = json.load(f)

# Buat pipeline
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") 
            for c in ["Sex","Embarked","Title","Deck"]]
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_vec") 
            for c in ["Sex","Embarked","Title","Deck"]]
assembler = VectorAssembler(
    inputCols=["Pclass","Age","SibSp","Parch","Fare","FamilySize","IsAlone",
               "Sex_vec","Embarked_vec","Title_vec","Deck_vec"],
    outputCol="features")
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", **config["rf_params"])

pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])
model = pipeline.fit(df_ml)   # cuma 2–3 detik!

print("Model berhasil direbuild!")

Model berhasil direbuild!


In [11]:
# Simpan config
import json, os
os.makedirs("model", exist_ok=True)

config = {
    "note": "Titanic PySpark Model - Rebuild cuma 2 detik!",
    "rf_params": {"numTrees": 200, "maxDepth": 10, "seed": 42}
}

with open("model/deploy_config.json", "w") as f:
    json.dump(config, f, indent=2)

print("Config model berhasil disimpan!")

Config model berhasil disimpan!


In [12]:
#load Model
import json
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import RandomForestClassifier

# 1. Load config
with open("model/deploy_config.json") as f:
    cfg = json.load(f)

# 2. Buat pipeline 
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") 
            for c in ["Sex","Embarked","Title","Deck"]]
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_vec") 
            for c in ["Sex","Embarked","Title","Deck"]]
assembler = VectorAssembler(
    inputCols=["Pclass","Age","SibSp","Parch","Fare","FamilySize","IsAlone",
               "Sex_vec","Embarked_vec","Title_vec","Deck_vec"],
    outputCol="features")
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", **cfg["rf_params"])

pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])
model = pipeline.fit(df_ml)

print("Model prediksi siap!")

Model prediksi siap!


In [13]:
from utils.spark_session import spark  

# buat data testing model (JACK & ROSE)
jack_rose = spark.createDataFrame([
    (3, "male",   23.0, 0, 0, 7.25,     "S", 1, 1, "Mr",    "Unknown"),  # Jack Dawson
    (1, "female", 19.0, 1, 0, 512.3292, "C", 2, 0, "Miss", "B")         # Rose DeWitt Bukater
], ["Pclass","Sex","Age","SibSp","Parch","Fare","Embarked","FamilySize","IsAlone","Title","Deck"])

# PREDIKSI!
hasil = model.transform(jack_rose)

# TAMPILKAN HASILNYA CANTIK
hasil.select("prediction", "probability").show(truncate=False)

+----------+-----------------------------------------+
|prediction|probability                              |
+----------+-----------------------------------------+
|0.0       |[0.7390814181250815,0.2609185818749184]  |
|1.0       |[0.022943965423918754,0.9770560345760811]|
+----------+-----------------------------------------+

