In [None]:
import random
import scipy
from scipy.io import arff
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
import IPython
import time

import sklearn
from sklearn import preprocessing
from sklearn import datasets 
from sklearn.impute import SimpleImputer
from sklearn.model_selection import *
from sklearn.metrics import *
from sklearn.decomposition import PCA
from sklearn import tree
from sklearn.tree import export_graphviz
from sklearn.manifold import TSNE

from sklearn.tree import DecisionTreeClassifier

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision.transforms import *

In [None]:
#data sets
DATA_PATH = 'dataset/'
IMAGE_PATH = 'image/'
files = []
dfs = []
dfs_test = []

In [None]:
def plot_image(X):
    plt.figure(figsize=(50,5))
    for i,x in enumerate(X[:10]):
        plt.subplot(1,10,i+1)
        try:
            plt.imshow(np.array(x))
        except:
            plt.imshow(np.array(x).transpose(1,2,0))
    
def dimension_reduction(x_train, x_test, n_components):
    print("Reducting dimensions...")
    pca = PCA(n_components=n_components, random_state=33)
    pca.fit(x_train)
    x_train= pca.transform(x_train)
    x_test = pca.transform(x_test)
    return x_train, x_test, pca

In [None]:
def load_CIFAR_10(training_batches=5, test_batches=1):
    X_train=y_train=X_test=y_test = None
    for b in range(1,training_batches+1):
        with open(DATA_PATH+'data_batch_%s'%b,'rb') as file:
            data = pickle.load(file,encoding='bytes')
            X = np.array(data[b'data'])
            y = np.array(data[b'labels'])
            try:
                X_train = np.concatenate((X_train,X))
                y_train = np.concatenate((y_train,y))
                print('train set batch: %d'%b)
            except:
                print('train set batch: %d'%b)
                X_train = X
                y_train = y
                
    for b in range(1,test_batches+1):
        with open(DATA_PATH+'test_batch','rb') as file:
            data = pickle.load(file,encoding='bytes')
            X = np.array(data[b'data'])
            y = np.array(data[b'labels'])
            try:
                X_test = np.concatenate((X_test,X))
                y_test = np.concatenate((y_test,y))
            except:
                print('test set batch: %d'%b)
                X_test = X
                y_test = y
    return X_train,y_train,X_test,y_test

## PCA Projection


In [None]:
def scatter_two_component(comp1,comp2,y_train,classes,save_as):
    plt.figure(figsize=(20,15))
    color_map = plt.cm.get_cmap('Accent')
    
    #plot without labels (faster)
    plt.scatter(comp1,comp2,c=y_train,cmap=color_map)

    #plot labels
    labels = np.array(classes)[y_train]
    class_num = set()
    for x1,x2,c,l in zip(comp1,comp2,color_map(y_train),labels):
        if len(class_num)==10:
            break
        plt.scatter(x1,x2,c=[c],label=l)
        class_num.add(l)
        
    #remvoe duplicate labels    
    hand, labl = plt.gca().get_legend_handles_labels()
    handout=[]
    lablout=[]
    for h,l in zip(hand,labl):
        if l not in lablout:
            lablout.append(l)
            handout.append(h)
    plt.title(save_as)
    plt.xlabel('Component One')
    plt.ylabel('Component Two')
    plt.legend(handout, lablout,fontsize=20)
    plt.savefig(IMAGE_PATH+save_as)
    plt.show()

In [None]:
# X_train,y_train,X_test,y_test = load_CIFAR_10()
# X_train,X_test,pca = dimension_reduction(X_train,X_test,n_components=50)

In [None]:
# pca.explained_variance_ratio_

In [None]:
# scatter_two_component(X_train[:,0],X_train[:,1],y_train,classes,'PCA')

## t-SNE Embedding

In [None]:
# try:
#     with open('X_embedded','rb') as file:
#         X_embedded = pickle.load(file)
# except:
#     X_embedded = TSNE(n_components=2).fit_transform(X_train)
#     with open('X_embedded','wb') as file:
#         pickle.dump(X_embedded,file)

In [None]:
# scatter_two_component(X_embedded[:,0],X_embedded[:,1],y_train,classes,'t-SNE')

# CNN

In [None]:
X_train,y_train,X_test,y_test = load_CIFAR_10()

In [None]:
X_train = torch.tensor(X_train.reshape(-1,3,32,32))
X_test = torch.tensor(X_test.reshape(-1,3,32,32))
y_train = torch.tensor(y_train)
y_test = torch.tensor(y_test)

In [None]:
# tranform functions
T = Compose([
    ToPILImage(),
#     CenterCrop(32),
#     ColorJitter(brightness=1, contrast=0, saturation=0, hue=0),
#     Pad(2, fill=(100,100,100), padding_mode='constant'),
#     Grayscale(3),    
    RandomHorizontalFlip(p=0.5),
#     RandomAffine(30, translate=None, scale=None, shear=None, resample=False, fillcolor=0),
#     RandomCrop(32, padding=None, pad_if_needed=False, fill=0, padding_mode='constant'),
#     RandomPerspective(distortion_scale=0.3, p=0.5, interpolation=3),
#     RandomRotation(10, resample=False, expand=False, center=None),
    RandomResizedCrop(32, scale=(0.75, 1), ratio=(0.75, 1.25), interpolation=2),
    ToTensor(),
#     Normalize((0.4753,0.4623,0.4149), (0.2414,0.236,0.2419)),
#     Normalize((0.5,0.5,0.5), (0.5,0.5,0.5)),
])

# tranform functions
T2 = Compose([
    ToPILImage(),
    ToTensor(),
])

# tranform functions
T3 = Compose([
    ToPILImage(),
#     CenterCrop(32),
#     ColorJitter(brightness=1, contrast=0, saturation=0, hue=0),
#     Pad(2, fill=(100,100,100), padding_mode='constant'),
    Grayscale(3),    
#     RandomHorizontalFlip(p=0.5),
#     RandomAffine(30, translate=None, scale=None, shear=None, resample=False, fillcolor=0),
#     RandomCrop(32, padding=None, pad_if_needed=False, fill=0, padding_mode='constant'),
#     RandomPerspective(distortion_scale=0.3, p=0.5, interpolation=3),
#     RandomRotation(10, resample=False, expand=False, center=None),
#     RandomResizedCrop(32, scale=(0.75, 1), ratio=(0.75, 1.25), interpolation=2),
    ToTensor(),
    Normalize((0.4753,0.4623,0.4149), (0.2414,0.236,0.2419)),
#     Normalize((0.5,0.5,0.5), (0.5,0.5,0.5)),
])

In [None]:
X_new = torch.stack([T(x) for x in X_train])
X_train = torch.stack([T2(x) for x in X_train])
X_test = torch.stack([T2(x) for x in X_test])

In [None]:
C = X_train.shape[1]
H = X_train.shape[2]
W = X_train.shape[3]
X_aug = torch.stack([X_new,X_train]).reshape(-1,C,H,W)

In [None]:
plot_image(X_train)

In [None]:
X_train = X_aug
y_train = torch.stack([y_train,y_train]).reshape(-1,)

X_train = torch.stack([T3(x) for x in X_train])
X_test = torch.stack([T3(x) for x in X_test])

In [None]:
plot_image(X_new)

In [None]:
plot_image(X_train)

In [None]:
#cnn parameters
image_channels = X_train.shape[1]
image_width = X_train.shape[2]
num_filters = 32
num_filters2 = 64
num_filters3 = 128
filter_size = 5
pool_size = 2
# final_input = (((image_width+1-filter_size)//pool_size+1-filter_size)//pool_size)**2*num_filters2#without padding
final_input = (image_width//pool_size//pool_size//pool_size)**2*num_filters3#with padding

model = torch.nn.Sequential(
    torch.nn.Conv2d(image_channels, num_filters, filter_size, padding=filter_size//2),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(pool_size, pool_size),
    
    torch.nn.Conv2d(num_filters, num_filters2, filter_size, padding=filter_size//2),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(pool_size, pool_size),
    
    torch.nn.Conv2d(num_filters2, num_filters3, filter_size, padding=filter_size//2),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(pool_size, pool_size),
    
    torch.nn.Flatten(),
    torch.nn.Linear(final_input, final_input//2),
    torch.nn.ReLU(),
    
    torch.nn.Linear(final_input//2, final_input//4),
    torch.nn.ReLU(),
    
    torch.nn.Linear(final_input//4, final_input//16),
    torch.nn.ReLU(),

    torch.nn.Linear(final_input//16,10),
)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

if device.type != 'cpu':
    print(device)
    model.to(device)

In [None]:
def train_models(model,model2,X_train,X_test,y_train,y_test,final_input,batch_size,num_epoch):

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)

    #mini-batch training loop
    for epoch in range(num_epoch):

        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            X, y = data
            if device.type != 'cpu':
                X, y = X.to(device), y.to(device)

            optimizer.zero_grad()
#             optimizer2.zero_grad()

            # forward + backward + optimize

            y_pred = model(X)

            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()

            #avg batch loss
            running_loss += loss.item()
            batch = X_train.shape[0]//batch_size//3
            if (i+1) % batch == 0:
                print('Epoch: %d, Batch: %5d had avg loss: %.3f'%(epoch+1,i+1,running_loss/batch))
                running_loss = 0.0
        if (epoch+1) % 10 ==0:
            torch.save(model.state_dict(), 'model_%s.pt'%(epoch+1))
    torch.save(model.state_dict(), 'model.pt')
    print('Training Finished')
    return model

In [None]:
# %%time
batch_size=16
num_epoch=1
# create torch Dataset class from tensors
train_set = torch.utils.data.TensorDataset(X_train, y_train)
test_set = torch.utils.data.TensorDataset(X_test, y_test)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)

try:#try to load trained model 
    model.load_state_dict(torch.load('model.pt'))
    print("Trained model loaded")

except:
    print('Model not found, start straining...')
    model = train_models(model,None,X_train,X_test,y_train,y_test,final_input,batch_size,num_epoch)

In [None]:
# num_epoch=100
# model = train_models(batch_size,num_epoch,train_loader)

In [None]:
correct = 0
total = 0
with torch.no_grad():
    for data in test_loader:
        X, y = data
        if device.type != 'cpu':
            X, y = X.to(device), y.to(device) 
        y_proba = model(X)
        _, y_pred = torch.max(y_proba.data, 1)
        total += y.size(0)
        correct += (y_pred == y).sum().item()

print('Accuracy of the network on the 10000 test images: %.2f %%' % (100 * correct / total))

In [None]:
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))
with torch.no_grad():
    for data in test_loader:
        X, y = data
        if device.type != 'cpu':
            X, y = X.to(device), y.to(device) 
        y_proba = model(X)
        _, y_pred = torch.max(y_proba, 1)
        c = (y_pred == y).squeeze()
        for i in range(4):
            label = y[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1


for i in range(10):
    print('Accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))

# Activation maximization

In [None]:
def plot_activation_image(target):
    zeros = np.zeros((1,3,32,32)).astype(np.float32)
    gray = zeros+0.5
    X = torch.tensor(gray, requires_grad=True)
    target = torch.empty(1, dtype=torch.long).random_(target,target+1)
    # plot_image(gray)
    criterion = nn.CrossEntropyLoss()

    for i in range(100):
        # y probabilities [p0,p1,...,p9]
        y_proba = model(X)

        #backward
        loss = criterion(y_proba, target)
        loss.backward()

        #minimize loss
        X.data -= 0.15*X.grad.data    

        X.grad.data.zero_()
        
#         if i%10 ==0:
#             IPython.display.display(plt.gcf())       # Tell Jupyter to show Matplotlib's current figure
#             IPython.display.clear_output(wait=True)  # Tell Jupyter to clear whatever it's currently showing
#             time.sleep(0.02)
    plt.title("Target class: %s"%classes[target])
    plot_image(X[0].detach().numpy())
    plt.savefig(IMAGE_PATH+classes[target]+'_activation')
    print('finished')

In [None]:
# this is animation
model = model.cpu()
plt.figure(figsize=(25,10))
for i in range(1):
    plt.subplot(2,5,i+1)
    plot_activation_image(i)