In [1]:
import pylab as plt
import imageio
import os
import numpy as np
import pandas as pd
import time
import utils
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image,ImageReadMode
from torchvision import transforms

from torch.autograd import Function
from torch.autograd import Variable

from skimage.transform import resize
import cv2

In [2]:
def path_label_loader(path):
    """
    Args:
     path: Folder path containing subfolders of images
    Output:
     images: List of image path
     labels: Numpy array of labels
    """
    images = []
    labels = []
    
    for s_folder in os.listdir(path):
        label = int(''.join([s for s in s_folder if s.isdigit()]))
        img_folder = os.path.join(path, s_folder)
        
        for img in os.listdir(img_folder):
            if img.endswith(".jpg"):
                image_path = os.path.join(img_folder, img)
                images.append(image_path)
                labels.append(label)
                
    labels = np.array(labels) - 1
    images = np.array(images)
                
    return images, labels

In [3]:
class CustomImageDataset(Dataset):
    def __init__(self, img_dir,labels, transform = False):
        self.img_dir = img_dir
        self.transform = transform
        self.labels = labels


    def __len__(self):
        return len(self.img_dir)

    def __getitem__(self, idx):
        img_path = self.img_dir[idx]
        img = read_image(img_path, ImageReadMode.RGB).float()
        if self.transform != False:
            img = self.transform(img)
        label = self.labels[idx]
        
        return img, label

In [4]:
def create_data_loader(img_path, img_label, batch_size):
    transform = transforms.Compose([
        transforms.Resize(size = (128,128)),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])
        
    
    dataset = CustomImageDataset(img_path, img_label, transform)

    data_loader = DataLoader(
        dataset, batch_size=batch_size, shuffle=True
    )
    
    return data_loader

In [5]:
def eval_on_test_set(net, test_loader,is_cnn = True, verbose=1):
    
    total_error = 0
    batch_num = 0
    
    for test_data, test_label in test_loader:
        
        bs = test_label.shape[0]
        test_data=test_data.to(device)
        test_label=test_label.to(device)
        
        if is_cnn:
            inputs = test_data
        else:
            inputs = test_data.view(bs, 128*128*3)
        
        scores=net( inputs ) 

        error = utils.get_error( scores , test_label)

        total_error += error.item()
        
        batch_num += 1
        
    total_error = total_error / batch_num
    
    if verbose == 1:
        print( 'error rate on test set =', total_error*100 ,'percent\n')
        
    return total_error

In [6]:
def train_model(net, n_epoch, my_lr, train_loader, test_loader, is_cnn = True, momentum=0.9, verbose=1):
    """
    Train a given model with specified hyperparameters
    
    Args:
     net: NN model to be trained
     n_epoch: Number of epochs to train the model
     bs: Batch size for minibatch GD
     lr: Learning rate of GD
     train_data: Torch tensor of dim [N:rgb:width:height]
     train_label: Torch tensor of dim [N]
     verbose: Print out metrics during training if 1, default 1
    
    Output:
     net: Trained NN model
     records: Dictionary containing metrics history, including training loss/error for each epoch/minibatch, test error for each epoch
    """
    
    N = len(train_loader)
    
    net = net.to(device)
    
    train_loss_hist_mb = []
    train_loss_hist = []
    train_error_hist = []
    test_error_hist = []
    
    start=time.time()

    for epoch in range(1,n_epoch+1):

        if not epoch%5:
            my_lr = my_lr / 1.5

        # optimizer=torch.optim.SGD( net.parameters() , lr=my_lr, momentum=momentum )
        optimizer=torch.optim.Adam(net.parameters(), lr=my_lr)

        running_loss=0
        running_error=0
        num_batches=0
            
        for minibatch_data, minibatch_label in train_loader:


            # FORWARD AND BACKWARD PASS
            bs = minibatch_label.shape[0]

            optimizer.zero_grad()


            minibatch_data=minibatch_data.to(device)                
            minibatch_label=minibatch_label.to(device)

            if is_cnn:
                inputs = minibatch_data
            else:                    
                inputs = minibatch_data.view(bs, 128*128*3)


            inputs.requires_grad_()

            scores=net( inputs ) 

            loss =  criterion( scores , minibatch_label.long()) 

            loss.backward()
            
            optimizer.step()


            # COMPUTE STATS
            running_loss += loss.detach().item()

            error = utils.get_error( scores.detach() , minibatch_label)
            running_error += error.item()

            num_batches+=1   
            train_loss_hist_mb.append(running_error/num_batches)
                
        # AVERAGE STATS THEN DISPLAY
        total_loss = running_loss/num_batches
        total_error = running_error/num_batches
        elapsed = (time.time()-start)/60

        if verbose == 1:
            print('epoch=',epoch, '\t time=', elapsed,'min', '\t lr=', my_lr  ,'\t loss=', total_loss , '\t error=', total_error*100 ,'percent')
        test_error = eval_on_test_set(net, test_loader,is_cnn = is_cnn, verbose=verbose) 
        
        train_loss_hist.append(total_loss)
        train_error_hist.append(total_error)
        test_error_hist.append(test_error)
    
    records = {'train_loss_mb': train_loss_hist_mb,
              'train_loss': train_loss_hist,
              'train_error': train_error_hist,
              'test_error': test_error_hist}
        
    return records

## Classification Models

### MLP

In [7]:
class mlp(nn.Module):

    def __init__(self, input_size, hidden_size1, hidden_size2,  output_size):
        super(mlp , self).__init__()
        
        self.layer1 = nn.Linear(  input_size   , hidden_size1)
        self.layer2 = nn.Linear(  hidden_size1 , hidden_size2)
        self.layer3 = nn.Linear(  hidden_size2 , output_size)
        
    def forward(self, x):
        
        y       = self.layer1(x)
        y_hat   = torch.relu(y)
        z       = self.layer2(y_hat)
        z_hat   = torch.relu(z)
        scores  = self.layer3(z_hat)
        
        return scores

### LeNet

In [8]:
class LeNet5_convnet(nn.Module):

    def __init__(self):

        super(LeNet5_convnet, self).__init__()
        
        self.layer = nn.Sequential(
            # CL1:   3 * 128 x 128  -->    50 x 128 x 128 
            nn.Conv2d(3,   50,  kernel_size=3,  padding=1 ),
            nn.ReLU(),

            # MP1: 50 x 128 x 128 -->    50 x 64 x 64
            nn.MaxPool2d(2,2),

            # CL2:   50 x 64 x 64  -->    100 x 64 x 64 
            nn.Conv2d(50,  100,  kernel_size=3,  padding=1 ),
            nn.ReLU(),

            # MP2: 100 x 64 x 64 -->    100 x 32 x 32
            nn.MaxPool2d(2,2),
            
            # MP3: 100 * 32 * 32 -> 100 * 32 * 32 (Grad needed for later CAM)
            nn.Conv2d(100,100, kernel_size=3, padding=1),
            nn.ReLU()
        )
        
        self.fc_layer = nn.Sequential(
            # LL1:   100 x 32 x 32 = 102400 -->  100 
            nn.Linear(102400, 100),
            nn.ReLU(),

            # LL2:   100  -->  13 
            nn.Linear(100,13),
        )


    def forward(self, x):
        out = self.layer(x)
        out = out.view(-1, 102400)
        out = self.fc_layer(out)
    
        return out

In [9]:
class VGG16(nn.Module):

    def __init__(self):

        super(VGG19, self).__init__()
        
        self.layer = nn.Sequential(
            # CL1:   3 * 128 x 128  -->    65 x 128 x 128 
            nn.Conv2d(3,   64,  kernel_size=3,  padding=1 ),
            nn.ReLU(),

            # CL2: 64 x 128 x 128 -->    64 x 128 x 128
            nn.Conv2d(64,   64,  kernel_size=3,  padding=1 ),
            nn.ReLU(),
            
            # MP1: 64 x 128 x 128 -->    64 x 64 x 64
            nn.MaxPool2d(2,2),
            
            # CL3: 64 x 64 x 64  -->    128 x 64 x 64 
            nn.Conv2d(64,  128,  kernel_size=3,  padding=1 ),
            nn.ReLU(),

            # CL4: 128 x 64 x 64  -->    128 x 64 x 64 
            nn.Conv2d(128,  128,  kernel_size=3,  padding=1 ),
            nn.ReLU(),
            
            # MP2: 128 x 64 x 64 -->    128 x 32 x 32
            nn.MaxPool2d(2,2),
            
            # CL5: 128 x 32 x 32  -->    256 x 32 x 32 
            nn.Conv2d(128,256, kernel_size=3, padding=1),
            nn.ReLU(),
            
            # CL6: 256 x 32 x 32  -->    256 x 32 x 32     
            nn.Conv2d(256,256, kernel_size=3, padding=1),
            nn.ReLU(),
            
            # MP3: 256 x 32 x 32 -->    256 x 16 x 16
            nn.MaxPool2d(2,2),
            
            # CL7: 256 x 16 x 16  -->    512 x 16 x 16
            nn.Conv2d(256,512, kernel_size=3, padding=1),
            nn.ReLU(),
            
            # CL8: 512 x 16 x 16  -->    512 x 16 x 16
            nn.Conv2d(512,512, kernel_size=3, padding=1),
            nn.ReLU(),
            
            # MP4: 512 x 16 x 16 -->    512 x 8 x 8
            nn.MaxPool2d(2,2)
            
        )
        
        self.fc_layer = nn.Sequential(
            # LL1:   512 x 8 x 8 = 32768 -->  4096 
            nn.Linear(32768, 4096),
            nn.ReLU(),
            
            # LL2:   4096 -->  4096 
            nn.Linear(4096, 4096),
            nn.ReLU(),
            
            # LL3:   4096 -->  13
            
            nn.Linear(4096, 13),
        )



    def forward(self, x):
        out = self.layer(x)
        out = out.view(-1, 32768)
        out = self.fc_layer(out)

    
        return out

## Load Data

In [10]:
path = "../data/clean_images_index"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device, "is running")

images, labels = path_label_loader(path)
print(len(images))
print(len(labels))

cuda is running
2061
2061


In [11]:
np.unique(labels, return_counts=True)

(array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12]),
 array([156, 152, 156, 170, 150, 129, 178, 141, 185, 159, 152, 161, 172]))

In [12]:
# Use 1648 images as training set and 413 images as test set
train_indice = np.random.choice(2061, 1648, replace=False)
train_path = images[train_indice]
train_label = labels[train_indice]
test_path = images[~train_indice]
test_label = labels[~train_indice]

In [13]:
train_loader = create_data_loader(train_path, train_label, 32)
test_loader = create_data_loader(test_path, test_label, 32)
criterion = nn.CrossEntropyLoss()

## Classification Model Training

In [None]:
my_mlp=mlp(128*128*3, 50, 50, 13)
my_mlp.to(device)
print(my_mlp)

In [None]:
mlp_record = train_model(net = my_mlp, n_epoch = 10, my_lr = 1e-4, train_loader = train_loader, test_loader = test_loader,is_cnn = False, verbose=1)

In [None]:
my_lenet = LeNet5_convnet()
my_lenet.to(device)
print(my_lenet)

In [None]:
lenet_record = train_model(net = my_lenet, n_epoch = 10, my_lr = 1e-4, train_loader = train_loader, test_loader = test_loader, verbose=1)

In [14]:
my_VGG16 = VGG16()
my_VGG16.to(device)
print(my_VGG16)

VGG19(
  (layer): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU()
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU()
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU()
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU()
    (14): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (15): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (16): ReLU()
    (17): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padd

In [17]:
VGG19_record = train_model(net = my_VGG19, n_epoch = 10, my_lr = 1e-4, train_loader = train_loader, test_loader = test_loader, verbose=1)

KeyboardInterrupt: 

In [16]:
torch.cuda.empty_cache()

## Guided Backprop ReLU

In [None]:
class GuidedBackpropRelu(Function):
    @staticmethod
    def forward(ctx,input):
        ctx.save_for_backward(input)
        return input.clamp(min=0)
    
    @staticmethod
    def backward(ctx,grad_output):
        input = ctx.saved_tensors[0]
        grad_input = grad_output.clone()
        grad_input[grad_input<0] = 0
        grad_input[input<0]=0
        return grad_input
     
guided_relu = GuidedBackpropRelu.apply

## Guided Backprop ReLU Model

In [None]:
class GuidedReluModel(nn.Module):
    def __init__(self,model,to_be_replaced,replace_to):
        super(GuidedReluModel,self).__init__()
        self.model = model
        self.to_be_replaced = to_be_replaced
        self.replace_to = replace_to
        self.layers=[]
        self.output=[]
        
        for m in self.model.modules():
            if isinstance(m,self.to_be_replaced):
                self.layers.append(self.replace_to )
                #self.layers.append(m)
            elif isinstance(m,nn.Conv2d):
                self.layers.append(m)
            elif isinstance(m,nn.BatchNorm2d):
                self.layers.append(m)
            elif isinstance(m,nn.Linear):
                self.layers.append(m)
            elif isinstance(m,nn.AvgPool2d):
                self.layers.append(m)
            elif isinstance(m,nn.MaxPool2d):
                self.layers.append(m)
                
        for i in self.layers:
            print(i)
        
        
    def reset_output(self):
        self.output = []
    
    def hook(self,grad):
        # out = grad[:,0,:,:].cpu().data#.numpy()
        out = grad[:,:,:,:].cpu().data#.numpy()
        print("out_size:",out.size())
        self.output.append(out)
        
    def get_visual(self,idx,original_img):
        grad = self.output[0][idx]
        return grad
        
    def forward(self,x):
        out = x 
        out.register_hook(self.hook)
        for i in self.layers[:-3]:
            out = i(out)
        out = out.view(out.size()[0],-1)
        for j in self.layers[-3:]:
            out = j(out)
        return out
    
    # def forward(self,x):
    #     out = x 
    #     out.register_hook(self.hook)
    #     for i in self.layers:
    #         out = i(out)
    #     out = out.view(out.size()[0],-1)
    #     # out = out.view(-1, 102400)
    #     return out
        

In [None]:
def axes_swap(x):
    '''Convert (3,n,n) to (n,n,3)'''
    x = x.swapaxes(0,1)
    x = x.swapaxes(1,2)
    return x

In [None]:
product = None

## Class Activation Map (CAM)

In [None]:
class CAM():
    def __init__(self,model):
        self.gradient = []
        self.model = model
        # self.h = self.model.model.layer[-2].register_backward_hook(self.save_gradient)
        self.h = self.model.model.layer[0].register_backward_hook(self.save_gradient) ## Note the change of index here, the indexed layer should have grad?
        
    def save_gradient(self,*args):
        #print("Gradient saved!!!!")
        grad_input = args[1]
        grad_output= args[2]
        self.gradient.append(grad_output[0])
        print('CAM grad: ', self.gradient[0].size())
        
    def get_gradient(self):
        return self.gradient[0]
    
    def remove_hook(self):
        self.h.remove()
            
    def normalize_cam(self,x):
        x = 2*(x-torch.min(x))/(torch.max(x)-torch.min(x)+1e-8)-1
        #x[x<torch.max(x)]=-1
        return x
    
    def visualize(self,cam_img,guided_img,img_var):
        # guided_img = guided_img.sum(dim=0) # Compress RGB dimension? 
        guided_img = guided_img.numpy()
        # cam_img = resize(cam_img.cpu().data.numpy(),output_shape=(28,28))
        cam_img = cam_img.cpu().data.numpy()
        x = img_var[:,:,:].cpu().data.numpy()
        
        x = axes_swap(x)
        guided_img = axes_swap(guided_img)
        # print('guided_img shape:',guided_img.shape)
        guided_img = np.sum(guided_img, axis=-1)
        # print('guided_img max:', guided_img.max())
        # print('x_max:', x.max())
        # print(x.shape)
        
        # x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)

        fig = plt.figure(figsize=(20, 12)) 
        
        plt.subplot(1,4,1)
        plt.title("Original Image")
        # plt.imshow(x,cmap="gray")
        plt.imshow(x)

        plt.subplot(1,4,2)
        plt.title("Class Activation Map")
        plt.imshow(cam_img)

        plt.subplot(1,4,3)
        plt.title("Guided Backpropagation")
        # plt.imshow(guided_img,cmap='gray')
        plt.imshow(guided_img)
        
        plt.subplot(1,4,4)
        plt.title("Guided x CAM")
        # plt.imshow(guided_img*cam_img,cmap="gray")
        global product
        product = guided_img*cam_img
        plt.imshow(guided_img*cam_img)
        plt.show()
    
    def get_cam(self,idx):
        grad = self.get_gradient()
        print('grad shape:', grad.shape)
        alpha = torch.sum(grad,dim=3,keepdim=True)
        alpha = torch.sum(alpha,dim=2,keepdim=True)
        print('alpha shape:', alpha.shape)
        
        cam = alpha[idx]*grad[idx]
        cam = torch.sum(cam,dim=0) # Default Compression(of features?)
        cam = self.normalize_cam(cam)
        self.remove_hook()
        
        return cam

## Detection Model

In [None]:
model = my_lenet ## Change when best model is used
guide = GuidedReluModel(model,nn.ReLU,guided_relu)
cam = CAM(guide)

In [None]:
guide.reset_output()
for image,label in test_loader:
    x = Variable(image,requires_grad=True).cuda()
    x = x/255./4.
    y_= Variable(label).cuda()
        
    output = guide.forward(x) 
    # print(output[0])
    output = torch.index_select(output,dim=1,index=y_)
    # print(output[0])
    output = torch.sum(output)
    # print(output)
    output.backward(retain_graph=True)
    
    for j in range(1):
        out = cam.get_cam(j)
        guided_img = guide.get_visual(j,x)
        cam.visualize(out,guided_img,x[j])
        
    break

In [None]:
print(product.max())
print(product.min())

In [None]:
np.unravel_index(product.argmax(), product.shape)

In [None]:
product[61,124]

In [None]:
np.unravel_index(product.argmin(), product.shape)

In [None]:
product[71, 32]

In [None]:
product[71,:]

In [None]:
pic = axes_swap(x[j]).cpu().data.numpy()

In [None]:
plt.imshow(pic)

In [None]:
import matplotlib.patches as patches
fig, ax = plt.subplots()

# Display the image
ax.imshow(pic)

# Create a Rectangle patch
rect = patches.Rectangle(np.unravel_index(product.argmax(), product.shape), 10, 10, linewidth=1, edgecolor='b', facecolor='none')

# Add the patch to the Axes
ax.add_patch(rect)