# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Contoh dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Menambahkan kolom baru dengan rata-rata fitur
df = df.withColumn('Feature1', df['Features'][0])
df = df.withColumn('Feature2', df['Features'][1])

# Sekarang kolom 'Features' dibagi menjadi 2, kita perlu menggunakan VectorAssembler untuk menambahkan fitur ke dalam vektor
assembler = VectorAssembler(inputCols=['Feature1','Feature2'], outputCol='FeaturesVec')
df_transformed = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='FeaturesVec', labelCol='Label')
model = lr.fit(df_transformed)

# Menampilkan koefisien dan intercept
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()

# Contoh dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Menambahkan kolom baru untuk Feature1 dan Feature2 dari Features array
df = df.withColumn('Feature1', df['Features'][0])
df = df.withColumn('Feature2', df['Features'][1])

# Menggunakan VectorAssembler untuk menggabungkan Feature1 dan Feature2 menjadi FeaturesVec
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeaturesVec')
df_transformed = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='FeaturesVec', k=2, seed=1)  # k=2 untuk 2 cluster
model = kmeans.fit(df_transformed)

# Menampilkan pusat cluster
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("LinearRegressionHomework").getOrCreate()

df = spark.read.csv('Titanic-Dataset-Normal.csv', header=True, inferSchema=True)
# Define the feature columns (excluding the target column 'species')
feature_columns = ['Age', 'Sex', 'Cabin', 'Embarked', 'Pclass']

# Use VectorAssembler to combine feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol='Features')
df_assembled = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Survived')
model = lr.fit(df_assembled)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


+--------------------+--------+------+----+---+-------------------+-----+-----+------+--------------------+-----+--------+
|         PassengerId|Survived|Pclass|Name|Sex|                Age|SibSp|Parch|Ticket|                Fare|Cabin|Embarked|
+--------------------+--------+------+----+---+-------------------+-----+-----+------+--------------------+-----+--------+
|                 0.0|     0.0|   1.0| 108|  1| 0.2711736617240512|0.125|  0.0|   523|0.014151057562208049|  147|       2|
|0.001123595505617...|     1.0|   0.0| 190|  0| 0.4722292033174164|0.125|  0.0|   596| 0.13913573538264068|   81|       0|
|0.002247191011235955|     1.0|   1.0| 353|  0|0.32143754712239253|  0.0|  0.0|   669|0.015468569817999833|  147|       2|
|0.003370786516853...|     1.0|   0.0| 272|  0| 0.4345312892686604|0.125|  0.0|    49| 0.10364429745562033|   55|       2|
|0.004494382022471...|     0.0|   1.0|  15|  1| 0.4345312892686604|  0.0|  0.0|   472|0.015712553569072387|  147|       2|
+---------------