In [205]:
from math import sqrt
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, stddev
from pyspark.sql.types import StructType, StructField, FloatType, StringType
from heapq import nsmallest
import statistics

In [206]:
# Constructing schema and loading in dataframe

spark = SparkSession.builder.appName("iris").getOrCreate()
schema = StructType([ 
    StructField('sepal_length', 
                FloatType(), True), 
    StructField('sepal_width', 
                FloatType(), True), 
    StructField('petal_length', 
                FloatType(), True), 
    StructField('petal_width', 
                FloatType(), True),
    StructField('class',
                StringType(), True)
    ])
iris_df = spark.read.format("csv").schema(schema).option("header", False).load("C:/Users/tyler/Documents/SparkAssignments/FinalProject/iris/iris.data")
iris_df.createOrReplaceTempView("iris")

In [207]:
# Calculating mean and standard deviations of the features

mean_sep_len = iris_df.agg(mean('sepal_length')).first()[0]
mean_sep_wid = iris_df.agg(mean('sepal_width')).first()[0]
mean_pet_len = iris_df.agg(mean('petal_length')).first()[0]
mean_pet_wid = iris_df.agg(mean('petal_width')).first()[0]

stddev_sep_len = iris_df.agg(stddev('sepal_length')).first()[0]
stddev_sep_wid = iris_df.agg(stddev('sepal_width')).first()[0]
stddev_pet_len = iris_df.agg(stddev('petal_length')).first()[0]
stddev_pet_wid = iris_df.agg(stddev('petal_width')).first()[0]

In [208]:
# Normalizing the data and mapping this function to every row in the dataframe

def normalize(row):
    return ((row[0] - mean_sep_len) / stddev_sep_len,
            (row[1] - mean_sep_wid) / stddev_sep_wid,
            (row[2] - mean_pet_len) / stddev_pet_len,
            (row[3] - mean_pet_wid) / stddev_pet_wid,
             row[4]
           )

In [209]:
normalized_rdd = df.rdd.map(normalize)

In [210]:
normalized_df = spark.createDataFrame(normalized_rdd, schema)

In [211]:
normalized_df.show()

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|      class|
+------------+-----------+------------+-----------+-----------+
|   -0.897674|  1.0286113|   -1.336794| -1.3085928|Iris-setosa|
|  -1.1392003|-0.12454039|   -1.336794| -1.3085928|Iris-setosa|
|  -1.3807273| 0.33672038|  -1.3934699| -1.3085928|Iris-setosa|
|  -1.5014905|0.106089726|  -1.2801182| -1.3085928|Iris-setosa|
|  -1.0184371|  1.2592413|   -1.336794| -1.3085928|Iris-setosa|
|  -0.5353839|  1.9511328|  -1.1667665| -1.0465249|Iris-setosa|
|  -1.5014905| 0.79798114|   -1.336794| -1.1775588|Iris-setosa|
|  -1.0184371| 0.79798114|  -1.2801182| -1.3085928|Iris-setosa|
|   -1.743017| -0.3551705|   -1.336794| -1.3085928|Iris-setosa|
|  -1.1392003|0.106089726|  -1.2801182| -1.4396268|Iris-setosa|
|  -0.5353839|  1.4898721|  -1.2801182| -1.3085928|Iris-setosa|
|  -1.2599635| 0.79798114|  -1.2234423| -1.3085928|Iris-setosa|
|  -1.2599635|-0.12454039|   -1.336794| 

In [212]:
# Splitting data into training, validation, and testing sets

train_df, val_df, test_df = normalized_df.randomSplit(weights=[.6, .2, .2], seed=100)

In [213]:
# Extracting class labels from dataframes

train_labels = [row['class'] for row in train_df.select("class").collect()]
val_labels = [row['class'] for row in val_df.select("class").collect()]
test_labels = [row['class'] for row in test_df.select("class").collect()]

In [214]:
# Removing class labels from original dataframes

train_df = train_df.select(train_df.columns[0:4])
test_df = test_df.select(test_df.columns[0:4])
val_df = val_df.select(val_df.columns[0:4])

In [215]:
# Collecting data from train df->rdd so I don't have to later

train_rdd = train_df.rdd.collect()

In [216]:
# Calculating euclidean distance between one row (in either val or test) and all the data in the training set

def euclidean_dist(row):
    dist = {}
    for index, data in enumerate(train_rdd):
        dist[index] = sqrt(sum([pow(row[0] - train_rdd[index][0], 2), 
                                pow(row[1] - train_rdd[index][1], 2), 
                                pow(row[2] - train_rdd[index][2], 2), 
                                pow(row[3] - train_rdd[index][3], 2)]))

    return dist

In [220]:
# Maps the euclidean distance function across every row in the val or test set, keeps the k nearest, looks up and assigns most common class label

def predict(df, k):
    distances = df.rdd.map(euclidean_dist).collect()
    k_closest = {}
    for index, data in enumerate(distances):
        k_closest_indices = nsmallest(k, data, key=data.get)
        k_closest[index] = k_closest_indices

        for i, num in enumerate(k_closest[index]): 
            k_closest[index][i] = train_labels[num]
        
        k_closest[index] = statistics.mode(k_closest[index])
        
    return k_closest

In [218]:
for i in range(2, 50, 2):
    print(f"k={i} accuracy: {round(prediction_accuracy(val_df, val_labels, i) * 100, 3)}%")

k=2 accuracy: 100.0%
k=4 accuracy: 100.0%
k=6 accuracy: 100.0%
k=8 accuracy: 100.0%
k=10 accuracy: 100.0%
k=12 accuracy: 100.0%
k=14 accuracy: 100.0%
k=16 accuracy: 100.0%
k=18 accuracy: 100.0%
k=20 accuracy: 93.1%
k=22 accuracy: 96.6%
k=24 accuracy: 96.6%
k=26 accuracy: 96.6%
k=28 accuracy: 96.6%
k=30 accuracy: 96.6%
k=32 accuracy: 93.1%
k=34 accuracy: 93.1%
k=36 accuracy: 96.6%
k=38 accuracy: 93.1%
k=40 accuracy: 93.1%
k=42 accuracy: 93.1%
k=44 accuracy: 93.1%
k=46 accuracy: 93.1%
k=48 accuracy: 86.2%


I get 100% accuracy until k=20, so we will stick with k=10.

In [225]:
print(f"Test prediction accuracy with k=10: {prediction_accuracy(test_df, test_labels, 10) * 100}%")

Test prediction accuracy with k=10: 93.5%
