<a href="https://colab.research.google.com/github/quicksilverTrx/mining_of_massive_datasets/blob/main/colab_notebooks/Decision_Trees_on_Spark_Colab_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CSE 547- Colab 7
## Decision Trees on Spark

Adapted From Stanford's CS246

### Setup

Let's setup Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 79kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 48.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=2dec837bde40654ed01b0bd110cb8a8c24450f415642cb4ebcf21e6aeeae4b94
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1
The 

Now we authenticate a Google Drive client to download the files we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [3]:
id='1aJrdYMVmmnUKYhLTlXtyB0FQ9gYJqCrs'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-train.txt')

id='1yLwxRaJIyrC03yxqbTKpedMmHEF86AAq'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-test.txt')

If you executed the cells above, you should be able to see the dataset we will use for this Colab under the "Files" tab on the left panel.

Next, we import some of the common libraries needed for our task.

In [4]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [5]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a **local runtime**).

In [6]:
spark

If you are running this Colab on the Google hosted runtime, the cell below will create a *ngrok* tunnel which will allow you to still check the Spark UI.

In [None]:
'''
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"
    '''

### Data Loading

![MNIST](https://upload.wikimedia.org/wikipedia/commons/thumb/2/27/MnistExamples.png/220px-MnistExamples.png)

In this Colab, we will be using the famous [MNIST database](https://en.wikipedia.org/wiki/MNIST_database), a large collection of handwritten digits that is widely used for training and testing in the field of machine learning.

For your convenience, the dataset has already been converted to the popular LibSVM format, where each digit is represented as a sparse vector of grayscale pixel values.

In [7]:
training = spark.read.format("libsvm").load("mnist-digits-train.txt")
test = spark.read.format("libsvm").load("mnist-digits-test.txt")

# Cache data for multiple uses
training.cache()
test.cache()

DataFrame[label: double, features: vector]

In [8]:
training.show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [9]:
training.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [10]:
test.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



### Your task

First of all, find out how many instances we have in our training / test split.

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer

In [12]:
# YOUR CODE HERE
print(training.count())
print(test.count())

60000
10000


Now train a Decision Tree on the training dataset using Spark MLlib. Use the default parameters for your classifier (You can use a different labelCol name)

You can refer to the Python example on this documentation page: [https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier](https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier)

In [13]:
# YOUR CODE HERE
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [19]:
'''
// StringIndexer: Read input column "label" (digits) and annotate them as categorical values.
'''
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
#test_label = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(test)

In [20]:
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")
pipeline = Pipeline(stages=[labelIndexer, dt])
model = pipeline.fit(training)

With the Decision Tree you just induced on the training data, predict the labels of the test set.
Print the predictions for the first 10 digits, and compare them with the labels.

In [28]:
predictions = model.transform(test)
predictions.select("prediction", "indexedLabel", "features").show(10)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(778,[202,203,204...|
|       6.0|         3.0|(778,[94,95,96,97...|
|       0.0|         0.0|(778,[128,129,130...|
|       5.0|         5.0|(778,[124,125,126...|
|       4.0|         8.0|(778,[150,151,159...|
|       0.0|         0.0|(778,[156,157,158...|
|       9.0|         8.0|(778,[149,150,151...|
|       4.0|         4.0|(778,[179,180,181...|
|       6.0|         9.0|(778,[129,130,131...|
|       4.0|         4.0|(778,[209,210,211...|
+----------+------------+--------------------+
only showing top 10 rows



In [29]:
# YOUR CODE HERE


The small sample above looks good, but not great!

Let's dig deeper. Compute the accuracy of our model, using the ```MulticlassClassificationEvaluator``` from MLlib.

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

In [30]:
# YOUR CODE HERE
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.3207 


Find out the max depth of the trained Decision Tree, and its total number of nodes.

In [32]:
treeModel = model.stages[1]
# summary only
print(model)
print(treeModel)
print(treeModel.numNodes)
print(treeModel.depth)

PipelineModel_9f14d33dc3c6
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_fe6967063e7d, depth=5, numNodes=63, numClasses=10, numFeatures=780
63
5


It appears that the default settings of the Decision Tree implemented in MLlib did not allow us to train a very powerful model!

Before starting to train a Decision Tree, you can tune the max depth it can reach using the [setMaxDepth()](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier.setMaxDepth) method. Train 21 different DTs, varying the max depth from 0 to 20, endpoints included (i.e., [0, 20]). For each value of the parameter, print the accuracy achieved on the test set, and the number of nodes contained in the given DT.

**IMPORTANT:** this parameter sweep can take 30 minutes or more, depending on how busy is your Colab instance. Notice how the induction time grows super-linearly!

In [38]:
# YOUR CODE HERE
for i in range(21):
  dt = DecisionTreeClassifier(maxDepth=i,labelCol="indexedLabel", featuresCol="features")
  pipeline = Pipeline(stages=[labelIndexer, dt])
  model = pipeline.fit(training)
  predictions = model.transform(test)
  evaluator = MulticlassClassificationEvaluator(
      labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(predictions)
  treeModel = model.stages[1]
  print("Max Depth of Tree: ", treeModel.depth)
  print("Test Error = %g " % (1.0 - accuracy))

  print("Number of Nodes: ",treeModel.numNodes)





Max Depth of Tree:  0
Test Error = 0.8865 
Number of Nodes:  1
Max Depth of Tree:  1
Test Error = 0.8007 
Number of Nodes:  3
Max Depth of Tree:  2
Test Error = 0.6558 
Number of Nodes:  7
Max Depth of Tree:  3
Test Error = 0.5112 
Number of Nodes:  15
Max Depth of Tree:  4
Test Error = 0.4062 
Number of Nodes:  31
Max Depth of Tree:  5
Test Error = 0.3207 
Number of Nodes:  63
Max Depth of Tree:  6
Test Error = 0.2517 
Number of Nodes:  121
Max Depth of Tree:  7
Test Error = 0.2106 
Number of Nodes:  245
Max Depth of Tree:  8
Test Error = 0.1815 
Number of Nodes:  471
Max Depth of Tree:  9
Test Error = 0.1517 
Number of Nodes:  929
Max Depth of Tree:  10
Test Error = 0.1341 
Number of Nodes:  1677
Max Depth of Tree:  11
Test Error = 0.1264 
Number of Nodes:  2701
Max Depth of Tree:  12
Test Error = 0.1209 
Number of Nodes:  3801
Max Depth of Tree:  13
Test Error = 0.1185 
Number of Nodes:  4797
Max Depth of Tree:  14
Test Error = 0.1175 
Number of Nodes:  5631
Max Depth of Tree:  15
T

Once you have working code for each cell above, **head over to Gradescope, read carefully the questions, and submit your solution for this Colab**!