# MIE524 - Lab 2 - Machine Learning


### Setup
Let's setup Spark on the Colab environment.

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Authenticate a Google Drive client to download the files needed in the Spark job.


In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

Load packages.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Initialize Spark context.

In [None]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Data
![MNIST](https://upload.wikimedia.org/wikipedia/commons/thumb/2/27/MnistExamples.png/220px-MnistExamples.png)

We will be using the [MNIST](https://en.wikipedia.org/wiki/MNIST_database) dataset throughout this lab. a large collection of handwritten digits that is widely used for training and testing in the field of machine learning.

This loads the MNIST dataset in the LibSVM format, where each digit is represented as a sparse vector of grayscale pixel values.

In [None]:
id='1aJrdYMVmmnUKYhLTlXtyB0FQ9gYJqCrs'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-train.txt')

id='1yLwxRaJIyrC03yxqbTKpedMmHEF86AAq'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-test.txt')

In [None]:
training = spark.read.format("libsvm").option("numFeatures","784").load("mnist-digits-train.txt")
test = spark.read.format("libsvm").option("numFeatures","784").load("mnist-digits-test.txt")

# Cache data for multiple uses
training.cache()
test.cache()

In [None]:
training.show(truncate=False)

In [None]:
training.printSchema()

In [None]:
test.printSchema()


## PART 1 - Random Forrest
We will build a random forrest model from scratch.

In [None]:
import numpy as np
import math
import random

from tqdm import tqdm

from sklearn import tree
from sklearn.utils import resample

from pyspark.ml.feature import StringIndexer, VectorIndexer


Define parameters

**n_estimators**: int - The number of classification trees that are used.

**max_features**: int - The maximum number of features that the classification trees are allowed to use.

**min_samples_split**: int - The minimum number of samples needed to make a split when building a tree.

**min_gain**: float - The minimum impurity required to split the tree further.

**max_depth**: int - The maximum depth of a tree.

**n_samples**: int - The number of samples in each tree is max_samples * X.shape[0].

In [None]:
n_estimators = 20
max_features = 20
min_samples_split = 2
min_impurity_decrease = 0
max_depth = None
n_samples = 0.05

n_features = len(training.select('features').first()[0])
# If max_features have not been defined => select it as
# sqrt(n_features)
if not max_features:
    max_features = int(math.sqrt(n_features))

Initialize the set of decision trees, we will use scikit's decision tree implementation

In [None]:
trees = []
for _ in range(n_estimators):
    trees.append(
        tree.DecisionTreeClassifier(
            min_samples_split=min_samples_split,
            min_impurity_decrease=min_impurity_decrease,
            max_depth=max_depth))

Let's create sample subsets for training for each tree.

In [None]:
# Create sample subsets for each decision tree
subsets = []
for i in tqdm(range(n_estimators)):
    subsets.append(training.sample(True, n_samples, 524))

In [None]:
# Convert data to pandas dataframe to use with scikit
subsets_pd = []
for i in tqdm(range(n_estimators)):
    X_subset = pd.DataFrame(subsets[i].toPandas().apply(lambda row: row['features'].toArray(), axis=1).tolist())
    y_subset = subsets[i].toPandas()[['label']]
    subsets_pd.append([X_subset, y_subset])

Create the training loop (*fit* function).

In [None]:
# training loop
for i in range(n_estimators):
    X_subset = subsets_pd[i][0]
    y_subset = subsets_pd[i][1]

    # Feature bagging (select random subsets of the features)
    idx = np.random.choice(range(n_features), size=max_features, replace=True)
    # Save the indices of the features for prediction
    trees[i].feature_indices = idx
    # Choose the features corresponding to the indices
    X_subset = X_subset.iloc[:, idx]
    # Fit the tree to the data
    trees[i].fit(X_subset, y_subset)

Now we are ready to make some predictions.

In [None]:
X_test = pd.DataFrame(test.toPandas().apply(lambda row: row['features'].toArray(), axis=1).tolist())
y_test = test.toPandas()[['label']]

y_preds = np.empty((X_test.shape[0], len(trees)))
# Let each tree make a prediction on the data
for i, t in enumerate(trees):
    # Indices of the features that the tree has trained on
    idx = t.feature_indices
    # Make a prediction based on those features
    prediction = t.predict(X_test.iloc[:, idx])
    y_preds[:, i] = prediction

y_pred = []
# For each sample
for sample_predictions in y_preds:
    # Select the most common class prediction
    y_pred.append(np.bincount(sample_predictions.astype('int')).argmax())

Putting everything together...

In [None]:
class RandomForest():
    """Random Forest classifier. Uses a collection of classification trees that
    trains on random subsets of the data using a random subsets of the features.

    Parameters:
    -----------
    n_estimators: int
        The number of classification trees that are used.
    max_features: int
        The maximum number of features that the classification trees are allowed to
        use.
    min_samples_split: int
        The minimum number of samples needed to make a split when building a tree.
    min_impurity_decrease: float
        A node will be split if this split induces a decrease of the impurity
        greater than or equal to this value.
    max_depth: int
        The maximum depth of a tree.
    n_samples: int
        The number of samples in each tree is max_samples * X.shape[0].
    """
    def __init__(self, n_estimators=100, max_features=None, min_samples_split=2,
                 min_impurity_decrease=0, max_depth=float("inf"), n_samples = 1):
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.min_samples_split = min_samples_split
        self.min_impurity_decrease = min_impurity_decrease
        self.max_depth = max_depth
        self.n_samples = n_samples

        # Initialize decision trees
        self.trees = []
        for _ in range(n_estimators):
            self.trees.append(
                tree.DecisionTreeClassifier(
                    min_samples_split=self.min_samples_split,
                    min_impurity_decrease=min_impurity_decrease,
                    max_depth=self.max_depth))

    def fit(self, X, y):
        n_features = np.shape(X)[1]
        # If max_features have not been defined => select it as
        # sqrt(n_features)
        if not self.max_features:
            self.max_features = int(math.sqrt(n_features))

        # Create sample subsets
        subsets = []
        for i in tqdm(range(self.n_estimators)):
            sample = pd.concat([X,y], axis = 1).sample(frac=self.n_samples, replace=True, random_state=524)
            subsets.append([sample.drop('label', axis=1), sample[['label']]])


        for i in range(self.n_estimators):
            X_subset = subsets[i][0]
            y_subset = subsets[i][1]

            # Feature bagging (select random subsets of the features)
            idx = np.random.choice(range(n_features), size=max_features, replace=True)
            # Save the indices of the features for prediction
            self.trees[i].feature_indices = idx
            # Choose the features corresponding to the indices
            X_subset = X_subset.iloc[:, idx]
            # Fit the tree to the data
            self.trees[i].fit(X_subset, y_subset)

    def predict(self, X):
        y_preds = np.empty((X.shape[0], len(self.trees)))
        # Let each tree make a prediction on the data
        for i, tree in enumerate(self.trees):
            # Indices of the features that the tree has trained on
            idx = tree.feature_indices
            # Make a prediction based on those features
            prediction = tree.predict(X.iloc[:, idx])
            y_preds[:, i] = prediction

        y_pred = []
        # For each sample
        for sample_predictions in y_preds:
            # Select the most common class prediction
            y_pred.append(np.bincount(sample_predictions.astype('int')).argmax())
        return y_pred

Let's train and predict using the RandomForest class.

In [None]:
# Initialize classifier
clf = RandomForest(n_estimators = 20, max_features = 20,
                   min_samples_split = 2, min_impurity_decrease = 0,
                   max_depth = None, n_samples = 0.05)

In [None]:
# Prepare data
X_training = pd.DataFrame(training.toPandas().apply(lambda row: row['features'].toArray(), axis=1).tolist())
y_training = training.toPandas()[['label']].astype(int)

X_test = pd.DataFrame(test.toPandas().apply(lambda row: row['features'].toArray(), axis=1).tolist())
y_test = test.toPandas()[['label']].astype(int)

In [None]:
# Fit model to data
clf.fit(X_training, y_training)

In [None]:
y_pred = clf.predict(X_test)

### Evaluate model performance

In [None]:
from sklearn.metrics import classification_report, ConfusionMatrixDisplay

print(classification_report(y_test, y_pred))


In [None]:
disp = ConfusionMatrixDisplay.from_predictions(y_test, y_pred)
disp.figure_.suptitle("Confusion Matrix")
print(f"Confusion matrix:\n{disp.confusion_matrix}")

plt.show()

Let's compare with scikit's implementation of random forest.

In [None]:
from sklearn.ensemble import RandomForestClassifier

sk_clf = RandomForestClassifier(n_estimators = 20, max_features = 20,
                   min_samples_split = 2, min_impurity_decrease = 0,
                   max_depth = None, max_samples = 0.05)

sk_clf.fit(X_training, y_training)

y_pred_sk = sk_clf.predict(X_test)

In [None]:
print(classification_report(y_test, y_pred_sk))


Let's see MLlib's performance, which can fully leverage the datatype of libsvm.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(training.union(test))

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(training.union(test))


# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=20)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(training)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.show(5)


In [None]:
# predictions.rdd.map(lambda x: x.prediction).collect()

y_pred_spark = predictions.rdd.map(lambda x: x.prediction).collect()
y_test_spark = predictions.rdd.map(lambda x: x.indexedLabel).collect()

In [None]:
print(classification_report(y_test_spark, y_pred_spark))


## PART 2 - Deep Learning
We will build a simple feed forwarnd neural network in this part.

![FFW](https://images.deepai.org/django-summernote/2019-06-06/5c17d9c2-0ad4-474c-be8d-d6ae9b094e74.png)

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

import matplotlib.pyplot as plt

### Torch Introduction

Initializing a tensor

In [None]:
data = [[1, 2],[3, 4]]
x_data = torch.tensor(data)

print(x_data)

Initialize a tensor with random values

In [None]:
x_rand = torch.rand_like(x_data, dtype=torch.float) # overrides the datatype of x_data
print(x_rand)

Attributes of a tensor

In [None]:
tensor = torch.rand(3,4)

print(f"Shape of tensor: {tensor.shape}")
print(f"Datatype of tensor: {tensor.dtype}")
print(f"Device tensor is stored on: {tensor.device}")
print(tensor)

Standard operations on tensors

In [None]:
print(f"First row: {tensor[0]}")
print(f"First column: {tensor[:, 0]}")
print(f"Last column: {tensor[..., -1]}")

In [None]:
tensor[:,1] = 0
print(tensor)

Arithmetic operations on tensors

In [None]:
# matrix multiplication of two tensors, y1 and y2 are equivalent

y1 = tensor @ tensor.T
y2 = tensor.matmul(tensor.T)


In [None]:
# element-wise product of the elemnts of the two tensors

z1 = tensor * tensor
z2 = tensor.mul(tensor)

### Prepare data

In [None]:
#dataset
training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [None]:
# dataloader - used to perform mini batch or stochastic gradient descent by acting as an iterable.
batch_size = 100

train_loader = DataLoader(dataset=training_data,shuffle=True,batch_size=batch_size)
test_loader = DataLoader(dataset=test_data,shuffle=True,batch_size=batch_size)

In [None]:
for X, y in test_loader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
examples = iter(test_loader)
example_data, example_targets = next(examples)
for i in range(6):
    plt.subplot(2,3,i+1)
    plt.imshow(example_data[i][0], cmap='gray')
plt.show()

### Create the neural network

`super()` returns a temporary object of the superclass that then allows you to call that superclass’s methods.

In [None]:
# Create a neural network with 2 linear hidden layers with a Relu activation function
from torch import nn
class net(nn.Module):
    def __init__(self,input_size,output_size, hidden_size):
        super(net,self).__init__()
        self.l1 = nn.Linear(input_size,hidden_size)
        self.relu = nn.ReLU()
        self.l2 = nn.Linear(hidden_size,output_size)
    def forward(self,x):
        output = self.l1(x)
        output = self.relu(output)
        output = self.l2(output)
        return output

input_size = 28 * 28
output_size = 10
hidden_size = 500

model = net(input_size, output_size, hidden_size)
print(model)

Define loss function

In [None]:
criterion = nn.CrossEntropyLoss()

Define the optimizer (specify optimization method, learning rate, etc.)

In [None]:
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### Training loop

In [None]:
num_epochs = 2
n_total_steps = len(train_loader)

lossval = []
for j in range(num_epochs):
    for i, (x_train, y_train) in enumerate(train_loader):
        #prediction
        y_pred = model(x_train.reshape(-1, 28*28))

        #calculating loss
        loss = criterion(y_pred,y_train.reshape(-1))

        #calculating accuracy
        correct = (y_pred.argmax(1) == y_train).type(torch.float).sum().item()
        accuracy_batch = correct/batch_size

        #backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        #print batch information
        if (i+1) % 100 == 0:
            print (f'Epoch [{j+1}/{num_epochs}], Step[{i+1}/{n_total_steps}], Train Accuracy: {accuracy_batch:.2f}%, Loss: {loss.item():.4f}')
            lossval.append(loss.item())


In [None]:
plt.plot(lossval)

### Save and load a model

In [None]:
torch.save(model.state_dict(), 'model_nn.pth')

In [None]:
input_size = 28 * 28
output_size = 10
hidden_size = 500

model = net(input_size, output_size, hidden_size)
model.load_state_dict(torch.load('model_nn.pth'))
model.eval()

### Predict

In [None]:
with torch.no_grad():
    y_test_nn = []
    y_pred_nn = []
    for x_test, y_test in test_loader:
        outputs = model(x_test.reshape(-1, 28*28))
        # max returns (value ,index)
        _, predicted = torch.max(outputs.data, 1)
        y_pred_nn += predicted.tolist()
        y_test_nn += y_test.tolist()

In [None]:
from sklearn.metrics import classification_report, ConfusionMatrixDisplay

print(classification_report(y_test_nn, y_pred_nn))