## Import dependencies

In [1]:
from __future__ import print_function
import numpy as np
import torch
import torchvision.datasets as datasets
import torchvision
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import seaborn as sns
import matplotlib.patheffects as path_effects
import argparse
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
from torch.utils.data.sampler import SubsetRandomSampler
import os

from sklearn.metrics import confusion_matrix
import itertools


## Download MNIST

In [None]:
mnist_dataset = datasets.MNIST(root='./data', train=True, 
                               transform=None, target_transform=None, download=True)


## Load in MNIST

In [None]:
train_dataset = datasets.MNIST(root='./data', train=True, 
                               transform=None, target_transform=None, download=False)
test_dataset = datasets.MNIST(root='./data', train=False, 
                               transform=None, target_transform=None, download=False)


In [None]:
train_all_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size
)

test_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size
)

## Partition MNIST

In [None]:
class_counts = {}
for i in range(10):
    class_counts[i] = []

for batch_idx, (data, target) in enumerate(train_dataset):
    class_counts[int(target)].append(batch_idx)
    
subset_indices_train = np.array([], dtype='int8')
subset_indices_valid = np.array([], dtype='int8')

np.random.seed(0)
for c in class_counts:
    t_size = int(len(class_counts[c])*0.85)
    t = np.random.choice(class_counts[c], size=t_size)
    v = []
    for i in range(len(class_counts[c])):
        if i not in t:
            v.append(int(i))
    
    subset_indices_train = np.concatenate((subset_indices_train, t), axis=None)
    subset_indices_valid = np.concatenate((subset_indices_valid, v), axis=None)

In [None]:
train_dataset = datasets.MNIST('./data', train=True, download=False,
            transform=transforms.Compose([       # Data preprocessing
                transforms.ToTensor(),           # Add data augmentation here
                transforms.Normalize((0.1307,), (0.3081,))
            ]))

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size,
    sampler=SubsetRandomSampler(subset_indices_train)
)
val_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size,
    sampler=SubsetRandomSampler(subset_indices_valid)
)


## Load My Net

In [None]:

class MyConvNet(nn.Module):

    def __init__(self):
        super(MyConvNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(5,5), stride=1, padding=2, padding_mode = 'reflect')
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(5,5), stride=1, padding=2, padding_mode = 'reflect')
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3,3), stride=1, padding=2, padding_mode = 'reflect')
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=32, kernel_size=(3,3), stride=1, padding=2, padding_mode = 'reflect')
        
        self.dropout1 = nn.Dropout2d(0.5)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(128, 96)
        self.bn1 = nn.BatchNorm1d(num_features=96)
        self.fc2 = nn.Linear(96, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)

        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 4)
        x = self.dropout2(x)

        x = self.conv3(x)
        x = F.relu(x)
        x = F.avg_pool2d(x, 2)
        x = self.dropout2(x)
        
        x = self.conv4(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout2(x)

        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.bn1(x)
        
        x = F.relu(x)
        x = self.fc2(x)

        output = F.log_softmax(x, dim=1)
        return output

In [None]:
aug_model = MyConvNet()
aug_model.load_state_dict(torch.load("mnist_model4_aug.pt"))

## Present at least 9 examples from the test set where your classifier made a mistake. 

In [None]:
# def test(model, test_loader, subset_indices_valid):
incorrect_images = []
true_incorrect_labels = []
pred_incorrect_labels = []

incorrect_count = 0
correct_count = 0


aug_model.eval()    # Set the model to inference mode
test_loss = 0
correct = 0
with torch.no_grad():   # For the inference step, gradient is not computed
    for data, target in test_loader:
        data, target = data, target
        output = model(data)
        test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
        pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
        for p in range(len(pred)):
            if pred[p] != target[p]:
                incorrect_count += 1
                incorrect_images.append(data[p])
                true_incorrect_labels.append(target[p])
                pred_incorrect_labels.append(pred[p])
            else:
                correct_count += 1
        correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)

total = len(subset_indices_valid)


In [None]:
# incorrect_images = []
print(incorrect_count)
print(correct_count)

In [None]:
for i in range(0, 10):
    img = incorrect_images[i]
    plt.imshow(img[0, :, :])
    plt.title("True = "+str(true_incorrect_labels[i].item()) +
             ", Predicted = "+str(pred_incorrect_labels[i].item()))
    plt.show()
    

## Visualize at least 9 of the learned kernels from the first layer of your network. 


In [None]:
conv1_weights = aug_model.conv1.weight.data
conv2_weights = aug_model.conv2.weight.data
conv3_weights = aug_model.conv3.weight.data

In [None]:
kernel_display = []
for i in range(10):
    kernel_conv1 = conv1_weights[i,0,:,:]
    kernel_display.append(kernel_conv1)


In [None]:
for i in range(0, 10):
    img = kernel_display[i]
    plt.imshow(img)
    plt.title("Kernel of First Conv Layer")
    plt.show()
    

## Create Confusion Matrix for test set

In [None]:

cm = confusion_matrix(np.array(Y_test), np.array(Y_pred), normalize=None)

def plot_confusion_matrix(cm, classes, normalize=False, title='Normalized Confusion matrix', cmap=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
plot_confusion_matrix(cm, class_names, normalize=True)

## Use your network to convert each image in the test set into a feature vector (taken from just before the final linear layer). Visualize this high-dimensional embedding in 2D using tSNE (each class should have its own color).

In [None]:
def forward_process(model, x):
#     print(x.shape)
    x = model.conv1(x)
    x = F.relu(x)
    x = F.max_pool2d(x, 2)
    x = model.dropout1(x)
#         print(x.shape)

    x = model.conv2(x)
    x = F.relu(x)
    x = F.max_pool2d(x, 4)
    x = model.dropout2(x)
#         print(x.shape)
#         print(x)

    x = model.conv3(x)
    x = F.relu(x)
    x = F.avg_pool2d(x, 2)
    x = model.dropout2(x)
#         print(x.shape)

    x = model.conv4(x)
    x = F.relu(x)
    x = F.max_pool2d(x, 2)
    x = model.dropout2(x)
#         print(x.shape)

    x = torch.flatten(x, 1)
#         print(x.shape)
    x = model.fc1(x)
    x = model.bn1(x)

    x = F.relu(x)
    return x

In [None]:
# def test(model, test_loader, subset_indices_valid):
embedding_info = []
targets = []


aug_model.eval()    # Set the model to inference mode
test_loss = 0
correct = 0
with torch.no_grad():   # For the inference step, gradient is not computed
    for data, target in test_loader:
        data, target = data, target
        output = model(data)
        test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
        pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
#         print(data.shape)
#         print(len(pred))

        emb = forward_process(aug_model, data)
        embedding_info.append(emb)
        targets.append(target[0])
        
        correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)

total = len(subset_indices_valid)


In [None]:
embedding_info2 = torch.cat(embedding_info, axis=0)

In [None]:
test_tsne = TSNE(random_state=123).fit_transform(embedding_info2)

In [None]:
# Y = test_tsne.fit_transform(embedding_info2)
Y = test_tsne
# ax.set_title("Perplexity=%d" % perplexity)
plt.scatter(Y[:, 0], Y[:, 1], s=0.1, cmap='viridis', c='r')


plt.show()

In [None]:
x = test_tsne
colors = np.array(targets)

num_classes = len(np.unique(colors))
palette = np.array(sns.color_palette("hls", num_classes))

# create a scatter plot.
f = plt.figure(figsize=(8, 8))
ax = plt.subplot(aspect='equal')
sc = ax.scatter(x[:,0], x[:,1], lw=0, s=40, c=palette[colors.astype(np.int)])
plt.xlim(-25, 25)
plt.ylim(-25, 25)
ax.axis('off')
ax.axis('tight')

# add the labels for each digit corresponding to the label
txts = []

for i in range(num_classes):

    # Position of each label at median of data points.
    print(x[colors == i, :].shape)
    xtext, ytext = np.median(x[colors == i, :], axis=0)
    txt = ax.text(xtext, ytext, str(i), fontsize=24)
    txt.set_path_effects([
        path_effects.Stroke(linewidth=5, foreground="w"),
        path_effects.Normal()])
    txts.append(txt)


## Choose one image I0 with feature vector x0 from the test set. Find the 8 images I1, I2, …, I8 in the test set whose feature vectors are closest in Euclidean distance to x0. Repeat this process for at least 3 more choices of I0.


In [None]:
def euclidean(x, y):
    distance = 0
    for i in range(len(x)):
        distance += ((x[i]-y[i])**2)
    return np.sqrt(distance)

In [None]:
image0_choices = []
image0_targets = []
for data, target in test_loader:
    data, target = data, target
    image0_choices.append(data)
    image0_targets.append(target[0])
    if len(image0_choices) ==3:
        break

## Image 1

In [None]:
# for i in range(len(image0_choices)):
emb_img0 = embedding_info[0][0]

distances = []
for j in range(len(embedding_info)):
#     if i == j:
#         continue
    check_emb = embedding_info[j][0]
    dist = euclidean(emb_img0, check_emb)
    distances.append(dist)

In [None]:
k = 9
distances_to_img0 = np.array(distances)
idx = np.argsort(distances_to_img0)[:k]


In [None]:
plt.imshow(test_images_all[0][0,:,:])
plt.show()

for index in idx[1:]:
    plt.imshow(test_images_all[index][0,:,:])
    plt.show()


## Image 3

In [None]:
# for i in range(len(image0_choices)):
emb_img0 = embedding_info[1][0]

distances = []
for j in range(len(embedding_info)):
#     if i == j:
#         continue
    check_emb = embedding_info[j][0]
    dist = euclidean(emb_img0, check_emb)
    distances.append(dist)
    
k = 10
distances_to_img0 = np.array(distances)
idx = np.argsort(distances_to_img0)[:k]
plt.imshow(test_images_all[1][0,:,:])
plt.show()

for index in idx[2:]:
    plt.imshow(test_images_all[index][0,:,:])
    plt.show()


## Image 4

In [None]:
# for i in range(len(image0_choices)):
emb_img0 = embedding_info[2][0]

distances = []
for j in range(len(embedding_info)):
#     if i == j:
#         continue
    check_emb = embedding_info[j][0]
    dist = euclidean(emb_img0, check_emb)
    distances.append(dist)
    
k = 11
distances_to_img0 = np.array(distances)
idx = np.argsort(distances_to_img0)[:k]
plt.imshow(test_images_all[2][0,:,:])
plt.show()

for index in idx[3:]:
    plt.imshow(test_images_all[index][0,:,:])
    plt.show()


## Image 3

In [None]:
# for i in range(len(image0_choices)):
emb_img0 = embedding_info[3][0]

distances = []
for j in range(len(embedding_info)):
#     if i == j:
#         continue
    check_emb = embedding_info[j][0]
    dist = euclidean(emb_img0, check_emb)
    distances.append(dist)
    
k = 12
distances_to_img0 = np.array(distances)
idx = np.argsort(distances_to_img0)[:k]
plt.imshow(test_images_all[3][0,:,:])
plt.show()

for index in idx[4:]:
    plt.imshow(test_images_all[index][0,:,:])
    plt.show()
