# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Example dataset - convert Features to Vectors
data = [(1, Vectors.dense([2.0, 3.0]), 0), (2, Vectors.dense([1.0, 5.0]), 1),
        (3, Vectors.dense([2.5, 4.5]), 1), (4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors # Import Vectors

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.


In [None]:
import pandas as pd
import numpy as np

df = pd.read_csv('unbalanced_twitter_btc_small.csv', engine='python')

# Menghitung jumlah nilai yang hilang
print("Jumlah nilai yang hilang di setiap kolom:\n")
df.isnull().sum()

Jumlah nilai yang hilang di setiap kolom:



Unnamed: 0,0
user_followers,0
user_verified,3705
date,3705
text,3705
hard_cleaned_text,4511
soft_cleaned_text,4511
vader_sentiment,5317
afinn_sentiment,5317
sentiment,5317


In [None]:
print(f"Jumlah data sebelum pembersihan: {df.shape}")

# Menangani Missing Values
df['user_verified'] = df['user_verified'].fillna(False)
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df['text'] = df['text'].fillna('')
df['hard_cleaned_text'] = df['hard_cleaned_text'].fillna('')
df['soft_cleaned_text'] = df['soft_cleaned_text'].fillna('')
df['vader_sentiment'] = df['vader_sentiment'].fillna(df['vader_sentiment'].mean())
df['afinn_sentiment'] = df['afinn_sentiment'].fillna(df['afinn_sentiment'].mean())
df = df.dropna(subset=['sentiment'])

# Normalisasi dataset
df.loc[:, 'user_followers'] = pd.to_numeric(df['user_followers'], errors='coerce')

# Tampilkan jumlah baris dan kolom setelah menangani missing values
df.reset_index(drop=True, inplace=True)
print(f"Jumlah data setelah pembersihan: {df.shape}")

# Menghitung jumlah nilai yang hilang
print("Jumlah nilai yang hilang di setiap kolom:")
df.isnull().sum()

Jumlah data sebelum pembersihan: (163440, 9)
Jumlah data setelah pembersihan: (158123, 9)
Jumlah nilai yang hilang di setiap kolom:


Unnamed: 0,0
user_followers,0
user_verified,0
date,0
text,0
hard_cleaned_text,0
soft_cleaned_text,0
vader_sentiment,0
afinn_sentiment,0
sentiment,0


- Build a classification model using Spark MLlib and evaluate its performance.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Inisialisasi Spark session
spark = SparkSession.builder.appName('SentimentAnalysis').getOrCreate()
sdf = spark.createDataFrame(df) # Memuat dataset

# Menambahkan kolom kategori sentimen berdasarkan kolom sentiment
sdf = sdf.withColumn(
      "sentiment_category",
      when(col("sentiment") > 0.05, "positif")
      .when(col("sentiment") < -0.05, "negatif")
      .otherwise("netral")
)

# Tampilkan hasil kategori sentimen untuk pengecekan
print("Hasil pengkategorian sentimen:")
sdf.select("soft_cleaned_text", "sentiment", "sentiment_category").show(5, False)

Hasil pengkategorian sentimen:
+-------------------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|soft_cleaned_text                                                                                                                                |sentiment         |sentiment_category|
+-------------------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|Which bitcoin books should I think about reading next?                                                                                           |-0.023076923076923|netral            |
|I appreciate the message, but not a fan of the religious references. Religion has nothing to do with Bitcoin.                                    |0.3372676923076922|positif           |
|Ethereum price update: . ETH $1664.02 

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Menyusun fitur dengan kolom sentiment
assembler = VectorAssembler(inputCols=['sentiment'], outputCol='features')
sdf = assembler.transform(sdf)

# Mengubah kolom 'sentiment_category' menjadi label numerik
# Sentiment categories: 'positif' -> 0, 'negatif' -> 1, 'netral' -> 2
sdf = sdf.withColumn(
      "label",
      when(col("sentiment_category") == "positif", 0)
      .when(col("sentiment_category") == "negatif", 1)
      .otherwise(2)
)

In [None]:
# Membagi data menjadi train dan test
train_data, test_data = sdf.randomSplit([0.8, 0.2], seed=1234)
print(f"Jumlah data Train: {train_data.count()}")
print(f"Jumlah data Test: {test_data.count()}")

Jumlah data Train: 126645
Jumlah data Test: 31478


In [None]:
# Model klasifikasi dengan Random Forest
rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=100)

# Melatih model
rf_model = rf.fit(train_data)

# Evaluasi model
predictions = rf_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

print(f'Akurasi Model: {accuracy}')

Akurasi Model: 0.9797954126691658


- Explore hyperparameter tuning using cross-validation.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder()\
    .addGrid(rf.maxDepth, [5, 10])\
    .addGrid(rf.numTrees, [50, 100])\
    .build()

crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
cv_model = crossval.fit(train_data)

# Evaluasi model dengan cross-validation
cv_predictions = cv_model.transform(test_data)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f'Akurasi dengan Cross-Validation: {cv_accuracy}')

Akurasi dengan Cross-Validation: 0.9797954126691658
