In [89]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import (RandomForestClassifier,
                                       DecisionTreeClassifier,
                                       LogisticRegression,
                                       GBTClassifier,
                                       NaiveBayes,
                                       OneVsRest) #we need OneVsRest, because GBTClassifier only works for binary classification
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import Row

In [90]:
spark = SparkSession.builder.appName("LeafClassifier").getOrCreate()

In [91]:
schema = StructType([
    StructField("class", IntegerType(), True),
    StructField("specimen_id", IntegerType(), True)
] + [StructField(f"feature_{i+1}", DoubleType(), True) for i in range(14)])


In [92]:
df = spark.read.csv("leaf.csv", schema=schema, header=False).drop("specimen_id")

In [93]:

assembler = VectorAssembler(inputCols=[f"feature_{i+1}" for i in range(14)], outputCol="features")
df = assembler.transform(df).select("features", "class")




In [94]:
train, test = df.randomSplit([0.8, 0.2], seed=42)

In [95]:
evaluator = MulticlassClassificationEvaluator(labelCol="class", predictionCol="prediction", metricName="accuracy")
results = []

In [96]:
#model 1: Decision Tree
dt = DecisionTreeClassifier(labelCol="class", featuresCol="features")
paramGrid_dt = ParamGridBuilder().addGrid(dt.maxDepth, [3, 5, 10]).build()
cv_dt = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid_dt, evaluator=evaluator, numFolds=3)
dt_model = cv_dt.fit(train)
best_dt = dt_model.bestModel
acc_dt = evaluator.evaluate(dt_model.transform(test))
results.append(Row(Method="Decision Tree", Parameters=f"maxDepth={best_dt.getOrDefault('maxDepth')}", Accuracy=round(acc_dt, 4)))


In [97]:
acc_dt

0.6346153846153846

In [98]:
#model 2: Random Forest
rf = RandomForestClassifier(labelCol="class", featuresCol="features")
paramGrid_rf = ParamGridBuilder().addGrid(rf.numTrees, [10, 30]).build()
cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=3)
rf_model = cv_rf.fit(train)
best_rf = rf_model.bestModel
acc_rf = evaluator.evaluate(rf_model.transform(test))
results.append(Row(Method="Random Forest", Parameters=f"numTrees={best_rf.getNumTrees}", Accuracy=round(acc_rf, 4)))


In [99]:
acc_rf

0.6730769230769231

In [100]:
#model 3: Logistic Regression
lr = LogisticRegression(labelCol="class", featuresCol="features", maxIter=100)
paramGrid_lr = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1]).build()
cv_lr = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid_lr, evaluator=evaluator, numFolds=3)
lr_model = cv_lr.fit(train)
best_lr = lr_model.bestModel

acc_lr = evaluator.evaluate(lr_model.transform(test))
results.append(Row(Method="Logistic Regression", Parameters=f"regParam={best_lr.getRegParam()}", Accuracy=round(acc_lr, 4)))



In [101]:
acc_lr

0.6346153846153846

In [102]:
#model 4: GBTClassifier via OneVsRest
gbt = GBTClassifier(labelCol="class", featuresCol="features", maxIter=20)
ovr = OneVsRest(classifier=gbt, labelCol="class", featuresCol="features")
paramGrid_gbt = ParamGridBuilder().build()
cv_ovr = CrossValidator(estimator=ovr, estimatorParamMaps=paramGrid_gbt, evaluator=evaluator, numFolds=3)
ovr_model = cv_ovr.fit(train)
best_ovr = ovr_model.bestModel
acc_ovr = evaluator.evaluate(ovr_model.transform(test))
results.append(Row(Method="GBT OneVsRest", Parameters="maxIter=20", Accuracy=round(acc_ovr, 4)))


In [103]:
acc_ovr

0.5

In [104]:
df

DataFrame[features: vector, class: int]

In [105]:
#model 4: Naive Bayes
nb = NaiveBayes(labelCol="class", featuresCol="features", modelType="multinomial")
paramGrid_nb = ParamGridBuilder().addGrid(nb.smoothing, [0.5, 1.0, 1.5]).build()
cv_nb = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid_nb, evaluator=evaluator, numFolds=3)
nb_model = cv_nb.fit(train)
acc_nb = evaluator.evaluate(nb_model.transform(test))
best_nb = nb_model.bestModel
nb_params = f"smoothing={best_nb.getSmoothing()}"
results.append(Row(Method="Naive Bayes", Parameters=nb_params, Accuracy=round(acc_nb, 4)))


In [106]:
acc_nb

0.019230769230769232

In [110]:
final_df = spark.createDataFrame(results)
final_df.show(truncate=False)

+-------------------+-------------+--------+
|Method             |Parameters   |Accuracy|
+-------------------+-------------+--------+
|Decision Tree      |maxDepth=10  |0.6346  |
|Random Forest      |numTrees=30  |0.6731  |
|Logistic Regression|regParam=0.01|0.6346  |
|GBT OneVsRest      |maxIter=20   |0.5     |
|Naive Bayes        |smoothing=1.5|0.0192  |
+-------------------+-------------+--------+

