# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
pip install pyspark



In [7]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense([2.0, 3.0]), 0), (2,Vectors.dense ([1.0, 5.0]), 1), (3, Vectors.dense([2.5, 4.5]), 1), (4,Vectors.dense ([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans


# Example dataset
data = [(1,Vectors.dense ([1.0, 1.0])), (2,Vectors.dense ([5.0, 5.0])), (3,Vectors.dense ([10.0, 10.0])), (4,Vectors.dense ([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

#baca file
data = SparkSession.builder.appName("Data ariel").getOrCreate()
im_data =data.read.csv("car_prices.csv", header=True, inferSchema=True)
im_data.show(100)



+----+---------+-------------------+--------------------+-----------+------------+-----------------+-----+---------+--------+--------+--------+--------------------+-----+------------+--------------------+
|year|     make|              model|                trim|       body|transmission|              vin|state|condition|odometer|   color|interior|              seller|  mmr|sellingprice|            saledate|
+----+---------+-------------------+--------------------+-----------+------------+-----------------+-----+---------+--------+--------+--------+--------------------+-----+------------+--------------------+
|2015|      Kia|            Sorento|                  LX|        SUV|   automatic|5xyktca69fg566472|   ca|        5|   16639|   white|   black|kia motors americ...|20500|       21500|Tue Dec 16 2014 1...|
|2015|      Kia|            Sorento|                  LX|        SUV|   automatic|5xyktca69fg561319|   ca|        5|    9393|   white|   beige|kia motors americ...|20800|       215

In [None]:
from pyspark.sql.functions import col

clean_data= im_data.dropna()
clean_data.show()


+------------+-------------+-----------------+----------------+
|Umur (bulan)|Jenis Kelamin|Tinggi Badan (cm)|     Status Gizi|
+------------+-------------+-----------------+----------------+
|           0|    laki-laki| 44.5919732943438|         stunted|
|           0|    laki-laki| 56.7052033668847|          tinggi|
|           0|    laki-laki| 46.8633575967919|          normal|
|           0|    laki-laki| 47.5080256315438|          normal|
|           0|    laki-laki| 42.7434938911793|severely stunted|
|           0|    laki-laki|   .2577186391463|         stunted|
|           0|    laki-laki| 42.7017961514634|severely stunted|
|           0|    laki-laki| 45.2517789938352|         stunted|
|           0|    laki-laki|  57.201961037402|          tinggi|
|           0|    laki-laki| 51.3538576645907|          normal|
|           0|    laki-laki| 53.0499109874484|          normal|
|           0|    laki-laki| 43.5448720454205|severely stunted|
|           0|    laki-laki| 46.25263202

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Pilih fitur untuk model (numerik & kategorikal yang sudah diproses)
feature_columns = ['year', 'odometer', 'condition']  # Sesuaikan dengan dataset Anda
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Transformasi data menjadi fitur vektor
data_transformed = assembler.transform(data)

from pyspark.sql.functions import when

# Membuat label berdasarkan sellingprice (contoh kategori: "murah" atau "mahal")
data_transformed = data_transformed.withColumn(
    'label', when(col('sellingprice') < 20000, 0).otherwise(1)
)

# Hapus baris dengan nilai null di label atau fitur
data_final = data_transformed.select('features', 'label').na.drop()

# Split dataset menjadi training dan testing
train_data, test_data = data_final.randomSplit([0.8, 0.2], seed=42)

from pyspark.ml.classification import LogisticRegression

# Buat model Logistic Regression
lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)

# Latih model pada data training
lr_model = lr.fit(train_data)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Prediksi pada data pengujian
predictions = lr_model.transform(test_data)

# Evaluasi dengan BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='label')
auc = evaluator.evaluate(predictions)

print(f'AUC: {auc:.2f}')


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# 1. Membuat SparkSession
spark = SparkSession.builder.appName("CarClassification").getOrCreate()

# 2. Membaca dataset kendaraan
data = spark.read.csv("car_prices.csv", header=True, inferSchema=True)

# 3. Menambahkan kolom target `selling_speed` (cepat/lambat) berdasarkan threshold harga
threshold = data.approxQuantile("sellingprice", [0.5], 0.01)[0]  # Median selling price
data = data.withColumn("selling_speed", when(col("sellingprice") >= threshold, 1).otherwise(0))

# 4. Menangani nilai kosong (sudah bersih, ini hanya antisipasi)
data = data.dropna(subset=["condition", "odometer", "mmr", "selling_speed"])

# 5. Mengonversi kolom kategorikal
# Handle null values during StringIndexer transformation by setting handleInvalid to 'skip' or 'keep'
transmission_indexer = StringIndexer(inputCol="transmission", outputCol="transmissionIndexed", handleInvalid="skip") # Changed here
body_indexer = StringIndexer(inputCol="body", outputCol="bodyIndexed", handleInvalid="skip") # Changed here
data = transmission_indexer.fit(data).transform(data)
data = body_indexer.fit(data).transform(data)

# 6. Menyusun fitur menjadi vektor
feature_columns = ["condition", "odometer", "mmr", "transmissionIndexed", "bodyIndexed"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# 7. Membagi data menjadi data latih dan data uji
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

# 8. Membuat model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="selling_speed")

# 9. Membuat parameter grid untuk hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# 10. Evaluasi model
evaluator = MulticlassClassificationEvaluator(labelCol="selling_speed", predictionCol="prediction", metricName="accuracy")

# 11. Cross-validation
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# 12. Melatih model
cv_model = crossval.fit(train_data)

# 13. Melakukan prediksi pada data uji
predictions = cv_model.transform(test_data)

# Menampilkan hasil prediksi
print("Hasil Prediksi:")
predictions.select("features", "selling_speed", "prediction").show()

# 14. Evaluasi akurasi
accuracy = evaluator.evaluate(predictions)
print(f"Akurasi Model dengan Validasi Silang: {accuracy}")

# 15. Menampilkan parameter terbaik
best_model = cv_model.bestModel
print("Parameter Model Terbaik:")
print(f"regParam: {best_model._java_obj.getRegParam()}")
print(f"elasticNetParam: {best_model._java_obj.getElasticNetParam()}")

