# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [2]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [5]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Membuat SparkSession
spark = SparkSession.builder.appName("LogisticRegressionModel").getOrCreate()

# Dataset contoh yang berisi ID, Fitur, dan Label
data = [
    (1, [2.0, 3.0], 0),
    (2, [1.0, 5.0], 1),
    (3, [2.5, 4.5], 1),
    (4, [3.0, 6.0], 0)
]
column = ['ID', 'Features', 'Label']

# Mengubah data menjadi DataFrame
df = spark.createDataFrame(data, column)

# Membagi array fitur menjadi dua kolom terpisah (Feature1 dan Feature2)
df = df.withColumn('Feature1', df['Features'][0]) \
       .withColumn('Feature2', df['Features'][1])

# Menggunakan VectorAssembler untuk menggabungkan Feature1 dan Feature2 menjadi satu kolom vektor
vector_assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeatureVector')
df_transformed = vector_assembler.transform(df)

# Menampilkan DataFrame setelah fitur digabungkan ke dalam vektor
df_transformed.select('ID', 'Feature1', 'Feature2', 'FeatureVector', 'Label').show()

# Membuat model regresi logistik dan melatihnya dengan data yang sudah diproses
logistic_regression = LogisticRegression(featuresCol='FeatureVector', labelCol='Label')
lr_model = logistic_regression.fit(df_transformed)

# Menampilkan koefisien dan intercept dari model yang sudah dilatih
print(f'Model Coefficients: {lr_model.coefficients}')
print(f'Model Intercept: {lr_model.intercept}')


+---+--------+--------+-------------+-----+
| ID|Feature1|Feature2|FeatureVector|Label|
+---+--------+--------+-------------+-----+
|  1|     2.0|     3.0|    [2.0,3.0]|    0|
|  2|     1.0|     5.0|    [1.0,5.0]|    1|
|  3|     2.5|     4.5|    [2.5,4.5]|    1|
|  4|     3.0|     6.0|    [3.0,6.0]|    0|
+---+--------+--------+-------------+-----+

Model Coefficients: [-12.262057929180484,4.087352266486688]
Model Intercept: 11.56891272665312


In [6]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("KMeansClusteringExample").getOrCreate()

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Split the 'Features' array into two separate columns (Feature1, Feature2)
df = df.withColumn('Feature1', df['Features'][0]) \
       .withColumn('Feature2', df['Features'][1])

# Use VectorAssembler to combine Feature1 and Feature2 into a single vector column
vector_assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeatureVector')
df_transformed = vector_assembler.transform(df)

# Show the transformed dataframe with the vector column
df_transformed.select('ID', 'Feature1', 'Feature2', 'FeatureVector').show()

# Train KMeans clustering model
kmeans = KMeans(featuresCol='FeatureVector', k=2)
model = kmeans.fit(df_transformed)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


+---+--------+--------+-------------+
| ID|Feature1|Feature2|FeatureVector|
+---+--------+--------+-------------+
|  1|     1.0|     1.0|    [1.0,1.0]|
|  2|     5.0|     5.0|    [5.0,5.0]|
|  3|    10.0|    10.0|  [10.0,10.0]|
|  4|    15.0|    15.0|  [15.0,15.0]|
+---+--------+--------+-------------+

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


# **IDENTIFIKASI DATA**

In [None]:
# Identifying a

import pandas as pd
import numpy as np

# Load the data
df = pd.read_csv('heart_2022.csv')
print('Dataset before cleaning:\n', df.head())

# Load a dataset of your choice and identify missing values.
print("\nMissing values in column:")
print(df.isnull().sum())


# Display basic information about the dataset
print("=== Dataset Overview ===")
print(f"Number of rows: {df.shape[0]}")
print(f"Number of columns: {df.shape[1]}\n")

print("=== Data Types and Non-Null Counts ===")
print(df.info())

Dataset before cleaning:
      State     Sex GeneralHealth  PhysicalHealthDays  MentalHealthDays  \
0  Alabama  Female     Very good                 0.0               0.0   
1  Alabama  Female     Excellent                 0.0               0.0   
2  Alabama  Female     Very good                 2.0               3.0   
3  Alabama  Female     Excellent                 0.0               0.0   
4  Alabama  Female          Fair                 2.0               0.0   

                                     LastCheckupTime PhysicalActivities  \
0  Within past year (anytime less than 12 months ...                 No   
1                                                NaN                 No   
2  Within past year (anytime less than 12 months ...                Yes   
3  Within past year (anytime less than 12 months ...                Yes   
4  Within past year (anytime less than 12 months ...                Yes   

   SleepHours RemovedTeeth HadHeartAttack  ... HeightInMeters  \
0         8.0

# **HANDLING MISSING VALUE**

In [None]:
# Handling missing numerical data
df['PhysicalHealthDays'].fillna(df['PhysicalHealthDays'].mean(), inplace=True)
df['MentalHealthDays'].fillna(df['MentalHealthDays'].median(), inplace=True)
df['SleepHours'].fillna(df['SleepHours'].mean(), inplace=True)
df['HeightInMeters'].fillna(df['HeightInMeters'].median(), inplace=True)
df['WeightInKilograms'].fillna(df['WeightInKilograms'].mean(), inplace=True)
df['BMI'].fillna(df['BMI'].median(), inplace=True)

# Handling missing categorical data
df['GeneralHealth'].fillna('Unknown', inplace=True)
df['LastCheckupTime'].fillna('Unknown', inplace=True)


df.dropna(inplace=True)

print("Dataset after cleaning:\n", df)

# cek missing values
print("\nMissing values in column:")
print(df.isnull().sum())

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['PhysicalHealthDays'].fillna(df['PhysicalHealthDays'].mean(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['MentalHealthDays'].fillna(df['MentalHealthDays'].median(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work 

Dataset after cleaning:
                 State     Sex GeneralHealth  PhysicalHealthDays  \
342           Alabama  Female     Very good                 4.0   
343           Alabama    Male     Very good                 0.0   
345           Alabama    Male     Very good                 0.0   
346           Alabama  Female          Fair                 5.0   
347           Alabama  Female          Good                 3.0   
...               ...     ...           ...                 ...   
418589  West Virginia  Female     Very good                 2.0   
418590  West Virginia  Female          Good                 0.0   
418593  West Virginia  Female          Fair                25.0   
418595  West Virginia    Male     Very good                 0.0   
418599  West Virginia  Female     Very good                 0.0   

        MentalHealthDays                                    LastCheckupTime  \
342                  0.0  Within past year (anytime less than 12 months ...   
343         

# **TRANSFORMASI DATA**

In [None]:
# Daftar kolom Yes/No yang perlu dikonversi menjadi 0 dan 1
binary_columns = [
    'PhysicalActivites', 'HadHeartAttack', 'HadAngina',
    'HadStroke', 'HadAsthma', 'HadSkinCancer', 'HadCOPD', 'HadDepressiveDisorder',
    'HadKidneyDisease', 'HadArthritis', 'DeafOrHardOfHearing',
    'DifficultyConcentrating', 'DifficultyWalking', 'DifficultyDressingBathing',
    'DifficultyErrands', 'AlcoholDrinkers', 'HIVTesting', 'FluVaxLast12',
    'PneumoVaxEver', 'HighRiskLastYear', 'ChestScan', 'PhysicalActivities', 'BlindOrVisionDifficulty'
]

# Mengonversi kolom Yes/No menjadi 1 dan 0
def standardize_binary(column):
    return column.replace({"Yes": 1, "No": 0})

for col in binary_columns:
    if col in df.columns:
        df[col] = standardize_binary(df[col])

# Daftar mapping langsung untuk konversi kolom lainnya
mappings = {
    'Sex': {'Male': 0, 'Female': 1},
    'GeneralHealth': {'Excellent': 1, 'Very good': 2, 'Good': 3, 'Fair': 4, 'Poor': 5},
    'LastCheckupTime': {
        "Within past year (anytime less than 12 months ago)": 1,
        "Within past 2 years (1 year but less than 2 years ago)": 2,
        "Within past 5 years (2 years but less than 5 years ago)": 3,
        "5 or more years ago": 4
    },
    'RemovedTeeth': {
        "1 to 5": 1,
        "6 or more, but not all": 2,
        "All": 3,
        "None of them": 4
    },
    'HadDiabetes': {
        "Yes": 1,
        "Yes, but only during pregnancy (female)": 1,
        "No": 0,
        "No, pre-diabetes or borderline diabetes": 0
    },
    'SmokerStatus': {
        "Current smoker - now smokes every day": 1,
        "Current smoker - now smokes some days": 1,
        "Former smoker": 1,
        "Never smoked": 0
    },
    'ECigaretteUsage': {
        "Never used e-cigarettes in my entire life": 1,
        "Use them every day": 1,
        "Use them some days": 1,
        "Not at all (right now)": 0
    },
    'RaceEthnicityCategory': {
        "White only, Non-Hispanic": 1,
        "Black only, Non-Hispanic": 2,
        "Other race only, Non-Hispanic": 3,
        "Multiracial, Non-Hispanic": 4,
        "Hispanic": 5
    },
    'AgeCategory': {
        "Age 18 to 24": 1, "Age 25 to 29": 2, "Age 30 to 34": 3, "Age 35 to 39": 4,
        "Age 40 to 44": 5, "Age 45 to 49": 6, "Age 50 to 54": 7, "Age 55 to 59": 8,
        "Age 60 to 64": 9, "Age 65 to 69": 10, "Age 70 to 74": 11, "Age 75 to 79": 12,
        "Age 80 or older": 13
    },
    'TetanusLast10Tdap': {
        "Yes, received Tdap": 1,
        "Yes, received tetanus shot, but not Tdap": 1,
        "Yes, received tetanus shot but not sure what type": 1,
        "No, did not receive any tetanus shot in the past 10 years": 0
    },
    'CovidPos': {
        "Yes": 1,
        "No": 0,
        "Tested positive using home test without a health professional": 2
    }
}

# Mengonversi kolom berdasarkan mapping
for col, mapping in mappings.items():
    if col in df.columns:
        df[col] = df[col].map(mapping)

# Membersihkan nilai SleepHours (mengganti nilai >24 dengan NaN)
df['SleepHours'] = df['SleepHours'].apply(lambda x: np.nan if x > 24 else x)

# Menghapus kolom yang tidak diperlukan
columns_to_drop = ['State']
df = df.drop(columns=[col for col in columns_to_drop if col in df.columns])



print("Dataset After Cleaning :", df.head())
# Menampilkan display tabel
from IPython.display import display
display(df.head(10))

# Menyimpan dataset yang telah dibersihkan
df.to_csv('heart_2022_cleanedfx.csv', index=False)


  return column.replace({"Yes": 1, "No": 0})


Dataset After Cleaning :      Sex  GeneralHealth  PhysicalHealthDays  MentalHealthDays  \
342    1            2.0                 4.0               0.0   
343    0            2.0                 0.0               0.0   
345    0            2.0                 0.0               0.0   
346    1            4.0                 5.0               0.0   
347    1            3.0                 3.0              15.0   

     LastCheckupTime  PhysicalActivities  SleepHours  RemovedTeeth  \
342              1.0                   1         9.0             4   
343              1.0                   1         6.0             4   
345              1.0                   0         8.0             2   
346              1.0                   1         9.0             4   
347              1.0                   1         5.0             1   

     HadHeartAttack  HadAngina  ...  HeightInMeters  WeightInKilograms    BMI  \
342               0          0  ...            1.60              71.67  27.99   
3

Unnamed: 0,Sex,GeneralHealth,PhysicalHealthDays,MentalHealthDays,LastCheckupTime,PhysicalActivities,SleepHours,RemovedTeeth,HadHeartAttack,HadAngina,...,HeightInMeters,WeightInKilograms,BMI,AlcoholDrinkers,HIVTesting,FluVaxLast12,PneumoVaxEver,TetanusLast10Tdap,HighRiskLastYear,CovidPos
342,1,2.0,4.0,0.0,1.0,1,9.0,4,0,0,...,1.6,71.67,27.99,0,0,1,1,1,0,0
343,0,2.0,0.0,0.0,1.0,1,6.0,4,0,0,...,1.78,95.25,30.13,0,0,1,1,1,0,0
345,0,2.0,0.0,0.0,1.0,0,8.0,2,0,0,...,1.85,108.86,31.66,1,0,0,1,0,0,1
346,1,4.0,5.0,0.0,1.0,1,9.0,4,0,0,...,1.7,90.72,31.32,0,0,1,1,0,0,1
347,1,3.0,3.0,15.0,1.0,1,5.0,1,0,0,...,1.55,79.38,33.07,0,0,1,1,0,0,0
349,0,3.0,0.0,0.0,1.0,1,7.0,4,0,0,...,1.85,120.2,34.96,1,1,1,0,1,0,0
350,1,3.0,3.0,0.0,1.0,1,8.0,2,0,0,...,1.63,88.0,33.3,0,0,1,1,0,0,0
352,0,4.0,5.0,0.0,1.0,1,8.0,1,1,1,...,1.75,74.84,24.37,0,1,1,1,0,0,1
354,0,3.0,2.0,0.0,4.0,0,6.0,4,0,0,...,1.7,78.02,26.94,0,0,0,0,0,0,1
357,1,2.0,0.0,0.0,1.0,1,7.0,4,0,0,...,1.68,63.5,22.6,0,0,1,1,0,0,0


# **MENCARI FEATURES PENTING**

In [None]:
# Import library yang diperlukan
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import col

# 1. Membuat sesi Spark
spark = SparkSession.builder.appName("HeartAttackFeatureImportance").getOrCreate()

# 2. Membaca dataset
data_path = "heart_2022_cleanedfx.csv"  # Ganti dengan path file Anda
df = spark.read.csv(data_path, header=True, inferSchema=True)

# 3. Memeriksa nilai null pada dataset
print("Jumlah Nilai Null per Kolom:")
df.select([col(c).isNull().alias(c) for c in df.columns]).show()

# 4. Menghapus baris dengan nilai null pada kolom fitur dan target
df = df.dropna(subset=[col for col in df.columns if col != 'HadHeartAttack'])  # Kecuali kolom target

# 5. Menggunakan StringIndexer untuk mengonversi kolom kategorikal menjadi numerik
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_indexed") for col in df.columns if df.schema[col].dataType == "StringType"]

# 6. Mengonversi kolom target menjadi label numerik
indexer_target = StringIndexer(inputCol="HadHeartAttack", outputCol="label")

# 7. Menggabungkan semua kolom fitur ke dalam VectorAssembler
feature_columns = [col for col in df.columns if col not in ["HadHeartAttack", "label"]]  # Mengabaikan kolom target
final_feature_columns = [f"{col}_indexed" if df.schema[col].dataType == "StringType" else col for col in feature_columns]
assembler = VectorAssembler(inputCols=final_feature_columns, outputCol="features")

# 8. Pipeline untuk preprocessing
pipeline = Pipeline(stages=indexers + [indexer_target, assembler])

# 9. Menjalankan pipeline untuk preprocessing
df_transformed = pipeline.fit(df).transform(df)

# Cek apakah kolom 'features' sudah ada setelah preprocessing
df_transformed.select("features", "label").show(5)

# 10. Latih RandomForestClassifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, seed=42)

# Pastikan model dilatih pada dataset yang telah diproses
model = rf.fit(df_transformed)

# 11. Mendapatkan fitur penting
feature_importances = model.featureImportances
importance_dict = {feature: score for feature, score in zip(final_feature_columns, feature_importances)}

# 12. Menampilkan fitur paling penting
sorted_importances = sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)

print("Top 10 Fitur yang Mempengaruhi HadHeartAttack:")
for feature, importance in sorted_importances[:10]:
    print(f"{feature}: {importance}")

# 13. Menutup sesi Spark
spark.stop()


Jumlah Nilai Null per Kolom:
+-----+-------------+------------------+----------------+---------------+------------------+----------+------------+--------------+---------+---------+---------+-------------+-------+---------------------+----------------+------------+-----------+-------------------+-----------------------+-----------------------+-----------------+-------------------------+-----------------+------------+---------------+---------+---------------------+-----------+--------------+-----------------+-----+---------------+----------+------------+-------------+-----------------+----------------+--------+
|  Sex|GeneralHealth|PhysicalHealthDays|MentalHealthDays|LastCheckupTime|PhysicalActivities|SleepHours|RemovedTeeth|HadHeartAttack|HadAngina|HadStroke|HadAsthma|HadSkinCancer|HadCOPD|HadDepressiveDisorder|HadKidneyDisease|HadArthritis|HadDiabetes|DeafOrHardOfHearing|BlindOrVisionDifficulty|DifficultyConcentrating|DifficultyWalking|DifficultyDressingBathing|DifficultyErrands|Smoker

# **MACHINE LEARNING**

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Membuat sesi Spark
spark = SparkSession.builder.appName("HeartDiseaseClassification").getOrCreate()

# Membaca dataset
data_path = "heart_2022_cleanedfx.csv"  # Ganti dengan path file Anda
df = spark.read.csv(data_path, header=True, inferSchema=True)

df = df.dropna()  # Drop rows with missing values


# Pilih fitur yang relevan dan target
features = [
    'HadAngina', 'HadStroke', 'GeneralHealth', 'SmokerStatus',
    'AlcoholDrinkers', 'AgeCategory', 'PhysicalHealthDays',
    'HadDiabetes', 'ChestScan', 'HadCOPD', 'DifficultyWalking', 'Sex'
]
target = 'HadHeartAttack'

# Menggabungkan semua fitur menjadi satu kolom 'features' menggunakan VectorAssembler
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Mengubah target menjadi numerik jika belum (asumsi target sudah numerik)
df = df.withColumnRenamed(target, "label")

# Membagi data menjadi training dan testing set
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Cek ukuran data training dan testing
print(f"Ukuran X_train: {train_data.count()} rows, {len(train_data.columns)} columns")
print(f"Ukuran X_test: {test_data.count()} rows, {len(test_data.columns)} columns")

# Inisialisasi model RandomForest
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, seed=42)

# Membuat pipeline dengan assembler dan classifier
pipeline = Pipeline(stages=[assembler, rf])

# Melatih model menggunakan data training
model = pipeline.fit(train_data)

# Gunakan model yang telah dilatih untuk melakukan prediksi pada data testing
predictions = model.transform(test_data)

# Menampilkan beberapa prediksi
predictions.select("features", "label", "prediction").show(5)


Ukuran X_train: 200387 rows, 39 columns
Ukuran X_test: 50186 rows, 39 columns
+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(12,[2,3,5,8],[1....|    0|       0.0|
|(12,[2,3,4,5],[1....|    0|       0.0|
|(12,[2,4,5],[1.0,...|    0|       0.0|
|(12,[2,5],[1.0,9.0])|    0|       0.0|
|(12,[2,3,5,8],[1....|    0|       0.0|
+--------------------+-----+----------+
only showing top 5 rows



In [None]:
# Melakukan prediksi pada test data
predictions = model.transform(test_data)

# Evaluasi model menggunakan akurasi
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Akurasi Model: {accuracy:.4f}")
print(f"Akurasi (Persen): {accuracy * 100:.2f}%")

Akurasi Model: 0.9476
Akurasi (Persen): 94.76%


In [None]:
# Menampilkan beberapa hasil prediksi
print("Hasil Prediksi:")
predictions.select('HadAngina', 'HadStroke', 'GeneralHealth', 'SmokerStatus',
    'AlcoholDrinkers', 'AgeCategory', 'PhysicalHealthDays',
    'HadDiabetes', 'ChestScan', 'HadCOPD', 'DifficultyWalking', 'Sex', "label", "prediction", "probability").show(10, truncate=False)

# Menghentikan sesi Spark
spark.stop()


Hasil Prediksi:
+---------+---------+-------------+------------+---------------+-----------+------------------+-----------+---------+-------+-----------------+---+-----+----------+-----------------------------------------+
|HadAngina|HadStroke|GeneralHealth|SmokerStatus|AlcoholDrinkers|AgeCategory|PhysicalHealthDays|HadDiabetes|ChestScan|HadCOPD|DifficultyWalking|Sex|label|prediction|probability                              |
+---------+---------+-------------+------------+---------------+-----------+------------------+-----------+---------+-------+-----------------+---+-----+----------+-----------------------------------------+
|0        |0        |1.0          |1           |0              |3          |0.0               |0          |1        |0      |0                |0  |0    |0.0       |[0.9663172440281808,0.03368275597181919] |
|0        |0        |1.0          |1           |1              |8          |0.0               |0          |0        |0      |0                |0  |0    |0.0

# **HYPERPARAMETER**

In [1]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Membuat sesi Spark
spark = SparkSession.builder.appName("HeartDiseaseClassification").getOrCreate()

# Membaca dataset
data_path = "heart_2022_cleanedfx.csv"  # Ganti dengan path file Anda
df = spark.read.csv(data_path, header=True, inferSchema=True)

df = df.na.drop()

# Pilih fitur yang relevan dan target
features = [
    'HadAngina', 'HadStroke', 'GeneralHealth', 'SmokerStatus',
    'AlcoholDrinkers', 'AgeCategory', 'PhysicalHealthDays',
    'HadDiabetes', 'ChestScan', 'HadCOPD', 'DifficultyWalking', 'Sex'
]
target = 'HadHeartAttack'

# Menggabungkan semua fitur menjadi satu kolom 'features' menggunakan VectorAssembler
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Mengubah target menjadi numerik jika belum (asumsi target sudah numerik)
df = df.withColumnRenamed(target, "label")

# Membagi data menjadi training dan testing set
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Menyiapkan VectorAssembler
train_data_transformed = assembler.transform(train_data)
test_data_transformed = assembler.transform(test_data)

# Inisialisasi DecisionTree
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# Menyiapkan evaluator untuk multiclass classification
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Membuat param grid untuk hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10, 15])   # Maximum depth of the tree
             .addGrid(dt.maxBins, [32, 64, 128])  # Number of bins
             .addGrid(dt.minInstancesPerNode, [1, 2, 5])  # Minimum instances per node
             .build())

# Menyiapkan CrossValidator untuk tuning hyperparameters
crossval = CrossValidator(estimator=dt,
                          evaluator=evaluator,
                          estimatorParamMaps=paramGrid,
                          numFolds=3)  # 3-fold cross-validation

# Melatih model dengan cross-validation
cvModel = crossval.fit(train_data_transformed)

# Menggunakan model yang terlatih untuk melakukan prediksi pada data testing
predictions = cvModel.transform(test_data_transformed)

# Menampilkan beberapa prediksi
predictions.select("features", "label", "prediction").show(5)

# Evaluasi model
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy of the Decision Tree model with best hyperparameters: {accuracy}")


+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(12,[2,4,5],[1.0,...|    0|       0.0|
|(12,[2,4,5],[1.0,...|    0|       0.0|
|(12,[0,2,3,4,5,8]...|    1|       0.0|
|(12,[2,5],[1.0,6.0])|    0|       0.0|
|(12,[2,3,5,8],[1....|    0|       0.0|
+--------------------+-----+----------+
only showing top 5 rows

Accuracy of the Decision Tree model with best hyperparameters: 0.9487753047757521
