In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import random
import pickle
from sklearn.metrics import classification_report,accuracy_score,precision_score,recall_score,mean_absolute_error
import matplotlib.pyplot as plt
import gc

In [2]:
def get_x(data):
    x=torch.stack([t[0] for t in data])
    return x
def get_y(data):
    y=torch.stack([torch.tensor(t[1]) for t in data])
    return y
with open("..//data//train_data.pkl",'rb') as f:
    train_data=pickle.load(f)

x_train,y_train=get_x(train_data),get_y(train_data)
y_train_classification,y_train_localization=y_train[:,0].to(torch.long),y_train[:,1:]
del train_data
with open("..//data//val_data.pkl",'rb') as f:
    val_data=pickle.load(f)
x_val,y_val=get_x(val_data),get_y(val_data)
y_val_classification,y_val_localization=y_val[:,0].to(torch.long),y_val[:,1:]
del val_data

# print(x_val)

In [3]:
# Build a skip connection network

device='cuda'
class Block(nn.Module):
    expansion=4
    def __init__(self,in_channels,out_channels,downsample=None,stride=1):
        super(Block,self).__init__()
        self.conv1=nn.Conv2d(in_channels,out_channels,kernel_size=1,stride=1,padding=0,bias=False)
        self.bn1=nn.BatchNorm2d(out_channels)
        self.conv2=nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=stride,padding=1,bias=False)
        self.bn2=nn.BatchNorm2d(out_channels)
        self.conv3=nn.Conv2d(out_channels,out_channels*Block.expansion,kernel_size=1,stride=1,padding=0,bias=False)
        self.bn3=nn.BatchNorm2d(out_channels*Block.expansion)
        self.downsample=downsample
    def forward(self,x):
        identity=x
        out=self.conv1(x)
        out=self.bn1(out)
        out=F.relu(out)
        out=self.conv2(out)
        out=self.bn2(out)
        out=F.relu(out)
        out=self.conv3(out)
        out=self.bn3(out)
        if self.downsample is not None:
            identity=self.downsample(identity)
        out+=identity
        out=F.relu(out)

        return out

class ResNet(nn.Module):
    def __init__(self,layers,output_size):
        super(ResNet,self).__init__()
        self.in_channels=64
        self.conv1=nn.Conv2d(3,self.in_channels,kernel_size=7,stride=2,padding=3,bias=False)
        self.bn1= nn.BatchNorm2d(self.in_channels)
        self.maxpool1=nn.MaxPool2d(kernel_size=3,stride=2,padding=1)   

        #Define Block layers
        self.layer1=self._make_layer(layers[0],64,1)
        self.layer2=self._make_layer(layers[1],128,2)
        self.layer3=self._make_layer(layers[2],256,2)
        self.layer4=self._make_layer(layers[3],512,2)

        self.avgpool=nn.AdaptiveAvgPool2d((1,1))

        self.fc=nn.Linear(512*Block.expansion,output_size)

    def _make_layer(self,num_blocks,out_channels,stride=1):
        downsample=None
        if stride!=1 or self.in_channels!=out_channels*Block.expansion:
            downsample=nn.Sequential(nn.Conv2d(self.in_channels,out_channels*Block.expansion,kernel_size=1,stride=stride,padding=0,bias=False),
                                     nn.BatchNorm2d(out_channels*Block.expansion))
        layers=[]
        layers.append(Block(in_channels=self.in_channels,out_channels=out_channels,downsample=downsample,stride=stride))
        self.in_channels=out_channels*Block.expansion
        for _ in range(num_blocks-1):
            layers.append(Block(in_channels=self.in_channels,out_channels=out_channels))
        return nn.Sequential(*layers)
    def forward(self, inputs):
        inputs=inputs.to(device)
        out=self.conv1(inputs)
        
        out=self.bn1(out)
        out=self.maxpool1(out)
        out=self.layer1(out)
        out=self.layer2(out)
        out=self.layer3(out)
        out=self.layer4(out)
        out=self.avgpool(out)
        out=torch.flatten(out,1)
        out=self.fc(out)

        return out

layers=[3,4,6,3]
output_size=7
model=ResNet(layers,output_size).to(device)
best_model=ResNet(layers,output_size)
best_model.load_state_dict(model.state_dict())


<All keys matched successfully>

In [None]:
class CombinedLoss(nn.Module):
    def __init__(self, classification_weight=1.0, bbox_weight=1.0):
        super(CombinedLoss, self).__init__()
        self.classification_loss = nn.CrossEntropyLoss()
        self.bbox_loss = nn.MSELoss()
        self.classification_weight = classification_weight
        self.bbox_weight = bbox_weight

    def forward(self, class_logits, bbox_preds, labels, bbox_targets):
        classification_loss = self.classification_loss(class_logits, labels)
        bbox_loss = self.bbox_loss(bbox_preds, bbox_targets)
        return self.classification_weight * classification_loss + self.bbox_weight * bbox_loss
criterion = CombinedLoss()

In [None]:
out=model(x_train[:2])
out

In [5]:
def get_predictions(x,model):
    BATCH_SIZE=16
    classification_predictions=torch.tensor([])
    localization_predictions=torch.tensor([])
    model.eval()
    for i in range(0,len(x),BATCH_SIZE):
        with torch.no_grad():
            y_hat=model(x[i:i+BATCH_SIZE]).to('cpu')
        y_hat_classification=F.softmax(y_hat[:,:3],dim=1)
        y_hat_localization=y_hat[:,3:]
        _,pred=torch.max(y_hat_classification,1)
        classification_predictions=torch.cat((classification_predictions,pred),dim=0)
        localization_predictions=torch.cat((localization_predictions,y_hat_localization),dim=0)
        torch.cuda.empty_cache()
    model.train()
    return classification_predictions.to(torch.long),localization_predictions


In [None]:
EPOCHS=10
MINI_BATCH_SIZE=16
accumalation_step=2

optimizer=torch.optim.Adam(model.parameters(),lr=1e-2)


train_loss_progress=[]
train_loss_hist=[]
val_loss_hist=[]

training_accuracy_hist=[]
val_accuracy_hist=[]

training_mae_hist=[]
val_mae_hist=[]
best_acc=0
for epoch in range(EPOCHS):
    model.eval()
    torch.cuda.empty_cache()
    classification_predictions,localization_predictions=get_predictions(x_val,model)
    val_acc=accuracy_score(y_val_classification.numpy(), classification_predictions.numpy())
    val_mae=mean_absolute_error(y_val_localization.numpy(),localization_predictions.numpy())
    val_mae_hist.append(val_mae)
    val_accuracy_hist.append(val_acc)
    torch.cuda.empty_cache()

    classification_predictions,localization_predictions=get_predictions(x_train,model)
    training_acc=accuracy_score(y_train_classification.numpy(), classification_predictions.numpy())
    training_mae=mean_absolute_error(y_train_localization.numpy(),localization_predictions.numpy())
    training_mae_hist.append(training_mae)
    training_accuracy_hist.append(training_acc)
    print(training_acc,val_acc)
    print(training_mae,val_mae)
    if val_acc>best_acc:
        best_acc=val_acc
        best_model.load_state_dict(model.state_dict())
    model.train()
    torch.cuda.empty_cache()
    for i in range(0,len(x_train),MINI_BATCH_SIZE):
        y_hat=model(x_train[i:i+MINI_BATCH_SIZE]).to('cpu')
        loss=criterion(y_hat[:,:3], y_hat[:,3:], y_train_classification[i:i+MINI_BATCH_SIZE], y_train_localization[i:i+MINI_BATCH_SIZE])
        loss=loss/accumalation_step
        loss.backward()
        if (i+1)% accumalation_step==0:
            optimizer.step()
            optimizer.zero_grad()
            torch.cuda.empty_cache()


In [None]:
# plt.plot(train_loss_hist, label='List 1',color='blue')
# plt.plot(val_loss_hist, label='List 2',color='red')
# plt.show()
plt.plot(training_accuracy_hist[0:], label='List 1',color='blue')
plt.plot(val_accuracy_hist[0:], label='List 2',color='red')
plt.show()

classification_predictions,localization_predictions=get_predictions(x_val,best_model.to(device))
report = classification_report(y_val.numpy(), classification_predictions.numpy(), target_names=[str(i) for i in range(200)])
print(report)