# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense([2.0, 3.0]), 0), (2, Vectors.dense([1.0, 5.0]), 1), (3, Vectors.dense([2.5, 4.5]), 1),(4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]

columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([3., 3.]), array([12.5, 12.5])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.functions import col, mean
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("SupermarketSales").getOrCreate()

# Muat dataset dari file yang diunggah
file_path = "/content/Supermarket Sales Cleaned.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Tampilkan beberapa baris pertama untuk memeriksa data
data.show(5)

# Menangani missing values dengan mengisi nilai null dengan rata-rata kolom
mean_total = data.select(mean(data['Total'])).first()[0]
mean_quantity = data.select(mean(data['Quantity'])).first()[0]
mean_unit_price = data.select(mean(data['Unit price'])).first()[0]

data = data.fillna({'Total': mean_total, 'Quantity': mean_quantity, 'Unit price': mean_unit_price})

# Pastikan kolom 'Total', 'Quantity', 'Unit price' memiliki tipe data numerik
data = data.withColumn("Total", col("Total").cast("double"))
data = data.withColumn("Quantity", col("Quantity").cast("double"))
data = data.withColumn("Unit price", col("Unit price").cast("double"))

# Encode kolom 'Payment' menjadi label (kolom kategori menjadi numerik)
indexer = StringIndexer(inputCol="Payment", outputCol="label")
data = indexer.fit(data).transform(data)

# Gabungkan kolom numerik menjadi satu kolom fitur
assembler = VectorAssembler(
    inputCols=["Total", "Quantity", "Unit price"],  # Kolom numerik pada dataset
    outputCol="features"
)

# Transformasi data untuk membuat kolom 'features'
data = assembler.transform(data)

# Tampilkan data yang telah diproses
data.select("features", "label").show(5)

# Membagi data menjadi training dan testing
trainingData, testData = data.randomSplit([0.8, 0.2], seed=42)

# Inisialisasi model Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)

# Cross-validation dengan grid parameter yang lebih luas
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10, 50, 100, 200])  # Lebih banyak pohon
             .addGrid(rf.maxDepth, [5, 10, 15, 20])    # Kedalaman lebih dalam
             .addGrid(rf.minInstancesPerNode, [1, 2, 4])  # Menambahkan minInstancesPerNode
             .build())

# Cross-validator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # 3-Fold Cross Validation

# Melatih model dengan cross-validation
cvModel = crossval.fit(trainingData)

# Model terbaik
bestModel = cvModel.bestModel
print(f"Parameter Terbaik: {bestModel.extractParamMap()}")

# Evaluasi akurasi dari model terbaik
predictions = bestModel.transform(testData)

# Evaluasi akurasi
accuracy = evaluator.evaluate(predictions)
print(f"Akurasi Setelah Tuning: {accuracy}")

# Evaluasi dengan F1-Score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)
print(f"F1-Score Model: {f1_score}")

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-------------------+-----------+------+-----------------------+------------+------+---+-----+----+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|      Date|               Time|    Payment|  cogs|gross margin percentage|gross income|Rating|Day|Month|Year|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+----------+-------------------+-----------+------+-----------------------+------------+------+---+-----+----+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715|2019-01-05|2024-12-04 13:08:00|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|  5|    1|2019|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22|2019-03-08|2024