# TO-DO
1. Replace top-k accuracy by some in built function

In [None]:
!nvidia-smi

# Imports

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from enum import Enum

import numpy as np
import os
import matplotlib.pyplot as plt
import random
import scipy.io as io

import pytorch_lightning as pl

seed = 0

random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)

# Hyper-params

In [None]:
data_dir = os.path.join(os.environ['HOME'], 'webots_code/data')
lpath = os.path.join(data_dir, 'lidar_samples')
cpath = os.path.join(data_dir, 'samples')
label_path = os.path.join(data_dir,'sample_label')

len_data = 5710
BS = np.array([
    [38.89328,-77.07611,5],
    [38.89380,-77.07590,5],
    [38.89393,-77.07644,5]
])
num_BS = int(BS.shape[0])

BATCH_SIZE = 16 
n_worker = 8

save_dir = os.path.join(os.environ['HOME'],'webots_code/model_state_dict')
os.makedirs(save_dir,exist_ok=True)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

# Utilities

In [None]:
def top_k_acc(y_true:torch.Tensor,y_pred:torch.Tensor,k=1):
    
    y_pred_tpk = torch.topk(y_pred,k,dim=1)[1]
    
    ovr = 0
    pos = 0

    for i in range(0,len(y_pred_tpk)):
        if(y_true[i] in y_pred_tpk[i]):
            pos+=1
        ovr+=1
    
    acc = pos/ovr
    return acc

# Creating Dataset and Dataloader

## Custom Dataset
Using 'GPS' currently.Translation can also be used instead of gps

In [None]:
class bs_dataset(Dataset):
    def __init__(self,lpath:str,cpath:str,
                 label_path:str,len_data:int,BS:np.ndarray):
        
        self.lpath = lpath
        self.cpath = cpath
        self.label_path = label_path
        self.len_data = len_data
        self.BS = BS #[num_BS,3]
        
    def __getitem__(self,idx):
        sample = dict()
        
        sample['lidar'] = dict(np.load(self.lpath+f'/{idx}.npz'))['lidar'] #[10,240,240]
        sample['lidar'] = sample['lidar'].astype('float32')
        sample['gps'] = io.loadmat(self.cpath+f'/{idx}.mat')['gps']
        sample['gps'] = sample['gps'].astype('float32').reshape((3,)) #[3,]
        sample['BS'] = self.BS.reshape((3*num_BS,)).astype('float32') #[num_BS,3]
        sample['label'] = io.loadmat(self.label_path+f'/{idx}.mat')['ss']
        
        #Return the index of maximum element 
        sample['label'] = np.argmax(sample['label'].astype('float32')) 
        
        return sample
    
    def __len__(self):
        return self.len_data

In [None]:
train_dataset = bs_dataset(lpath,cpath,label_path,
                         len_data,BS)

## Dataloader

In [None]:
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    pin_memory=True,
    num_workers = n_worker,
    drop_last = True,
    shuffle = True
    )

# Model
Based on Imperial Model

## Model Class

In [None]:
class bs_model(nn.Module):
    def __init__(self):
        super().__init__()
        self.channels = 5
        self.fchannel = 3
        self.conv1 = nn.Conv2d(10,self.channels, 13, 1, 1)
        self.bn1 = nn.BatchNorm2d(self.channels)
        self.relu1 = nn.PReLU(num_parameters=self.channels)
        self.conv2 = nn.Conv2d(self.channels, self.channels, 13, 1, 1)
        self.bn2 = nn.BatchNorm2d(self.channels)
        self.relu2 = nn.PReLU(num_parameters=self.channels)
        self.conv3 = nn.Conv2d(self.channels, self.channels, 7, 2, 1)
        self.bn3 = nn.BatchNorm2d(self.channels)
        self.relu3 = nn.PReLU(num_parameters=self.channels)
        self.conv4 = nn.Conv2d(self.channels, self.channels, 7, 1, 1)
        self.bn4 = nn.BatchNorm2d(self.channels)
        self.relu4 = nn.PReLU(num_parameters=self.channels)
        self.conv5 = nn.Conv2d(self.channels, self.fchannel, 5, 2, 1)
        self.bn5 = nn.BatchNorm2d(self.fchannel)
        self.relu5 = nn.PReLU(num_parameters=self.fchannel)
        self.conv6 = nn.Conv2d(self.fchannel, self.fchannel, 5, (1, 2), 1)
        self.bn6 = nn.BatchNorm2d(self.fchannel)
        self.relu6 = nn.PReLU(num_parameters=self.fchannel)
        
        self.flatten = nn.Flatten()
        self.linear7 = nn.Linear(3675,64)
        self.relu7 = nn.ReLU()
        self.linear8 = nn.Linear(76, 3)
#         self.linear9 = nn.Linear(64,3)
#         self.linear10 = nn.Linear(16,3)


    def forward(self, x, gps, bs):

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        #
        x = self.conv4(x)
        x = self.bn4(x)
        x = self.relu4(x)

        x = self.conv5(x)
        x = self.bn5(x)
        x = self.relu5(x)

        x = self.conv6(x)
        x = self.bn6(x)
        x = self.relu6(x)

        x = self.flatten(x)
        x = self.linear7(x)
        x = self.relu7(x)
        
        out = torch.cat((x,gps,bs),dim=1)
        out = self.linear8(out)
#         out = self.relu7(out)
#         out = self.linear9(out)
#         out = self.relu7(out)
#         out = self.linear10(out)
        
        return out


In [None]:
# # For testing model shape and size
# model = bs_model()lidar = torch.Tensor(2,10,240,240)
# gps = torch.Tensor(2,3)
# BS = torch.Tensor(2,9)
# model(lidar,gps,BS)

# Training
Using pytorch lightning

In [None]:
class BS_trainer(pl.LightningModule):
    def __init__(self,learning_rate = 1e-3):
        super().__init__()
        self.model = bs_model()
        self.celoss = nn.CrossEntropyLoss()
        self.lr = learning_rate
    
    def forward(self,lidar,gps,BS):
        out = self.model(lidar,gps,BS)
        return out
    
    def training_step(self,batch,batch_idx):
        lidar = batch['lidar'].float()
        gps = batch['gps'].float()
        BS = batch['BS'].float()
        label = batch['label'].long()
        
        yhat = self(lidar,gps,BS)
        
        loss = self.celoss(yhat,label)
        
        self.log('my_loss',loss)
        
        return {'loss':loss,'pred':yhat.cpu().detach(),'label':label.cpu().detach()}
    
    def configure_optimizers(self):
        opt = torch.optim.Adam(self.parameters(),self.lr)
        return opt
    
    def training_epoch_end(self,train_out):        
        len_out = len(train_out)
        y_pred = torch.Tensor(len_out*BATCH_SIZE,num_BS)
        y_true = torch.Tensor(len_out*BATCH_SIZE)

        for i in range(0,len_out):
            y_pred[i*BATCH_SIZE:(i+1)*BATCH_SIZE,:] = train_out[i]['pred'] 
            y_true[i*BATCH_SIZE:(i+1)*BATCH_SIZE] = train_out[i]['label']

        top1 = top_k_acc(y_true,y_pred,k=1)

        print(f'Train accuracies is {top1}')
        
    ## Uncomment while using validation dataset
    
#     def validation_step(self,batch,batch_idx):
#         lidar = batch['lidar'].float()
#         gps = batch['gps'].float()
#         BS = batch['BS'].float()
#         label = batch['label'].long()
        
#         yhat = self.forward(lidar,gps,BS)
        
#         return [yhat.cpu().detach(),label.cpu().detach()]
     
#     def validation_epoch_end(self,val_out):
#         len_out = len(val_out)
#         y_pred = torch.Tensor(len_out*BATCH_SIZE,num_classes)
#         y_true = torch.Tensor(len_out*BATCH_SIZE)

#         for i in range(0,len_out):
#             y_pred[i*BATCH_SIZE:(i+1)*BATCH_SIZE,:] = val_out[i][0] 
#             y_true[i*BATCH_SIZE:(i+1)*BATCH_SIZE] = val_out[i][1] 

#         top1 = top_k_acc(y_true,y_pred,k=1)
#         print(f'Validation accuracy is {top1}')
    
    def test_step(self,batch,batch_idx):
        lidar = batch['lidar'].float()
        gps = batch['gps'].float()
        BS = batch['BS'].float()
        label = batch['label'].long()
        
        yhat = self.forward(lidar,gps,BS)
        
        return [yhat.cpu().detach(),label.cpu().detach()]
     
    def test_epoch_end(self,val_out):
        len_out = len(val_out)
        y_pred = torch.Tensor(len_out*BATCH_SIZE,num_classes)
        y_true = torch.Tensor(len_out*BATCH_SIZE)

        for i in range(0,len_out):
            y_pred[i*BATCH_SIZE:(i+1)*BATCH_SIZE,:] = val_out[i][0] 
            y_true[i*BATCH_SIZE:(i+1)*BATCH_SIZE] = val_out[i][1] 

        top1 = top_k_acc(y_true,y_pred,k=1)
        print(f'Test accuracy is {top1}')

In [None]:
model = BS_trainer()

In [None]:
pl_trainer = pl.Trainer(reload_dataloaders_every_epoch = False,
                     gpus=1,
                     max_epochs = 50,
                     auto_lr_find = False
                     )

In [None]:
pl_trainer.fit(model,train_loader)