Week 14


In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

24/12/22 09:13:56 WARN Utils: Your hostname, codespaces-13fb51 resolves to a loopback address: 127.0.0.1; using 10.0.2.104 instead (on interface eth0)
24/12/22 09:13:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/22 09:13:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/22 09:14:03 WARN Instrumentation: [67d954a8] regParam is zero, which might cause numerical instability and overfitting.
24/12/22 09:14:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/22 09:14:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/12/22 09:14:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


24/12/22 09:14:08 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Linear Regression

In [4]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, 2.0, 4.0, 0), (2, 1.0, 3.5, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# Use VectorAssembler to change features columns to a vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')

# Transform the data
df = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-0.25656420289219967,-1.3097244239447348]
Intercept: 6.334558002662652


KMeans Clustering

In [7]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense([5.0, 5.0])), (2, Vectors.dense([7.5, 7.5])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([12.5, 12.5]), array([6.25, 6.25])]


**Homework**

Load Dataset

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Inisialisasi Spark Session
spark = SparkSession.builder.appName("Stock_Classification").getOrCreate()

# Muat file TXT (ganti path sesuai lokasi file)
data = spark.read.option("delimiter", ",").csv("/workspaces/Big-Data/Dataset/alfa-us.txt", header=True, inferSchema=True)

# Periksa data yang dimuat
data.show(5)

+----------+------+------+------+------+------+-------+
|      Date|  Open|  High|   Low| Close|Volume|OpenInt|
+----------+------+------+------+------+------+-------+
|2012-06-05|23.913|24.101|23.913|24.101|  1364|      0|
|2012-06-06|24.586|24.606|24.438|24.556|  1561|      0|
|2012-06-07|25.388|25.388|24.705|24.725|  3234|      0|
|2012-06-08|24.695|24.843|24.695|24.804|   808|      0|
|2012-06-11|25.289|25.289|24.467|24.467|  3921|      0|
+----------+------+------+------+------+------+-------+
only showing top 5 rows



Transformasi Data dan Vektorisasi Fitur

In [10]:
from pyspark.ml.feature import VectorAssembler

from pyspark.sql.functions import lit

# Add a dummy Label column for demonstration
data = data.withColumn("Label", lit(1))

# Definisikan fitur yang akan digunakan dalam model
feature_columns = ["Open", "High", "Low", "Volume"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transformasikan data ke dalam format fitur vektor
assembled_data = assembler.transform(data).select("features", "Label")

# Periksa hasil transformasi
assembled_data.show(5)

+--------------------+-----+
|            features|Label|
+--------------------+-----+
|[23.913,24.101,23...|    1|
|[24.586,24.606,24...|    1|
|[25.388,25.388,24...|    1|
|[24.695,24.843,24...|    1|
|[25.289,25.289,24...|    1|
+--------------------+-----+
only showing top 5 rows



Bagi Data ke Dalam Set Pelatihan dan Pengujian

In [11]:
# Bagi data menjadi 80% pelatihan dan 20% pengujian
train_data, test_data = assembled_data.randomSplit([0.8, 0.2], seed=42)

Bangun Model Klasifikasi Menggunakan Logistic Regression

In [12]:
from pyspark.ml.classification import LogisticRegression

# Inisialisasi model Logistic Regression
lr = LogisticRegression(labelCol="Label", featuresCol="features")

# Latih model dengan data pelatihan
lr_model = lr.fit(train_data)

# Lakukan prediksi pada data pengujian
predictions = lr_model.transform(test_data)

# Tampilkan beberapa hasil prediksi
predictions.select("Label", "prediction").show(5)

24/12/22 09:29:10 WARN Instrumentation: [ecc0ee15] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.


+-----+----------+
|Label|prediction|
+-----+----------+
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 5 rows



Evaluasi Kinerja Model

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Inisialisasi evaluator dengan metrik AUC (Area Under ROC)
evaluator = BinaryClassificationEvaluator(labelCol="Label", metricName="areaUnderROC")

# Hitung dan tampilkan nilai AUC
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")

Test AUC: 1.0


Penyetelan Hiperparameter Menggunakan Validasi Silang

In [14]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Buat grid parameter untuk diuji
param_grid = (ParamGridBuilder()
              .addGrid(lr.regParam, [0.01, 0.1, 0.5])       # Parameter regularisasi
              .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) # Parameter elastic net
              .build())

# Buat CrossValidator dengan 5 fold
cross_val = CrossValidator(estimator=lr,
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator,
                           numFolds=5)

# Latih model dengan validasi silang
cv_model = cross_val.fit(train_data)

# Dapatkan model terbaik dari validasi silang
best_model = cv_model.bestModel

# Evaluasi model terbaik pada data pengujian
best_predictions = best_model.transform(test_data)
best_auc = evaluator.evaluate(best_predictions)
print(f"Best Model Test AUC: {best_auc}")

24/12/22 09:30:08 WARN Instrumentation: [3c61e666] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/22 09:30:09 WARN Instrumentation: [79f3a5f7] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/22 09:30:10 WARN Instrumentation: [d2d0abc7] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/22 09:30:10 WARN Instrumentation: [8701d8b8] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/22 09:30:11 WARN Instrumentation: [5955637a] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/22 09:30:11 WARN Instrumentation: [3f9c38c9] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/22 09:30:11 WARN

Best Model Test AUC: 1.0
