In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.tensorboard import SummaryWriter
from torchvision import models, transforms, io
from torch.utils.data import Dataset, DataLoader

import matplotlib.pyplot as plt
from tqdm.notebook import tqdm, trange
import numpy as np
import pandas as pd
import os

In [2]:
#### modified resnets are done

class ModResNet18(models.resnet.ResNet):
    def __init__(self, out_features=10, custom_classifier=False):
        super(ModResNet18, self).__init__(models.resnet.BasicBlock, [2, 2, 2, 2], num_classes=out_features)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=(7, 7),
                               stride=(2, 2), padding=(3, 3), bias=False)
        
        if custom_classifier:
            classifier = Classifier(self.fc.in_features, out_features)
            self.fc = classifier
        
class Classifier(nn.Module):
    def __init__(self, in_features, out_features):
        super(Classifier, self).__init__()
        self.linear = nn.Linear(in_features=in_features, out_features=out_features, bias=True)
        
    def forward(self, x):
        x = self.linear(x)
        return torch.tanh(x)


In [4]:
#### TRAINING
class ANNTrainer:
    def __init__(self, name, model, train_loader, validation_loader, writer,
                 criterion, device='cpu', lr=1e-2):
        
        self.name = name
        
        self.train_loader = train_loader
        self.validation_loader = validation_loader
        self.model = model
        self.model.to(device)
        self.device = device
        
        torch.save(self.model.state_dict(),
                   '{}_initial.pt'.format(self.name))
        
        self.criterion = criterion
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        
        self.writer = writer
    
    def find_lr(self, init_value=1e-8, final_value=1):
        num = len(self.train_loader) - 1
        mult = (final_value / init_value) ** (1/num)
        lr = init_value
        self.optimizer.param_groups[0]['lr'] = lr
        
        for i, data in enumerate(self.train_loader):
            X, y = data
            if self.device == 'cuda':
                X = X.to('cuda')
                y = y.to('cuda')
            
            y_pred = self.model.forward(X)
            loss = self.criterion(y_pred, y)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
            self.writer.add_scalar('Lr loss', loss.item(), np.log10(lr)*1000)
            
            lr *= mult
            self.optimizer.param_groups[0]['lr'] = lr
            
        self.model.load_state_dict(torch.load('{}_initial.pt'.format(self.name)))
        
    def assign_lr(self, lr):
        self.optimizer.param_groups[0]['lr'] = lr
        
    def train(self, epochs, lr_scheduler=False, save_epoch=10):
        if lr_scheduler:
            scheduler = optim.lr_scheduler.CosineAnnealingLR(self.optimizer, epochs//2)
        
        for e in trange(epochs):
            self.model.train()
            ls = self.train_epoch()
            if lr_scheduler:
                scheduler.step()
            
            avg_loss = np.mean(ls)
            
            self.model.eval()
            avg_vloss = 0
            for i, data in enumerate(self.validation_loader):
                X, y = data
                if self.device == 'cuda':
                    X = X.to('cuda')
                    y = y.to('cuda')
                
                y_pred = self.model.forward(X)
                vloss = self.criterion(y_pred, y)
                
                avg_vloss += vloss.item()
                
            avg_vloss /= i + 1
            if e == 0:
                best_vloss = avg_vloss
            
            if avg_vloss < best_vloss:
                torch.save(self.model.state_dict(),
                           '{}_state_dict.pt'.format(self.name))
                
            if (e+1)%save_epoch == 0:
                torch.save(self.model.state_dict(),
                           '{}_epoch{}_state_dict.pt'.format(self.name, e+1))
                
            self.writer.add_scalars('Loss',
                                    {'train': avg_loss,
                                     'validation': avg_vloss}, e)
    
    def train_epoch(self):
        running_loss = []
        for i, data in enumerate(self.train_loader):
            X, y = data
            if self.device == 'cuda':
                X = X.to('cuda')
                y = y.to('cuda')
            
            y_pred = self.model.forward(X)
            loss = self.criterion(y_pred, y)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
            running_loss.append(loss.item())
            
        return running_loss            


In [5]:
############################# ANGLE APPROACH
class AngleDataSet(Dataset):
    def __init__(self, annotation_file, img_dir, transform=None):
        
        self.labels = pd.read_csv(annotation_file)
        self.img_dir = img_dir
        self.transform = transform
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        video, frame_id = self.labels.iloc[idx, [0, 1]]
        headx = self.labels.iloc[idx, 8]
        heady = self.labels.iloc[idx, 9]
        bodyx = self.labels.iloc[idx, 11]
        bodyy = self.labels.iloc[idx, 12]
        
        l = ((headx - bodyx)**2 + (heady - bodyy)**2)**0.5
        sina = (heady - bodyy)/l
        cosa = (headx - bodyx)/l
        
        filename = '{}_{}'.format(video, frame_id)
        img_path = os.path.join(self.img_dir, '{}.jpg'.format(filename))
        image = io.read_image(img_path)
        if self.transform != None:
            image = self.transform(image).float()
            
        return image, torch.tensor([sina, cosa]).float()

In [6]:
img_dir = 'TrainDataset/images'
annotation_train = 'TrainDataset/train.csv'
annotation_test = 'TrainDataset/test.csv'
input_transform = transforms.Resize((224, 224))

train_dataset = AngleDataSet(annotation_train, img_dir, input_transform)
test_dataset = AngleDataSet(annotation_test, img_dir, input_transform)

In [7]:
train_loader = DataLoader(train_dataset, batch_size=40, shuffle=True, drop_last=True)
validation_loader = DataLoader(test_dataset, batch_size=40, shuffle=True, drop_last=True)
model = ModResNet18(out_features=2, custom_classifier=True)
criterion = nn.L1Loss()
writer = SummaryWriter('./runs/AngApproach1/run_01')

In [9]:
trainer = ANNTrainer('MRS_Angle', model=model, train_loader=train_loader,
                     validation_loader=validation_loader, criterion=criterion,
                     writer=writer, device='cuda')

In [10]:
trainer.find_lr()

In [11]:
trainer.assign_lr(10**-3)

In [12]:
trainer.train(epochs=500)

  0%|          | 0/500 [00:00<?, ?it/s]

In [13]:
# Writing down angular difference between the predicted and actual orientation

get_angle = lambda x: np.angle(x[:, 0] + 1j*x[:, 1])
angular_differences = np.empty(0)

eval_model = ModResNet18(out_features=2, custom_classifier=True)
eval_model.load_state_dict(torch.load('MRS_Angle_state_dict.pt'))
eval_model.to('cpu')
eval_model.eval()

for x, y in validation_loader:
    y_pred = eval_model(x)
    y = get_angle(y.detach().numpy())
    y_pred = get_angle(y_pred.detach().numpy())
    angular_differences = np.append(angular_differences, y_pred - y)

with open('angular_differences.csv', 'w') as inf:
    for i in angular_differences:
        inf.write(f'{i*180/np.pi}\n')