# Step 1: Setting Up the Environment

In [2]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     -------------------------------------- 0.0/317.0 MB 435.7 kB/s eta 0:12:08
     ---------------------------------------- 0.1/317.0 MB 1.2 MB/s eta 0:04:25
     ---------------------------------------- 0.6/317.0 MB 4.5 MB/s eta 0:01:10
     --------------------------------------- 2.0/317.0 MB 11.5 MB/s eta 0:00:28
     --------------------------------------- 3.5/317.0 MB 17.3 MB/s eta 0:00:19
      -------------------------------------- 4.5/317.0 MB 19.3 MB/s eta 0:00:17
      -------------------------------------- 6.3/317.0 MB 22.4 MB/s eta 0:00:14
     - ------------------------------------- 8.4/317.0 MB 26.8 MB/s eta 0:00:12
     - ------------------------------------- 9.8/317.0 MB 27.2 MB/s eta 0:00:12
     - ------------------------------------ 11.7/317.0 MB 43.7 MB/s e

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("IrisClassification").getOrCreate()


# Step 2: Load the Iris Dataset

In [33]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType

# Define the schema
schema = StructType([
    StructField("sepal_length", FloatType(), True),
    StructField("sepal_width", FloatType(), True),
    StructField("petal_length", FloatType(), True),
    StructField("petal_width", FloatType(), True),
    StructField("class", StringType(), True)
])

# Load the Iris dataset with header
iris_df = spark.read.csv("iris.csv", schema=schema, header=True)

# Show the first few rows to ensure data is loaded correctly
iris_df.show(5)


+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width| class|
+------------+-----------+------------+-----------+------+
|         5.1|        3.5|         1.4|        0.2|setosa|
|         4.9|        3.0|         1.4|        0.2|setosa|
|         4.7|        3.2|         1.3|        0.2|setosa|
|         4.6|        3.1|         1.5|        0.2|setosa|
|         5.0|        3.6|         1.4|        0.2|setosa|
+------------+-----------+------------+-----------+------+
only showing top 5 rows



# Step 3: Preprocess the Data

In [34]:
# Index the class column:

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="class", outputCol="label")
indexed_df = indexer.fit(iris_df).transform(iris_df)


In [35]:
# Assemble features into a feature vector:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], 
                            outputCol="features", 
                            handleInvalid="skip")
final_df = assembler.transform(indexed_df)


# Step 4: Split the Data

In [36]:
training_df, testing_df = final_df.randomSplit([0.7, 0.3], seed=42)


# Step 5: Select and Train a Classification Algorithm

In [37]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# Define the classifier
dt_classifier = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# Set up the pipeline
pipeline = Pipeline(stages=[dt_classifier])


# Step 6: Tune Hyperparameters using Cross-Validation

In [38]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(dt_classifier.maxDepth, [3, 5, 7]) \
    .addGrid(dt_classifier.minInstancesPerNode, [1, 2, 4]) \
    .build()

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Set up the CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)


In [39]:
cv_model = crossval.fit(training_df)


# Step 7: Evaluate the Model

In [40]:
# Make predictions on the test data:

predictions = cv_model.transform(testing_df)


In [41]:
# Evaluate performance using relevant metrics:

accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")

# Additional metrics
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")


Test Accuracy: 0.9782608695652174
Precision: 0.9804347826086957
Recall: 0.9782608695652174
F1 Score: 0.978458139351377


# Step 8: Comparative Analysis

In [42]:
# Compare predicted labels with actual labels:

predictions.select("label", "prediction", "probability").show(10)


+-----+----------+-------------+
|label|prediction|  probability|
+-----+----------+-------------+
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  0.0|       0.0|[1.0,0.0,0.0]|
|  1.0|       1.0|[0.0,1.0,0.0]|
+-----+----------+-------------+
only showing top 10 rows

