# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import functions as F

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Convert the 'Features' array into individual columns
df = df.withColumn('Feature1', F.col('Features')[0]) \
       .withColumn('Feature2', F.col('Features')[1])

# Use VectorAssembler to combine 'Feature1' and 'Feature2' into a single vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeaturesVec')
df_vector = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='FeaturesVec', labelCol='Label')
model = lr.fit(df_vector)

# Display coefficients and intercept
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Confert
df = df.withColumn('Feature1', F.col('Features')[0]) \
       .withColumn('Feature2', F.col('Features')[1])

# VectorAssembler
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeaturesVec')
df_vector = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='FeaturesVec', k=2)
model = kmeans.fit(df_vector)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


**Machine Learning : Linear Regression**

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("LinearRegressionWithStrings").getOrCreate()

# Step 2: Load Dataset
file_path = "/content/youtube.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Step 3: Explore Dataset
df.printSchema()
print("Available Columns: ", df.columns)

# Step 4: Konversi Kolom Non-Numerik
string_columns = [
    'video_id', 'trending_date', 'title', 'channel_title',
    'publish_date', 'time_frame', 'published_day_of_week',
    'publish_country', 'tags'
]

# Ubah setiap kolom string menjadi indeks numerik
for col in string_columns:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_Index")
    df = indexer.fit(df).transform(df)

# Step 5: Tentukan Target dan Fitur
# Ganti nama kolom 'views' sebagai target (atau gunakan yang relevan)
label_column = 'views'  # Ganti ini dengan kolom target yang benar
feature_columns = [col for col in df.columns if col != label_column and col not in string_columns]

# Pastikan target kolom ada dalam dataset
if label_column not in df.columns:
    raise ValueError(f"Kolom target '{label_column}' tidak ditemukan dalam dataset. Kolom yang tersedia: {df.columns}")

# Gabungkan fitur ke dalam satu vektor
assembler = VectorAssembler(inputCols=feature_columns, outputCol='Features')
df = assembler.transform(df)

# Step 6: Split Dataset
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Step 7: Train Linear Regression Model
lr = LinearRegression(featuresCol='Features', labelCol=label_column)
model = lr.fit(train_df)

# Step 8: Evaluate Model
test_results = model.evaluate(test_df)

print(f"RMSE: {test_results.rootMeanSquaredError}")
print(f"R2: {test_results.r2}")
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")


root
 |-- index: integer (nullable = true)
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_date: string (nullable = true)
 |-- time_frame: string (nullable = true)
 |-- published_day_of_week: string (nullable = true)
 |-- publish_country: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)

Available Columns:  ['index', 'video_id', 'trending_date', 'title', 'channel_title', 'category_id', 'publish_date', 'time_frame', 'published_day_of_week', 'publish_country', 'tags', 'views', 'likes', 'disli

**Machine Learning : Logistic Regression**

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import col

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("LogisticRegressionWithBooleans").getOrCreate()

# Step 2: Load Dataset
file_path = "/content/youtube.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Step 3: Explore Dataset
df.printSchema()
print("Available Columns: ", df.columns)

# Step 4: Konversi Kolom Non-Numerik
string_columns = [
    'video_id', 'trending_date', 'title', 'channel_title',
    'publish_date', 'time_frame', 'published_day_of_week',
    'publish_country', 'tags'
]

# Ubah setiap kolom string menjadi indeks numerik
for col_name in string_columns:
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_Index")
    df = indexer.fit(df).transform(df)

# Step 5: Konversi Kolom Boolean
# Ubah kolom ratings_disabled dari Boolean menjadi Integer
df = df.withColumn('ratings_disabled_Int', col('ratings_disabled').cast('integer'))

# Step 6: Tentukan Target dan Fitur
label_column = 'ratings_disabled_Int'  # Menggunakan kolom boolean yang dikonversi

# Pilih fitur numerik dan vektorize
feature_columns = ['views', 'likes', 'dislikes', 'comment_count']  # Pastikan kolom ini ada di dataset
indexed_columns = [f"{col_name}_Index" for col_name in string_columns]
all_features = feature_columns + indexed_columns

assembler = VectorAssembler(inputCols=all_features, outputCol='FeaturesVec')
df_vector = assembler.transform(df)

# Step 7: Split Dataset
train_df, test_df = df_vector.randomSplit([0.8, 0.2], seed=42)

# Step 8: Train Logistic Regression Model
lr = LogisticRegression(featuresCol='FeaturesVec', labelCol=label_column, maxIter=10)
model = lr.fit(train_df)

# Step 9: Evaluate Model
train_summary = model.summary
print(f"Accuracy: {train_summary.accuracy}")
print(f"F1 Score: {train_summary.fMeasureByLabel()}")
print(f"Precision: {train_summary.precisionByLabel}")
print(f"Recall: {train_summary.recallByLabel}")

# Step 10: Make Predictions
predictions = model.transform(test_df)
predictions.select('video_id', 'FeaturesVec', 'prediction', label_column).show(10)


root
 |-- index: integer (nullable = true)
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_date: string (nullable = true)
 |-- time_frame: string (nullable = true)
 |-- published_day_of_week: string (nullable = true)
 |-- publish_country: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)

Available Columns:  ['index', 'video_id', 'trending_date', 'title', 'channel_title', 'category_id', 'publish_date', 'time_frame', 'published_day_of_week', 'publish_country', 'tags', 'views', 'likes', 'disli

**KMeans Clustering**

In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StringIndexer

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("KMeansWithStrings").getOrCreate()

# Step 2: Load Dataset
file_path = "/content/youtube.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Step 3: Explore Dataset
df.printSchema()
print("Available Columns: ", df.columns)

# Step 4: Konversi Kolom Non-Numerik
string_columns = [
    'video_id', 'trending_date', 'title', 'channel_title',
    'publish_date', 'time_frame', 'published_day_of_week',
    'publish_country', 'tags'
]

# Ubah setiap kolom string menjadi indeks numerik
for col in string_columns:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_Index")
    df = indexer.fit(df).transform(df)

# Step 5: Pilih Fitur
# Pilih beberapa kolom numerik untuk clustering, seperti 'views', 'likes', dll.
feature_columns = ['views', 'likes', 'dislikes', 'comment_count']  # Pastikan kolom ini ada di dataset
indexed_columns = [f"{col}_Index" for col in string_columns]
all_features = feature_columns + indexed_columns

# Gabungkan fitur menjadi satu vektor
assembler = VectorAssembler(inputCols=all_features, outputCol='FeaturesVec')
df_vector = assembler.transform(df)

# Step 6: Train KMeans Model
kmeans = KMeans(featuresCol='FeaturesVec', k=3, seed=42)  # k=3 untuk 3 cluster, sesuaikan dengan kebutuhan
model = kmeans.fit(df_vector)

# Step 7: Evaluate Model
centers = model.clusterCenters()
print(f"Cluster Centers: {centers}")

# Step 8: Assign Clusters to Data
df_clustered = model.transform(df_vector)
df_clustered.select('video_id', 'FeaturesVec', 'prediction').show(10)


root
 |-- index: integer (nullable = true)
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_date: string (nullable = true)
 |-- time_frame: string (nullable = true)
 |-- published_day_of_week: string (nullable = true)
 |-- publish_country: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)

Available Columns:  ['index', 'video_id', 'trending_date', 'title', 'channel_title', 'category_id', 'publish_date', 'time_frame', 'published_day_of_week', 'publish_country', 'tags', 'views', 'likes', 'disli

**Hyperparameter tuning using cross-validation.**

In [9]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Step 1: Definisikan evaluator untuk binary classification
evaluator = BinaryClassificationEvaluator(labelCol=label_column)

# Step 2: Buat grid parameter untuk hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5])  # Regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # ElasticNet parameter
             .addGrid(lr.maxIter, [10, 20, 30])  # Iterasi model
             .build())

# Step 3: Setup CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # 3-fold cross-validation

# Step 4: Latih model dengan cross-validation
cvModel = crossval.fit(train_df)

# Step 5: Evaluasi Model
cvPredictions = cvModel.transform(test_df)
print(f"Accuracy of the model: {evaluator.evaluate(cvPredictions)}")

# Step 6: Tampilkan parameter terbaik
bestModel = cvModel.bestModel
print(f"Best Model Parameters: {bestModel.extractParamMap()}")


Accuracy of the model: 0.7545396773459662
Best Model Parameters: {Param(parent='LogisticRegression_a90051a2aaf5', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_a90051a2aaf5', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_a90051a2aaf5', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_a90051a2aaf5', name='featuresCol', doc='features column name.'): 'FeaturesVec', Param(parent='LogisticRegression_a90051a2aaf5', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_a90051a2aaf5', name='labelCol', doc='label column name.'): 'ratings_disabled_Int', Param(parent='LogisticRegre