In [1]:
import numpy as np

# Data Loading

In [2]:
#import neccessary libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorIndexer

In [3]:
#load training data
trainData = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("sep", ",") \
  .load("/FileStore/tables/9fruit_csv/9fruitsTrainDF.csv")

trainData.cache() # Cache data for faster reuse

#load testing data
testData = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("sep", ",") \
  .load("/FileStore/tables/9fruit_csv/9fruitsTestDF.csv")

testData.cache() # Cache data for faster reuse

In [4]:
column2features = trainData.columns
column2features.remove("label")

In [5]:
#create feature column
assembler = VectorAssembler().setInputCols(column2features).setOutputCol("features")
trainData_with_features = assembler.transform(trainData)
testData_with_features = assembler.transform(testData)

# trainData_with_features.show(1)
# display(trainData_with_features.limit(10))

In [6]:
#filter rows with null features
trainData_with_features = trainData_with_features.filter("features is not null")
testData_with_features = testData_with_features.filter("features is not null")

In [7]:
# index label
# add metadata to label column
# fit on whole dataset so that to include all labels in index
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").setHandleInvalid("skip").fit(trainData_with_features)

In [8]:
# index feature
# identify categorical features, and index them.
# set maxCategories as 20 so features with > 20 distinct values are treated as continuous
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(20).setHandleInvalid("skip").fit(trainData_with_features)

# Multi-layer Perceptron Classifier

In [9]:
#import neccessary libraries
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString
from pyspark.ml import Pipeline

In [10]:
# last 9 is the predicted value output
# 9 equals to 9 value
layers = [784, 50, 16, 9]

trainer = MultilayerPerceptronClassifier().setLayers(layers).setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setBlockSize(128).setSeed(123).setMaxIter(200)

labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

pipeline = Pipeline().setStages([labelIndexer, featureIndexer, trainer, labelConverter])

model = pipeline.fit(trainData_with_features)

predictions = model.transform(testData_with_features)

In [11]:
display(predictions.select(['label','prediction']).limit(5))

label,prediction
0,2.0
0,4.0
0,5.0
0,5.0
0,5.0


In [12]:
#accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "accuracy")
evaluator.evaluate(predictions)

In [13]:
#precision
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "weightedPrecision")
evaluator.evaluate(predictions)

In [14]:
#recall
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "weightedRecall")
evaluator.evaluate(predictions)

In [15]:
#F1-score
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "f1")
evaluator.evaluate(predictions)

# Random Forest Classifier

In [16]:
#import neccessary libraries
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [17]:
# Create a RandomForestClassifier
rf = RandomForestClassifier()

# Set ParamGridBuilder 
grid_rf = ParamGridBuilder().baseOn({rf.labelCol: 'label'}).baseOn([rf.predictionCol, 'prediction']).baseOn([rf.featuresCol,'features']).baseOn([rf.seed,123]).addGrid(rf.maxDepth, [5,10]).addGrid(rf.numTrees, [10,20]).build()

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "accuracy")


cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=grid_rf, evaluator=evaluator)

# Create a Pipeline
rfPipeline = Pipeline()

# Set the stages of the Pipeline
rfPipeline.setStages([assembler, cv_rf])

# Let's train on the entire dataset to see what we get
rfModel = rfPipeline.fit(trainData)

In [18]:
resultsDF_rf = rfModel.transform(testData).select('label','prediction')
display(resultsDF_rf.limit(10))

label,prediction
0,0.0
0,1.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,7.0
0,0.0


In [19]:
#accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "accuracy")
evaluator.evaluate(resultsDF_rf)

In [20]:
#precision
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "weightedPrecision")
evaluator.evaluate(resultsDF_rf)

In [21]:
#recall
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "weightedRecall")
evaluator.evaluate(resultsDF_rf)

In [22]:
#F1-score
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "f1")
evaluator.evaluate(resultsDF_rf)

# Multinomial Logistic Regression

In [23]:
#import neccessary libraries
from pyspark.ml.classification import LogisticRegression

In [24]:
# Create a Multinomial Logistic Regression Model
mlr = LogisticRegression(family = "multinomial")

# Set ParamGridBuilder 
grid_mlr = ParamGridBuilder().baseOn({mlr.labelCol: 'label'}).baseOn([mlr.predictionCol, 'prediction']).baseOn([mlr.featuresCol,'features']).addGrid(mlr.maxIter, [50,100]).addGrid(mlr.regParam, [1.0,2.0]).build()

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "accuracy")

cv_mlr = CrossValidator(estimator=mlr, estimatorParamMaps=grid_mlr, evaluator=evaluator)

# Create a Pipeline
mlrPipeline = Pipeline()

# Set the stages of the Pipeline
mlrPipeline.setStages([assembler, cv_mlr])

# Let's train on the entire dataset to see what we get
mlrModel = mlrPipeline.fit(trainData)

In [25]:
resultsDF_mlr = mlrModel.transform(testData).select('label','prediction')
display(resultsDF_mlr.limit(10))

label,prediction
0,0.0
0,1.0
0,0.0
0,0.0
0,0.0
0,5.0
0,0.0
0,0.0
0,7.0
0,0.0


In [26]:
#accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "accuracy")
evaluator.evaluate(resultsDF_mlr)

In [27]:
#precision
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "weightedPrecision")
evaluator.evaluate(resultsDF_mlr)

In [28]:
#recall
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "weightedRecall")
evaluator.evaluate(resultsDF_mlr)

In [29]:
#F1-score
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol = "label",metricName = "f1")
evaluator.evaluate(resultsDF_mlr)