# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense([2.0, 3.0]), 0),
        (2, Vectors.dense([1.0, 5.0]), 1),
        (3, Vectors.dense([2.5, 4.5]), 1),
        (4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors # Import Vectors

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])),
        (2, Vectors.dense([5.0, 5.0])),
        (3, Vectors.dense([10.0, 10.0])),
        (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('weather_pred').getOrCreate()

In [3]:
df1 = spark.read.csv('weather_prediction_dataset.csv', header=True, inferSchema=True)
df2 = spark.read.csv('weather_prediction_bbq_labels.csv',header=True, inferSchema=True)

In [4]:
df1.columns[80:91]

['MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max']

In [5]:
dfM = df1.select(['DATE','MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max'])

In [6]:
dfM2 = df2.select(['DATE','MAASTRICHT_BBQ_weather'])
dfM2.show()

+--------+----------------------+
|    DATE|MAASTRICHT_BBQ_weather|
+--------+----------------------+
|20000101|                 false|
|20000102|                 false|
|20000103|                 false|
|20000104|                 false|
|20000105|                 false|
|20000106|                 false|
|20000107|                 false|
|20000108|                 false|
|20000109|                 false|
|20000110|                 false|
|20000111|                 false|
|20000112|                 false|
|20000113|                 false|
|20000114|                 false|
|20000115|                 false|
|20000116|                 false|
|20000117|                 false|
|20000118|                 false|
|20000119|                 false|
|20000120|                 false|
+--------+----------------------+
only showing top 20 rows



In [7]:
import pyspark.sql.functions as F
from functools import reduce

#convert the Boolean True/Fase to 0 or 1
cols = ["MAASTRICHT_BBQ_weather"]
dfM2 = reduce(lambda df, c: dfM2.withColumn(c, F.when(df[c] == 'false', 0).otherwise(1)), cols, dfM2)

In [8]:
#Join the labels (True, False) from the second newly created dataframe to the first one
dfM2= dfM2.join(dfM, on=["DATE"])
dfM2.printSchema()

root
 |-- DATE: integer (nullable = true)
 |-- MAASTRICHT_BBQ_weather: integer (nullable = false)
 |-- MAASTRICHT_cloud_cover: integer (nullable = true)
 |-- MAASTRICHT_wind_speed: double (nullable = true)
 |-- MAASTRICHT_wind_gust: double (nullable = true)
 |-- MAASTRICHT_humidity: double (nullable = true)
 |-- MAASTRICHT_pressure: double (nullable = true)
 |-- MAASTRICHT_global_radiation: double (nullable = true)
 |-- MAASTRICHT_precipitation: double (nullable = true)
 |-- MAASTRICHT_sunshine: double (nullable = true)
 |-- MAASTRICHT_temp_mean: double (nullable = true)
 |-- MAASTRICHT_temp_min: double (nullable = true)
 |-- MAASTRICHT_temp_max: double (nullable = true)



In [9]:
#Convert data to Spark ML format
from pyspark.ml.feature import VectorAssembler

In [10]:
#create an instance of the Assembler which takes in a series of columns to be used as features
#and returns a condensed vector
assembler = VectorAssembler(inputCols = ['DATE','MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max',
 'MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max'], outputCol = 'features')

In [11]:
#Call the 'transform' method on the dataframe returns a new dataframe with the newly created 'features' column

data = assembler.transform(dfM2)

In [12]:
final_data = data.select(['features','MAASTRICHT_BBQ_weather'])
final_data.show()

+--------------------+----------------------+
|            features|MAASTRICHT_BBQ_weather|
+--------------------+----------------------+
|[2.0000101E7,8.0,...|                     0|
|[2.0000102E7,7.0,...|                     0|
|[2.0000103E7,7.0,...|                     0|
|[2.0000104E7,8.0,...|                     0|
|[2.0000105E7,4.0,...|                     0|
|[2.0000106E7,6.0,...|                     0|
|[2.0000107E7,6.0,...|                     0|
|[2.0000108E7,7.0,...|                     0|
|[2.0000109E7,6.0,...|                     0|
|[2.000011E7,7.0,1...|                     0|
|[2.0000111E7,3.0,...|                     0|
|[2.0000112E7,5.0,...|                     0|
|[2.0000113E7,8.0,...|                     0|
|[2.0000114E7,8.0,...|                     0|
|[2.0000115E7,8.0,...|                     0|
|[2.0000116E7,8.0,...|                     0|
|[2.0000117E7,8.0,...|                     0|
|[2.0000118E7,8.0,...|                     0|
|[2.0000119E7,7.0,...|            

#Machine Learning

In [13]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [14]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [15]:
dtc = DecisionTreeClassifier(labelCol='MAASTRICHT_BBQ_weather',featuresCol='features')
rfc = RandomForestClassifier(labelCol='MAASTRICHT_BBQ_weather',featuresCol='features')
gbt = GBTClassifier(labelCol='MAASTRICHT_BBQ_weather',featuresCol='features')

In [16]:
# Train the models
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [17]:
#Call 'transform' on the test_data
#these will create 3 new dataframes for each model
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [18]:
#let's look at one of the newly created dataframes
rfc_predictions.show()

+--------------------+----------------------+--------------------+--------------------+----------+
|            features|MAASTRICHT_BBQ_weather|       rawPrediction|         probability|prediction|
+--------------------+----------------------+--------------------+--------------------+----------+
|[2.0000101E7,8.0,...|                     0|[19.9992581602373...|[0.99996290801186...|       0.0|
|[2.0000114E7,8.0,...|                     0|[19.9928479038271...|[0.99964239519135...|       0.0|
|[2.0000116E7,8.0,...|                     0|[19.9992581602373...|[0.99996290801186...|       0.0|
|[2.0000117E7,8.0,...|                     0|[19.9992581602373...|[0.99996290801186...|       0.0|
|[2.0000122E7,7.0,...|                     0|[19.9992581602373...|[0.99996290801186...|       0.0|
|[2.0000204E7,8.0,...|                     0|[19.9992581602373...|[0.99996290801186...|       0.0|
|[2.0000206E7,7.0,...|                     0|[19.9992581602373...|[0.99996290801186...|       0.0|
|[2.000020

# Accuracy Evaluation

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [20]:
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="MAASTRICHT_BBQ_weather", predictionCol="prediction", metricName="accuracy")

In [21]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [22]:
print('-'*80)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*80)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*80)
print('A ensemble using GBT had an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

--------------------------------------------------------------------------------
A single decision tree had an accuracy of: 99.39%
--------------------------------------------------------------------------------
A random forest ensemble had an accuracy of: 99.47%
--------------------------------------------------------------------------------
A ensemble using GBT had an accuracy of: 99.39%


# hyperparameter tuning cross-validation

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# --- Hyperparameter Tuning untuk RandomForestClassifier ---

# Definisikan parameter grid
paramGrid_rfc = ParamGridBuilder() \
    .addGrid(rfc.numTrees, [10, 20, 30]) \
    .addGrid(rfc.maxDepth, [5, 10, 15]) \
    .build()

# Buat objek CrossValidator
crossval_rfc = CrossValidator(estimator=rfc,
                              estimatorParamMaps=paramGrid_rfc,
                              evaluator=acc_evaluator,
                              numFolds=3)

# Latih model
cvModel_rfc = crossval_rfc.fit(train_data)



# --- Hyperparameter Tuning untuk DecisionTreeClassifier ---

# Definisikan parameter grid
paramGrid_dtc = ParamGridBuilder() \
    .addGrid(dtc.maxDepth, [5, 10, 15]) \
    .addGrid(dtc.minInstancesPerNode, [1, 5, 10]) \
    .build()

# Buat objek CrossValidator
crossval_dtc = CrossValidator(estimator=dtc,
                              estimatorParamMaps=paramGrid_dtc,
                              evaluator=acc_evaluator,
                              numFolds=3)

# Latih model
cvModel_dtc = crossval_dtc.fit(train_data)



# --- Hyperparameter Tuning untuk GBTClassifier ---

# Definisikan parameter grid
paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 15]) \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .build()

# Buat objek CrossValidator
crossval_gbt = CrossValidator(estimator=gbt,
                              estimatorParamMaps=paramGrid_gbt,
                              evaluator=acc_evaluator,
                              numFolds=3)

# Latih model
cvModel_gbt = crossval_gbt.fit(train_data)