In [1]:
import pyspark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import Vectors

In [2]:
# Creating the Apache spark SQL context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
infile = 'iris.csv'
outfile = '../out/out_3_2.txt'

In [3]:
# Creating Struct for the schema of the csv file to be read
_schema = pyspark.sql.types.StructType(
        [pyspark.sql.types.StructField("sepal_length", pyspark.sql.types.FloatType(), False),
         pyspark.sql.types.StructField("sepal_width", pyspark.sql.types.FloatType(), False),
         pyspark.sql.types.StructField("petal_length", pyspark.sql.types.FloatType(), False),
         pyspark.sql.types.StructField("petal_width", pyspark.sql.types.FloatType(), False),
         pyspark.sql.types.StructField("class", pyspark.sql.types.StringType(), False)
         ])

In [4]:
# Read the csv file, infering the aforementioned schema
trainDF = spark.read.format("csv") \
            .schema(_schema) \
            .load(infile)

In [5]:
# Convert the columns 'sepal_length','sepal_width','petal_length','petal_width' to Spark Dense Vectors under column 'features'
vectorAssembler = VectorAssembler(inputCols=['sepal_length','sepal_width','petal_length','petal_width'], outputCol="features")
# Encode the class column into floating number to be able to perform mathematical calculations on it and placing it under column 'labels'
labelIndexer = StringIndexer(inputCol="class", outputCol="labels").fit(trainDF)
# Declaring the logistic regression class with max iteration as 100, as was seen in the code snippet in the question
lr = LogisticRegression(featuresCol='features', labelCol='labels', maxIter=100, fitIntercept=True)
# Decode the labels back to text class and place them under column 'predictedLabel'
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
# Putting all the above items into a pipeline for ease of training and predictions
pipeline = Pipeline(stages=[vectorAssembler, labelIndexer, lr, labelConverter])

In [6]:
# training the model with training data
model = pipeline.fit(trainDF)

In [7]:
# declaring the testing data dataframe
predData = spark.createDataFrame(
    [(5.1, 3.5, 1.4, 0.2),
    (6.2, 3.4, 5.4, 2.3)],
            ['sepal_length','sepal_width','petal_length','petal_width'])

In [8]:
# predicting on the requested data
predictions = model.transform(predData)

In [9]:
predictions[['sepal_length','sepal_width','petal_length','petal_width', 'predictedLabel']].show()

+------------+-----------+------------+-----------+--------------+
|sepal_length|sepal_width|petal_length|petal_width|predictedLabel|
+------------+-----------+------------+-----------+--------------+
|         5.1|        3.5|         1.4|        0.2|   Iris-setosa|
|         6.2|        3.4|         5.4|        2.3|Iris-virginica|
+------------+-----------+------------+-----------+--------------+



In [10]:
# Since it is asked to output into a file. The outputs are collected dumps.
# .write().text(<path>) takes in path and creates file as part-0000..... files
# Same goes for all the following exercise
with open(outfile, 'w') as f:
    for pred in predictions[['predictedLabel']].collect():
        f.write(str(pred[0])+'\n')