In [1]:
import os
import sys
import git
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import GBTClassifier

### SparkContext and SparkSession

In [2]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

23/11/24 12:06:25 WARN Utils: Your hostname, pops resolves to a loopback address: 127.0.0.1; using 192.168.178.20 instead (on interface wlp3s0)
23/11/24 12:06:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/24 12:06:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Load the dataset

In [4]:
base_path = git.Repo('.', search_parent_directories=True).working_tree_dir
data_path = "data/iris.csv"
path = os.path.join(base_path, data_path)

In [5]:
iris = spark.read.csv(path, header=True, inferSchema=True)

                                                                                

In [6]:
iris.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [7]:
iris.describe().show()

23/11/24 12:06:35 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 3:>                                                          (0 + 1) / 1]

+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|     null|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|     null|
|    min|               4.3|                2.0|               1.0|               0.1|   setosa|
|    max|               7.9|                4.4|               6.9|               2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+



                                                                                

### Create feature and target column from the features and label columns

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

In [9]:
iris2 = iris.rdd.map(lambda x: Row(features=Vectors.dense(x[:-1]), species=x[-1])).toDF()
iris2.show(5)

                                                                                

+-----------------+-------+
|         features|species|
+-----------------+-------+
|[5.1,3.5,1.4,0.2]| setosa|
|[4.9,3.0,1.4,0.2]| setosa|
|[4.7,3.2,1.3,0.2]| setosa|
|[4.6,3.1,1.5,0.2]| setosa|
|[5.0,3.6,1.4,0.2]| setosa|
+-----------------+-------+
only showing top 5 rows



                                                                                

### Encode labels as numbers

In [10]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [11]:
stringindexer = StringIndexer(inputCol='species', outputCol='label')
stages = [stringindexer]
pipeline = Pipeline(stages=stages)

In [12]:
iris_df = pipeline.fit(iris2).transform(iris2)
iris_df.show(5)

                                                                                

+-----------------+-------+-----+
|         features|species|label|
+-----------------+-------+-----+
|[5.1,3.5,1.4,0.2]| setosa|  0.0|
|[4.9,3.0,1.4,0.2]| setosa|  0.0|
|[4.7,3.2,1.3,0.2]| setosa|  0.0|
|[4.6,3.1,1.5,0.2]| setosa|  0.0|
|[5.0,3.6,1.4,0.2]| setosa|  0.0|
+-----------------+-------+-----+
only showing top 5 rows



### Split the data into train and test sets

In [13]:
train, test = iris_df.randomSplit([0.8, 0.2], seed=1234)

### Model

In [14]:
from pyspark.ml.classification import NaiveBayes
naivebayes = NaiveBayes(featuresCol="features", labelCol="label")

### Cross-validation

In [15]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(naivebayes.smoothing, [0, 1, 2, 4, 8]).\
    build()

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()

In [17]:
from pyspark.ml.tuning import CrossValidator
crossvalidator = CrossValidator(estimator=naivebayes, estimatorParamMaps=param_grid, evaluator=evaluator)

### Train

In [18]:
crossvalidation_mode = crossvalidator.fit(train)

                                                                                

In [19]:
pred_train = crossvalidation_mode.transform(train)
pred_train.show(5)

+-----------------+-------+-----+--------------------+--------------------+----------+
|         features|species|label|       rawPrediction|         probability|prediction|
+-----------------+-------+-----+--------------------+--------------------+----------+
|[4.3,3.0,1.1,0.1]| setosa|  0.0|[-9.9381810763959...|[0.72155142569232...|       0.0|
|[4.4,3.0,1.3,0.2]| setosa|  0.0|[-10.766392344741...|[0.66358934301063...|       0.0|
|[4.4,3.2,1.3,0.2]| setosa|  0.0|[-10.982676322366...|[0.68983253053946...|       0.0|
|[4.6,3.1,1.5,0.2]| setosa|  0.0|[-11.401874021560...|[0.65240632674455...|       0.0|
|[4.6,3.2,1.4,0.2]| setosa|  0.0|[-11.317522307805...|[0.68223200525475...|       0.0|
+-----------------+-------+-----+--------------------+--------------------+----------+
only showing top 5 rows



### Test

In [20]:
pred_test = crossvalidation_mode.transform(test)
pred_test.show(5)

+-----------------+-------+-----+--------------------+--------------------+----------+
|         features|species|label|       rawPrediction|         probability|prediction|
+-----------------+-------+-----+--------------------+--------------------+----------+
|[4.4,2.9,1.4,0.2]| setosa|  0.0|[-10.850744058495...|[0.63292618670780...|       0.0|
|[4.5,2.3,1.3,0.3]| setosa|  0.0|[-10.452622286261...|[0.53348339985681...|       0.0|
|[4.9,3.1,1.5,0.1]| setosa|  0.0|[-11.243354724093...|[0.69672006026418...|       0.0|
|[5.0,3.0,1.6,0.2]| setosa|  0.0|[-11.770930301058...|[0.63957922203686...|       0.0|
|[5.0,3.2,1.2,0.2]| setosa|  0.0|[-11.217239468413...|[0.72925864659080...|       0.0|
+-----------------+-------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [21]:
print("The parameter smoothing has best value:",
      crossvalidation_mode.bestModel._java_obj.getSmoothing())

The parameter smoothing has best value: 0.0


### Metrics

In [22]:
print(
    'training data (f1):', evaluator.setMetricName('f1').evaluate(pred_train), "\n",
    'training data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_train),"\n",
    'training data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_train),"\n",
    'training data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_train)
    )

training data (f1): 0.9465233881163084 
 training data (weightedPrecision):  0.953982300884956 
 training data (weightedRecall):  0.9469026548672566 
 training data (accuracy):  0.9469026548672567
