In [1]:
from lib.ekyn import get_ekyn_ids,load_ekyn_pt
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader,ConcatDataset
import math
from torch import nn
from lib.models import RegNet
from torch.nn.functional import relu
import matplotlib.pyplot as plt
import torch
from lib.datasets import EpochedDataset,SequencedDataset
from tqdm import tqdm
import numpy as np
import random

torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

CONFIG = {
    'WINDOW_SIZE':5000,
    'BATCH_SIZE':64,
    'LEARNING_RATE':3e-4,
    'DEVICE':'cuda',
    'SEQUENCE_LENGTH':5
}

train_idx,test_idx = train_test_split(get_ekyn_ids(),test_size=.25,random_state=0)

In [2]:
trainloader = DataLoader(ConcatDataset([SequencedDataset(idx=idx,condition=condition,sequence_length=CONFIG['SEQUENCE_LENGTH']) for idx in train_idx for condition in ['Vehicle','PF']]),batch_size=CONFIG['BATCH_SIZE'],shuffle=True)
devloader = DataLoader(ConcatDataset([SequencedDataset(idx=idx,condition=condition,sequence_length=CONFIG['SEQUENCE_LENGTH']) for idx in test_idx for condition in ['Vehicle','PF']]),batch_size=CONFIG['BATCH_SIZE'],shuffle=True)

In [3]:
from lib.models import Dumbledore
import json
from lib.models import RegNetY
class Dumbledore(nn.Module):
    def __init__(self,encoder_path,sequence_length,hidden_dim=8,frozen=True,embedding=False,layers=1) -> None:
        super().__init__()
        self.encoder_path = encoder_path
        self.sequence_length = sequence_length
        self.frozen = frozen
        self.embedding = embedding
        self.layers = layers
        self.encoder = self.get_encoder()
        if self.embedding:
            self.latent_dim = self.ENCODER_CONFIG['WIDTHI'][-1]
        else:
            self.latent_dim = 3
        self.lstm = nn.LSTM(self.latent_dim,hidden_dim,num_layers=self.layers,bidirectional=True,batch_first=True,dropout=.3)
        self.fc1 = nn.Linear(hidden_dim*2,3)
    def forward(self,x):
        x = self.encoder(x,return_embedding=self.embedding)
        x = x.view(-1,self.sequence_length,self.latent_dim)
        o,(h,c) = self.lstm(x)
        x = self.fc1(o[:,-1])
        return x
    def get_encoder(self):
        with open(f'{self.encoder_path}/config.json') as f:
            self.ENCODER_CONFIG = json.load(f)
        encoder = RegNetY(depth=self.ENCODER_CONFIG['DEPTHI'],width=self.ENCODER_CONFIG['WIDTHI'],stem_kernel_size=self.ENCODER_CONFIG['STEM_KERNEL_SIZE'])
        encoder.load_state_dict(torch.load(f'{self.encoder_path}/best.f1.pt'))
        if self.frozen:
            print("Model is freezing encoder")
            for p in encoder.parameters():
                p.requires_grad = False
        return encoder

model = Dumbledore(encoder_path=f'projects/6',sequence_length=CONFIG['SEQUENCE_LENGTH'],hidden_dim=16,layers=3,embedding=False)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(),lr=CONFIG['LEARNING_RATE'])
model.to(CONFIG['DEVICE']);
criterion.to(CONFIG['DEVICE']);

stage 0, depth 1
1250
block 0 8 8
stage 1, depth 2
625
block 1 8 16
625
block 1 16 16
stage 2, depth 4
313
block 2 16 32
313
block 2 32 32
313
block 2 32 32
313
block 2 32 32
stage 3, depth 1
157
block 3 32 64
Model is freezing encoder


In [4]:
trainlossi = []
trainf1 = []
devlossi = []
devf1 = []
model.train()
from lib.utils import training_loop,development_loop
for i in tqdm(range(10)):
    loss,f1 = training_loop(model=model,trainloader=trainloader,criterion=criterion,optimizer=optimizer,device=CONFIG['DEVICE'])
    trainlossi.append(loss)
    trainf1.append(f1)

    loss,f1 = development_loop(model=model,devloader=devloader,criterion=criterion,device=CONFIG['DEVICE'])
    devlossi.append(loss)
    devf1.append(f1)

    fig,ax = plt.subplots(nrows=1,ncols=2,figsize=(20,4))
    ax[0].plot(trainlossi)
    ax[0].plot(devlossi)
    ax[1].plot(trainf1)
    ax[1].plot(devf1)
    plt.savefig('loss.jpg')
    plt.close()

 70%|███████   | 7/10 [04:00<01:43, 34.42s/it]


KeyboardInterrupt: 

In [None]:
from lib.utils import evaluate
from sklearn.metrics import ConfusionMatrixDisplay,classification_report

loss,report,y_true,y_pred,y_logits = evaluate(dataloader=devloader,model=model,criterion=criterion,DEVICE=CONFIG['DEVICE'])
ConfusionMatrixDisplay.from_predictions(y_true,y_pred,normalize='true')
print(classification_report(y_true,y_pred))
print(loss)