In [33]:
! pip install pyspark scikit-learn



In [34]:
## IMPORTING NECESSARY LIBRARIES
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, PCA, StringIndexer, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.datasets import load_iris
import pandas as pd



In [35]:
## CREATING SPARK SESSION
spark = SparkSession.builder.master("local").appName("PySparkClassification").getOrCreate()

In [36]:
## LOADING THE IRIS DATASET
iris = load_iris()


In [37]:
## SEPARATING INTO FEATURE AND TARGET VARIABLE
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target


In [38]:
## CONVERTING THE PANDAS DATAFRAME TO A PYSPARK DATAFRAME
df = spark.createDataFrame(iris_df)
df.show(5)

+-----------------+----------------+-----------------+----------------+------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|
+-----------------+----------------+-----------------+----------------+------+
|              5.1|             3.5|              1.4|             0.2|     0|
|              4.9|             3.0|              1.4|             0.2|     0|
|              4.7|             3.2|              1.3|             0.2|     0|
|              4.6|             3.1|              1.5|             0.2|     0|
|              5.0|             3.6|              1.4|             0.2|     0|
+-----------------+----------------+-----------------+----------------+------+
only showing top 5 rows



In [39]:
## DROPPING THE MISSING ROWS
df = df.na.drop()

In [40]:
## CONVERTING THE TARGET COLUMN FROM CATEGORICAL TO NUMERIC(CONTINUOUS)
indexer = StringIndexer(inputCol="target", outputCol="indexed_target")

df = indexer.fit(df).transform(df)

In [41]:
## ASSEMBLING THE FEATURES TO A SINGLE VECTOR COLUMN
assembler = VectorAssembler(inputCols=iris.feature_names, outputCol="features")
df = assembler.transform(df)
df.show(5)

+-----------------+----------------+-----------------+----------------+------+--------------+-----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|indexed_target|         features|
+-----------------+----------------+-----------------+----------------+------+--------------+-----------------+
|              5.1|             3.5|              1.4|             0.2|     0|           0.0|[5.1,3.5,1.4,0.2]|
|              4.9|             3.0|              1.4|             0.2|     0|           0.0|[4.9,3.0,1.4,0.2]|
|              4.7|             3.2|              1.3|             0.2|     0|           0.0|[4.7,3.2,1.3,0.2]|
|              4.6|             3.1|              1.5|             0.2|     0|           0.0|[4.6,3.1,1.5,0.2]|
|              5.0|             3.6|              1.4|             0.2|     0|           0.0|[5.0,3.6,1.4,0.2]|
+-----------------+----------------+-----------------+----------------+------+--------------+-----------

In [42]:
## APPLYING PCA TO REDUCE DIMENSION
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(df)
df_pca = pca_model.transform(df)
df_pca.show(5)


+-----------------+----------------+-----------------+----------------+------+--------------+-----------------+--------------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|target|indexed_target|         features|        pca_features|
+-----------------+----------------+-----------------+----------------+------+--------------+-----------------+--------------------+
|              5.1|             3.5|              1.4|             0.2|     0|           0.0|[5.1,3.5,1.4,0.2]|[-2.8182395066394...|
|              4.9|             3.0|              1.4|             0.2|     0|           0.0|[4.9,3.0,1.4,0.2]|[-2.7882234453146...|
|              4.7|             3.2|              1.3|             0.2|     0|           0.0|[4.7,3.2,1.3,0.2]|[-2.6133745635497...|
|              4.6|             3.1|              1.5|             0.2|     0|           0.0|[4.6,3.1,1.5,0.2]|[-2.7570222769675...|
|              5.0|             3.6|              1.4|             0.

In [43]:
## SPLITTING THE DATA INTO TRAIN AND TEST DATA
train_data, test_data = df_pca.randomSplit([0.8, 0.2], seed=1234)


In [44]:
## DEFINING AND TRAINING THE DECISION TREE CLASSIFIER MODEL AND PREDICTING ON TEST DATA
dt_classifier = DecisionTreeClassifier(labelCol="indexed_target", featuresCol="pca_features")

dt_model = dt_classifier.fit(train_data)

dt_predictions = dt_model.transform(test_data)

In [45]:
## DECISION TREE MODEL EVALUATION AND TEST ACCURACY
dt_evaluator = MulticlassClassificationEvaluator(labelCol="indexed_target", predictionCol="prediction", metricName="accuracy")

dt_accuracy = dt_evaluator.evaluate(dt_predictions)

print("Decision Tree Accuracy:", dt_accuracy)

Decision Tree Accuracy: 0.972972972972973


In [46]:
## DEFINING AND TRAINING THE RANDOM FOREST CLASSIFIER MODEL AND PREDICTING ON TEST DATA
rf_classifier = RandomForestClassifier(labelCol="indexed_target", featuresCol="pca_features")

rf_model = rf_classifier.fit(train_data)

rf_predictions = rf_model.transform(test_data)

In [47]:
## RANDOM FOREST CALSSIFIER MODEL EVALUATION AND TEST ACCURACY
rf_accuracy = dt_evaluator.evaluate(rf_predictions)

print("Random Forest Accuracy:", rf_accuracy)

Random Forest Accuracy: 0.972972972972973


In [48]:
## TERMINATING THE SPARK SESSION
spark.stop()