# Resnu Mukti Ismail Hanif : Advanced Machine Learning using Spark MLlib

In [None]:
# Menghubungkan Google Colab dengan Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [2]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [4]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Example dataset
data = [(1, Vectors.dense([2.0, 3.0]), 0),
        (2, Vectors.dense([1.0, 5.0]), 1),
        (3, Vectors.dense([2.5, 4.5]), 1),
        (4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("Logistic Regression Classification").getOrCreate()

# Load data yang sudah di data cleaning sebelumnya
file_path = "/content/drive/MyDrive/Pertemuan14/Titanic.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)
data.show()

# Preprocessing data
# 1. Encode label kolom (jika label berupa string)
label_column = "Survived"
features_columns = ["Sex", "Age", "Fare", "Pclass", "SibSp", ]

# Ubah label menjadi numeric
label_indexer = StringIndexer(inputCol=label_column, outputCol="indexedLabel")
data = label_indexer.fit(data).transform(data)

# 2. Gabungkan fitur menjadi satu vektor
assembler = VectorAssembler(inputCols=features_columns, outputCol="features")
data = assembler.transform(data)

# Pisahkan data menjadi training dan testing
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Buat model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="indexedLabel")

# Latih model
lr_model = lr.fit(train_data)

# Prediksi pada data testing
predictions = lr_model.transform(test_data)

# Evaluasi model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")

# Tampilkan beberapa hasil prediksi
predictions.select("indexedLabel", "prediction", "probability").show()

+-----------+--------+------+--------------------+---+-------------------+-----+-----+----------------+--------------------+-------+--------+
|PassengerId|Survived|Pclass|                Name|Sex|                Age|SibSp|Parch|          Ticket|                Fare|  Cabin|Embarked|
+-----------+--------+------+--------------------+---+-------------------+-----+-----+----------------+--------------------+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  0| 0.2711736617240512|    1|    0|       A/5 21171|0.014151057562208049|Unknown|       S|
|          3|       1|     3|Heikkinen, Miss. ...|  1|0.32143754712239253|    0|    0|STON/O2. 3101282|0.015468569817999833|Unknown|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|  1| 0.4345312892686604|    1|    0|          113803| 0.10364429745562033|   C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  0| 0.4345312892686604|    0|    0|          373450|0.015712553569072387|Unknown|       S|
|     

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("Logistic Regression with Cross-Validation").getOrCreate()

# Kolom label dan fiturs
label_column = "Survived"
features_columns = ["Sex", "Age", "Fare", "Pclass", "SibSp", "Parch"]

# Ubah label menjadi nilai numerik
label_inddndexer = StringIndexer(inputCol=label_column, outputCol="ind")
data = label_inddndexer.fit(data).transform(data)

# Ubah kolom kategori menjadi numerik (misalnya, Sex)
for column in ["Sex"]:  # Tambahkan kolom kategori lainnya jika ada
    indexer = StringIndexer(inputCol=column, outputCol=f"{column}_indd")
    data = indexer.fit(data).transform(data)
    features_columns.remove(column)
    features_columns.append(f"{column}_indd")

# Gabungkan fiturs menjadi vektor tunggal
assembler = VectorAssembler(inputCols=features_columns, outputCol="fiturs")
data = assembler.transform(data)

# Pisahkan data menjadi training dan testing
train_data, test_data = data.randomSplit([0.9, 0.1], seed=42)

# Logistic Regression model
lr = LogisticRegression(featuresCol="fiturs", labelCol="ind")

# Cross-validation dengan Hyperparameter Tuning
# Buat grid untuk hyperparameter tuning
param_grid = (ParamGridBuilder()
              .addGrid(lr.regParam, [0.01, 0.1, 1.0])  # Regularization parameter
              .addGrid(lr.maxIter, [10, 50, 100])  # Jumlah iterasi
              .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # ElasticNet parameter
              .build())

# Evaluator untuk evaluasi model
evaluator = MulticlassClassificationEvaluator(labelCol="ind", predictionCol="prediction", metricName="accuracy")

# CrossValidator untuk cross-validation
crossval = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator,numFolds=5)  # 5-fold cross-validation

# Latih model dengan Cross-Validation
cv_model = crossval.fit(train_data)

# Ambil model terbaik setelah cross-validation
best_model = cv_model.bestModel

# Tampilkan koefisien dan intercept dari model terbaik
print(f"Best Model Coefficients: {best_model.coefficients}")
print(f"Best Model Intercept: {best_model.intercept}")

# Prediksi pada data testing
predictions = best_model.transform(test_data)

# Evaluasi model
accuracy = evaluator.evaluate(predictions)
print(f"Best Model Accuracy: {accuracy}")

# Tampilkan beberapa hasil prediksi
predictions.select("ind", "prediction", "probability").show()

Best Model Coefficients: [-3.2860317252661,15.559149248896164,-0.714117308210292,-0.4816661084891112,-0.1660201681281415,2.326073330404171]
Best Model Intercept: 1.0698041996903624
Best Model Accuracy: 0.8166666666666667
+---+----------+--------------------+
|ind|prediction|         probability|
+---+----------+--------------------+
|0.0|       0.0|[0.89175285019756...|
|1.0|       1.0|[0.13723326069264...|
|0.0|       0.0|[0.84362606576265...|
|0.0|       0.0|[0.85120535148271...|
|1.0|       0.0|[0.74181363719666...|
|0.0|       1.0|[0.49712307284592...|
|0.0|       0.0|[0.93711664799273...|
|0.0|       0.0|[0.89826154883747...|
|0.0|       0.0|[0.83682104230538...|
|0.0|       0.0|[0.95767601130740...|
|0.0|       1.0|[0.33958141672134...|
|0.0|       0.0|[0.90566911411434...|
|1.0|       1.0|[0.30036579868203...|
|1.0|       0.0|[0.69141866002956...|
|0.0|       0.0|[0.77200755602657...|
|1.0|       0.0|[0.55142399793961...|
|0.0|       1.0|[0.47797904941623...|
|1.0|       1.0|[0.