# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, Vectors.dense([2.0, 3.0]), 0),(2, Vectors.dense([1.0, 5.0]), 1),(3, Vectors.dense([2.5, 4.5]), 1),(4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Example dataset with Vectors
data = [(1, Vectors.dense([2.0, 3.0]), 0),(2, Vectors.dense([1.0, 5.0]), 1),(3, Vectors.dense([2.5, 4.5]), 1),(4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([2.75, 5.25]), array([1.5, 4. ])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
# Liberary yang diperlukan

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, year, month
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [14]:
# 1. Inisialisasi Spark Session
spark = SparkSession.builder.appName("MLlib_Classification").getOrCreate()

# 2. Memuat dataset CSV
df = spark.read.csv("stock_details_5_years.csv", header=True, inferSchema=True)

# Memeriksa kolom-kolom dalam dataset
print(df.columns)

# Tampilkan beberapa baris pertama untuk memverifikasi data
df.show(5)

# 3. Preprocessing data
df = df.dropna()
df = df.withColumn("label", when(col("Close") > 150, 1).otherwise(0))
df = df.withColumn("year", year(col("Date"))).withColumn("month", month(col("Date")))

# 4. Mengonversi 'Company' menjadi numerik menggunakan StringIndexer dan OneHotEncoder
indexer = StringIndexer(inputCol="Company", outputCol="companyIndex")
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCol="companyIndex", outputCol="companyVec")
df = encoder.fit(df).transform(df)

# 5. Menggabungkan fitur menjadi satu vektor
feature_columns = [col for col in df.columns if col not in ["Date", "Company", "label", "companyIndex", "Close"]]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df).select("features", "label")

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits', 'Company']
+-------------------+----------------+----------------+----------------+----------------+---------+---------+------------+-------+
|               Date|            Open|            High|             Low|           Close|   Volume|Dividends|Stock Splits|Company|
+-------------------+----------------+----------------+----------------+----------------+---------+---------+------------+-------+
|2018-11-29 05:00:00| 43.829760572993|43.8633538041636|42.6395935832266|43.0835075378418|167080000|      0.0|         0.0|   AAPL|
|2018-11-29 05:00:00|104.769074332185|105.519257086357|103.534594914971|104.636131286621| 28123200|      0.0|         0.0|   MSFT|
|2018-11-29 05:00:00|54.1764984130859|55.0074996948242|54.0999984741211|54.7290000915527| 31004000|      0.0|         0.0|  GOOGL|
|2018-11-29 05:00:00|83.7494964599609|84.4994964599609|82.6165008544922|83.6784973144531|132264000|      0.0|         0.0| 

In [15]:
# Membagi dataset menjadi data latih dan data uji
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Membangun model klasifikasi
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# Melakukan prediksi pada data uji
predictions = lr_model.transform(test_data)

# Mengevaluasi performa model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Akurasi Model: {accuracy}")

Akurasi Model: 0.9967538920172242


In [16]:
# Menentukan grid parameter untuk tuning
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Membuat cross-validator
crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3
)

# Melatih model dengan cross-validation
cv_model = crossval.fit(train_data)

# Mengevaluasi model yang dihasilkan
cv_predictions = cv_model.transform(test_data)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"Akurasi Model Setelah Cross-Validation: {cv_accuracy}")

Akurasi Model Setelah Cross-Validation: 0.9744783040741968
