In [74]:
import os
import random
import numpy as np
import torch
from PIL import Image
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch import nn

In [15]:
def parse_record(raw_record):
    out_dict = {}
    raw_data = raw_record.split(";")

    out_dict["file_path"] = raw_data[0]
    tmp_keypoints = [data.split(",") for data in raw_data[1:9]]

    out_dict["keypoints"] = []
    for keypoint in tmp_keypoints:
        keypoint = [int(elem) for elem in keypoint]
        out_dict["keypoints"].append(keypoint)

    out_dict["position"] = [float(data) for data in raw_data[9:12]]

    out_dict["rotation"] = [float(data) for data in raw_data[12:]]

    return out_dict

In [16]:
class KeypointsDataset(Dataset):
    def __init__(self, ann_file):
        self.ann_file = ann_file
        self.samples = []
        with open(self.ann_file, 'r') as anns:
            line = anns.readline()
            while line:
                sample = parse_record(line)
                self.samples.append(sample)
                line = anns.readline()
        
    def __getitem__(self, idx):
        input_keypoints = self.samples[idx]['keypoints']
        input_vec = []
        for keypoint in input_keypoints:
            input_vec += keypoint
        position = self.samples[idx]['position']
        rotation = self.samples[idx]['rotation']
        output_orientation = position + rotation
        
        input_vec = torch.as_tensor(np.array(input_vec), dtype=torch.float32)
        
        output_vec = torch.as_tensor(np.array(output_orientation), dtype=torch.float32)
        
        sample = {'input': input_vec, 'output': output_vec}
        
        return sample
        
    def __len__(self):
        return len(self.samples)

In [17]:
x_train = []
y_train = []

with open('data/keypoints_data/annotations/ann.csv', 'r') as anns:
    line = anns.readline()
    while line:
        sample = parse_record(line)
        out = sample['position'] + sample['rotation']
        inp = []
        for keypoint in sample['keypoints']:
            inp += keypoint
        x_train.append(inp)
        y_train.append(out)
        line = anns.readline()
                
x_train = torch.as_tensor(x_train[:100], dtype=torch.float32)
y_train = torch.as_tensor(y_train[:100], dtype=torch.float32)
x_val = torch.as_tensor(x_train[-100:], dtype=torch.float32)
y_val = torch.as_tensor(y_train[-100:], dtype=torch.float32)

In [18]:
#dataset = KeypointsDataset('data/keypoints_data/annotations/ann.csv')
train_dataset = TensorDataset(x_train, y_train)
val_dataset = TensorDataset(x_val, y_val)

train_data_loader = DataLoader(train_dataset, batch_size=2, num_workers=4)
val_data_loader = DataLoader(val_dataset, batch_size=2, num_workers=4)

In [19]:
class MLPNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MLPNet, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(in_features=input_dim, out_features=hidden_dim), 
            nn.ReLU(), 
            nn.Linear(in_features=hidden_dim, out_features=hidden_dim), 
            nn.ReLU(), 
            nn.Linear(in_features=hidden_dim, out_features=output_dim)
        )
        
    def forward(self, x):
        x = self.layers(x)
        return x
        

In [20]:
model = MLPNet(24,100,6)

In [21]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
loss_fn = nn.MSELoss()

In [44]:
def loss_batch(model, loss_func, xb, yb, opt=None):
    loss = loss_func(model(xb), yb)

    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()

    return loss.item(), len(xb)

def fit(epochs, model, loss_fn, opt, train_dl, valid_dl, lr_scheduler):
    for epoch in range(epochs):
        model.train()
        for xb, yb in train_data_loader:
            loss_batch(model, loss_fn, xb, yb, opt)
            pred = model(xb)
            loss = loss_fn(pred, yb)
            
            model.eval()
            with torch.no_grad():
                losses, nums = zip(
                *[loss_batch(model, loss_fn, xb, yb) for xb, yb in valid_dl]
                )
            val_loss = np.sum(np.multiply(losses, nums)) / np.sum(nums)

        print('Epoch: {}  Train loss: {:.2f}  Val loss: {:.2f}  Lr: {}'.format(
            epoch, 
            loss, 
            val_loss, 
            opt.param_groups[0]["lr"]))   
        lr_scheduler.step()

In [45]:
def get_data(train_ds, valid_ds, bs):
    return (
        DataLoader(train_ds, batch_size=bs, shuffle=True),
        DataLoader(valid_ds, batch_size=bs * 2),
    )

In [46]:
def get_model():
    model = MLPNet(24,100,6)
    return model, torch.optim.Adam(model.parameters(), lr=0.001)

In [68]:
#train_dl, valid_dl = get_data(train_dataset, val_dataset, 100)
#model, opt = get_model()
opt = torch.optim.Adam(model.parameters(), lr=0.00001)
fit(100, model, loss_fn, opt, train_dl, valid_dl, lr_scheduler)

Epoch: 0  Train loss: 21.59  Val loss: 16.69  Lr: 1e-05
Epoch: 1  Train loss: 22.11  Val loss: 16.65  Lr: 1e-05
Epoch: 2  Train loss: 21.67  Val loss: 16.53  Lr: 1e-05
Epoch: 3  Train loss: 21.39  Val loss: 16.60  Lr: 1e-05
Epoch: 4  Train loss: 22.14  Val loss: 16.54  Lr: 1e-05
Epoch: 5  Train loss: 21.64  Val loss: 16.50  Lr: 1e-05
Epoch: 6  Train loss: 22.10  Val loss: 16.50  Lr: 1e-05
Epoch: 7  Train loss: 21.33  Val loss: 16.43  Lr: 1e-05
Epoch: 8  Train loss: 21.50  Val loss: 16.50  Lr: 1e-05
Epoch: 9  Train loss: 21.26  Val loss: 16.37  Lr: 1e-05
Epoch: 10  Train loss: 21.55  Val loss: 16.33  Lr: 1e-05
Epoch: 11  Train loss: 20.89  Val loss: 16.40  Lr: 1e-05
Epoch: 12  Train loss: 21.56  Val loss: 16.33  Lr: 1e-05
Epoch: 13  Train loss: 20.66  Val loss: 16.32  Lr: 1e-05
Epoch: 14  Train loss: 21.60  Val loss: 16.30  Lr: 1e-05
Epoch: 15  Train loss: 20.93  Val loss: 16.20  Lr: 1e-05
Epoch: 16  Train loss: 20.81  Val loss: 16.20  Lr: 1e-05
Epoch: 17  Train loss: 21.24  Val loss: 1

In [81]:
idx = random.randint(0, len(val_dataset)-1)
pred = model(val_dataset[idx][0]).detach().numpy()
np.set_printoptions(formatter={'float': '{: 0.3f}'.format})
print(val_dataset[idx][1].numpy())
print(pred)

[ 0.800  0.570  19.250  57.990  346.560  55.690]
[ 0.691  0.316  18.363  59.384  343.465  54.969]


In [82]:
torch.save(model.state_dict(), 'mlp_model')