In [12]:
# Hands-on 
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Start Spark
spark = SparkSession.builder.appName("MLlib LinearRegression").getOrCreate()

# Data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Vector Assembler
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Output
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")


25/12/12 21:37:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/12/12 21:37:44 WARN Instrumentation: [24249be8] regParam is zero, which might cause numerical instability and overfitting.

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


                                                                                

In [13]:
# Hands-on
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Start Spark
spark = SparkSession.builder.appName("MLlib LogisticRegression").getOrCreate()

# Data
data = [
    (1, [2.0, 3.0], 0),
    (2, [1.0, 5.0], 1),
    (3, [2.5, 4.5], 1),
    (4, [3.0, 6.0], 0)
]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Split features
df = df.withColumn("Feature1", df["Features"][0]) \
       .withColumn("Feature2", df["Features"][1])

# Vector assembler
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeatureVector')
df_transformed = assembler.transform(df)

# Model
lr = LogisticRegression(featuresCol='FeatureVector', labelCol='Label')
model = lr.fit(df_transformed)

# Output
print(f"Model Coefficients: {model.coefficients}")
print(f"Model Intercept: {model.intercept}")


25/12/12 21:39:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Model Coefficients: [-12.262057915782703,4.087352262022548]
Model Intercept: 11.568912713246394


In [14]:
# Hands-on
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Inisialisasi Spark Session
spark = SparkSession.builder.appName("KMeans Clustering Example").getOrCreate()

data = [
    (1, [1.0, 1.0]),
    (2, [5.0, 5.0]),
    (3, [10.0, 10.0]),
    (4, [15.0, 15.0])
]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)
df = df.withColumn("Feature1", col("Features")[0]) \
       .withColumn("Feature2", col("Features")[1])

# VectorAssembler
assembler = VectorAssembler(
    inputCols=["Feature1", "Feature2"],
    outputCol="FeatureVector"
)
df_transformed = assembler.transform(df)
# Latih model KMeans
kmeans = KMeans(featuresCol="FeatureVector", k=2)
model = kmeans.fit(df_transformed)
centers = model.clusterCenters()
print(f"Cluster Centers: {centers}")

25/12/12 21:40:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 118:>                                                        (0 + 8) / 8]

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


                                                                                

In [15]:
# Homework 
# Load dataset Delhi AQI
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DelhiAQI-MLlib") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

# Load dataset (gunakan absolute path)
df = spark.read.csv("file:///home/erlyn/delhi_aqi.csv", header=True, inferSchema=True)

print("5 Data Teratas")
df.show(5)

print("Skema Dataset")
df.printSchema()

print("Jumlah Baris:", df.count())

25/12/12 21:42:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


5 Data Teratas
+-------------------+-------+-----+------+-----+-----+------+------+-----+
|               date|     co|   no|   no2|   o3|  so2| pm2_5|  pm10|  nh3|
+-------------------+-------+-----+------+-----+-----+------+------+-----+
|2020-11-25 01:00:00|2616.88| 2.18|  70.6|13.59|38.62|364.61|411.73|28.63|
|2020-11-25 02:00:00|3631.59|23.25| 89.11| 0.33|54.36|420.96|486.21|41.04|
|2020-11-25 03:00:00|4539.49|52.75|100.08| 1.11|68.67|463.68|541.95|49.14|
|2020-11-25 04:00:00|4539.49|50.96|111.04| 6.44| 78.2|454.81| 534.0|48.13|
|2020-11-25 05:00:00|4379.27|42.92| 117.9|17.17|87.74|448.14|529.19|46.61|
+-------------------+-------+-----+------+-----+-----+------+------+-----+
only showing top 5 rows

Skema Dataset
root
 |-- date: timestamp (nullable = true)
 |-- co: double (nullable = true)
 |-- no: double (nullable = true)
 |-- no2: double (nullable = true)
 |-- o3: double (nullable = true)
 |-- so2: double (nullable = true)
 |-- pm2_5: double (nullable = true)
 |-- pm10: double 

In [18]:
# Identifikasi data
from pyspark.sql.functions import col, sum

print("Jumlah Missing Value per Kolom:")
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

Jumlah Missing Value per Kolom:
+----+---+---+---+---+---+-----+----+---+
|date| co| no|no2| o3|so2|pm2_5|pm10|nh3|
+----+---+---+---+---+---+-----+----+---+
|   0|  0|  0|  0|  0|  0|    0|   0|  0|
+----+---+---+---+---+---+-----+----+---+



In [19]:
# Isi missing value dengan mean tiap kolom
from pyspark.sql.functions import col, avg

# Mengisi missing value dengan mean dari setiap kolom numerik
impute_values = df.select([avg(c).alias(c) for c in df.columns if c != "date"]).collect()[0].asDict()

df_clean = df.fillna(impute_values)

df_clean.show(5)

+-------------------+-------+-----+------+-----+-----+------+------+-----+
|               date|     co|   no|   no2|   o3|  so2| pm2_5|  pm10|  nh3|
+-------------------+-------+-----+------+-----+-----+------+------+-----+
|2020-11-25 01:00:00|2616.88| 2.18|  70.6|13.59|38.62|364.61|411.73|28.63|
|2020-11-25 02:00:00|3631.59|23.25| 89.11| 0.33|54.36|420.96|486.21|41.04|
|2020-11-25 03:00:00|4539.49|52.75|100.08| 1.11|68.67|463.68|541.95|49.14|
|2020-11-25 04:00:00|4539.49|50.96|111.04| 6.44| 78.2|454.81| 534.0|48.13|
|2020-11-25 05:00:00|4379.27|42.92| 117.9|17.17|87.74|448.14|529.19|46.61|
+-------------------+-------+-----+------+-----+-----+------+------+-----+
only showing top 5 rows



In [20]:
# Feature Engineering (VectorAssembler)
from pyspark.sql.functions import when

# buat label klasifikasi sederhana
df_ml = df_clean.withColumn("label", when(col("pm2_5") >= 300, 1).otherwise(0))

# daftar fitur
feature_cols = ["co", "no", "no2", "o3", "so2", "pm2_5", "pm10", "nh3"]

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

data = assembler.transform(df_ml)
data.select("label", "features").show(5, truncate=False)

+-----+-----------------------------------------------------+
|label|features                                             |
+-----+-----------------------------------------------------+
|1    |[2616.88,2.18,70.6,13.59,38.62,364.61,411.73,28.63]  |
|1    |[3631.59,23.25,89.11,0.33,54.36,420.96,486.21,41.04] |
|1    |[4539.49,52.75,100.08,1.11,68.67,463.68,541.95,49.14]|
|1    |[4539.49,50.96,111.04,6.44,78.2,454.81,534.0,48.13]  |
|1    |[4379.27,42.92,117.9,17.17,87.74,448.14,529.19,46.61]|
+-----+-----------------------------------------------------+
only showing top 5 rows



In [21]:
# Feature importance (Random Forest)
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(data)

print("Feature Importance:")
for col, score in zip(feature_cols, rf_model.featureImportances):
    print(f"{col}: {score}")

Feature Importance:
co: 0.0725842148854877
no: 0.030263975401343346
no2: 0.000497750652195045
o3: 0.014346907361286054
so2: 0.0005795508033445968
pm2_5: 0.4187598526263726
pm10: 0.4585820690720733
nh3: 0.004385679197897377


In [22]:
# Klasifikasi
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# split data
train, test = data.randomSplit([0.8, 0.2], seed=42)

# model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train)

# prediksi
pred = lr_model.transform(test)
pred.select("label", "prediction", "probability").show(10, truncate=False)

# evaluasi model
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(pred)
print("AUC Score:", auc)

+-----+----------+----------------------------+
|label|prediction|probability                 |
+-----+----------+----------------------------+
|1    |1.0       |[0.0,1.0]                   |
|1    |1.0       |[9.308662562765728E-221,1.0]|
|0    |0.0       |[1.0,0.0]                   |
|1    |1.0       |[0.0,1.0]                   |
|1    |1.0       |[1.3762034489540506E-63,1.0]|
|0    |0.0       |[1.0,0.0]                   |
|0    |0.0       |[1.0,0.0]                   |
|0    |0.0       |[1.0,0.0]                   |
|0    |0.0       |[1.0,0.0]                   |
|0    |0.0       |[1.0,0.0]                   |
+-----+----------+----------------------------+
only showing top 10 rows

AUC Score: 0.9999988719645977


In [24]:
# Cross validasi dan hyperparame
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(featuresCol="features", labelCol="label")

# Grid hyperparameter
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 0.5])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

evaluator = BinaryClassificationEvaluator(labelCol="label")

cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

# Latih model
cv_model = cv.fit(train)
best_model = cv_model.bestModel

print("Best Model Hyperparameters")
print("regParam :", best_model._java_obj.getRegParam())
print("elasticNetParam :", best_model._java_obj.getElasticNetParam())

# Evaluasi model terbaik
pred_cv = best_model.transform(test)
auc_cv = evaluator.evaluate(pred_cv)

print("AUC dengan Cross Validation:", auc_cv)

                                                                                

Best Model Hyperparameters
regParam : 0.1
elasticNetParam : 1.0
AUC dengan Cross Validation: 0.9999996239881992
