In [23]:
#import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [24]:
# Create a spark session/application
spark = SparkSession.builder.appName('iris_dataset').getOrCreate()

In [25]:
# Load the dataset
df = spark.read.csv('../../data/iris.csv', inferSchema=True, header=True)

In [26]:
display(df.printSchema())
display(df.show(5))

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



None

+------------+-----------+------------+-----------+---------+
|sepal_length|sepal_width|petal_length|petal_width|  species|
+------------+-----------+------------+-----------+---------+
|         6.0|        3.0|         4.8|        1.8|virginica|
|         6.9|        3.1|         5.4|        2.1|virginica|
|         6.7|        3.1|         5.6|        2.4|virginica|
|         6.9|        3.1|         5.1|        2.3|virginica|
|         5.8|        2.7|         5.1|        1.9|virginica|
+------------+-----------+------------+-----------+---------+
only showing top 5 rows



None

In [27]:
#encode the species column
indexer = StringIndexer(inputCol='species', outputCol='label')
df = indexer.fit(df).transform(df)

In [28]:
df.show(5)

+------------+-----------+------------+-----------+---------+-----+
|sepal_length|sepal_width|petal_length|petal_width|  species|label|
+------------+-----------+------------+-----------+---------+-----+
|         6.0|        3.0|         4.8|        1.8|virginica|  2.0|
|         6.9|        3.1|         5.4|        2.1|virginica|  2.0|
|         6.7|        3.1|         5.6|        2.4|virginica|  2.0|
|         6.9|        3.1|         5.1|        2.3|virginica|  2.0|
|         5.8|        2.7|         5.1|        1.9|virginica|  2.0|
+------------+-----------+------------+-----------+---------+-----+
only showing top 5 rows



In [29]:
# Create a vector assembler for the feature columns
assembler = VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')
df = assembler.transform(df)

In [31]:
df.show(5)

+------------+-----------+------------+-----------+---------+-----+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|  species|label|         features|
+------------+-----------+------------+-----------+---------+-----+-----------------+
|         6.0|        3.0|         4.8|        1.8|virginica|  2.0|[6.0,3.0,4.8,1.8]|
|         6.9|        3.1|         5.4|        2.1|virginica|  2.0|[6.9,3.1,5.4,2.1]|
|         6.7|        3.1|         5.6|        2.4|virginica|  2.0|[6.7,3.1,5.6,2.4]|
|         6.9|        3.1|         5.1|        2.3|virginica|  2.0|[6.9,3.1,5.1,2.3]|
|         5.8|        2.7|         5.1|        1.9|virginica|  2.0|[5.8,2.7,5.1,1.9]|
+------------+-----------+------------+-----------+---------+-----+-----------------+
only showing top 5 rows



In [32]:
# Split the data into train and test sets
train, test = df.randomSplit([0.7, 0.3])

In [34]:
# Create a logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Fit the model
lr_model = lr.fit(train)

# Make predictions
predictions = lr_model.transform(test)

In [35]:
predictions.show(5)

+------------+-----------+------------+-----------+----------+-----+-----------------+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|label|         features|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+----------+-----+-----------------+--------------------+--------------------+----------+
|         6.0|        3.0|         4.8|        1.8| virginica|  2.0|[6.0,3.0,4.8,1.8]|[-606.96116122220...|[0.0,1.0,1.875347...|       1.0|
|         6.2|        3.4|         5.4|        2.3| virginica|  2.0|[6.2,3.4,5.4,2.3]|[-801.13941719412...|[0.0,3.7897443622...|       2.0|
|         6.3|        2.5|         5.0|        1.9| virginica|  2.0|[6.3,2.5,5.0,1.9]|[-1019.3040539894...|[0.0,7.2281409627...|       2.0|
|         6.9|        3.1|         5.4|        2.1| virginica|  2.0|[6.9,3.1,5.4,2.1]|[-1042.0309683354...|[0.0,6.2948059475...|       2.0|
|         6.2|      

In [36]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

In [37]:
# Print the accuracy
print('Accuracy of the model is: ', accuracy)

Accuracy of the model is:  0.8947368421052632


In [40]:
# Classification report
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert the predictions and labels to RDD
predictionAndLabels = predictions.select(['prediction', 'label']).rdd

# Instantiate the metrics object
metrics = MulticlassMetrics(predictionAndLabels)

# Print the confusion matrix
print(metrics.confusionMatrix().toArray())

[[11.  1.  0.]
 [ 0. 14.  0.]
 [ 0.  3.  9.]]


In [47]:
# Create Precision, Recall and F1 score for each class
labels = [0.0, 1.0, 2.0]
for label in labels:
    print(f'Precision of class {label} is {metrics.precision(label)}')
    print(f'Recall of class {label} is {metrics.recall(label)}')
    print(f'F1 score of class {label} is {metrics.fMeasure(label)}')

Precision of class 0.0 is 1.0
Recall of class 0.0 is 0.9166666666666666
F1 score of class 0.0 is 0.9565217391304348
Precision of class 1.0 is 0.7777777777777778
Recall of class 1.0 is 1.0
F1 score of class 1.0 is 0.8750000000000001
Precision of class 2.0 is 1.0
Recall of class 2.0 is 0.75
F1 score of class 2.0 is 0.8571428571428571
