<a href="https://colab.research.google.com/github/varshachawan/SparkMLlib/blob/master/DecisionTree_WineData_SparkMLLib_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Check the Spark Context
Spark context sets up internal services and establishes a connection to a Spark execution environment

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirror.olnevhost.net/pub/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [2]:
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-2.4.5-bin-hadoop2.7"
!echo $JAVA_HOME
import findspark
findspark.init()

/usr/lib/jvm/java-8-openjdk-amd64


In [0]:
from pyspark.sql import SparkSession


In [0]:

spark = SparkSession.builder.appName("Tee Model").getOrCreate()

In [0]:
sc = spark.sparkContext

<b>Dataset location: </b>https://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data

In [0]:
rawData = sc.textFile('./wine.data') 

#### The raw data is of type MapPartitionsRDD
MapPartitionsRDD is the result of the following transformations:
* map
* flatMap
* filter
* glom

MapPartitionsRDD is an RDD that applies the provided function f to every partition of the parent RDD

In [8]:
rawData

./wine.data MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

#### View contents of the rawData RDD

In [9]:
rawData.take(10)

['1,14.23,1.71,2.43,15.6,127,2.8,3.06,.28,2.29,5.64,1.04,3.92,1065',
 '1,13.2,1.78,2.14,11.2,100,2.65,2.76,.26,1.28,4.38,1.05,3.4,1050',
 '1,13.16,2.36,2.67,18.6,101,2.8,3.24,.3,2.81,5.68,1.03,3.17,1185',
 '1,14.37,1.95,2.5,16.8,113,3.85,3.49,.24,2.18,7.8,.86,3.45,1480',
 '1,13.24,2.59,2.87,21,118,2.8,2.69,.39,1.82,4.32,1.04,2.93,735',
 '1,14.2,1.76,2.45,15.2,112,3.27,3.39,.34,1.97,6.75,1.05,2.85,1450',
 '1,14.39,1.87,2.45,14.6,96,2.5,2.52,.3,1.98,5.25,1.02,3.58,1290',
 '1,14.06,2.15,2.61,17.6,121,2.6,2.51,.31,1.25,5.05,1.06,3.58,1295',
 '1,14.83,1.64,2.17,14,97,2.8,2.98,.29,1.98,5.2,1.08,2.85,1045',
 '1,13.86,1.35,2.27,16,98,2.98,3.15,.22,1.85,7.22,1.01,3.55,1045']

#### Function to transform each row in the RDD to a LabeledPoint
* MLlib classifiers and regressors require data sets in a format of rows of type LabeledPoint
* It's in the format (&lt;label&gt;, [&lt;array_of_features&gt;])

In [0]:
from pyspark.mllib.regression import LabeledPoint

def parsePoint(line):
    values = [float(x) for x in line.split(',')]
    return LabeledPoint(values[0], values[1:])

#### Transform our raw data into an RDD of LabeledPoints

In [12]:
parsedData = rawData.map(parsePoint)
parsedData

PythonRDD[3] at RDD at PythonRDD.scala:53

In [0]:
parsedData.take(10)

[LabeledPoint(1.0, [14.23,1.71,2.43,15.6,127.0,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065.0]),
 LabeledPoint(1.0, [13.2,1.78,2.14,11.2,100.0,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050.0]),
 LabeledPoint(1.0, [13.16,2.36,2.67,18.6,101.0,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185.0]),
 LabeledPoint(1.0, [14.37,1.95,2.5,16.8,113.0,3.85,3.49,0.24,2.18,7.8,0.86,3.45,1480.0]),
 LabeledPoint(1.0, [13.24,2.59,2.87,21.0,118.0,2.8,2.69,0.39,1.82,4.32,1.04,2.93,735.0]),
 LabeledPoint(1.0, [14.2,1.76,2.45,15.2,112.0,3.27,3.39,0.34,1.97,6.75,1.05,2.85,1450.0]),
 LabeledPoint(1.0, [14.39,1.87,2.45,14.6,96.0,2.5,2.52,0.3,1.98,5.25,1.02,3.58,1290.0]),
 LabeledPoint(1.0, [14.06,2.15,2.61,17.6,121.0,2.6,2.51,0.31,1.25,5.05,1.06,3.58,1295.0]),
 LabeledPoint(1.0, [14.83,1.64,2.17,14.0,97.0,2.8,2.98,0.29,1.98,5.2,1.08,2.85,1045.0]),
 LabeledPoint(1.0, [13.86,1.35,2.27,16.0,98.0,2.98,3.15,0.22,1.85,7.22,1.01,3.55,1045.0])]

#### Split the RDD into training and test data sets

In [0]:
(trainingData, testData) = parsedData.randomSplit([0.8, 0.2])

In [0]:
trainingData

PythonRDD[6] at RDD at PythonRDD.scala:48

In [24]:
trainingData.map(lambda x: x.label).distinct().collect()

[2.0, 1.0, 3.0]

### Create a Decision Tree model
* <b>numClasses: </b>The number of labels. Since the labels in our dataset are 1,2 or 3 (rather than 0, 1, 2), we specify 4 rather than 3. Otherwise, it complains when it encounters a label of 3
* <b>categoricalFeaturesInfo: </b>Specifies which features are categorical. None of the features in our dataset are
* <b>impurity: </b>Can be <i>gini</i> or <i>entropy</i>
* <b>maxDepth: </b>Maximum depth of the decision tree
* <b>maxBins: </b>Number of bins used when discretizing continuous features. Increasing maxBins allows the algorithm to consider more split candidates and make fine-grained split decisions - at the cost of computation

In [0]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

model = DecisionTree.trainClassifier(trainingData, 
                                     numClasses=4, 
                                     categoricalFeaturesInfo={},
                                     impurity='gini', 
                                     maxDepth=3, 
                                     maxBins=32)

#### Use our model to make predictions with our test data

In [43]:
predictions = model.predict(testData.map(lambda x: x.features))
predictions.take(5)

[1.0, 1.0, 1.0, 1.0, 1.0]

#### Pair up the actual and predicted values into a tuple

In [44]:
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
labelsAndPredictions.take(5)

[(1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0)]

#### Compare the actual and predicted values to get the accuracy of our model

In [45]:
testAcc = labelsAndPredictions.filter(
    lambda lp: lp[0] == lp[1]).count() / float(testData.count())
print('Test Accuracy = ' + str(testAcc))

Test Accuracy = 0.9473684210526315


#### Use MulticlassMetrics instead for model evaluation
* MulticlassMetrics takes rows of (prediction, label) tuples as input
* The model can be evaluated on multiple measures such as fMeasure, precision, recall

In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(labelsAndPredictions)

In [47]:
metrics.accuracy

0.9473684210526315

In [48]:
metrics.recall()

0.9473684210526315

In [49]:
metrics.fMeasure()

0.9473684210526315

#### Measure precision when making a specific prediction
Check accuracy when the predicted value is 2.0

In [51]:
metrics.precision(1.0)

0.9285714285714286

#### Plot a confusion matrix
* MulticlassMetrics also provides a confusion matrix

In [52]:
metrics.confusionMatrix()

DenseMatrix(3, 3, [13.0, 1.0, 0.0, 0.0, 13.0, 0.0, 0.0, 1.0, 10.0], 0)

#### The confusion matrix is easier to read when converted to an array

In [0]:
metrics.confusionMatrix().toArray()

array([[15.,  0.,  0.],
       [ 2., 18.,  0.],
       [ 0.,  2., 12.]])

#### View the Decision Tree model
It is merely a collection of if-else statements

In [0]:
print(model.toDebugString())

DecisionTreeModel classifier of depth 3 with 15 nodes
  If (feature 12 <= 765.0)
   If (feature 9 <= 4.85)
    If (feature 6 <= 0.835)
     Predict: 3.0
    Else (feature 6 > 0.835)
     Predict: 2.0
   Else (feature 9 > 4.85)
    If (feature 5 <= 2.815)
     Predict: 3.0
    Else (feature 5 > 2.815)
     Predict: 2.0
  Else (feature 12 > 765.0)
   If (feature 6 <= 2.1550000000000002)
    If (feature 1 <= 1.62)
     Predict: 2.0
    Else (feature 1 > 1.62)
     Predict: 3.0
   Else (feature 6 > 2.1550000000000002)
    If (feature 4 <= 133.0)
     Predict: 1.0
    Else (feature 4 > 133.0)
     Predict: 2.0



### Spark can also handle data sets in LIBSVM format
The data is in this format: <br />
&lt;label&gt; &lt;index1&gt;:&lt;value1&gt; &lt;index2&gt;:&lt;value2&gt; ... <br /><br />

The MLUtils class is required to load SVM data

In [0]:
from pyspark.mllib.util import MLUtils

<b>LibSVM dataset location: </b>https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/wine.scale

In [0]:
libsvmData = MLUtils.loadLibSVMFile(sc, './wine.scale')

In [0]:
libsvmData

PythonRDD[73] at RDD at PythonRDD.scala:48

In [58]:
libsvmData.take(5)

[LabeledPoint(1.0, (13,[0,1,2,3,4,5,6,7,8,9,10,11,12],[0.68421,-0.616601,0.144385,-0.484536,0.23913,0.255172,0.147679,-0.433962,0.18612,-0.255973,-0.089431,0.941392,0.122682])),
 LabeledPoint(1.0, (13,[0,1,2,3,4,5,6,7,8,9,10,11,12],[0.142105,-0.588933,-0.165775,-0.938144,-0.347826,0.151724,0.0210971,-0.509434,-0.451104,-0.47099,-0.0731708,0.56044,0.101284])),
 LabeledPoint(1.0, (13,[0,1,2,3,4,5,6,7,8,9,10,11,12],[0.121053,-0.359684,0.40107,-0.175258,-0.326087,0.255172,0.223629,-0.358491,0.514196,-0.249147,-0.105691,0.391941,0.293866])),
 LabeledPoint(1.0, (13,[0,1,2,3,4,5,6,7,8,9,10,11,12],[0.757895,-0.521739,0.219251,-0.360825,-0.0652174,0.97931,0.329114,-0.584906,0.116719,0.112628,-0.382114,0.59707,0.714693])),
 LabeledPoint(1.0, (13,[0,1,2,3,4,5,6,7,8,9,10,11,12],[0.163158,-0.268775,0.614973,0.0721649,0.0434783,0.255172,-0.00843878,-0.018868,-0.11041,-0.481229,-0.089431,0.216117,-0.348074]))]

In [0]:
(trainingData, testData) = libsvmData.randomSplit([0.8, 0.2])

#### The model has the same parameters as the one created previously

In [0]:
libsvmModel = DecisionTree.trainClassifier(trainingData, 
                                           numClasses=4, 
                                           categoricalFeaturesInfo={},
                                           impurity='gini', 
                                           maxDepth=6, 
                                           maxBins=32)

In [0]:
predictions = libsvmModel.predict(testData.map(lambda x: x.features))

In [0]:
metrics = MulticlassMetrics(labelsAndPredictions)

In [63]:
metrics.accuracy

0.9473684210526315

In [0]:
metrics.confusionMatrix().toArray()

array([[19.,  0.,  0.],
       [ 2., 29.,  0.],
       [ 0.,  3., 15.]])

In [0]:
print(model.toDebugString())

DecisionTreeModel classifier of depth 3 with 11 nodes
  If (feature 6 <= 1.29)
   If (feature 9 <= 3.125)
    Predict: 2.0
   Else (feature 9 > 3.125)
    Predict: 3.0
  Else (feature 6 > 1.29)
   If (feature 12 <= 765.0)
    If (feature 9 <= 6.165)
     Predict: 2.0
    Else (feature 9 > 6.165)
     Predict: 3.0
   Else (feature 12 > 765.0)
    If (feature 9 <= 3.46)
     Predict: 2.0
    Else (feature 9 > 3.46)
     Predict: 1.0

