# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Assuming 'Features' column contains arrays like [2.0, 3.0]
# We need to extract the individual elements as separate columns first.

# Extract elements from 'Features' array into individual columns
from pyspark.sql.functions import col
df = df.withColumn('Feature1', col('Features').getItem(0))
df = df.withColumn('Feature2', col('Features').getItem(1))

# Create a VectorAssembler to convert the extracted features to a VectorUDT
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features_vec')
df = assembler.transform(df)

# Train logistic regression model, using the new 'Features_vec' column
lr = LogisticRegression(featuresCol='Features_vec', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

from pyspark.sql.functions import col
df = df.withColumn('Feature1', col('Features').getItem(0))

assembler = VectorAssembler(inputCols=['Feature1'], outputCol='Features_vec')
df = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features_vec', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([5.33333333]), array([15.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# Load dataset
df = pd.read_csv('100000 Sales Records.csv')

# 1. Eksplorasi awal dataset
print("Info Dataset:")
print(df.info())

print("\nStatistik Deskriptif:")
print(df.describe())

print("\nMissing Values:")
print(df.isnull().sum())

# 2. Tangani missing values
numeric_cols = df.select_dtypes(include=np.number).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())  # Isi missing values numerik dengan mean
df["Total Cost"] = df["Total Cost"].fillna(df["Total Cost"].mean())
df["Total Profit"] = df["Total Profit"].fillna(df["Total Profit"].mean())

# 3. Hapus duplikasi
df.drop_duplicates(inplace=True)

# 4. Periksa nilai tidak masuk akal (contoh: Units Sold < 0)
print("\nData dengan Units Sold < 0:")
print(df[df['Units Sold'] < 0])

# Hapus data yang tidak masuk akal
df = df[df['Units Sold'] >= 0]

# 5. Standarisasi Kategori
df['Region'] = df['Region'].str.title()  # Ubah menjadi title case


Info Dataset:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 14 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   Region          100000 non-null  object 
 1   Country         100000 non-null  object 
 2   Item Type       100000 non-null  object 
 3   Sales Channel   100000 non-null  object 
 4   Order Priority  100000 non-null  object 
 5   Order Date      100000 non-null  object 
 6   Order ID        100000 non-null  int64  
 7   Ship Date       100000 non-null  object 
 8   Units Sold      100000 non-null  int64  
 9   Unit Price      100000 non-null  float64
 10  Unit Cost       100000 non-null  float64
 11  Total Revenue   100000 non-null  float64
 12  Total Cost      100000 non-null  float64
 13  Total Profit    100000 non-null  float64
dtypes: float64(5), int64(2), object(7)
memory usage: 10.7+ MB
None

Statistik Deskriptif:
           Order ID     Units Sold     Unit Price

In [None]:
from sklearn.preprocessing import MinMaxScaler
import numpy as np

if (df['Units Sold'] == 0).any():
    raise ValueError("Kolom 'Units Sold' memiliki nilai nol.")

# Feature Engineering sebelum normalisasi
df['Revenue per Unit'] = df['Total Revenue'] / df['Units Sold']

df['Log Total Revenue'] = np.log1p(df['Total Revenue'])  # log1p untuk menghindari log(0)

# Normalisasi kolom numerik (setelah fitur baru dibuat)
scaler = MinMaxScaler()
df[['Total Profit', 'Log Total Revenue', 'Revenue per Unit']] = scaler.fit_transform(df[['Total Profit', 'Log Total Revenue', 'Revenue per Unit']])

print("\nStatistik Setelah Transformasi Log:")
print(df['Log Total Revenue'].describe())
print("\nStatistik Deskriptif Setelah Normalisasi:")
print(df[['Total Profit', 'Total Revenue', 'Revenue per Unit']].describe())


Statistik Setelah Transformasi Log:
count    100000.000000
mean          0.811823
std           0.120497
min           0.000000
25%           0.750844
50%           0.832341
75%           0.898585
max           1.000000
Name: Log Total Revenue, dtype: float64

Statistik Deskriptif Setelah Normalisasi:
        Total Profit  Total Revenue  Revenue per Unit
count  100000.000000   1.000000e+05     100000.000000
mean        0.226656   1.336067e+06          0.390588
std         0.218324   1.471768e+06          0.329226
min         0.000000   1.866000e+01          0.000000
25%         0.055154   2.797533e+05          0.151683
50%         0.163141   7.898916e+05          0.298009
75%         0.326900   1.836490e+06          0.649331
max         1.000000   6.682700e+06          1.000000


In [None]:
!pip install pyspark



# **Logistic Regression**

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col, when, log1p

# Membuat sesi Spark
spark = SparkSession.builder.appName("MLExample").getOrCreate()

# Misalnya df adalah DataFrame Pandas yang sudah dinormalisasi sebelumnya
# Jika dataset dalam bentuk Pandas, perlu mengonversinya ke Spark DataFrame
spark_df = spark.createDataFrame(df)

# Membuat kolom target jika diperlukan (misalnya target = 1 jika profit > 0, else 0)
spark_df = spark_df.withColumn("target", when(col("Total Profit") > 0, 1).otherwise(0))

# --- Memastikan fitur yang sudah dihitung tersedia di Spark DataFrame ---
# Data seperti 'Revenue per Unit' dan 'Log Total Revenue' sudah ada setelah normalisasi
# Pastikan kolom-kolom yang dibutuhkan ada, seperti 'Total Profit', 'Log Total Revenue', 'Revenue per Unit'

# Tambahkan kolom 'Log Total Revenue' yang dihitung sebelumnya
spark_df = spark_df.withColumn("Log Total Revenue", log1p(col("Total Revenue")))

# Menampilkan skema kolom untuk memastikan kolom yang diperlukan ada
spark_df.printSchema()

# Menampilkan distribusi label target untuk memeriksa keseimbangan kelas
spark_df.groupBy("target").count().show()

# Membuat VectorAssembler untuk menggabungkan fitur
assembler = VectorAssembler(inputCols=["Total Profit", "Log Total Revenue", "Revenue per Unit"], outputCol="features", handleInvalid="skip")

# Transformasi data menjadi format yang bisa digunakan untuk model
assembled_data = assembler.transform(spark_df)

# Menampilkan hasil untuk memastikan
assembled_data.select("features").show(5)

# Membagi data menjadi training set dan testing set
train_data, test_data = assembled_data.randomSplit([0.7, 0.3], seed=1234)

# Membuat model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="target")  # Pastikan 'target' adalah kolom label kamu

# Melatih model menggunakan training data
lr_model = lr.fit(train_data)

# Membuat prediksi pada testing set
predictions = lr_model.transform(test_data)

# Menghitung evaluasi model
evaluator = BinaryClassificationEvaluator(labelCol="target")
accuracy = evaluator.evaluate(predictions)

# Menampilkan akurasi model
print(f"Accuracy: {accuracy}")

# --- Cross-Validation untuk menghindari overfitting ---
# Membuat parameter grid untuk hyperparameter tuning
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0.0, 0.5]).build()

# CrossValidator untuk tuning hyperparameter
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

# Melatih model dengan cross-validation
cv_model = crossval.fit(train_data)

# Evaluasi hasil cross-validation
cv_predictions = cv_model.transform(test_data)
accuracy_cv = evaluator.evaluate(cv_predictions)

# Menampilkan akurasi setelah cross-validation
print(f"Accuracy after Cross Validation: {accuracy_cv}")

# --- Memeriksa korelasi antara fitur dan target ---
print("Korelasi antara Total Profit dan target:", spark_df.corr("Total Profit", "target"))
print("Korelasi antara Log Total Revenue dan target:", spark_df.corr("Log Total Revenue", "target"))
print("Korelasi antara Revenue per Unit dan target:", spark_df.corr("Revenue per Unit", "target"))


root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: long (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: long (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)
 |-- Revenue per Unit: double (nullable = true)
 |-- Log Total Revenue: double (nullable = true)
 |-- target: integer (nullable = false)

+------+-----+
|target|count|
+------+-----+
|     1|99999|
|     0|    1|
+------+-----+

+--------------------+
|            features|
+--------------------+
|[0.02961757793565...|
|[0.45509848943159...|
|[0.01383879145509...|
|[0.13141593916421...|
|[0.55837361900318...

# **Linear Regression**

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

# Pastikan 'features' dan 'label' adalah nama kolom yang benar
# dan bahwa assembled_data sudah didefinisikan sebelumnya

# Membagi data menjadi training set dan testing set (jika belum dilakukan)
train_data, test_data = assembled_data.randomSplit([0.7, 0.3], seed=1234)

# Buat model Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="target") # Ganti 'label' dengan nama kolom target

# Latih model, gunakan train_data yang sudah didefinisikan
lrModel = lr.fit(train_data)

# Buat prediksi
predictions = lrModel.transform(test_data)

# Evaluasi model
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="target", metricName="rmse") # Ganti 'label' dengan nama kolom target
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.000142738
