# Introduction to ML Pipelines

This notebook will provide a brief introduction to using ML Pipelines using the Iris dataset and the Naive Bayes Classifier.

#### Iris Dataset Review

The dataset contains 3 species of iris (Setosa, Versicolor and Virginica) with 50 instances of each. In this example, we are going to try to predict the species of iris from its features.

Feature Information:
1. Sepal Length in cm 
2. Sepal Width in cm 
3. Petal Length in cm 
4. Petal Width in cm 

Target/Label:
  - Species
    - Setosa
    - Versicolor
    - Virginica

#### Load Data and Data Preprocessing


In this notebook, we will be using the iris dataset that is mounted on the DBFS. You can read more about free public hosted datasets [here](../../03 Data Sources/6 Databricks Public Datasets/1 DBFS Hosted Datasets.html).

In [4]:
# Filepath for iris dataset in DBFS
display(dbutils.fs.ls("/databricks-datasets/Rdatasets/data-001/csv/datasets/iris.csv"))
# juste pour vérifier l'existence du fichier

path,name,size
dbfs:/databricks-datasets/Rdatasets/data-001/csv/datasets/iris.csv,iris.csv,4821


Read in the dataset using the spark-csv package. Note that the iris dataset has dots in column names by default, such as "Sepal.Length" and "Sepal.Width". Spark-csv doesn't support dots in column names because it's usually the notation used for nested queries. To get by this, we will have to rename the columns when we read the data in using SQL

In [6]:

%sql DROP TABLE IF EXISTS iris


In [7]:
%sql
DROP TABLE IF EXISTS iris;
CREATE TABLE iris (rowNum int, SepalLength double, SepalWidth double, PetalLength double, PetalWidth double, Species string)
USING com.databricks.spark.csv
OPTIONS (path "/databricks-datasets/Rdatasets/data-001/csv/datasets/iris.csv", header "true")

In [8]:
%sql SELECT * FROM iris

rowNum,SepalLength,SepalWidth,PetalLength,PetalWidth,Species
1,5.1,3.5,1.4,0.2,setosa
2,4.9,3.0,1.4,0.2,setosa
3,4.7,3.2,1.3,0.2,setosa
4,4.6,3.1,1.5,0.2,setosa
5,5.0,3.6,1.4,0.2,setosa
6,5.4,3.9,1.7,0.4,setosa
7,4.6,3.4,1.4,0.3,setosa
8,5.0,3.4,1.5,0.2,setosa
9,4.4,2.9,1.4,0.2,setosa
10,4.9,3.1,1.5,0.1,setosa


Since we do not need the first column of row indexes, we will only select the relevant columns that we need and convert it into a DataFrame

In [10]:
irisdf = sqlContext.sql("SELECT SepalLength, SepalWidth, PetalLength, PetalWidth, Species FROM iris")

In [11]:
display(irisdf)

SepalLength,SepalWidth,PetalLength,PetalWidth,Species
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
5.4,3.9,1.7,0.4,setosa
4.6,3.4,1.4,0.3,setosa
5.0,3.4,1.5,0.2,setosa
4.4,2.9,1.4,0.2,setosa
4.9,3.1,1.5,0.1,setosa


Since we have already removed the row index column and separated the header from the rest of the dataset, the next data preprocessing step we need to take is to convert our label into numerical categories. This can be easily done with the `StringIndexer()`. We won't transform the dataset just yet as we will pass the `StringIndexer()` into our ML Pipeline later.

In [13]:
from pyspark.ml.feature import StringIndexer
# Convert target into numerical categories
labelIndexer = StringIndexer(inputCol="Species", outputCol="label")

#### Explore Data


We can easily obtain some quick visualizations to better understand the data with the `display()` command.

In [15]:
irisdf.printSchema()

In [16]:
# Click the chart icon on the bottom left to select chart type and set plot options
display(irisdf.select("SepalLength", "Species"))

SepalLength,Species
5.1,setosa
4.9,setosa
4.7,setosa
4.6,setosa
5.0,setosa
5.4,setosa
4.6,setosa
5.0,setosa
4.4,setosa
4.9,setosa


The above plot suggests that setosas have shorter sepal lengths, whereas the sepal lengths of versicolors and virginicas have similar ranges with the latter having slightly longer sepal lengths.

In this example, we will be demonstrating the use of the ML Pipeline API.

To proceed, we will first randomly split the dataset into training set (70%) and test set (30%). The training set will be used to build our models, and the test set will be used to evaluate models. We cache the datasets as we will be using them multiple times in this notebook.

In [19]:
# Split dataset randomly into Training and Test sets. Set seed for reproducibility
(trainingData, testData) = irisdf.randomSplit([0.7, 0.3], seed = 200)

trainingData.cache()
testData.cache()

print trainingData.count()
print testData.count()

Next, we will use the `VectorAssembler()` to merge our feature columns into a single vector column, which we will be passing into our Naive Bayes model. Again, we will not transform the dataset just yet as we will be passing the VectorAssembler into our ML Pipeline.

In [21]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["SepalLength", "SepalWidth", "PetalLength", "PetalWidth"], outputCol="features")
print(vecAssembler)


#### Create a Multiclass Naive Bayes Classifier


We will try to see how well Naive Bayes can predict the species of iris using its 4 features -- Sepal Length, Sepal Width, Petal Length, and Petal Width. This is a multiclass problem as we have 3 different species of irises in our dataset. Keep in mind that the Naive Bayes algorithm assumes independence between features, and requires your features to take on non-negative values.

In [23]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline

# Train a NaiveBayes model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")



# Chain labelIndexer, vecAssembler and NBmodel in a 
pipeline = Pipeline(stages=[labelIndexer, vecAssembler, nb])

# Run stages in pipeline and train model
model = pipeline.fit(trainingData)


In [24]:
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import IndexToString ,StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#nb=DecisionTreeClassifier(labelCol="label", featuresCol="features",maxDepth=5)
nb= RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=15)

#nb= GradientBoostedTrees.trainClassifier(trainingData,
 #                                            categoricalFeaturesInfo={}, numIterations=3)
#nb = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Chain labelIndexer, vecAssembler and NBmodel in a 
pipeline = Pipeline(stages=[labelIndexer, vecAssembler, nb])

# Fit the model
model=pipeline.fit(trainingData)


We can now make predictions from our model and view results.

In [26]:
# Make predictions on testData so we can measure the accuracy of our model on new data
predictions = model.transform(testData)

# Display what results we can view
predictions.printSchema()

In [27]:
# Select results to view
display(predictions.select("label", "prediction", "probability"))

label,prediction,probability
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"
0.0,0.0,"List(1, 3, List(), List(1.0, 0.0, 0.0))"


#### Model Evaluation

To evaluate our model, we will be making use of the `Evaluator` in `MulticlassClassification`. Note that f1-score is the default metric for the `MulticlassClassificationEvaluator`. There are other choices for evaluations metrics that can be found in the API.

In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import sklearn
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedPrecision")

accuracy = evaluator.evaluate(predictions)
print "Model Accuracy: ", accuracy

The Evaluator is able to use a few metrics such as `f1-score`, `precision`, `recall`, `weightedPrecision` and `weightedRecall`.

`evaluator.setMetricName("insert_metric_here")` can be used to change the metric used to evaluate models.

In [31]:
evaluator.explainParam("metricName")

We can also generate a Confusion Matrix to see the results of the predictions better. `ConfusionMatrix()` works only with RDDs, so we will have to convert our DataFrame of (prediction, label) into a RDD.

`confusionMatrix()` returns a `DenseMatrix` with the columns representing the predicted class ordered by ascending class label, and each row represents the actual class ordered by ascending class label. The diagonal from top left to bottom right represents the observations that were predicted correctly. 

From the above confusion matrix, we observe that all Setosas (class 0) and Versicolors (class 1) have been classified correctly, but there are 10 Virginicas (class 2) that have been wrongly classified as Versicolors.

In [33]:
from pyspark.mllib.evaluation import MulticlassMetrics
# Create (prediction, label) pairs
predictionAndLabel = predictions.select("prediction", "label").rdd

# Generate confusion matrix
metrics = MulticlassMetrics(predictionAndLabel)
print metrics.confusionMatrix()
