# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [6]:
# Download the Iris dataset
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data -O iris.csv
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Download the Iris dataset
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data -O iris.csv

# Initialize Spark Session
spark = SparkSession.builder.appName('Iris Classification').getOrCreate()

# Load dataset (Iris dataset)
data = spark.read.csv('iris.csv', header=False, inferSchema=True)

# Rename columns for clarity
data = data.withColumnRenamed('_c0', 'sepal_length') \
           .withColumnRenamed('_c1', 'sepal_width') \
           .withColumnRenamed('_c2', 'petal_length') \
           .withColumnRenamed('_c3', 'petal_width') \
           .withColumnRenamed('_c4', 'species')

# Convert species to numeric using StringIndexer
indexer = StringIndexer(inputCol='species', outputCol='label')
data_indexed = indexer.fit(data).transform(data)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')
df_transformed = assembler.transform(data_indexed)

# Select only the relevant columns
df_transformed = df_transformed.select('features', 'label')

# Split the data into training and test sets
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=1234)

# Train a logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Create a parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

# Create a CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy'),
                          numFolds=3)  # Use 3+ folds in practice

# Fit the model using cross-validation
cvModel = crossval.fit(train_data)

# Make predictions on the test data
predictions = cvModel.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f'Accuracy: {accuracy}')

# Stop the Spark session
spark.stop()

--2024-12-10 11:39:30--  https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘iris.csv’

iris.csv                [ <=>                ]   4.44K  --.-KB/s    in 0s      

2024-12-10 11:39:30 (22.1 MB/s) - ‘iris.csv’ saved [4551]

--2024-12-10 11:39:30--  https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘iris.csv’

iris.csv                [ <=>                ]   4.44K  --.-KB/s    in 0s      

2024-12-10 11:39:30 (77.0 MB/s) - ‘iris.csv’ saved [4551]

Accuracy: 0.945945945945

In [8]:
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('KMeans Example').getOrCreate()

# Example dataset with separate features
data = [(1, 1.0, 1.0), (2, 5.0, 5.0), (3, 10.0, 10.0), (4, 15.0, 15.0)]
columns = ['ID', 'Feature1', 'Feature2']
df = spark.createDataFrame(data, columns)

# Prepare data for KMeans
# Use VectorAssembler to combine the feature columns into a single vector column
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='features')
df_transformed = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='features', k=2)
model = kmeans.fit(df_transformed)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

# Stop the Spark session
spark.stop()

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [12]:
# -*- coding: utf-8 -*-
import urllib.request

# Download the Iris dataset
url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
file_path = 'iris.data'
urllib.request.urlretrieve(url, file_path)

# Now, let's proceed with Spark
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialize Spark Session
spark = SparkSession.builder.appName('Iris Classification').getOrCreate()

# Load dataset (Iris dataset) from local file
data = spark.read.csv(file_path, header=False, inferSchema=True)

# Rename columns for clarity
data = data.withColumnRenamed('_c0', 'sepal_length') \
           .withColumnRenamed('_c1', 'sepal_width') \
           .withColumnRenamed('_c2', 'petal_length') \
           .withColumnRenamed('_c3', 'petal_width') \
           .withColumnRenamed('_c4', 'species')

# Convert species to numeric using StringIndexer
indexer = StringIndexer(inputCol='species', outputCol='label')
data_indexed = indexer.fit(data).transform(data)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')
df_transformed = assembler.transform(data_indexed)

# Select only the relevant columns
df_transformed = df_transformed.select('features', 'label')

# Split the data into training and test sets
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=1234)

# Train a logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Create a parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

# Create a CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy'),
                          numFolds=3)  # Use 3+ folds in practice

# Fit the model using cross-validation
cvModel = crossval.fit(train_data)

# Make predictions on the test data
predictions = cvModel.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f'Accuracy: {accuracy}')

# Stop the Spark session
spark.stop()

Accuracy: 0.9459459459459459
