# Install packages

In [None]:
import os 
import pandas as pd 
from os.path import isfile, join
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.metrics import recall_score,precision_score,f1_score
from sklearn.manifold import TSNE

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torchvision.io import read_image
import torchvision.transforms as T
import torchvision
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import timeit
import unittest
import random
import torchvision.models as models
## Please DONOT remove these lines. 
torch.manual_seed(0)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(0)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Load data

In [None]:
from google.colab import output
!unzip ./drive/MyDrive/data-new.zip
output.clear()

In [None]:
!rm /content/data-new/wallpapers/train/.DS_Store
!rm /content/data-new/wallpapers/test/.DS_Store
!rm /content/data-new/wallpapers/.DS_Store
#!mkdir /content/drive/MyDrive/data
!find /content/data-new/ -name \.DS_Store -delete

In [None]:
dataDir= '/content/drive/MyDrive/data';
checkpointDir = 'modelCheckpoints';
train_folder = 'train';
test_folder  = 'test';
Symmetry_Groups = ['P1', 'P2', 'PM' ,'PG', 'CM', 'PMM', 'PMG', 'PGG', 'CMM','P4', 'P4M', 'P4G', 'P3', 'P3M1', 'P31M', 'P6', 'P6M']

In [None]:
label_mapping = dict(zip(Symmetry_Groups, range(17)))
label_mapping

In [None]:
files = []
for group in Symmetry_Groups:
    files.extend(os.listdir(join("/content/data-new/wallpapers/train",group)))
s = pd.Series(files).apply(lambda x: x.split("_")[0]).map(label_mapping)
pd.concat([pd.Series(files),s],axis=1).to_csv("/content/drive/MyDrive/data/train_csv")

In [None]:
files = []
for group in Symmetry_Groups:
    files.extend(os.listdir(join("/content/data-new/wallpapers/test",group)))
s = pd.Series(files).apply(lambda x: x.split("_")[0]).map(label_mapping)
pd.concat([pd.Series(files),s],axis=1).to_csv("/content/drive/MyDrive/data/test_csv")

In [None]:
# check availability of GPU and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
use_cuda =  torch.cuda.init()# if you have acess to a GPU, enable it to speed the training 

In [None]:
class normalise(object):
    def __call__(self,img):
      img = torch.from_numpy(np.asarray(img))
      img = img / 255
      return img

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file,index_col=0)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0].split("_")[0] +"/"+self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [None]:
train_dataset = CustomImageDataset("/content/drive/MyDrive/data/train_csv","/content/data-new/wallpapers/train" ,
                                   transform=transforms.Compose([normalise()]))
test_dataset  = CustomImageDataset("/content/drive/MyDrive/data/test_csv","/content/data-new/wallpapers/test",
                                  transform=transforms.Compose([normalise()]))

In [None]:
# create dataloaders for training and test datasets
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=250,shuffle=True)
#valid_dataloader = torch.utils.data.DataLoader(val_set,batch_size=250,shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size=250,shuffle=True)


In [None]:
import matplotlib.pyplot as plt
im, target = next(iter(train_dataloader))
print(im.shape, target.shape)
plt.imshow(im[0].squeeze(0).detach().cpu().numpy(),cmap="gray")
plt.show()

In [None]:
jitter = T.ColorJitter(brightness=.5, hue=.3)
jitted_imgs = [jitter(im[0]) for _ in range(4)]
fig, ax = plt.subplots(1, 4,figsize=(20,20))
for i in range(4):
  ax[i].imshow(jitted_imgs[i].squeeze(0).detach().cpu().numpy(),cmap="gray")

In [None]:
rotater = T.RandomRotation(degrees=(0, 360))
rotated_imgs = [rotater(im[0]) for _ in range(4)]
fig, ax = plt.subplots(1, 4,figsize=(20,20))
for i in range(4):
  ax[i].imshow(rotated_imgs[i].squeeze(0).detach().cpu().numpy(),cmap="gray")

In [None]:

affine_transfomer = T.RandomAffine(degrees=(0, 0), translate=(0.1, 0.3), scale=(1,2 ))
affine_imgs = [affine_transfomer(im[0]) for _ in range(4)]
fig, ax = plt.subplots(1, 4,figsize=(20,20))
for i in range(4):
  ax[i].imshow(affine_imgs[i].squeeze(0).detach().cpu().numpy(),cmap="gray")

In [None]:
blurrer = T.GaussianBlur(kernel_size=(3, 5), sigma=(0.1, 0.5))
blurred_imgs = [blurrer(im[0]) for _ in range(4)]
fig, ax = plt.subplots(1, 4,figsize=(20,20))
for i in range(4):
  ax[i].imshow(blurred_imgs[i].squeeze(0).detach().cpu().numpy(),cmap="gray")

In [None]:
center_crops = [T.CenterCrop(size=size)(im[0]) for size in [(128,128)]]
fig, ax = plt.subplots(1, len(center_crops))
plt.imshow(center_crops[0].squeeze(0).detach().cpu().numpy(),cmap="gray")

In [None]:
augmentations = transforms.Compose([#T.ColorJitter(brightness=.5, hue=.3),
                                   # T.GaussianBlur(kernel_size=(3, 5), sigma=(0.1, 0.5)),
                                    T.RandomAffine(degrees=(0, 0), translate=(0.1, 0.3), scale=(1,2 )),
                                    T.RandomRotation(degrees=(0, 360)),
                                    T.CenterCrop(size=(256,256)),
                                    normalise() ])

In [None]:
aug_train_dataset = CustomImageDataset("/content/drive/MyDrive/data/train_csv","/content/data-new/wallpapers/train" ,
                                   transform=augmentations)
aug_test_dataset  = CustomImageDataset("/content/drive/MyDrive/data/test_csv","/content/data-new/wallpapers/test",
                                  transform=augmentations)

In [None]:
# create dataloaders for training and test datasets
aug_train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=250,shuffle=True)
#valid_dataloader = torch.utils.data.DataLoader(val_set,batch_size=250,shuffle=True)
aug_test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size=250,shuffle=True)


In [None]:
import matplotlib.pyplot as plt
im, target = next(iter(aug_train_dataloader))
print(im.shape, target.shape)
fig, ax = plt.subplots(1, 4,figsize=(20,20))
for i in range(4):
  ax[i].imshow(im[i].squeeze(0).detach().cpu().numpy(),cmap="gray")

# Training functions

In [None]:
def get_subset(percent_of_data):
  return torch.utils.data.Subset(aug_train_dataset, random.sample(range(0,len(aug_train_dataset)),
                                                              int(percent_of_data*(len(aug_train_dataset)/100))) )

In [None]:
crs_entropy = nn.CrossEntropyLoss()
def multi_metrics(y_pred, y_test):
    y_pred_softmax = torch.log_softmax(y_pred, dim = 1)
    _, y_pred_tags = torch.max(y_pred_softmax, dim = 1)    
    
    correct_pred = (y_pred_tags == y_test).float()
    acc = correct_pred.sum() #/ len(correct_pred)
    precision = precision_score(y_test.detach().cpu().numpy(),y_pred_tags.detach().cpu().numpy(),average="micro")
    recall = recall_score(y_test.detach().cpu().numpy(),y_pred_tags.detach().cpu().numpy(),average="micro")
    f1 = f1_score(y_test.detach().cpu().numpy(),y_pred_tags.detach().cpu().numpy(),average="micro")
    #acc = torch.round(acc * 100)
    
    return acc,precision,recall,f1,y_pred_tags

def train(model, device, train_loader, optimizer, epoch,):
    model.train()
   
    loss_per_epoch = 0
    correct = 0
    total_prec = 0
    total_recall = 0
    total_f1 = 0
    confusion_matrix = np.zeros((17, 17))

    for batch_idx, (data, target) in enumerate(train_loader):
      
        inputs, labels = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(inputs)
        loss = crs_entropy(output, labels)
        loss.backward()
        optimizer.step()
  
        train_acc,pre, recall, f1,y_pred = multi_metrics(output, labels)
        correct += train_acc
        total_prec += pre
        total_recall += recall
        total_f1 += f1

        idx =  np.array((labels.view(-1).detach().cpu().numpy(),y_pred.view(-1).detach().cpu().numpy())).T
        np.add.at(confusion_matrix,(idx[:,0],idx[:,1]),1)

        if (batch_idx % 50 == 0) or (batch_idx == (len(train_loader)-1)):
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.sampler),
                100. * batch_idx / len(train_loader), loss.item()))
            print('\nTrain Accuracy: {} [{}/{} ({:.0f}%)]\tAccuracy:{}/{} ({:.0f}%)\n'.format(
                epoch, batch_idx * len(data), len(train_loader.sampler),
                100. * batch_idx / len(train_loader), correct, len(train_loader.sampler),
        100. * correct / len(train_loader.sampler)))
        loss_per_epoch += loss.item()

    print('\nTrain set: PRECISION: {:.4f}'.format(100. * total_prec / len(train_loader)))
    print('\nTrain set: RECALL: {:.4f}'.format(100. * total_recall / len(train_loader)))
    print('\nTrain set: F1: {:.4f}'.format(100. * total_f1 / len(train_loader)))

    return loss_per_epoch,100* correct/len(train_loader.sampler),confusion_matrix
    

In [None]:
def test(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    total_prec = 0
    total_recall = 0
    total_f1 = 0
    confusion_matrix = np.zeros((17, 17))
    with torch.no_grad():
        for data, target in test_loader:
            image,target = data.to(device), target.to(device)
            output = model(image)
            test_loss += crs_entropy(output, target).item()  
            test_acc,pre, recall, f1 ,y_pred= multi_metrics(output, target)
            correct += test_acc
            total_prec += pre
            total_recall += recall
            total_f1 += f1

            idx =  np.array((target.view(-1).detach().cpu().numpy(),y_pred.view(-1).detach().cpu().numpy())).T
            np.add.at(confusion_matrix,(idx[:,0],idx[:,1]),1)

    test_loss /= len(test_loader)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.sampler),
        100. * correct / len(test_loader.sampler)))
    
    print('\nTest set: PRECISION: {:.4f}'.format(100. * total_prec / len(test_loader)))
    print('\nTest set: RECALL: {:.4f}'.format(100. * total_recall / len(test_loader)))
    print('\nTest set: F1: {:.4f}'.format(100. * total_f1 / len(test_loader)))
    
    return test_loss,100. * correct / len(test_loader.sampler), confusion_matrix
   

# Transfer Learning

## ResNet

In [None]:
def reset_model_resent():
  resnet_model = models.resnet18(pretrained=False)
  resnet_model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  for param in resnet_model.parameters():
    param.requires_grad = True
  num_ftrs = resnet_model.fc.in_features
  resnet_model.fc = nn.Linear(num_ftrs, len(Symmetry_Groups))
  resnet_model = resnet_model.to(device)
  params_to_update = []
  for name,param in resnet_model.named_parameters():
    if param.requires_grad == True:
      params_to_update.append(param)
      # print("\t",name)
  return resnet_model,params_to_update

In [None]:
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
batch_size = 128

avg_train_acc= []
avg_valid_acc = []   
avg_train_loss = []
avg_valid_loss = [] 

criterion = nn.CrossEntropyLoss().cuda()

# K-fold Cross Validation model evaluation
train_subset = get_subset(20)
for fold, (train_ids, valid_ids) in enumerate(kfold.split(train_subset)):
    print(f'FOLD {fold}')

    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_ids)

    trainloader = torch.utils.data.DataLoader(
                      train_subset, 
                      batch_size=batch_size, sampler=train_subsampler)
    
    validloader = torch.utils.data.DataLoader(
                      train_subset,
                      batch_size=batch_size, sampler=valid_subsampler)
    
    train_loss_per_epoch = []

    model,params_to_update = reset_model_resent()
    model = model.to(device)
    optimizer =  optim.Adam(params_to_update, lr=0.0001)
    
    start = timeit.default_timer()
    for epoch in range(1, 2):  
        train_loss,train_acc,cf_train =  train(model, device, trainloader, optimizer, epoch)
        train_loss_per_epoch.append(train_loss)

    #train_loss,train_acc,cf_train = (test(model, device, trainloader, criterion))    
    valid_loss,valid_acc,cf_valid = (test(model, device, validloader, criterion))

    stop = timeit.default_timer()
    print('Total time taken: {} seconds'.format(int(stop - start)) )

    avg_train_acc.append(train_acc.item())
    avg_valid_acc.append(valid_acc.item())  
    avg_train_loss.append(train_loss)
    avg_valid_loss.append(valid_loss)

    # Saving the model
    model_name = "Resnet"
    save_path = "/content/drive/MyDrive/data"+f'/model-{model_name}-fold-{fold}.pth'
    torch.save(model.state_dict(), save_path)
    for heatmap in [cf_train,cf_valid]:
      plt.figure(figsize=(15,10))
      sns.heatmap(heatmap,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
      plt.ylabel('True class')
      plt.xlabel('Predicted class')
      plt.show()

print(f'Train All Loss: {np.mean(avg_train_loss):.5f} | Train All Acc: {np.mean(avg_train_acc):.3f}| STD: {np.std(avg_train_loss)} | STD: {np.std(avg_train_acc)}')
print(f'Validation All Loss: {np.mean(avg_valid_loss):.5f} | Validation All Acc: {np.mean(avg_valid_acc):.3f}| STD: {np.std(avg_valid_loss)} | STD: {np.std(avg_valid_acc)} ')


In [None]:
test_model,_ = reset_model_resent()
test_model.load_state_dict(torch.load("/content/drive/MyDrive/data/model-Resnet-fold-"+str(np.argmax(avg_valid_acc))+".pth"))
test_model.to(device)
testloader = torch.utils.data.DataLoader(test_dataset,batch_size=128,shuffle=True)
test_loss,test_acc,cf_test= (test(test_model, device, testloader, criterion=nn.CrossEntropyLoss()))

plt.figure(figsize=(15,10))
sns.heatmap(cf_test,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
plt.ylabel('True class')
plt.xlabel('Predicted class')
plt.show()

In [None]:
test_outputs = []
test_targets=[]
with torch.no_grad():
      for data, target in testloader:
          image,target = data.to(device), target.to(device)
          output = test_model(image)
          test_outputs.extend( output.detach().cpu().tolist())
          test_targets.extend(target.detach().cpu().tolist())
          
tsne = TSNE(2, verbose=1)
reduced_dim = tsne.fit_transform(test_outputs)
fig, ax = plt.subplots(figsize=(15,10))
scatter = ax.scatter(reduced_dim[:,0],reduced_dim[:,1],c = test_targets)
legend1 = ax.legend(*scatter.legend_elements(num=17),
                    loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

test_model.conv1.register_forward_hook(get_activation('conv1'))
data, _ = train_dataset[4]
data=data.to(device)
data.unsqueeze_(0)
output = test_model(data)

k=0
act = activation['conv1'].squeeze()
fig,ax = plt.subplots(8,8,figsize=(12, 15))

for i in range(act.size(0)//8):
        for j in range(act.size(0)//8):
           ax[i,j].imshow(act[k].detach().cpu().numpy())
           k+=1    


## VGG16

In [None]:
def vgg16_model():
  vgg_model = models.vgg16(pretrained=False)
  vgg_model.features[0] = nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  for param in vgg_model.parameters():
    param.requires_grad = True
  num_ftrs = vgg_model.classifier[6].in_features
  vgg_model.classifier[6] = nn.Linear(num_ftrs, len(Symmetry_Groups))
  vgg_model = vgg_model.to(device)
  params_to_update = []
  for name,param in vgg_model.named_parameters():
    if param.requires_grad == True:
      params_to_update.append(param)
      #print("\t",name)
  return vgg_model,params_to_update

In [None]:
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
batch_size = 256

avg_train_acc= []
avg_valid_acc = []   
avg_train_loss = []
avg_valid_loss = [] 

criterion = nn.CrossEntropyLoss().cuda()

# K-fold Cross Validation model evaluation
train_subset = get_subset(20)
for fold, (train_ids, valid_ids) in enumerate(kfold.split(train_subset)):
    print(f'FOLD {fold}')

    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_ids)

    trainloader = torch.utils.data.DataLoader(
                      train_subset, 
                      batch_size=batch_size, sampler=train_subsampler)
    
    validloader = torch.utils.data.DataLoader(
                      train_subset,
                      batch_size=batch_size, sampler=valid_subsampler)
    
    train_loss_per_epoch = []

    vgg_model,params_to_update = vgg16_model()
    vgg_model = vgg_model.to(device)
    optimizer =  optim.Adam(params_to_update, lr=0.0001)
    
    start = timeit.default_timer()
    for epoch in range(1, 2):  
        train_loss,train_acc,cf_train =  train(vgg_model, device, trainloader, optimizer, epoch)
        train_loss_per_epoch.append(train_loss)

    #train_loss,train_acc,cf_train = (test(model, device, trainloader, criterion))    
    valid_loss,valid_acc,cf_valid = (test(vgg_model, device, validloader, criterion))

    stop = timeit.default_timer()
    print('Total time taken: {} seconds'.format(int(stop - start)) )

    avg_train_acc.append(train_acc.item())
    avg_valid_acc.append(valid_acc.item())  
    avg_train_loss.append(train_loss)
    avg_valid_loss.append(valid_loss)

    # Saving the model
    model_name = "Vgg16"
    save_path = "/content/drive/MyDrive/data"+f'/model-{model_name}-fold-{fold}.pth'
    torch.save(vgg_model.state_dict(), save_path)
    for heatmap in [cf_train,cf_valid]:
      plt.figure(figsize=(15,10))
      sns.heatmap(heatmap,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
      plt.ylabel('True class')
      plt.xlabel('Predicted class')
      plt.show()

print(f'Train All Loss: {np.mean(avg_train_loss):.5f} | Train All Acc: {np.mean(avg_train_acc):.3f}| STD: {np.std(avg_train_loss)} | STD: {np.std(avg_train_acc)}')
print(f'Validation All Loss: {np.mean(avg_valid_loss):.5f} | Validation All Acc: {np.mean(avg_valid_acc):.3f}| STD: {np.std(avg_valid_loss)} | STD: {np.std(avg_valid_acc)} ')


In [None]:
test_model,_ = vgg16_model()
test_model.load_state_dict(torch.load("/content/drive/MyDrive/data/model-Vgg16-fold-"+str(np.argmax(avg_valid_acc))+".pth"))
test_model.to(device)
testloader = torch.utils.data.DataLoader(test_dataset,batch_size=128,shuffle=True)
test_loss,test_acc,cf_test= (test(test_model, device, testloader, criterion=nn.CrossEntropyLoss()))

plt.figure(figsize=(15,10))
sns.heatmap(cf_test,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
plt.ylabel('True class')
plt.xlabel('Predicted class')
plt.show()

In [None]:
test_outputs = []
test_targets=[]
with torch.no_grad():
      for data, target in testloader:
          image,target = data.to(device), target.to(device)
          output = test_model(image)
          test_outputs.extend( output.detach().cpu().tolist())
          test_targets.extend(target.detach().cpu().tolist())
          
tsne = TSNE(2, verbose=1)
reduced_dim = tsne.fit_transform(test_outputs)
fig, ax = plt.subplots(figsize=(15,10))
scatter = ax.scatter(reduced_dim[:,0],reduced_dim[:,1],c = test_targets)
legend1 = ax.legend(*scatter.legend_elements(num=17),
                    loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

test_model.conv1.register_forward_hook(get_activation('conv1'))
data, _ = train_dataset[4]
data=data.to(device)
data.unsqueeze_(0)
output = test_model(data)

k=0
act = activation['conv1'].squeeze()
fig,ax = plt.subplots(8,8,figsize=(12, 15))

for i in range(act.size(0)//8):
        for j in range(act.size(0)//8):
           ax[i,j].imshow(act[k].detach().cpu().numpy())
           k+=1    


## AlexNet

In [None]:
def alexnet_create_model():
  alexnet_model = models.alexnet(pretrained=False)
  alexnet_model.features[0] = nn.Conv2d(1, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
  for param in alexnet_model.parameters():
    param.requires_grad = True
  num_ftrs = alexnet_model.classifier[6].in_features
  alexnet_model.classifier[6] = nn.Linear(num_ftrs, len(Symmetry_Groups))
  alexnet_model = alexnet_model.to(device)
  params_to_update = []
  for name,param in alexnet_model.named_parameters():
    if param.requires_grad == True:
      params_to_update.append(param)
      #print("\t",name)
  return alexnet_model,params_to_update

In [None]:
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
batch_size = 256

avg_train_acc= []
avg_valid_acc = []   
avg_train_loss = []
avg_valid_loss = [] 

criterion = nn.CrossEntropyLoss().cuda()

# K-fold Cross Validation model evaluation
train_subset = get_subset(20)
for fold, (train_ids, valid_ids) in enumerate(kfold.split(train_subset)):
    print(f'FOLD {fold}')

    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_ids)

    trainloader = torch.utils.data.DataLoader(
                      train_subset, 
                      batch_size=batch_size, sampler=train_subsampler)
    
    validloader = torch.utils.data.DataLoader(
                      train_subset,
                      batch_size=batch_size, sampler=valid_subsampler)
    
    train_loss_per_epoch = []

    alexnet_model,params_to_update = alexnet_create_model()
    alexnet_model = alexnet_model.to(device)
    optimizer =  optim.Adam(params_to_update, lr=0.0001)
    
    start = timeit.default_timer()
    for epoch in range(1, 2):  
        train_loss,train_acc,cf_train =  train(alexnet_model, device, trainloader, optimizer, epoch)
        train_loss_per_epoch.append(train_loss)

    #train_loss,train_acc,cf_train = (test(model, device, trainloader, criterion))    
    valid_loss,valid_acc,cf_valid = (test(alexnet_model, device, validloader, criterion))

    stop = timeit.default_timer()
    print('Total time taken: {} seconds'.format(int(stop - start)) )

    avg_train_acc.append(train_acc.item())
    avg_valid_acc.append(valid_acc.item())  
    avg_train_loss.append(train_loss)
    avg_valid_loss.append(valid_loss)

    # Saving the model
    model_name = "alexnet"
    save_path = "/content/drive/MyDrive/data"+f'/model-{model_name}-fold-{fold}.pth'
    torch.save(model.state_dict(), save_path)
    for heatmap in [cf_train,cf_valid]:
      plt.figure(figsize=(15,10))
      sns.heatmap(heatmap,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
      plt.ylabel('True class')
      plt.xlabel('Predicted class')
      plt.show()

print(f'Train All Loss: {np.mean(avg_train_loss):.5f} | Train All Acc: {np.mean(avg_train_acc):.3f}| STD: {np.std(avg_train_loss)} | STD: {np.std(avg_train_acc)}')
print(f'Validation All Loss: {np.mean(avg_valid_loss):.5f} | Validation All Acc: {np.mean(avg_valid_acc):.3f}| STD: {np.std(avg_valid_loss)} | STD: {np.std(avg_valid_acc)} ')


In [None]:
print(np.argmax(avg_valid_acc))

In [None]:
test_model,_ = alexnet_create_model()
test_model.load_state_dict(torch.load("/content/drive/MyDrive/data/model-alexnet-fold-"+str(np.argmax(avg_valid_acc))+".pth"))
test_model.to(device)
testloader = torch.utils.data.DataLoader(test_dataset,batch_size=128,shuffle=True)
test_loss,test_acc,cf_test= (test(test_model, device, testloader, criterion=nn.CrossEntropyLoss()))

plt.figure(figsize=(15,10))
sns.heatmap(cf_test,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
plt.ylabel('True class')
plt.xlabel('Predicted class')
plt.show()

In [None]:
test_outputs = []
test_targets=[]
with torch.no_grad():
      for data, target in testloader:
          image,target = data.to(device), target.to(device)
          output = test_model(image)
          test_outputs.extend( output.detach().cpu().tolist())
          test_targets.extend(target.detach().cpu().tolist())
          
tsne = TSNE(2, verbose=1)
reduced_dim = tsne.fit_transform(test_outputs)
fig, ax = plt.subplots(figsize=(15,10))
scatter = ax.scatter(reduced_dim[:,0],reduced_dim[:,1],c = test_targets)
legend1 = ax.legend(*scatter.legend_elements(num=17),
                    loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

test_model.conv1.register_forward_hook(get_activation('conv1'))
data, _ = train_dataset[4]
data=data.to(device)
data.unsqueeze_(0)
output = test_model(data)

k=0
act = activation['conv1'].squeeze()
fig,ax = plt.subplots(8,8,figsize=(12, 15))

for i in range(act.size(0)//8):
        for j in range(act.size(0)//8):
           ax[i,j].imshow(act[k].detach().cpu().numpy())
           k+=1    


# Custom/ Designed Networks 

## Wide Network

In [None]:
class BasicBlock_wide(nn.Module):
    expansion = 1
    def __init__(self):
        super(BasicBlock_wide, self).__init__()
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 32,
                                            kernel_size = 7, padding = (2,2), stride = (1,1), bias = True) 
        self.relu = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size = (2,2), stride = 2)

        self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 64,
                                            kernel_size = 5, padding = (1,1), stride = (1,1), bias = True) 
        self.relu = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size = (2,2), stride = 2)

        self.conv3 = nn.Conv2d(in_channels = 64, out_channels = 128,
                                            kernel_size = 3, padding = (1,1), stride = (1,1), bias = True) 
        self.relu = nn.ReLU()
        self.maxpool3 = nn.MaxPool2d(kernel_size = (2,2), stride = 2)

        self.flatten = nn.Flatten()
        self.linear1 = nn.Linear(28800, 512)
        self.dropout = nn.Dropout(0.25)
        self.linear2 = nn.Linear(512, 17)


    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool2(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.maxpool3(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.dropout(x)
        x = self.linear2(x)

        return x

In [None]:
def wide_nw_create_model():
  wide_nw  = BasicBlock_wide()
  for param in wide_nw.parameters():
    param.requires_grad = True
  
  wide_nw = wide_nw.to(device)
  params_to_update = []
  for name,param in wide_nw.named_parameters():
    if param.requires_grad == True:
      params_to_update.append(param)
      #print("\t",name)
  return wide_nw,params_to_update

In [None]:
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
batch_size = 256

avg_train_acc= []
avg_valid_acc = []   
avg_train_loss = []
avg_valid_loss = [] 

criterion = nn.CrossEntropyLoss().cuda()

# K-fold Cross Validation model evaluation
train_subset = get_subset(40)
for fold, (train_ids, valid_ids) in enumerate(kfold.split(train_subset)):
    print(f'FOLD {fold}')

    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_ids)

    trainloader = torch.utils.data.DataLoader(
                      train_subset, 
                      batch_size=batch_size, sampler=train_subsampler)
    
    validloader = torch.utils.data.DataLoader(
                      train_subset,
                      batch_size=batch_size, sampler=valid_subsampler)
    
    train_loss_per_epoch = []

    wide_model,params_to_update = wide_nw_create_model()
    wide_model = wide_model.to(device)
    optimizer =  optim.Adam(params_to_update, lr=0.0001)
    
    start = timeit.default_timer()
    for epoch in range(1, 2):  
        train_loss,train_acc,cf_train =  train(wide_model, device, trainloader, optimizer, epoch)
        train_loss_per_epoch.append(train_loss)

    #train_loss,train_acc,cf_train = (test(model, device, trainloader, criterion))    
    valid_loss,valid_acc,cf_valid = (test(wide_model, device, validloader, criterion))

    stop = timeit.default_timer()
    print('Total time taken: {} seconds'.format(int(stop - start)) )

    avg_train_acc.append(train_acc.item())
    avg_valid_acc.append(valid_acc.item())  
    avg_train_loss.append(train_loss)
    avg_valid_loss.append(valid_loss)

    # Saving the model
    model_name = "wide_nw"
    save_path = "/content/drive/MyDrive/data"+f'/model-{model_name}-fold-{fold}-40.pth'
    torch.save(wide_model.state_dict(), save_path)
    for heatmap in [cf_train,cf_valid]:
      plt.figure(figsize=(15,10))
      sns.heatmap(heatmap,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
      plt.ylabel('True class')
      plt.xlabel('Predicted class')
      plt.show()

print(f'Train All Loss: {np.mean(avg_train_loss):.5f} | Train All Acc: {np.mean(avg_train_acc):.3f}| STD: {np.std(avg_train_loss)} | STD: {np.std(avg_train_acc)}')
print(f'Validation All Loss: {np.mean(avg_valid_loss):.5f} | Validation All Acc: {np.mean(avg_valid_acc):.3f}| STD: {np.std(avg_valid_loss)} | STD: {np.std(avg_valid_acc)} ')


In [None]:
test_model,_ = wide_nw_create_model()
test_model.load_state_dict(torch.load("/content/drive/MyDrive/data/model-wide_nw-fold-"+str(np.argmax(avg_valid_acc))+"-40.pth"))
test_model.to(device)
testloader = torch.utils.data.DataLoader(test_dataset,batch_size=128,shuffle=True)
test_loss,test_acc,cf_test= (test(test_model, device, testloader, criterion=nn.CrossEntropyLoss()))

plt.figure(figsize=(15,10))
sns.heatmap(cf_test,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
plt.ylabel('True class')
plt.xlabel('Predicted class')
plt.show()

In [None]:
test_outputs = []
test_targets=[]
with torch.no_grad():
      for data, target in testloader:
          image,target = data.to(device), target.to(device)
          output = test_model(image)
          test_outputs.extend( output.detach().cpu().tolist())
          test_targets.extend(target.detach().cpu().tolist())
          
tsne = TSNE(2, verbose=1)
reduced_dim = tsne.fit_transform(test_outputs)
fig, ax = plt.subplots(figsize=(15,10))
scatter = ax.scatter(reduced_dim[:,0],reduced_dim[:,1],c = test_targets)
legend1 = ax.legend(*scatter.legend_elements(num=17),
                    loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

test_model.conv1.register_forward_hook(get_activation('conv1'))
data, _ = train_dataset[4]
data=data.to(device)
data.unsqueeze_(0)
output = test_model(data)

k=0
act = activation['conv1'].squeeze()
fig,ax = plt.subplots(6,6,figsize=(12, 15))

# for i in range(act.size(0)//6):
#         for j in range(act.size(0)//6):
#            ax[i,j].imshow(act[k].detach().cpu().numpy())
#            k+=1    

for act, ax in zip(act, ax.ravel()):
    ax.imshow(act.detach().cpu().numpy())

plt.show()

In [None]:
act.size()

## Skinny Network

In [None]:
class BasicBlock_skinny(nn.Module):
    expansion = 1
    def __init__(self):
        super(BasicBlock_skinny, self).__init__()
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 36,
                                            kernel_size = 5, padding = (2,2), stride = (1,1), bias = True) 
        self.relu = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size = (2,2), stride = 2)

        self.conv2 = nn.Conv2d(in_channels = 36, out_channels = 36,
                                            kernel_size = 5, padding = (1,1), stride = (1,1), bias = True) 
        self.relu = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size = (2,2), stride = 2)

        self.conv3 = nn.Conv2d(in_channels = 36, out_channels = 36,
                                            kernel_size = 3, padding = (1,1), stride = (1,1), bias = True) 
        self.relu = nn.ReLU()
        self.maxpool3 = nn.MaxPool2d(kernel_size = (2,2), stride = 2)
        self.flatten = nn.Flatten()
        self.linear1 = nn.Linear(8100, 128)
        self.dropout = nn.Dropout(0.25)
        self.linear2 = nn.Linear(128, 17) 

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool2(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.maxpool3(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.dropout(x)
        x = self.linear2(x)

        return x

In [None]:
def skinny_nw_create_model():
  skinny_nw  = BasicBlock_skinny()
  for param in skinny_nw.parameters():
    param.requires_grad = True
  
  skinny_nw = skinny_nw.to(device)
  params_to_update = []
  for name,param in skinny_nw.named_parameters():
    if param.requires_grad == True:
      params_to_update.append(param)
      #print("\t",name)
  return skinny_nw,params_to_update

In [None]:
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
batch_size = 256

avg_train_acc= []
avg_valid_acc = []   
avg_train_loss = []
avg_valid_loss = [] 

criterion = nn.CrossEntropyLoss().cuda()

# K-fold Cross Validation model evaluation
train_subset = get_subset(40)
for fold, (train_ids, valid_ids) in enumerate(kfold.split(train_subset)):
    print(f'FOLD {fold}')

    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_ids)

    trainloader = torch.utils.data.DataLoader(
                      train_subset, 
                      batch_size=batch_size, sampler=train_subsampler)
    
    validloader = torch.utils.data.DataLoader(
                      train_subset,
                      batch_size=batch_size, sampler=valid_subsampler)
    
    train_loss_per_epoch = []

    skinny_model,params_to_update = skinny_nw_create_model()
    skinny_model = skinny_model.to(device)
    optimizer =  optim.Adam(params_to_update, lr=0.0001)
    
    start = timeit.default_timer()
    for epoch in range(1, 2):  
        train_loss,train_acc,cf_train =  train(skinny_model, device, trainloader, optimizer, epoch)
        train_loss_per_epoch.append(train_loss)

    #train_loss,train_acc,cf_train = (test(model, device, trainloader, criterion))    
    valid_loss,valid_acc,cf_valid = (test(skinny_model, device, validloader, criterion))

    stop = timeit.default_timer()
    print('Total time taken: {} seconds'.format(int(stop - start)) )

    avg_train_acc.append(train_acc.item())
    avg_valid_acc.append(valid_acc.item())  
    avg_train_loss.append(train_loss)
    avg_valid_loss.append(valid_loss)

    # Saving the model
    model_name = "skinny_nw"
    save_path = "/content/drive/MyDrive/data"+f'/model-{model_name}-fold-{fold}-40.pth'
    torch.save(skinny_model.state_dict(), save_path)
    for heatmap in [cf_train,cf_valid]:
      plt.figure(figsize=(15,10))
      sns.heatmap(heatmap,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
      plt.ylabel('True class')
      plt.xlabel('Predicted class')
      plt.show()

print(f'Train All Loss: {np.mean(avg_train_loss):.5f} | Train All Acc: {np.mean(avg_train_acc):.3f}| STD: {np.std(avg_train_loss)} | STD: {np.std(avg_train_acc)}')
print(f'Validation All Loss: {np.mean(avg_valid_loss):.5f} | Validation All Acc: {np.mean(avg_valid_acc):.3f}| STD: {np.std(avg_valid_loss)} | STD: {np.std(avg_valid_acc)} ')


In [None]:
test_model,_ = skinny_nw_create_model()
test_model.load_state_dict(torch.load("/content/drive/MyDrive/data/model-skinny_nw-fold-"+str(np.argmax(avg_valid_acc))+"-40.pth"))
test_model.to(device)
testloader = torch.utils.data.DataLoader(test_dataset,batch_size=128,shuffle=True)
test_loss,test_acc,cf_test= (test(test_model, device, testloader, criterion=nn.CrossEntropyLoss()))

plt.figure(figsize=(15,10))
sns.heatmap(cf_test,annot=True,fmt="g",xticklabels=Symmetry_Groups,yticklabels=Symmetry_Groups)
plt.ylabel('True class')
plt.xlabel('Predicted class')
plt.show()

In [None]:
test_outputs = []
test_targets=[]
with torch.no_grad():
      for data, target in testloader:
          image,target = data.to(device), target.to(device)
          output = test_model(image)
          test_outputs.extend( output.detach().cpu().tolist())
          test_targets.extend(target.detach().cpu().tolist())
          
tsne = TSNE(2, verbose=1)
reduced_dim = tsne.fit_transform(test_outputs)
fig, ax = plt.subplots(figsize=(15,10))
scatter = ax.scatter(reduced_dim[:,0],reduced_dim[:,1],c = test_targets)
legend1 = ax.legend(*scatter.legend_elements(num=17),
                    loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

test_model.conv1.register_forward_hook(get_activation('conv1'))
data, _ = train_dataset[4]
data=data.to(device)
data.unsqueeze_(0)
output = test_model(data)

k=0
act = activation['conv1'].squeeze()
fig,ax = plt.subplots(6,6,figsize=(12, 15))

# for i in range(act.size(0)//6):
#         for j in range(act.size(0)//6):
#            ax[i,j].imshow(act[k].detach().cpu().numpy())
#            k+=1    

for act, ax in zip(act, ax.ravel()):
    ax.imshow(act.detach().cpu().numpy())

plt.show()