In [85]:
# Importing libraries

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import SparkFiles
from pyspark.sql import SQLContext
from pyspark.sql.functions import min,max
import re
import csv

# Creating spark session

conf = SparkConf().setAppName("SparkCodingChallenge").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
sqlC = SQLContext(sc)

df = sqlC.read.csv("../data/iris.csv")

# rename column names

newcolnames = ["sepal_length", "sepal_width", "petal_length", "petal_width", "class"]

for c,n in zip(df.columns,newcolnames):
    df=df.withColumnRenamed(c,n)

df = df.select(df.sepal_length.cast('double'),df.sepal_width.cast('double'),df.petal_length.cast('double'),df.petal_width.cast('double'),'class')

from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[:-1]
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df = assembler.transform(df)

from pyspark.ml.feature import StringIndexer

df = df.select(['features','class'])
label_indexer = StringIndexer(inputCol='class', outputCol='label').fit(df)
df = label_indexer.transform(df)

df2 = df.select(['features','label'])

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.01)
lrModel = lr.fit(df2)

In [86]:
spark = pyspark.sql.SparkSession.builder.appName('Iris').getOrCreate()

In [87]:
pred_data = spark.createDataFrame(
[(5.1,3.5,1.4,0.2),
(6.2,3.4,5.4,2.3)],
["sepal_length", "sepal_width", "petal_length", "petal_width"])

In [88]:
pred_data.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)



In [89]:
feature_cols2 = pred_data.columns
assembler2 = VectorAssembler(inputCols=feature_cols2, outputCol='features')
pred_df = assembler2.transform(pred_data)

In [90]:
pred_df.show(10)

+------------+-----------+------------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|         features|
+------------+-----------+------------+-----------+-----------------+
|         5.1|        3.5|         1.4|        0.2|[5.1,3.5,1.4,0.2]|
|         6.2|        3.4|         5.4|        2.3|[6.2,3.4,5.4,2.3]|
+------------+-----------+------------+-----------+-----------------+



In [91]:
pred_df2 = pred_df.drop('sepal_length','sepal_width','petal_length','petal_width')

In [92]:
pred_df2.show(10)

+-----------------+
|         features|
+-----------------+
|[5.1,3.5,1.4,0.2]|
|[6.2,3.4,5.4,2.3]|
+-----------------+



In [93]:
pred_df2 = lrModel.transform(pred_df2)

In [94]:
pred_df2.show()

+-----------------+--------------------+--------------------+----------+
|         features|       rawPrediction|         probability|prediction|
+-----------------+--------------------+--------------------+----------+
|[5.1,3.5,1.4,0.2]|[5.42426204068578...|[0.98060989835204...|       0.0|
|[6.2,3.4,5.4,2.3]|[-4.7743046300876...|[2.49038896949521...|       2.0|
+-----------------+--------------------+--------------------+----------+



In [95]:
from pyspark.ml.feature import IndexToString

idx_to_string = IndexToString(inputCol="label", outputCol="labelValue")

class_dict = idx_to_string.transform(df).drop("features","class").distinct()

In [96]:
class_dict.show()

+-----+---------------+
|label|     labelValue|
+-----+---------------+
|  0.0|    Iris-setosa|
|  1.0|Iris-versicolor|
|  2.0| Iris-virginica|
+-----+---------------+



In [97]:
class_dict = class_dict.selectExpr("label as prediction","labelValue as predictionValue")

In [98]:
class_dict.show()

+----------+---------------+
|prediction|predictionValue|
+----------+---------------+
|       0.0|    Iris-setosa|
|       1.0|Iris-versicolor|
|       2.0| Iris-virginica|
+----------+---------------+



In [99]:
from pyspark.sql.functions import *

pred_df2 = pred_df2.join(class_dict, pred_df2.prediction==class_dict.prediction)

In [100]:
pred_df2.show()

+-----------------+--------------------+--------------------+----------+----------+---------------+
|         features|       rawPrediction|         probability|prediction|prediction|predictionValue|
+-----------------+--------------------+--------------------+----------+----------+---------------+
|[5.1,3.5,1.4,0.2]|[5.42426204068578...|[0.98060989835204...|       0.0|       0.0|    Iris-setosa|
|[6.2,3.4,5.4,2.3]|[-4.7743046300876...|[2.49038896949521...|       2.0|       2.0| Iris-virginica|
+-----------------+--------------------+--------------------+----------+----------+---------------+



In [101]:
predictionValues = pred_df2.select("predictionValue").collect()

In [102]:
with open('../out/out_3_2.txt', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["class"])
    for val in predictionValues:
        writer.writerow([val.predictionValue])