In [1]:
# This task consists of analysing the car data using Spark ML library. This data set is composed
# of 1727 rows and 6 different attributes which are buying price, price of maintenance, number
# of doors, capacity in terms of persons to carry, the relative size of luggage boot and the
# estimated safety value of each car.

# Build and evaluate any 2 Machine Learning algorithm using
# Apache Spark’s ML library on this dataset. The decision tree will classify the type of the car -
# 1) unacceptable, 2) acceptable, 3) good or 4) very good.


In [1]:
! pip install pyspark



In [2]:
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
import shutil
from pyspark.sql import SparkSession
from pyspark.mllib.util import MLUtils

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [4]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Car Classification") \
    .getOrCreate()


23/07/31 08:52:47 WARN Utils: Your hostname, Srijas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.99 instead (on interface en0)
23/07/31 08:52:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/31 08:52:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/31 08:53:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [5]:

# The csv files added to a files array
files = ["car_evaluation_0.csv","car_evaluation_1.csv", "car_evaluation_2.csv", "car_evaluation_3.csv"] 

data = []
# for each .csv file in the files array, read the .csv files with header being True signifying 
# the first line is the header
#  spark.read.csv will load the data into a DataFrame object using Apache Spark
for file_path in files:
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    data.append(df) #append dataframe df into data array 

# Combine all the DataFrames into a single DataFrame

# Initialize combined_df with the first DataFrame which is the "car_evaluation_0" content, from data dataframe
combined_df = data[0]

# Loop through the remaining dataFrames starting from "car_evaluation_1.csv" to "car_evaluation_3.csv" 
# and concatenate them to combined_df
# union concatenates with combined_df and select will make sure the other dataframes have the same columns as first dataframe i.e
# "car_evaluation_0" content

for df in data[1:]:
    combined_df = combined_df.union(df.select(*combined_df.columns))

# Count the number of rows and columns in the combined dataframe to confirm if all files were combined, should 
# return 1727 rows and 7 columns
# 432 rows from "car_evaluation_0.csv", 432 rows from "car_evaluation_1.csv",
# 432 rows from "car_evaluation_2.csv",431 rows from "car_evaluation_3.csv".

rowCount = combined_df.count()
colCount = len(combined_df.columns)


print("Number of rows in the combined dataframe:", rowCount)
print(f"Number of columns in the combined dataframe: {colCount}")

combined_df.show()



Number of rows in the combined dataframe: 1727
Number of columns in the combined dataframe: 7
+--------+---------+-------+---------+-----------+------+--------+
|buyPrice|maintCost|noDoors|noPersons|bootLuggage|safety|decision|
+--------+---------+-------+---------+-----------+------+--------+
|   vhigh|    vhigh|      2|        2|      small|   med|   unacc|
|   vhigh|    vhigh|      2|        2|      small|  high|   unacc|
|   vhigh|    vhigh|      2|        2|        med|   low|   unacc|
|   vhigh|    vhigh|      2|        2|        med|   med|   unacc|
|   vhigh|    vhigh|      2|        2|        med|  high|   unacc|
|   vhigh|    vhigh|      2|        2|        big|   low|   unacc|
|   vhigh|    vhigh|      2|        2|        big|   med|   unacc|
|   vhigh|    vhigh|      2|        2|        big|  high|   unacc|
|   vhigh|    vhigh|      2|        4|      small|   low|   unacc|
|   vhigh|    vhigh|      2|        4|      small|   med|   unacc|
|   vhigh|    vhigh|      2|       

In [19]:
from pyspark.ml import Pipeline

# The data needs to be converted into numbers to feed into the machine learning model and as single features 
# Then we use these features in our ML models


selected_data =[]

# Perform StringIndexing on categorical columns
# replace each category with a number since machine learning models need to be fed with numerical data 
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(combined_df) for col in ["buyPrice", "maintCost", "noDoors", "noPersons", "bootLuggage","safety","decision"]]

# A Pipeline is used to chain multiple Transformers and Estimators together to specify our machine learning workflow
pipeline = Pipeline(stages=indexers) #pipeline to chain the transformers and estimators 



# Next we call the fit() method to initiate the learning process and transform to add the indexed columns
indexed_data = pipeline.fit(combined_df).transform(combined_df)

# The returned indexer_model is an object of type Transformer.A Transformer is an abstraction that includes feature transformers and learned models. It
# implements a method transform(), which converts one DataFrame into another, generally by
# appending one or more columns.

indexed_data.select("buyPrice_index", "maintCost_index", "noDoors_index", "noPersons_index", "bootLuggage_index","safety_index","decision_index").show(n=30)

# Perform OneHotEncoding on indexed columns
encoder = OneHotEncoder(inputCols=["buyPrice_index", "maintCost_index", "noDoors_index", "noPersons_index", "bootLuggage_index","safety_index"],
                        outputCols=["buying_en", "maint_en", "doors_en", "persons_en", "lug_en", "safety_en"])

# After fitting the estimator and getting our transformer, it is time to use it on our data by
# calling transform().
encoded_data = encoder.fit(indexed_data).transform(indexed_data)

# show the first 30 rows after one hot encoding the dataframe
encoded_data.select("buying_en", "maint_en", "doors_en", "persons_en", "lug_en", "safety_en").show(n=30)


# We learned previously that Spark ML expects data to be represented in two columns:
# a features vector and a label column.

# To create a features array use VectorAssembler and add the encoded columns to add as combined features column
assembler = VectorAssembler(inputCols=["buying_en", "maint_en", "doors_en", "persons_en", "lug_en", "safety_en"],
                            outputCol="features")

# assembler used to create the features column
selected_data = assembler.transform(encoded_data)

# show the features and decision columns since they'll be used as input for the machine learning models next
selected_data.select("features", "decision_index").show(n=10)





+--------------+---------------+-------------+---------------+-----------------+------------+--------------+
|buyPrice_index|maintCost_index|noDoors_index|noPersons_index|bootLuggage_index|safety_index|decision_index|
+--------------+---------------+-------------+---------------+-----------------+------------+--------------+
|           3.0|            3.0|          3.0|            2.0|              2.0|         1.0|           0.0|
|           3.0|            3.0|          3.0|            2.0|              2.0|         0.0|           0.0|
|           3.0|            3.0|          3.0|            2.0|              1.0|         2.0|           0.0|
|           3.0|            3.0|          3.0|            2.0|              1.0|         1.0|           0.0|
|           3.0|            3.0|          3.0|            2.0|              1.0|         0.0|           0.0|
|           3.0|            3.0|          3.0|            2.0|              0.0|         2.0|           0.0|
|           3.0|   

In [20]:
# load spark modules for implimenting Decision tree and random forests classifiers 
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [21]:
# Split the data into training and testing sets (80% for training, 20% for testing)
(train, test) = selected_data.randomSplit([0.8, 0.2], seed=123)
# (train, test) = selected_data.randomSplit([0.8, 0.2], seed=42)
# (train, test) = selected_data.randomSplit([0.7, 0.3], seed=42)

print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))


Training Dataset Count: 1399
Test Dataset Count: 328


In [22]:
# Model 1: Decision Tree
# dt = DecisionTreeClassifier(labelCol="decision_index", featuresCol="features")

dt = DecisionTreeClassifier(
    labelCol="decision_index",
    featuresCol="features",
    maxDepth=10,
    minInstancesPerNode=2,
    impurity="gini",
)


# dt = DecisionTreeClassifier(labelCol="decision_index", featuresCol="features",maxDepth = 3)
decision_tree_evaluator = MulticlassClassificationEvaluator(labelCol="decision_index", predictionCol="prediction", metricName="accuracy")

# Set up a parameter grid for tuning the decision tree model
paramGrid = ParamGridBuilder().build()

         
# Build a CrossValidator to find the best model using the parameter grid
cv_dt = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=decision_tree_evaluator, numFolds=5)

# Train the models
dt_model = cv_dt.fit(train)

# Predicting Values
predictions = dt_model.transform(test)

# predictions is a DataFrame that contains: the original columns, the features column and 
# predictions column generated by the model
predictions.select(['buyPrice', 'maintCost', 'noDoors', 'noPersons', 'bootLuggage','safety','decision','decision_index','prediction', 'probability']).show()

y_true = predictions.select(['decision_index']).collect()
y_pred = predictions.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true, y_pred))

# Accuracy
dt_accuracy = decision_tree_evaluator.evaluate(predictions)
# Evaluate the models on the testing data
print("Decision tree accuracy: {:.2f}%".format(dt_accuracy * 100))
print("Decision tree accuracy test Error: {:.2f}%".format((1.0 - dt_accuracy) * 100))


+--------+---------+-------+---------+-----------+------+--------+--------------+----------+-----------------+
|buyPrice|maintCost|noDoors|noPersons|bootLuggage|safety|decision|decision_index|prediction|      probability|
+--------+---------+-------+---------+-----------+------+--------+--------------+----------+-----------------+
|   vhigh|     high|      2|        2|        big|   low|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        2|        med|   med|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        4|        big|   med|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        4|        med|  high|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        4|      small|   med|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|     more|        med|   low|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|

In [28]:
# Model 2: Random Forest
rf = RandomForestClassifier(
    labelCol="decision_index",
    featuresCol="features",
    numTrees=1,
    maxDepth=30,
    minInstancesPerNode=3,
    featureSubsetStrategy="auto",
    seed=42
)

rf_evaluator = MulticlassClassificationEvaluator(labelCol="decision_index", predictionCol="prediction", metricName="accuracy")

# Set up a parameter grid for tuning the Random Forest model 
paramGrid = ParamGridBuilder().build()

# Build a CrossValidator to find the best model using the parameter grid
cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=rf_evaluator, numFolds=5)

# Train the models
rf_model = cv_rf.fit(train)

rf_accuracy = rf_evaluator.evaluate(rf_model.transform(test))

# Predicting Values
predictions = rf_model.transform(test)
# predictions is a DataFrame that contains: the original columns, the features column and 
# predictions column generated by the model

predictions.select(['buyPrice', 'maintCost', 'noDoors', 'noPersons', 'bootLuggage','safety','decision','decision_index','prediction', 'probability']).show()
# compare the preidcted value and the actual label for what was supposed to be predicted i.e the decision that was indexed
predictions.select(['decision_index','prediction']).show()


y_true = predictions.select(['decision_index']).collect()
y_pred = predictions.select(['prediction']).collect()
from sklearn.metrics import classification_report, confusion_matrix

print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true, y_pred))

# Evaluate the models on the testing data
print("Random forests accuracy: {:.2f}%".format(rf_accuracy * 100))
print("Random forests test Error: {:.2f}%".format((1.0 - rf_accuracy) * 100))


+--------+---------+-------+---------+-----------+------+--------+--------------+----------+-----------------+
|buyPrice|maintCost|noDoors|noPersons|bootLuggage|safety|decision|decision_index|prediction|      probability|
+--------+---------+-------+---------+-----------+------+--------+--------------+----------+-----------------+
|   vhigh|     high|      2|        2|        big|   low|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        2|        med|   med|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        4|        big|   med|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        4|        med|  high|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|        4|      small|   med|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|   vhigh|     high|      2|     more|        med|   low|   unacc|           0.0|       0.0|[1.0,0.0,0.0,0.0]|
|

In [26]:
# Get the original labels from the StringIndexerModel from the beginning
# Create a DataFrame to map original labels with their indexed labels
prediction_labels = [(original_label, index_label) for original_label, index_label in enumerate(original_labels)]
mappings = spark.createDataFrame(prediction_labels, ["decision_index", "decision"])

# show the indexed labels mapped to the decision of car type  
mappings.show()



+--------------+--------+
|decision_index|decision|
+--------------+--------+
|             0|   unacc|
|             1|     acc|
|             2|    good|
|             3|   vgood|
+--------------+--------+

