RNNs are used over point sequence to predict whether the user wanted to have the pencil up or pencil down

In [1]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import wandb
import cv2
from os import listdir
from contextlib import ExitStack
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

In [2]:
print(torch.__version__)

1.9.0+cu102


# Dataset

In [3]:
class DrawingsDS(torch.utils.data.Dataset):
    def __init__(self, folder="../../data/processed_labeled/"):
        self.folder = folder
        self.ds = []
        self.n = 6
        for f in listdir(self.folder):
            self.ds.append(self.load_file(f))
        
        self.compute_mean()
        self.compute_std()
        
        for f in self.ds:
            f['input'] = (f['input'] - self.mean)/self.std
        
    def compute_mean(self):
        self.mean = np.zeros(self.n)
        tot = 0
        self.y_mean = 0
        for f in self.ds:
            x = f['input']
            self.mean += np.sum(x,axis=0)
            self.y_mean += np.sum(f['output'])
            tot += x.shape[0]
        self.mean /= tot
        self.y_mean /= tot
        
        
    def compute_std(self):
        variance = np.zeros(self.n)
        tot = 0
        for f in self.ds:
            x = f['input'] - self.mean
            x = np.square(x)
            variance += np.sum(x,axis=0)
            tot += x.shape[0]
        variance /= tot
        self.std = np.sqrt(variance)
        
    def load_file(self,f):
        df = pd.read_csv(self.folder+f,index_col=0)
        raw_input = df[['x','y']].to_numpy().astype(np.int)
        inputs = df[['vx','vy','v','ax','ay','a']].to_numpy().astype(np.double).copy()
        output = df[['label']].to_numpy().astype(np.double)
        distance = df[['dist']].to_numpy().astype(np.double)
        return {
            'raw_input' : raw_input,
            'input' : inputs,
            'output' : output,
            'name': f,
            'dist': distance
        }
    
    def __len__(self):
        return len(self.ds)
    
    def __getitem__(self, idx):
        return self.ds[idx]

In [4]:
dataset = DrawingsDS()
print(dataset.mean)
print(dataset.y_mean)
print(dataset.std)
print(dataset[0]['input'].shape)
print(dataset[0]['output'].shape)
train_set, test_set = torch.utils.data.random_split(dataset,(50,10))

[ 4.02705672e-01 -3.64267352e-01  1.45071593e+01 -1.34526336e-02
  1.64587564e-01  9.20813371e+00]
0.5606161200371796
[16.00648243 14.28259791 15.81256695 14.97948971 12.13628675 16.9384802 ]
(197, 6)
(197, 1)


In [5]:
def visualize(sample,pred):
    sample_output = np.squeeze(pred)
    pts = sample['raw_input'][sample_output == True]

    img = np.zeros((720,1280), dtype=np.uint8)
    img[pts.T[1],pts.T[0]]=255
    img = cv2.flip(img, 1)

    cv2.imshow('frame', img)
    key = cv2.waitKey(0)
    cv2.destroyAllWindows()

# Model

In [6]:
class AttnBlock(torch.nn.Module):
    def __init__(self,in_features,num_heads,out_features): 
        super(AttnBlock, self).__init__()
        self.attn = torch.nn.MultiheadAttention(
            embed_dim = in_features,
            num_heads = num_heads,
            batch_first = True
        )
        self.fwd = torch.nn.Sequential(
            torch.nn.Linear(in_features=in_features,out_features=out_features),
            torch.nn.ReLU()
        )
        self.relu = torch.nn.ReLU()
    def forward(self,x):
        return self.relu(self.fwd(self.attn(x,x,x)[0]))

In [7]:
class RecurrentModel(torch.nn.Module):
    def __init__(
        self,
        input_size=6,
        output_size=1,
        hidden_size_in=64,
        hidden_size_out=32,
        num_layers=2,
        model=torch.nn.GRU,
        dropout=0.2,
        bidirectional=True
    ):
        super(RecurrentModel, self).__init__()
        self.rnn = model(
            input_size = hidden_size_in,
            hidden_size = hidden_size_out,
            num_layers = num_layers,
            batch_first = True,
            dropout = dropout,
            bidirectional = bidirectional
        )
        self.before = torch.nn.Sequential(
            torch.nn.Linear(in_features=input_size,out_features=hidden_size_in),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=hidden_size_in,out_features=hidden_size_in)
        )
        self.after = torch.nn.Sequential(
            torch.nn.Linear(in_features=2*hidden_size_out,out_features=hidden_size_out),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=hidden_size_out,out_features=output_size)
        )
        
    def forward(self,x):
        output = self.before(x)
        output, _ = self.rnn(output)
        output = self.after(output)
        return output

In [8]:
class ConvRecurrentModel(torch.nn.Module):
    def __init__(
        self,
        input_size=6,
        output_size=1,
        hidden_size_in=64,
        hidden_size_out=32,
        num_layers=2,
        model=torch.nn.GRU,
        dropout=0.2,
        bidirectional=True
    ):
        super(ConvRecurrentModel, self).__init__()
        self.rnn = model(
            input_size = hidden_size_in,
            hidden_size = hidden_size_out,
            num_layers = num_layers,
            batch_first = True,
            dropout = dropout,
            bidirectional = bidirectional
        )
        self.before = torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=input_size,out_channels=hidden_size_in//2,kernel_size=3,padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_in//2,out_channels=hidden_size_in//2,kernel_size=3,padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_in//2,out_channels=hidden_size_in,kernel_size=3,padding=1)
        )
        self.after = torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=2*hidden_size_out,out_channels=hidden_size_out,kernel_size=3,padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_out,out_channels=hidden_size_out,kernel_size=3,padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_out,out_channels=hidden_size_out//2,kernel_size=3,padding=1)
        )
        self.fc = torch.nn.Linear(in_features=hidden_size_out//2,out_features=output_size)
        
    def forward(self,x):
        x = torch.transpose(x,1,2)
        output = self.before(x)
        
        output = torch.transpose(output,2,1)
        output, _ = self.rnn(output)
        
        output = torch.transpose(output,2,1)
        output = self.after(output)
        
        output = torch.transpose(output,2,1)
        output = self.fc(output)
        return output

In [9]:
model = ConvRecurrentModel(dataset.n).double()
sample = torch.tensor(train_set[0]['input']).unsqueeze(0)
print(sample.size())
print(model(sample).size())

torch.Size([1, 227, 6])
torch.Size([1, 227, 1])


In [10]:
model = RecurrentModel(dataset.n).double()
sample = torch.tensor(train_set[0]['input']).unsqueeze(0)
print(sample.size())
print(model(sample).size())

torch.Size([1, 227, 6])
torch.Size([1, 227, 1])


# Training

In [11]:
def compute_metrics(pred,y):
    pred_np = pred.squeeze().detach().numpy()
    y_np = y.squeeze().detach().numpy()
    accuracy = accuracy_score(y_np,pred_np)
    precision = precision_score(y_np,pred_np,zero_division=0)
    recall = recall_score(y_np,pred_np,zero_division=0)
    f1 = f1_score(y_np,pred_np,zero_division=0)
    return accuracy, precision, recall, f1

def save_model(model,model_name):
    torch.save(model.state_dict(),f"../../models/{model_name}.pt")
    

def epoch(loader,optimizer,model,loss,iteration_type='train',gradient_clipping=1.,name='current'):
    if iteration_type == 'train':
        MODEL.train()
    if iteration_type == 'test':
        MODEL.eval()

    best_acc = 0.
    with ExitStack() as stack:
        if iteration_type == 'test':
            gs = stack.enter_context(torch.no_grad())
        
        metrics = {
            'loss': 0.,
            'accuracy': 0.,
            'precision': 0.,
            'recall': 0.,
            'f1': 0.
        }
        
        n = len(loader)
        for sample in loader:
            # make predictions
            x = sample['input']
            y = sample['output'].squeeze()
            dist = sample['dist'].squeeze()
            augment = np.random.uniform(0.5,1.5) if iteration_type=='train' else 1.
            pred = MODEL(augment*x).squeeze()
            # compute losses
            l = loss(pred,y)
            # apply backprop
            if iteration_type == 'train':
                OPTIMIZER.zero_grad()
                l.backward()
                '''torch.nn.utils.clip_grad_norm_(
                    parameters = model.parameters(),
                    max_norm = gradient_clipping
                )'''
                OPTIMIZER.step()
                
            acc, prec, rec, f1 = compute_metrics(torch.sigmoid(pred)>0.5,y)
            metrics['loss'] += l.item()/n
            metrics['accuracy'] += acc/n
            metrics['precision'] += prec/n
            metrics['recall'] += rec/n
            metrics['f1'] += f1/n
        if iteration_type == 'test' and metrics['accuracy'] > best_acc:
            save_model(model,name)
    return metrics

In [12]:
config = {
    "EPOCHS" : 500,
    "BATCH_SIZE" : 1,
    "LEARNING_RATE" : 3e-4,
    "NUM_WORKERS" : 2,
    "PIN_MEMORY" : True,
    "MODEL_HIDDEN_SIZE_IN" : 64,
    "MODEL_HIDDEN_SIZE_OUT" : 32,
    "MODEL_NUM_LAYERS" : 1,
    "WEIGHT_DECAY" : 0.,
    "SCHEDULER_GAMMA" : 1.,
    "SEED" : 179428,
    "DROPOUT" : 0.2,
    "GRADIENT_CLIPPING" : None,
    "BIDIRECTIONAL" : True,
    "NAME" : 'current'
}
log = True
if log:
    run = wandb.init(project="r-drawing",config=config)

wandb: Currently logged in as: lmagne (use `wandb login --relogin` to force relogin)


In [None]:
# %%wandb

torch.manual_seed(config["SEED"])
np.random.seed(config["SEED"])

dataset = DrawingsDS()
train_set, test_set = torch.utils.data.random_split(dataset,(50,10))

MODEL = ConvRecurrentModel(
    input_size = dataset.n,
    hidden_size_in = config["MODEL_HIDDEN_SIZE_IN"],
    hidden_size_out = config["MODEL_HIDDEN_SIZE_OUT"],
    num_layers = config["MODEL_NUM_LAYERS"],
    dropout = config["DROPOUT"],
    model=torch.nn.GRU
).double()
# MODEL.load_state_dict(torch.load("../../models/84_3.pt"))

LOSS = torch.nn.BCEWithLogitsLoss(pos_weight = torch.tensor([1./dataset.y_mean]))

OPTIMIZER = torch.optim.Adam(
    MODEL.parameters(),
    lr = config["LEARNING_RATE"],
    weight_decay = config["WEIGHT_DECAY"]
)

"""
OPTIMIZER = torch.optim.SGD(
    MODEL.parameters(),
    lr = config["LEARNING_RATE"],
    momentum = 0.9,
    nesterov = True
)
"""

SCHEDULER = torch.optim.lr_scheduler.StepLR(
    OPTIMIZER,
    step_size = 250,
    gamma = config["SCHEDULER_GAMMA"]
)


train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size = config["BATCH_SIZE"],
    num_workers = config["NUM_WORKERS"],
    pin_memory = config["PIN_MEMORY"],
    shuffle = True
)

test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size = config["BATCH_SIZE"],
    num_workers = config["NUM_WORKERS"],
    pin_memory = config["PIN_MEMORY"],
    shuffle = True
)

train_loss = []
test_loss = []
train_acc = []
test_acc = []

if log:
    wandb.watch(MODEL)

for k in tqdm(range(config["EPOCHS"])):
    train_metrics = epoch(train_loader,OPTIMIZER,MODEL,LOSS,'train',config["GRADIENT_CLIPPING"])
    test_metrics = epoch(test_loader,OPTIMIZER,MODEL,LOSS,'test',name=config["NAME"])
    if log:
        wandb.log({
            "loss_train" : train_metrics["loss"],
            "loss_test" : test_metrics["loss"],
            "accuracy_train" : train_metrics["accuracy"],
            "accuracy_test" : test_metrics["accuracy"],
            "precision_train" : train_metrics["precision"],
            "precision_test" : test_metrics["precision"],
            "recall_train" : train_metrics["recall"],
            "recall_test" : test_metrics["recall"],
            "f1_train" : train_metrics["f1"],
            "f1_test" : test_metrics["f1"]
        })
    SCHEDULER.step()

if log:
    run.finish()

  return torch._C._cuda_getDeviceCount() > 0
 26%|██▌       | 129/500 [12:20<31:56,  5.17s/it]

In [None]:
for k in range(10):
    sample = test_set[k]
    print(sample['name'])
    x = torch.tensor(sample['input']).unsqueeze(0)
    pred = (torch.sigmoid(MODEL(x)) > 0.5).detach().numpy()
    visualize(sample,pred)
    visualize(sample,sample['output'])

In [None]:
# torch.save(MODEL.state_dict(), "../../models/84_3.pt")