In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkFiles
import os
import pandas as pd

In [2]:
"""----------------------------------------------------------------------------
CREATE SPARK CONTEXT
CREATE SQL CONTEXT
----------------------------------------------------------------------------"""
sc = SparkContext.getOrCreate()
#sc =SparkContext()
sqlContext = SQLContext(sc)



In [3]:
#data_dir="/work/irlin355_1/gratienj/ParallelProgrammingCourse/BigDataHadoopSpark/TPs/data"
#file = os.path.join(data_dir,"iris.csv")
panda_df = pd.read_csv("iris.csv")
print(panda_df)



     sepal_length  sepal_width  petal_length  petal_width    variety
0             5.1          3.5           1.4          0.2     Setosa
1             4.9          3.0           1.4          0.2     Setosa
2             4.7          3.2           1.3          0.2     Setosa
3             4.6          3.1           1.5          0.2     Setosa
4             5.0          3.6           1.4          0.2     Setosa
..            ...          ...           ...          ...        ...
145           6.7          3.0           5.2          2.3  Virginica
146           6.3          2.5           5.0          1.9  Virginica
147           6.5          3.0           5.2          2.0  Virginica
148           6.2          3.4           5.4          2.3  Virginica
149           5.9          3.0           5.1          1.8  Virginica

[150 rows x 5 columns]


In [4]:
iris_df=sqlContext.createDataFrame(panda_df)
iris_df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



In [5]:
#Add a numeric indexer for the label/target column
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="variety", outputCol="ind_variety")
si_model = stringIndexer.fit(iris_df)

irisNormDf = si_model.transform(iris_df)
irisNormDf.printSchema()
irisNormDf.select("variety","ind_variety").distinct().collect()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)
 |-- ind_variety: double (nullable = false)



[Row(variety='Setosa', ind_variety=0.0),
 Row(variety='Versicolor', ind_variety=1.0),
 Row(variety='Virginica', ind_variety=2.0)]

In [6]:
"""--------------------------------------------------------------------------
Perform Data Analytics
-------------------------------------------------------------------------"""

#See standard parameters
irisNormDf.describe().show()


+-------+------------------+------------------+------------------+------------------+---------+------------------+
|summary|      sepal_length|       sepal_width|      petal_length|       petal_width|  variety|       ind_variety|
+-------+------------------+------------------+------------------+------------------+---------+------------------+
|  count|               150|               150|               150|               150|      150|               150|
|   mean| 5.843333333333334|3.0573333333333332|3.7580000000000005|1.1993333333333331|     null|               1.0|
| stddev|0.8280661279778632|0.4358662849366984| 1.765298233259466|0.7622376689603464|     null|0.8192319205190405|
|    min|               4.3|               2.0|               1.0|               0.1|   Setosa|               0.0|
|    max|               7.9|               4.4|               6.9|               2.5|Virginica|               2.0|
+-------+------------------+------------------+------------------+--------------

In [7]:
"""--------------------------------------------------------------------------
Prepare data for ML
-------------------------------------------------------------------------"""

#Transform to a Data Frame for input to Machine Learing
#Drop columns that are not required (low correlation)

from pyspark.ml.linalg import Vectors
def transformToLabeledPoint(row) :
    lp = ( row["variety"], row["ind_variety"], \
                Vectors.dense([row["sepal_length"],\
                        row["sepal_width"], \
                        row["petal_length"], \
                        row["petal_width"]]))
    return lp


irisLp = irisNormDf.rdd.map(transformToLabeledPoint)
irisLpDf = sqlContext.createDataFrame(irisLp,["species","label", "features"])
irisLpDf.select("species","label","features").show(50)
irisLpDf.cache()

+-------+-----+-----------------+
|species|label|         features|
+-------+-----+-----------------+
| Setosa|  0.0|[5.1,3.5,1.4,0.2]|
| Setosa|  0.0|[4.9,3.0,1.4,0.2]|
| Setosa|  0.0|[4.7,3.2,1.3,0.2]|
| Setosa|  0.0|[4.6,3.1,1.5,0.2]|
| Setosa|  0.0|[5.0,3.6,1.4,0.2]|
| Setosa|  0.0|[5.4,3.9,1.7,0.4]|
| Setosa|  0.0|[4.6,3.4,1.4,0.3]|
| Setosa|  0.0|[5.0,3.4,1.5,0.2]|
| Setosa|  0.0|[4.4,2.9,1.4,0.2]|
| Setosa|  0.0|[4.9,3.1,1.5,0.1]|
| Setosa|  0.0|[5.4,3.7,1.5,0.2]|
| Setosa|  0.0|[4.8,3.4,1.6,0.2]|
| Setosa|  0.0|[4.8,3.0,1.4,0.1]|
| Setosa|  0.0|[4.3,3.0,1.1,0.1]|
| Setosa|  0.0|[5.8,4.0,1.2,0.2]|
| Setosa|  0.0|[5.7,4.4,1.5,0.4]|
| Setosa|  0.0|[5.4,3.9,1.3,0.4]|
| Setosa|  0.0|[5.1,3.5,1.4,0.3]|
| Setosa|  0.0|[5.7,3.8,1.7,0.3]|
| Setosa|  0.0|[5.1,3.8,1.5,0.3]|
| Setosa|  0.0|[5.4,3.4,1.7,0.2]|
| Setosa|  0.0|[5.1,3.7,1.5,0.4]|
| Setosa|  0.0|[4.6,3.6,1.0,0.2]|
| Setosa|  0.0|[5.1,3.3,1.7,0.5]|
| Setosa|  0.0|[4.8,3.4,1.9,0.2]|
| Setosa|  0.0|[5.0,3.0,1.6,0.2]|
| Setosa|  0.0

DataFrame[species: string, label: double, features: vector]

In [8]:
from pyspark.ml.feature import VectorIndexer
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(irisLpDf)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(irisLpDf)

In [9]:
"""--------------------------------------------------------------------------
Perform Machine Learning
-------------------------------------------------------------------------"""

#Split into training and testing data
(trainingData, testData) = irisLpDf.randomSplit([0.9, 0.1])
# trainingData.count()
# testData.count()
# testData.collect()

In [10]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

In [11]:
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [12]:
from pyspark.ml import Pipeline
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [13]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [14]:
# Make predictions.
predictions = model.transform(testData)
predictions.select("prediction","species","label").collect()

[Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=1.0, species='Versicolor', label=1.0),
 Row(prediction=1.0, species='Versicolor', label=1.0),
 Row(prediction=1.0, species='Versicolor', label=1.0),
 Row(prediction=1.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0)]

In [15]:
# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)


+--------------+-----+-----------------+
|predictedLabel|label|         features|
+--------------+-----+-----------------+
|           0.0|  0.0|[4.6,3.1,1.5,0.2]|
|           0.0|  0.0|[4.8,3.0,1.4,0.1]|
|           0.0|  0.0|[4.9,3.0,1.4,0.2]|
|           0.0|  0.0|[5.8,4.0,1.2,0.2]|
|           0.0|  0.0|[4.6,3.6,1.0,0.2]|
+--------------+-----+-----------------+
only showing top 5 rows



In [16]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

Test Error = 0.0555556
RandomForestClassificationModel: uid=RandomForestClassifier_2c6e41fc91d8, numTrees=10, numClasses=3, numFeatures=4


In [17]:
#Predict on the test data
predictions.select("prediction","species","label").collect()
#predictions = rfModel.transform(testData)
#predictions.select("prediction","species","label").collect()

[Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=0.0, species='Setosa', label=0.0),
 Row(prediction=1.0, species='Versicolor', label=1.0),
 Row(prediction=1.0, species='Versicolor', label=1.0),
 Row(prediction=1.0, species='Versicolor', label=1.0),
 Row(prediction=1.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0),
 Row(prediction=2.0, species='Virginica', label=2.0)]

In [18]:
#Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                    labelCol="label",metricName="accuracy")
evaluator.evaluate(predictions)  

0.9444444444444444

In [19]:
#Draw a confusion matrix
predictions.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|    8|
|  1.0|       1.0|    3|
|  2.0|       2.0|    6|
|  2.0|       1.0|    1|
+-----+----------+-----+

