## Spark ML Model Development

This project aims to create model using SparkML, a library that run on top of Apache Spark. SparkML is suitable for creating a model that learn from very big dataset (> millions dataset)

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, NaiveBayes, MultilayerPerceptronClassifier,DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
import pandas as pd


spark = SparkSession.builder \
    .appName("SparkML Model Development") \
    .master("local") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

### Let's Doing Data Preprocessing

Let's import the dataset first, removing null values, and casting the data type into correct data type

In [2]:
# Read the dataset
data = spark.read.options(header='True', inferSchema=True, delimiter=',').csv("D:\Self_project\sparkml_classification\data\dataset_2millions.csv")



In [3]:
data.show()

+--------+---------+---+-----+----+-----+---------+----------+---------+-------+
| pointid|grid_code|Red|Green|Blue|Label| norm_red|norm_green|norm_blue|Label_1|
+--------+---------+---+-----+----+-----+---------+----------+---------+-------+
|19300985|        1|0.0| 43.0|20.0|    1|0,0941176| 0,1686275|0,0784314|      1|
| 5665222|        0|0.0| 69.0|37.0|    0|0,1764706| 0,2705882|0,1450980|      0|
|34365418|        0|0.0| 47.0|25.0|    0|0,1176471| 0,1843137|0,0980392|      0|
|13371923|        0|0.0|116.0|89.0|    0|0,4470588| 0,4549020|0,3490196|      0|
|10244697|        0|0.0| 57.0|40.0|    0|0,1568628| 0,2235294|0,1568628|      0|
|79029168|        0|0.0| 49.0|21.0|    0|0,0941176| 0,1921569|0,0823529|      0|
|20882365|        0|0.0| 51.0|26.0|    0|0,1607843| 0,2000000|0,1019608|      0|
| 2966066|        0|0.0| 94.0|61.0|    0|0,2196078| 0,3686275|0,2392157|      0|
|29171445|        1|0.0| 44.0|21.0|    1|0,0980392| 0,1725490|0,0823529|      1|
|44246354|        0|0.0| 53.

In [4]:
# Select only meaningful columns
data = data.select("norm_red", "norm_green", "norm_blue", "Label")

data.show()

+---------+----------+---------+-----+
| norm_red|norm_green|norm_blue|Label|
+---------+----------+---------+-----+
|0,0941176| 0,1686275|0,0784314|    1|
|0,1764706| 0,2705882|0,1450980|    0|
|0,1176471| 0,1843137|0,0980392|    0|
|0,4470588| 0,4549020|0,3490196|    0|
|0,1568628| 0,2235294|0,1568628|    0|
|0,0941176| 0,1921569|0,0823529|    0|
|0,1607843| 0,2000000|0,1019608|    0|
|0,2196078| 0,3686275|0,2392157|    0|
|0,0980392| 0,1725490|0,0823529|    1|
|0,1333333| 0,2078431|0,1137255|    0|
|0,1254902| 0,2117647|0,1294118|    1|
|0,1215686| 0,2000000|0,1215686|    1|
|0,1215686| 0,1921569|0,1254902|    1|
|0,2039216| 0,2666667|0,1254902|    0|
|0,1450980| 0,2549020|0,1215686|    0|
|0,5098040| 0,5490196|0,3411765|    0|
|0,3529412| 0,3137255|0,1960784|    0|
|0,3019608| 0,3647059|0,3294118|    1|
|0,0862745| 0,1568628|0,0784314|    1|
|0,3215686| 0,3529412|0,2823530|    0|
+---------+----------+---------+-----+
only showing top 20 rows



In [5]:
# Check the null value
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]
   ).show()

+--------+----------+---------+-----+
|norm_red|norm_green|norm_blue|Label|
+--------+----------+---------+-----+
|       0|         0|        0|    0|
+--------+----------+---------+-----+



In [6]:
# Check the data type
data.dtypes

[('norm_red', 'string'),
 ('norm_green', 'string'),
 ('norm_blue', 'string'),
 ('Label', 'int')]

In [7]:
# There is issue with our datatype column, it is supposed to be a double but turns out a string
# We need to convert it into double
# Before convert it into double, replace the "," sign into "." because double type only can read the "." sign


data = data.withColumn("norm_red", regexp_replace('norm_red', ',', '.'))
data = data.withColumn("norm_red", data.norm_red.cast(DoubleType()))

data = data.withColumn("norm_green", regexp_replace('norm_green', ',', '.'))
data = data.withColumn("norm_green", data.norm_red.cast(DoubleType()))

data = data.withColumn("norm_blue", regexp_replace('norm_blue', ',', '.'))
data = data.withColumn("norm_blue", data.norm_red.cast(DoubleType()))

data.dtypes

[('norm_red', 'double'),
 ('norm_green', 'double'),
 ('norm_blue', 'double'),
 ('Label', 'int')]

In [8]:
data.show()

+---------+----------+---------+-----+
| norm_red|norm_green|norm_blue|Label|
+---------+----------+---------+-----+
|0.0941176| 0.0941176|0.0941176|    1|
|0.1764706| 0.1764706|0.1764706|    0|
|0.1176471| 0.1176471|0.1176471|    0|
|0.4470588| 0.4470588|0.4470588|    0|
|0.1568628| 0.1568628|0.1568628|    0|
|0.0941176| 0.0941176|0.0941176|    0|
|0.1607843| 0.1607843|0.1607843|    0|
|0.2196078| 0.2196078|0.2196078|    0|
|0.0980392| 0.0980392|0.0980392|    1|
|0.1333333| 0.1333333|0.1333333|    0|
|0.1254902| 0.1254902|0.1254902|    1|
|0.1215686| 0.1215686|0.1215686|    1|
|0.1215686| 0.1215686|0.1215686|    1|
|0.2039216| 0.2039216|0.2039216|    0|
| 0.145098|  0.145098| 0.145098|    0|
| 0.509804|  0.509804| 0.509804|    0|
|0.3529412| 0.3529412|0.3529412|    0|
|0.3019608| 0.3019608|0.3019608|    1|
|0.0862745| 0.0862745|0.0862745|    1|
|0.3215686| 0.3215686|0.3215686|    0|
+---------+----------+---------+-----+
only showing top 20 rows



#### Prepare the Dataset Before Model Training

Vectorize the dataset is key important thing before we train our model in Spark ML. It involves converting data into a format that machine learning algorithms can efficiently process. 

In [9]:
feature_columns = ['norm_red', 'norm_green', 'norm_blue']  # Replace with your actual feature columns
label = 'Label'

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)
data = data.select("features", "label")

In [10]:
data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0941176,0.0941...|    1|
|[0.1764706,0.1764...|    0|
|[0.1176471,0.1176...|    0|
|[0.4470588,0.4470...|    0|
|[0.1568628,0.1568...|    0|
|[0.0941176,0.0941...|    0|
|[0.1607843,0.1607...|    0|
|[0.2196078,0.2196...|    0|
|[0.0980392,0.0980...|    1|
|[0.1333333,0.1333...|    0|
|[0.1254902,0.1254...|    1|
|[0.1215686,0.1215...|    1|
|[0.1215686,0.1215...|    1|
|[0.2039216,0.2039...|    0|
|[0.145098,0.14509...|    0|
|[0.509804,0.50980...|    0|
|[0.3529412,0.3529...|    0|
|[0.3019608,0.3019...|    1|
|[0.0862745,0.0862...|    1|
|[0.3215686,0.3215...|    0|
+--------------------+-----+
only showing top 20 rows



In [38]:
# Define models
models = [
    LogisticRegression(featuresCol='features', labelCol='label'),
    RandomForestClassifier(featuresCol='features', labelCol='label'),
    GBTClassifier(featuresCol='features', labelCol='label'),
    DecisionTreeClassifier(featuresCol='features', labelCol='label'),
    NaiveBayes(featuresCol='features', labelCol='label'),
    MultilayerPerceptronClassifier(featuresCol='features', labelCol='label', layers=[len(feature_columns), 2, 2]),  # Example layers
    LinearSVC(featuresCol='features', labelCol='label')
]

# Define evaluation metrics
metrics = ['accuracy', 'recall', 'precision', 'f1', 'iou']

# Define the evaluator function for each metric
def evaluate_metrics(predictions, metric):
    if metric == 'accuracy':
        evaluator = BinaryClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
        return evaluator.evaluate(predictions)
    elif metric == 'recall':
        evaluator = BinaryClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
        return evaluator.evaluate(predictions)
    elif metric == 'precision':
        evaluator = BinaryClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
        return evaluator.evaluate(predictions)
    elif metric == 'f1':
        evaluator = BinaryClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
        return evaluator.evaluate(predictions)
    elif metric == 'iou':
        # Assuming predictions contain rawPrediction column
        evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
        return evaluator.evaluate(predictions)
    else:
        return None

# Define param grid (optional, for hyperparameter tuning)
paramGrid = ParamGridBuilder().build()

# Train and evaluate models
results = []
for model in models:
    print(f"Evaluating model: {model}")
    model_results = []
    for i in range(5):  # 5-fold cross-validation
        # Train the model
        crossval = CrossValidator(estimator=model,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=BinaryClassificationEvaluator(),
                                  numFolds=2)
        cvModel = crossval.fit(data)
        
        # Make predictions
        predictions = cvModel.transform(data)
        
        # Evaluate metrics
        metrics_values = [evaluate_metrics(predictions, metric) for metric in metrics]
        model_results.append(metrics_values)
    
    # Compute average metrics across folds
    avg_metrics = [sum(metric) / len(metric) for metric in zip(*model_results)]
    results.append(avg_metrics)

# Convert results to DataFrame
results_df = pd.DataFrame(results, columns=metrics)

# Write results to CSV file
results_df.to_csv("metrics_results.csv", index=False)

Evaluating model: LogisticRegression_83c8a4cc17d8


TypeError: __init__() got an unexpected keyword argument 'predictionCol'

In [13]:
from pyspark.ml.classification import (LogisticRegression, RandomForestClassifier, 
                                       GBTClassifier, DecisionTreeClassifier, 
                                       NaiveBayes, MultilayerPerceptronClassifier, LinearSVC)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col
import pandas as pd

# Define models
models = [
    LogisticRegression(featuresCol='features', labelCol='label'),
    RandomForestClassifier(featuresCol='features', labelCol='label'),
    GBTClassifier(featuresCol='features', labelCol='label'),
    DecisionTreeClassifier(featuresCol='features', labelCol='label'),
    NaiveBayes(featuresCol='features', labelCol='label'),
    MultilayerPerceptronClassifier(featuresCol='features', labelCol='label', layers=[len(feature_columns), 2, 2]),  # Example layers
    LinearSVC(featuresCol='features', labelCol='label')
]

# Define evaluation metrics
metrics = ['accuracy', 'recall', 'precision', 'f1', 'areaUnderROC', 'iou']

# Define the evaluator function for each metric
def evaluate_metrics(predictions, metric):
    if metric == 'accuracy':
        total_count = predictions.count()
        if total_count == 0:
            return 0.0
        correct_predictions = predictions.filter(predictions.label == predictions.prediction).count()
        accuracy = correct_predictions / float(total_count)
        return accuracy
    elif metric == 'recall':
        tp = predictions.filter((predictions.label == 1) & (predictions.prediction == 1)).count()
        fn = predictions.filter((predictions.label == 1) & (predictions.prediction == 0)).count()
        if tp + fn == 0:
            return 0.0
        recall = tp / float(tp + fn)
        return recall
    elif metric == 'precision':
        tp = predictions.filter((predictions.label == 1) & (predictions.prediction == 1)).count()
        fp = predictions.filter((predictions.label == 0) & (predictions.prediction == 1)).count()
        if tp + fp == 0:
            return 0.0
        precision = tp / float(tp + fp)
        return precision
    elif metric == 'f1':
        tp = predictions.filter((predictions.label == 1) & (predictions.prediction == 1)).count()
        fp = predictions.filter((predictions.label == 0) & (predictions.prediction == 1)).count()
        fn = predictions.filter((predictions.label == 1) & (predictions.prediction == 0)).count()
        if tp + fp == 0 or tp + fn == 0:
            return 0.0
        precision = tp / float(tp + fp)
        recall = tp / float(tp + fn)
        if precision + recall == 0:
            return 0.0
        f1 = 2 * (precision * recall) / (precision + recall)
        return f1
    elif metric == 'areaUnderROC':
        evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
        return evaluator.evaluate(predictions)
    elif metric == 'iou':
        tp = predictions.filter((predictions.label == 1) & (predictions.prediction == 1)).count()
        fp = predictions.filter((predictions.label == 0) & (predictions.prediction == 1)).count()
        fn = predictions.filter((predictions.label == 1) & (predictions.prediction == 0)).count()
        if tp + fp + fn == 0:
            return 0.0
        iou = tp / float(tp + fp + fn)
        return iou
    else:
        return None

# Define param grid (optional, for hyperparameter tuning)
paramGrid = ParamGridBuilder().build()

# Train and evaluate models
results = []
for model in models:
    print(f"Evaluating model: {model}")
    model_results = []
    for i in range(10):  # 5-fold cross-validation
        # Train the model
        crossval = CrossValidator(estimator=model,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                                  numFolds=10)
        cvModel = crossval.fit(data)
        
        # Make predictions
        predictions = cvModel.transform(data)
        
        # Evaluate metrics
        metrics_values = {metric: evaluate_metrics(predictions, metric) for metric in metrics}
        model_results.append(metrics_values)
    
    results.append(model_results)

# Convert results to DataFrame
results_flattened = []
for model_index, model_results in enumerate(results):
    for fold_index, fold_metrics in enumerate(model_results):
        fold_metrics['model'] = model_index
        fold_metrics['fold'] = fold_index
        results_flattened.append(fold_metrics)

results_df = pd.DataFrame(results_flattened)

# Write results to CSV file
results_df.to_csv("metrics_results2.csv", index=False)


Evaluating model: LogisticRegression_9246039ec13d
