In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

25/11/23 13:59:07 WARN Utils: Your hostname, OSSalwa resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/23 13:59:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/23 13:59:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

25/11/23 13:59:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
25/11/23 13:59:33 WARN Instrumentation: [8131e6af] regParam is zero, which might cause numerical instability and overfitting.
25/11/23 13:59:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/11/23 13:59:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/11/23 13:59:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
[Stage 1:>                                                          (0 + 2) / 2]

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


                                                                                

In [3]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

IllegalArgumentException: requirement failed: Column Features must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.ArrayType:array<double>.

In [4]:
# (perbaikan) Practice: Logistic Regression

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [
    (1, 2.0, 3.0, 0),
    (2, 1.0, 5.0, 1),
    (3, 2.5, 4.5, 1),
    (4, 3.0, 6.0, 0)
]

columns = ['ID', 'F1', 'F2', 'Label']
df = spark.createDataFrame(data, columns)

# Assemble features
assembler = VectorAssembler(
    inputCols=['F1', 'F2'],
    outputCol='Features'
)
df_transformed = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df_transformed)

# Display coefficients and summary
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

                                                                                

Coefficients: [-12.262057891021882,4.087352253767593]
Intercept: 11.568912688492174


In [8]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

data = [
    (1, 1.0, 1.0),
    (2, 5.0, 5.0),
    (3, 10.0, 10.0),
    (4, 15.0, 15.0)
]

columns = ['ID', 'x', 'y']
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(
    inputCols=['x', 'y'],
    outputCol='Features'
)

df2 = assembler.transform(df)

kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df2)

centers = model.clusterCenters()
print("Cluster Centers:", centers)

                                                                                

Cluster Centers: [array([3., 3.]), array([12.5, 12.5])]


In [9]:
# HOMEWORK

In [17]:
from pyspark.sql import SparkSession

# Inisialisasi Spark
spark = SparkSession.builder.appName("Week14_MLlib").getOrCreate()

# Load dataset
df = spark.read.csv("file:///home/salwa/Downloads/titanic_clean.csv", header=True, inferSchema=True)

print("5 baris pertama dataset:")
df.show(5, truncate=False)

print("\nSchema dataset:")
df.printSchema()

25/11/23 14:48:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


5 baris pertama dataset:
+-----------+--------+---------------------------------------------------+-------------------+-----+-----+----------------+-------------------+--------+----------+----------+--------+--------+
|PassengerId|Survived|Name                                               |Age                |SibSp|Parch|Ticket          |Fare               |Sex_male|Embarked_Q|Embarked_S|Pclass_2|Pclass_3|
+-----------+--------+---------------------------------------------------+-------------------+-----+-----+----------------+-------------------+--------+----------+----------+--------+--------+
|1          |0       |Braund, Mr. Owen Harris                            |0.37500000000000006|1    |0    |A/5 21171       |0.11046036834342966|true    |false     |true      |false   |true    |
|2          |1       |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|0.6826923076923077 |1    |0    |PC 17599        |1.0                |false   |false     |false     |false   |false   |
|3        

In [18]:
# hapus kolom yang tdk diperlukan

df = df.drop("Name", "Ticket", "PassengerId")

In [19]:
# ubah kolom survived menjadi label
df = df.withColumnRenamed("Survived", "label")

In [20]:
# tentukan kolom fitur

feature_cols = [
    "Age", "SibSp", "Parch", "Fare",
    "Sex_male", "Embarked_Q", "Embarked_S",
    "Pclass_2", "Pclass_3"
]

In [22]:
# VectorAssembler

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

df2 = assembler.transform(df).select("features", "label")

print("Data setelah VectorAssembler:")
df2.show(5, truncate=False)

Data setelah VectorAssembler:
+---------------------------------------------------------------------+-----+
|features                                                             |label|
+---------------------------------------------------------------------+-----+
|[0.37500000000000006,1.0,0.0,0.11046036834342966,1.0,0.0,1.0,0.0,1.0]|0    |
|(9,[0,1,3],[0.6826923076923077,1.0,1.0])                             |1    |
|(9,[0,3,6,8],[0.4519230769230769,0.12074460953402483,1.0,1.0])       |1    |
|(9,[0,1,3,6],[0.625,1.0,0.8090269736601539,1.0])                     |1    |
|[0.625,0.0,0.0,0.12264909864339432,1.0,0.0,1.0,0.0,1.0]              |0    |
+---------------------------------------------------------------------+-----+
only showing top 5 rows



In [23]:
# Split Train-Test

train, test = df2.randomSplit([0.8, 0.2], seed=42)

In [24]:
# Build a classification model using Spark MLlib and evaluate its performance.
# Explore hyperparameter tuning using cross-validation.

In [26]:
# MODEL 1: Random Forest Classifier

# train random forest
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50
)

rf_model = rf.fit(train)
rf_pred = rf_model.transform(test)

                                                                                

In [28]:
# evaluasi random forest

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="label")
rf_auc = evaluator.evaluate(rf_pred)
print(f"AUC Random Forest: {rf_auc:.4f}")

AUC Random Forest: 0.8933


In [33]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

rf_paramGrid = (ParamGridBuilder()
                .addGrid(rf.numTrees, [30, 50, 80])
                .addGrid(rf.maxDepth, [3, 5, 8])
                .build())

rf_cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=rf_paramGrid,
    evaluator=evaluator,
    numFolds=3
)

print("Cross Validation Random Forest berjalan...")
rf_cv_model = rf_cv.fit(train)

rf_cv_pred = rf_cv_model.transform(test)
rf_cv_auc = evaluator.evaluate(rf_cv_pred)

print(f"AUC Random Forest (after CV): {rf_cv_auc:.4f}")

Cross Validation Random Forest berjalan...
AUC Random Forest (after CV): 0.8726


In [38]:
# best parameter random forest

best_rf = rf_cv_model.bestModel

print("\nBest Parameters (Random Forest)")
print("numTrees :", best_rf.getNumTrees)
print("maxDepth :", best_rf.getMaxDepth())


Best Parameters (Random Forest)
numTrees : 30
maxDepth : 8


In [40]:
# MODEL 2: Logistic Regression

# Train Logistic Regression
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=50
)

lr_model = lr.fit(train)
lr_pred = lr_model.transform(test)

In [41]:
# Evaluasi Logistic Regression

lr_auc = evaluator.evaluate(lr_pred)
print(f"AUC Logistic Regression: {lr_auc:.4f}")

AUC Logistic Regression: 0.8870


In [42]:
# Hyperparameter Tuning Logistic Regression

lr_paramGrid = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.01, 0.1, 0.5])
                .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                .build())

lr_cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=lr_paramGrid,
    evaluator=evaluator,
    numFolds=3
)

print("Cross Validation Logistic Regression berjalan...")
lr_cv_model = lr_cv.fit(train)

lr_cv_pred = lr_cv_model.transform(test)
lr_cv_auc = evaluator.evaluate(lr_cv_pred)

print(f"AUC Logistic Regression (after CV): {lr_cv_auc:.4f}")

Cross Validation Logistic Regression berjalan...
AUC Logistic Regression (after CV): 0.8916


In [46]:
# Best Parameters Logistic Regression

best_lr = lr_cv_model.bestModel

print("\nBest Parameters (Logistic Regression):")
print("regParam :", best_lr._java_obj.getRegParam())
print("elasticNetParam :", best_lr._java_obj.getElasticNetParam())


Best Parameters (Logistic Regression):
regParam : 0.01
elasticNetParam : 0.5
