<a href="https://colab.research.google.com/github/visiont3lab/tecnologie_data_science/blob/master/book/docs/pyspark/Iris_pyspark.ipynb
" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## PYSPARK

In [None]:
################ template to run PySpark on Colab #######################

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

-------------------------------------------
Time: 2020-06-17 22:17:35
-------------------------------------------

-------------------------------------------
Time: 2020-06-17 22:17:40
-------------------------------------------

-------------------------------------------
Time: 2020-06-17 22:17:45
-------------------------------------------



In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark1 = SparkSession.builder.appName('basic').getOrCreate()
#Test must give no error

In [None]:
import pyspark

In [None]:
################ end template PySpark on Colab ##########################

## IRIS

In [None]:
!wget "https://frenzy86.s3.eu-west-2.amazonaws.com/fav/iris.data"
df = spark.read.csv("iris.data", inferSchema=True).toDF("sep_len", "sep_wid", "pet_len", "pet_wid", "label")

-------------------------------------------
Time: 2020-06-17 22:19:35
-------------------------------------------

--2020-06-17 22:19:35--  https://frenzy86.s3.eu-west-2.amazonaws.com/fav/iris.data
Resolving frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)... 52.95.149.70
Connecting to frenzy86.s3.eu-west-2.amazonaws.com (frenzy86.s3.eu-west-2.amazonaws.com)|52.95.149.70|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4551 (4.4K) [application/octet-stream]
Saving to: ‘iris.data’


2020-06-17 22:19:35 (103 MB/s) - ‘iris.data’ saved [4551/4551]



In [None]:
df.show(5)

+-------+-------+-------+-------+-----------+
|sep_len|sep_wid|pet_len|pet_wid|      label|
+-------+-------+-------+-------+-----------+
|    5.1|    3.5|    1.4|    0.2|Iris-setosa|
|    4.9|    3.0|    1.4|    0.2|Iris-setosa|
|    4.7|    3.2|    1.3|    0.2|Iris-setosa|
|    4.6|    3.1|    1.5|    0.2|Iris-setosa|
|    5.0|    3.6|    1.4|    0.2|Iris-setosa|
+-------+-------+-------+-------+-----------+
only showing top 5 rows



We have to perform some transformations to join all feature columns into a single column using VectorAssembler. To do this, firstly we should make two more imports:

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
vector_assembler = VectorAssembler(\
inputCols=["sep_len", "sep_wid", "pet_len", "pet_wid"],\
outputCol="features")
df_temp = vector_assembler.transform(df)
df_temp.show(3)

+-------+-------+-------+-------+-----------+-----------------+
|sep_len|sep_wid|pet_len|pet_wid|      label|         features|
+-------+-------+-------+-------+-----------+-----------------+
|    5.1|    3.5|    1.4|    0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|    4.9|    3.0|    1.4|    0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|    4.7|    3.2|    1.3|    0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
+-------+-------+-------+-------+-----------+-----------------+
only showing top 3 rows



Let’s remove unnecessary columns:

In [None]:
df = df_temp.drop('sep_len', 'sep_wid', 'pet_len', 'pet_wid')
df.show(3)

+-----------+-----------------+
|      label|         features|
+-----------+-----------------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|
|Iris-setosa|[4.9,3.0,1.4,0.2]|
|Iris-setosa|[4.7,3.2,1.3,0.2]|
+-----------+-----------------+
only showing top 3 rows



At this point, we have a dataframe with all necessary data in the appropriate form. Now we should index labels, i.e., convert textual representation to a numeric one with the help of StringIndexer

In [None]:
from pyspark.ml.feature import StringIndexer
l_indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
df = l_indexer.fit(df).transform(df)

-------------------------------------------
Time: 2020-06-17 22:22:00
-------------------------------------------



In [None]:
df.show(3)

+-----------+-----------------+----------+
|      label|         features|labelIndex|
+-----------+-----------------+----------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|       0.0|
|Iris-setosa|[4.9,3.0,1.4,0.2]|       0.0|
|Iris-setosa|[4.7,3.2,1.3,0.2]|       0.0|
+-----------+-----------------+----------+
only showing top 3 rows



After label indexing, we should divide our data into training and test sets (20% held out for testing):

In [None]:
(trainingData, testData) = df.randomSplit([0.8, 0.2])


Now we can apply to our data machine learning algorithms

### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
lr = LogisticRegression(labelCol="labelIndex", featuresCol="features")
model = lr.fit(trainingData)

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.select("prediction", "labelIndex").show(5)

-------------------------------------------
Time: 2020-06-17 22:32:10
-------------------------------------------

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
+----------+----------+
only showing top 5 rows



In [None]:
evaluator = MulticlassClassificationEvaluator(\
labelCol="labelIndex", predictionCol="prediction",\
metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = ", accuracy)

Test Error = 0.09375 
Accuracy =  0.90625


### Decision tree classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

-------------------------------------------
Time: 2020-06-17 22:33:55
-------------------------------------------



In [None]:
dt = DecisionTreeClassifier(labelCol="labelIndex", featuresCol="features")
model = dt.fit(trainingData)

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.select("prediction", "labelIndex").show(5)

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
+----------+----------+
only showing top 5 rows



In [None]:
evaluator = MulticlassClassificationEvaluator(\
labelCol="labelIndex", predictionCol="prediction",\
metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy = ", accuracy)

Test Error = 0.0625 
Accuracy =  0.9375


In [None]:
print(model)


DecisionTreeClassificationModel (uid=DecisionTreeClassifier_af1102e3f5bc) of depth 5 with 13 nodes


### Random forest classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier


In [None]:
rf = RandomForestClassifier(labelCol="labelIndex",\
featuresCol="features", numTrees=10)
model = rf.fit(trainingData)

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.select("prediction", "labelIndex").show(5)

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
+----------+----------+
only showing top 5 rows



In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex",\
predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = ", accuracy)

Test Error = 0.0625
Accuracy =  0.9375


In [None]:
print(model)


RandomForestClassificationModel (uid=RandomForestClassifier_b6d202f54418) with 10 trees


### Naive Bayse

Naive Bayes classifier is one of the most straightforward multiclass classification algorithms, which can be applied to the multiclass classification task, with the assumption of independence between every pair of features. It can be trained very efficiently. For this algorithm, we will use the same previously prepared dataframe as in the previous models, but we will change the division between training and test data:

In [None]:
splits = df.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="labelIndex",\
featuresCol="features", smoothing=1.0,\
modelType="multinomial")
model = nb.fit(train)

-------------------------------------------
Time: 2020-06-17 22:35:55
-------------------------------------------



In [None]:
predictions = model.transform(test)
predictions.select("label", "labelIndex","probability", "prediction").show()

+---------------+----------+--------------------+----------+
|          label|labelIndex|         probability|prediction|
+---------------+----------+--------------------+----------+
|    Iris-setosa|       0.0|[0.72731658870743...|       0.0|
|    Iris-setosa|       0.0|[0.64191979313055...|       0.0|
|    Iris-setosa|       0.0|[0.67190087859470...|       0.0|
|    Iris-setosa|       0.0|[0.68619515575852...|       0.0|
|    Iris-setosa|       0.0|[0.79113303960571...|       0.0|
|    Iris-setosa|       0.0|[0.66216353297161...|       0.0|
|    Iris-setosa|       0.0|[0.65344785868152...|       0.0|
|    Iris-setosa|       0.0|[0.73019518028270...|       0.0|
|    Iris-setosa|       0.0|[0.59020997011114...|       0.0|
|    Iris-setosa|       0.0|[0.75329619797323...|       0.0|
|    Iris-setosa|       0.0|[0.71944966826326...|       0.0|
|    Iris-setosa|       0.0|[0.70041981497288...|       0.0|
|    Iris-setosa|       0.0|[0.75134779207872...|       0.0|
|    Iris-setosa|       

In [None]:
evaluator =MulticlassClassificationEvaluator(labelCol="labelIndex",\
predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Testset accuracy = " + str(accuracy))

Testset accuracy = 0.8235294117647058
