In [1]:
from google.colab import drive
drive.mount('/content/gdrive/')

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).


In [2]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data.dataset import random_split

import torchvision.datasets
import torchvision.transforms as transforms
from torchvision.utils import save_image

import numpy as np
import matplotlib.pyplot as plt

import pandas as pd
import skimage as ski

In [3]:
if torch.cuda.is_available():
    device = "cuda"
    torch.cuda.empty_cache()
    print(torch.cuda.memory_summary(device=None, abbreviated=False))

|                  PyTorch CUDA memory summary, device ID 0                 |
|---------------------------------------------------------------------------|
|            CUDA OOMs: 0            |        cudaMalloc retries: 0         |
|        Metric         | Cur Usage  | Peak Usage | Tot Alloc  | Tot Freed  |
|---------------------------------------------------------------------------|
| Allocated memory      |       0 B  |       0 B  |       0 B  |       0 B  |
|       from large pool |       0 B  |       0 B  |       0 B  |       0 B  |
|       from small pool |       0 B  |       0 B  |       0 B  |       0 B  |
|---------------------------------------------------------------------------|
| Active memory         |       0 B  |       0 B  |       0 B  |       0 B  |
|       from large pool |       0 B  |       0 B  |       0 B  |       0 B  |
|       from small pool |       0 B  |       0 B  |       0 B  |       0 B  |
|---------------------------------------------------------------

In [4]:
x_data = np.load('/content/gdrive/MyDrive/Exponenta/WindowsShield/Data/data_x.npy')
y_data = np.load('/content/gdrive/MyDrive/Exponenta/WindowsShield/Data/data_y.npy')
x_data = x_data[1:,:,:,:]
y_data = y_data[1:]

x_data_tensor = torch.FloatTensor(x_data)
y_data_tensor = torch.Tensor(y_data)

#Norm
x_data_tensor = (x_data_tensor - torch.min(x_data_tensor))/(torch.max(x_data_tensor) - torch.min(x_data_tensor))
dataset = TensorDataset(x_data_tensor, y_data_tensor)  #combine to tuple structure

#here we form train and test
batch_size = 12
part = 0.8
train_lenght = int(x_data_tensor.shape[0]*part)
test_lenght = int(x_data_tensor.shape[0] - train_lenght)

train_set, test_set = random_split(dataset, [train_lenght,test_lenght])
train_loader = DataLoader(train_set, batch_size=batch_size, drop_last=False, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size, drop_last=False, shuffle=True)

In [5]:
dataset_number = len(dataset)
trainData_number = len(train_set)
testData_number = len(test_set)
print(f'len of DataSet\t: {dataset_number}')
print(f'len of trainData\t: {trainData_number}')
print(f'len of testData\t: {testData_number}')

len of DataSet	: 981
len of trainData	: 784
len of testData	: 197


In [6]:
class SelfAttentionBlock(nn.Module):
    """ Self attention Layer"""
    def __init__(self, in_dim):
        super(SelfAttentionBlock, self).__init__()
        
        self.chanel_in = in_dim
        #self.activation = activation
        
        self.query_conv = nn.Conv2d(in_channels = in_dim , out_channels = in_dim, kernel_size= 1)   #in_dim//8
        self.key_conv = nn.Conv2d(in_channels = in_dim , out_channels = in_dim, kernel_size= 1)
        self.value_conv = nn.Conv2d(in_channels = in_dim , out_channels = in_dim, kernel_size= 1)
        self.gamma = nn.Parameter(torch.zeros(1))

        self.softmax  = nn.Softmax(dim=-1) #
        
    def forward(self,x):
        """
            inputs :
                x : input feature maps( B X C X W X H)
            returns :
                out : self attention value + input feature 
                attention: B X N X N (N is Width*Height)
        """
        m_batchsize, C, width ,height = x.size()
        #print('atten: ', m_batchsize,C,width ,height)
        proj_query  = self.query_conv(x).view(m_batchsize,-1,width*height).permute(0,2,1) # B X CX(N)
        proj_key =  self.key_conv(x).view(m_batchsize,-1,width*height) # B X C x (*W*H)
        energy =  torch.bmm(proj_query,proj_key) # transpose check
        attention = self.softmax(energy) # BX (N) X (N) 
        proj_value = self.value_conv(x).view(m_batchsize,-1,width*height) # B X C X N

        out = torch.bmm(proj_value,attention.permute(0,2,1))
        #print('atten2:',out.shape)
        out = out.view(m_batchsize,C,width,height)
        
        out = self.gamma*out + x
        
        return out

In [7]:
class InnerBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InnerBlock, self).__init__()
        
        self.InnerOperation = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding='same'),
                                   nn.BatchNorm2d(out_channels),
                                   nn.ReLU(inplace=True),
                                   nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding='same'),
                                   nn.BatchNorm2d(out_channels))
    def forward(self, x):
        out_block = self.InnerOperation(x)
        return out_block
       
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResBlock, self).__init__()
        
        self.ResConnect = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding='same'))
    
    def forward(self, x):
        out_ResBlock = self.ResConnect(x)
        return out_ResBlock

In [8]:
class ResNetCustom(nn.Module):
    def __init__(self, in_channels=1, out_channels=8, number_class=10):
        super(ResNetCustom, self).__init__()
        
        self.number_class = number_class
        
        self.Layer_1 = InnerBlock(in_channels, out_channels)
        self.Layer_2 = InnerBlock(out_channels, out_channels*2)
        self.Layer_3 = InnerBlock(out_channels*2, out_channels*4)
        self.Layer_4 = InnerBlock(out_channels*4, out_channels*4)
        
        self.Skip_1 = ResBlock(in_channels, out_channels)
        self.Skip_2 = ResBlock(out_channels, out_channels*2)
        self.Skip_3 = ResBlock(out_channels*2, out_channels*4)
        self.Skip_4 = ResBlock(out_channels*4, out_channels*4)
        
        self.Last_Conv = nn.Sequential(nn.Conv2d(out_channels*4, out_channels*1, kernel_size=3, stride=1, padding='same'),
                                       SelfAttentionBlock(out_channels*1))
  
    def forward(self, x):
        
        out_1 = self.Layer_1(x)
        res_1 = self.Skip_1(x)
        out_1 = nn.ReLU()(out_1 + res_1)
        
        out_2 = self.Layer_2(out_1)
        res_2 = self.Skip_2(out_1)
        out_2 = nn.ReLU()(out_2 + res_2)
        
        out_3 = self.Layer_3(out_2)
        res_3 = self.Skip_3(out_2)
        out_3 = nn.ReLU()(out_3 + res_3)
        
        
        out_4 = self.Layer_4(out_3)
        res_4 = self.Skip_4(out_3)
        out_4 = nn.ReLU()(out_4 + res_4)
        out = self.Last_Conv(out_4)
        #plt.imshow(out[0][0].cpu().detach().numpy())
        #plt.show()
        out = nn.MaxPool2d(kernel_size=4, stride=2)(out_4)
        
        #Fully Connected Leyer: Output has 10 labels
        input_layer = torch.reshape(out, (out.shape[0],-1)).to(device)
        out = nn.Linear(input_layer.shape[1], self.number_class*5).to(device)(input_layer)  #bias=False
        out = nn.ReLU()(out)
        out = nn.Linear(self.number_class*5, self.number_class).to(device)(out)
        return out 

#x = ResNetCustom(1,8,10)
#print(x(torch.randn(12,1,28,28)).shape)

In [9]:
#weights initializing
def init_all(model, init_func, *params, **kwargs):
    for p in model.parameters():
        init_func(p, *params, **kwargs)

net = ResNetCustom(in_channels=3, out_channels=32*2, number_class=6).to(device)
init_all(net, torch.nn.init.normal_, mean=0.0, std=1)

In [10]:
#Parameters info
from prettytable import PrettyTable

def count_parameters(model):
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad: continue
        params = parameter.numel()
        table.add_row([name, params])
        total_params+=params
    print(table)
    print(f"Total Trainable Params: {total_params}")
    return total_params
    
count_parameters(net)

+---------------------------------+------------+
|             Modules             | Parameters |
+---------------------------------+------------+
| Layer_1.InnerOperation.0.weight |    1728    |
|  Layer_1.InnerOperation.0.bias  |     64     |
| Layer_1.InnerOperation.1.weight |     64     |
|  Layer_1.InnerOperation.1.bias  |     64     |
| Layer_1.InnerOperation.3.weight |   36864    |
|  Layer_1.InnerOperation.3.bias  |     64     |
| Layer_1.InnerOperation.4.weight |     64     |
|  Layer_1.InnerOperation.4.bias  |     64     |
| Layer_2.InnerOperation.0.weight |   73728    |
|  Layer_2.InnerOperation.0.bias  |    128     |
| Layer_2.InnerOperation.1.weight |    128     |
|  Layer_2.InnerOperation.1.bias  |    128     |
| Layer_2.InnerOperation.3.weight |   147456   |
|  Layer_2.InnerOperation.3.bias  |    128     |
| Layer_2.InnerOperation.4.weight |    128     |
|  Layer_2.InnerOperation.4.bias  |    128     |
| Layer_3.InnerOperation.0.weight |   294912   |
|  Layer_3.InnerOper

2595777

In [11]:
optimizer = torch.optim.SGD(net.parameters(), lr=0.001, weight_decay=1e-4, momentum=0)   #lr=0.001, weight_decay=1e-6, Adam
#For tuning the lr
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer = optimizer, 
                                                       mode = 'min', 
                                                       factor = 0.1, 
                                                       patience = 10,
                                                       threshold = 1e-4,
                                                       verbose = 'True')

#scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9, last_epoch=- 1, verbose=True)
Loss_function = nn.CrossEntropyLoss()

In [12]:
def accuracy_train_batch(prediction, target, trainData_number, train_loader, index_b):
    global score_positive
    #target = target.to(device)
    prediction = nn.Softmax(dim=1)(prediction)
    index_max = torch.argmax(prediction, dim=1)
    for i in range(len(target)):
      if (target[i] == index_max[i]):
          score_positive +=1
    if (index_b == len(train_loader) - 1):
      print(f'Epoch Accuracy: {round(score_positive/trainData_number, 3)}')

In [None]:
#learning network
epoche = 100
history = []
history_epoch = []
loss_sum = 0
score_positive=0

for iteration in range(epoche):
    for index_b, (feature, target) in enumerate(train_loader):
        target = target.long().to(device)
        prediction = net(feature.to(device))

        Loss = Loss_function(prediction, target.to(device))
        loss_sum += Loss.item()
        
        #net.zero_grad()
        optimizer.zero_grad()
        Loss.backward()
        optimizer.step()
        history.append(Loss.item())
        accuracy_train_batch(prediction, target, trainData_number, train_loader, index_b)
    scheduler.step(Loss)
    print('Current Learning Rate:', scheduler.optimizer.param_groups[0]['lr'])
    print(f'Epoche №: {iteration}')
    print(f'Loss per an epoch: {loss_sum/len(train_loader)}')
    history_epoch.append(loss_sum/len(train_loader))
    loss_sum=0
    score_positive=0

In [None]:
fig, ax = plt.subplots()
ax.plot(history, label=f'Loss = {history[-1]}')
ax.legend()
fig.set_figheight(6)
fig.set_figwidth(9)
plt.xlabel('Number Batch')
plt.ylabel('Loss Value')
plt.grid()
plt.show()

In [None]:
fig, ax = plt.subplots()
ax.plot(history_epoch, label = f'Average Train Loss: {history[-1]}')
ax.legend()
fig.set_figheight(6)
fig.set_figwidth(9)
plt.xlabel('Number Epoch')
plt.ylabel('Loss Value')
plt.grid()
plt.show()

In [None]:
torch.save(net, '/content/gdrive/MyDrive/Exponenta/WindowsShield/Data/ResNetAttention.pt')

In [None]:
#testing
score_positive = 0
score_negative = 0
N_batch = 0
for index_b, (feature, target) in enumerate(test_loader):
    target = target.long().to(device)
    prediction = net(feature.to(device))
    prediction = nn.Softmax(dim=1)(prediction)
    index_max = torch.argmax(prediction, dim=1)
    for i in range(len(target)):
      if (target[i] == index_max[i]):
          score_positive +=1
      else:
          score_negative +=1
print(f'Test Accuracy: {round(score_positive/testData_number, 3)}')
print(f'Test Negative: {round(score_negative/testData_number, 3)}')