In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import random
import pickle
from sklearn.metrics import classification_report,accuracy_score,precision_score,recall_score
import matplotlib.pyplot as plt

In [21]:
def get_data(folder):
    data_files = [f for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))]
    data_file=data_files[random.randint(0,len(data_files)-1)]
    with open(f'data//{data_file}','rb') as f:
        data=pickle.load(f)
    x=[]
    y=[]
    for i in range(len(data)):
        x.append(data[i][0])
        y.append(data[i][1])
    x=torch.from_numpy(x).to(torch.float32)
    y=torch.from_numpy(y).to(torch.long)
    return x,y
x_train,y_train=get_data('data')
x_test,y_test=get_data('test_data')



torch.Size([3, 224, 224])

In [4]:
# Build a skip connection network

device='cuda'
class Block(nn.Module):
    expansion=4
    def __init__(self,in_channels,out_channels,downsample=None,stride=1):
        super(Block,self).__init__()
        self.conv1=nn.Conv2d(in_channels,out_channels,kernel_size=1,stride=1,padding=0,bias=False)
        self.bn1=nn.BatchNorm2d(out_channels)
        self.conv2=nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=stride,padding=1,bias=False)
        self.bn2=nn.BatchNorm2d(out_channels)
        self.conv3=nn.Conv2d(out_channels,out_channels*Block.expansion,kernel_size=1,stride=1,padding=0,bias=False)
        self.bn3=nn.BatchNorm2d(out_channels*Block.expansion)
        self.downsample=downsample
    def forward(self,x):
        identity=x
        out=self.conv1(x)
        out=self.bn1(out)
        out=F.relu(out)
        out=self.conv2(out)
        out=self.bn2(out)
        out=F.relu(out)
        out=self.conv3(out)
        out=self.bn3(out)
        if self.downsample is not None:
            identity=self.downsample(identity)
        out+=identity
        out=F.relu(out)

        return out

class ResNet(nn.Module):
    def __init__(self,layers,output_size):
        super(ResNet,self).__init__()
        self.in_channels=64
        self.conv1=nn.Conv2d(1,self.in_channels,kernel_size=7,stride=2,padding=3,bias=False)
        self.bn1= nn.BatchNorm2d(self.in_channels)
        self.maxpool1=nn.MaxPool2d(kernel_size=3,stride=2,padding=1)   

        #Define Block layers
        self.layer1=self._make_layer(layers[0],64,1)
        self.layer2=self._make_layer(layers[1],128,2)
        self.layer3=self._make_layer(layers[2],256,2)
        self.layer4=self._make_layer(layers[3],512,2)

        self.avgpool=nn.AdaptiveAvgPool2d((1,1))

        self.fc=nn.Linear(64*Block.expansion,output_size)

    def _make_layer(self,num_blocks,out_channels,stride=1):
        downsample=None
        if stride!=1 or self.in_channels!=out_channels*Block.expansion:
            downsample=nn.Sequential(nn.Conv2d(self.in_channels,out_channels*Block.expansion,kernel_size=1,stride=stride,padding=0,bias=False),
                                     nn.BatchNorm2d(out_channels*Block.expansion))
        layers=[]
        layers.append(Block(in_channels=self.in_channels,out_channels=out_channels,downsample=downsample,stride=stride))
        self.in_channels=out_channels*Block.expansion
        for _ in range(num_blocks-1):
            layers.append(Block(in_channels=self.in_channels,out_channels=out_channels))
        return nn.Sequential(*layers)
    def forward(self, inputs):
        inputs=inputs.to(device)
        out=self.conv1(inputs)
        
        out=self.bn1(out)
        out=self.maxpool1(out)
        out=self.layer1(out)
        out=self.layer2(out)
        out=self.layer3(out)
        out=self.layer4(out)
        out=self.avgpool(out)
        out=torch.flatten(out,1)
        out=self.fc(out)
        return out

layers=[3,4,6,3]
output_size=200
model=ResNet(layers,output_size).to(device)


In [None]:
def get_predictions(x):
    BATCH_SIZE=500
    predictions=torch.tensor([])
    for i in range(0,len(x),BATCH_SIZE):
        y_hat=model(x[i:i+BATCH_SIZE]).to('cpu')
        _,pred=torch.max(y_hat,1)
        predictions=torch.cat((predictions,pred),dim=0)
    return predictions.to(torch.long)


In [None]:
EPOCHS=10
MINI_BATCH_SIZE=32
accumalation_step=1
loss_fn=nn.CrossEntropyLoss()
optimizer=torch.optim.adam(model.parameters(),lr=1e-2)


train_loss_progress=[]
train_loss_hist=[]
val_loss_hist=[]

training_accuracy_hist=[]
val_accuracy_hist=[]
for epoch in range(EPOCHS):
    model.eval()
    with torch.no_grad():
        y_hat = model(x_test[:200]).to('cpu')
    loss_val= loss_fn(y_hat,y_test[:200])
    predictions=get_predictions(x_test)
    val_acc=accuracy_score(y_test.numpy(), predictions.numpy())
    val_accuracy_hist.append(val_acc)
    print(val_acc)

    predictions=get_predictions(x_train)
    training_accuracy_hist.append(accuracy_score(y_train.numpy(), predictions.numpy()))

    model.train()
    for i in range(0,len(x_train),MINI_BATCH_SIZE):
        y_hat=model(x_train[i:i+MINI_BATCH_SIZE]).to('cpu')
        loss=loss_fn(y_hat,y_train[i:i+MINI_BATCH_SIZE])
        loss=loss/accumalation_step
        loss.backward()
        if (i+1)% accumalation_step==0:
            optimizer.step()
            optimizer.zero_grad()
    
    x_train,y_train=get_data('data')
    x_test,y_test=get_data('test_data')

In [None]:
# plt.plot(train_loss_hist, label='List 1',color='blue')
# plt.plot(val_loss_hist, label='List 2',color='red')
# plt.show()
plt.plot(training_accuracy_hist[0:], label='List 1',color='blue')
plt.plot(val_accuracy_hist[0:], label='List 2',color='red')
plt.show()
model.eval()
predictions=get_predictions(x_test)
report = classification_report(y_test.numpy(), predictions.numpy(), target_names=[str(i) for i in range(200)])
print(report)