# CSE 547- Colab 7
## Decision Trees on Spark

Adapted From Stanford's CS246

### Setup

Let's setup Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u252-b09-1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 31 not upgraded.


Now we authenticate a Google Drive client to download the files we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [0]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [0]:
id='1aJrdYMVmmnUKYhLTlXtyB0FQ9gYJqCrs'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-train.txt')

id='1yLwxRaJIyrC03yxqbTKpedMmHEF86AAq'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-test.txt')

If you executed the cells above, you should be able to see the dataset we will use for this Colab under the "Files" tab on the left panel.

Next, we import some of the common libraries needed for our task.

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [0]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a **local runtime**).

In [8]:
spark

If you are running this Colab on the Google hosted runtime, the cell below will create a *ngrok* tunnel which will allow you to still check the Spark UI.

In [9]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2020-05-22 02:45:55--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 50.17.2.180, 52.54.124.219, 3.95.144.123, ...
Connecting to bin.equinox.io (bin.equinox.io)|50.17.2.180|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13773305 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2020-05-22 02:45:58 (6.36 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13773305/13773305]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


### Data Loading

![MNIST](https://upload.wikimedia.org/wikipedia/commons/thumb/2/27/MnistExamples.png/220px-MnistExamples.png)

In this Colab, we will be using the famous [MNIST database](https://en.wikipedia.org/wiki/MNIST_database), a large collection of handwritten digits that is widely used for training and testing in the field of machine learning.

For your convenience, the dataset has already been converted to the popular LibSVM format, where each digit is represented as a sparse vector of grayscale pixel values.

In [10]:
training = spark.read.format("libsvm").load("mnist-digits-train.txt")
test = spark.read.format("libsvm").load("mnist-digits-test.txt")

# Cache data for multiple uses
training.cache()
test.cache()

DataFrame[label: double, features: vector]

In [11]:
training.show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
training.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [12]:
test.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



### Your task

First of all, find out how many instances we have in our training / test split.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [28]:
print(training.count())
print(test.count())

60000
10000


Now train a Decision Tree on the training dataset using Spark MLlib. Use the default parameters for your classifier (You can use a different labelCol name)

You can refer to the Python example on this documentation page: [https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier](https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier)

In [0]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(
  inputCol="label", 
  outputCol="indexedLabel"
  ).fit(training)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.


# Train a DecisionTree model.
dt = DecisionTreeClassifier(
    labelCol="indexedLabel", 
    featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(training)




With the Decision Tree you just induced on the training data, predict the labels of the test set.
Print the predictions for the first 10 digits, and compare them with the labels.

In [14]:
# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(10)



+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(778,[202,203,204...|
|       6.0|         3.0|(778,[94,95,96,97...|
|       0.0|         0.0|(778,[128,129,130...|
|       5.0|         5.0|(778,[124,125,126...|
|       4.0|         8.0|(778,[150,151,159...|
|       0.0|         0.0|(778,[156,157,158...|
|       9.0|         8.0|(778,[149,150,151...|
|       6.0|         4.0|(778,[179,180,181...|
|       6.0|         9.0|(778,[129,130,131...|
|       4.0|         4.0|(778,[209,210,211...|
+----------+------------+--------------------+
only showing top 10 rows



The small sample above looks good, but not great!

Let's dig deeper. Compute the accuracy of our model, using the ```MulticlassClassificationEvaluator``` from MLlib.

In [60]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", 
    predictionCol="prediction", 
    metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g " % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))


Test Accuracy = 0.6766 
Test Error = 0.3234 


Find out the max depth of the trained Decision Tree, and its total number of nodes.

In [15]:
treeModel = model.stages
# summary only
print(treeModel)

[StringIndexer_40b7d4b6b2b4, DecisionTreeClassificationModel (uid=DecisionTreeClassifier_df5f7089c3eb) of depth 5 with 61 nodes]


It appears that the default settings of the Decision Tree implemented in MLlib did not allow us to train a very powerful model!

Before starting to train a Decision Tree, you can tune the max depth it can reach using the [setMaxDepth()](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier.setMaxDepth) method. Train 21 different DTs, varying the max depth from 0 to 20, endpoints included (i.e., [0, 20]). For each value of the parameter, print the accuracy achieved on the test set, and the number of nodes contained in the given DT.

**IMPORTANT:** this parameter sweep can take 30 minutes or more, depending on how busy is your Colab instance. Notice how the induction time grows super-linearly!

In [21]:
dt10 = dt.setMaxDepth(10)

# Chain indexers and tree in a Pipeline
pipeline10 = Pipeline(stages=[labelIndexer, dt10])
# Train model.  This also runs the indexers.
model10 = pipeline10.fit(training)
treeModel10 = model10.stages
# summary only
print(treeModel10)


[StringIndexer_40b7d4b6b2b4, DecisionTreeClassificationModel (uid=DecisionTreeClassifier_99df1978571e) of depth 10 with 1723 nodes]


In [22]:
dt20 = dt.setMaxDepth(20)

# Chain indexers and tree in a Pipeline
pipeline20 = Pipeline(stages=[labelIndexer, dt20])
# Train model.  This also runs the indexers.
model20 = pipeline20.fit(training)
treeModel20 = model20.stages
# summary only
print(treeModel20)


[StringIndexer_40b7d4b6b2b4, DecisionTreeClassificationModel (uid=DecisionTreeClassifier_99df1978571e) of depth 20 with 7353 nodes]


Once you have working code for each cell above, **head over to Gradescope, read carefully the questions, and submit your solution for this Colab**!