In [None]:
import os
import re
from copy import deepcopy
from typing import List, Tuple, Union

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from PIL import Image

In [None]:
# GLOBALS

# define your path below
# DATASET_PATH = ... # Ania
# DATASET_PATH = ... # Witek
DATASET_PATH = 'dataset/'  # Jacek

# hyperparameters
BATCH_SIZE = 64
EPOCHS = 100
LEARNING_RATE = 1e-3
NUM_WORKERS = 1
TRAIN_SET_SIZE = 0.8

In [None]:
torch.manual_seed(42)
np.random.seed(42)

In [None]:
class StorageDataset(torch.utils.data.Dataset):
    def __init__(self, path: str) -> None:
        self.path = path
        self.dirs = [d for d in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, d))]
        self.items = self._get_file_target_map()

    def _get_file_target_map(self) -> List[Tuple[str, Tuple[float, float]]]:
        result = []
        for dir in self.dirs:
            labels = pd.read_csv(f'{self.path}/{dir}.csv', header=None, index_col=0, names=['speed', 'turn'])
            for file_name in os.listdir(os.path.join(self.path, dir)):
                file_path = os.path.join(self.path, dir, file_name)
                photo_id = int(re.search(r'\d+', file_name).group())
                if photo_id in labels.index:
                    target = labels.loc[photo_id]
                    result.append((file_path, (target['speed'], target['turn'], photo_id)))
        return result

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        path, target = self.items[idx]
        img = torch.tensor(np.asarray(Image.open(path)), dtype=torch.float32)
        target = torch.tensor(target[:2], dtype=torch.float32)
        return img, target

In [None]:
class MemoryDataset(torch.utils.data.Dataset):
    def __init__(self, path: str) -> None:
        self.path = path
        self.dirs = [d for d in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, d))]
        self.items = self._get_file_target_map()

    def _get_file_target_map(self) -> List[Tuple[str, Tuple[float, float]]]:
        result = []
        for dir in self.dirs:
            labels = pd.read_csv(f'{self.path}/{dir}.csv', header=None, index_col=0, names=['speed', 'turn'])
            for file_name in os.listdir(os.path.join(self.path, dir)):
                file_path = os.path.join(self.path, dir, file_name)
                photo_id = int(re.search(r'\d+', file_name).group())
                if photo_id in labels.index:
                    target = labels.loc[photo_id]
                    img = Image.open(file_path)
                    result.append((img, (target['speed'], target['turn'], photo_id)))
        return result

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        img, target = self.items[idx]
        img = torch.tensor(np.asarray(img), dtype=torch.float32)
        target = torch.tensor(target[:2], dtype=torch.float32)
        return img, target

In [None]:
def train_test_split(dataset: Union[StorageDataset, MemoryDataset], train_size: float = 0.8, shuffle: bool = True):
    train_keys, test_keys = set(), set()
    train_items, test_items = list(), list()
    for item in dataset.items:
        key = item[1][2]
        if key in train_keys:
            train_items.append(item)
        elif key in test_keys:
            test_items.append(item)
        else:
            if np.random.random() < train_size:
                train_keys.add(key)
                train_items.append(item)
            else:
                test_keys.add(key)
                test_items.append(item)
    train, test = deepcopy(dataset), deepcopy(dataset)
    train.items, test.items = train_items, test_items
    return train, test

In [None]:
dataset = StorageDataset(DATASET_PATH)
# dataset = MemoryDataset(DATASET_PATH)
train_dataset, test_dataset = train_test_split(dataset, train_size=TRAIN_SET_SIZE, shuffle=True)
train = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
test = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

In [None]:
class Model(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.resnet18 = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
        self.resnet18.fc = nn.Linear(in_features=512, out_features=2, bias=True)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.resnet18(x)
        x = torch.clamp(x, min=-1.0, max=1.0)
        return x

In [None]:
model = Model()
model.cuda()
model = torch.jit.script(model)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.MSELoss()

In [None]:
train_len = len(train_dataset)

dummy_input = torch.randn(1, 3, 224, 224, device='cuda')
os.makedirs('models', exist_ok=True)

for epoch in range(EPOCHS):
    print(f'epoch {epoch}')
    best_loss = np.inf
    train_loss_history, test_loss_history = [], []

    model.train()
    for i, (x_train, y_train) in enumerate(train):
        if not i % 5:
            print(f'{i}/{int(train_len / BATCH_SIZE)}', end='\r')
        x_train = torch.movedim(x_train, -1, -3)
        x_train = x_train.to(torch.device('cuda'))
        y_train = y_train.to(torch.device('cuda'))
        optimizer.zero_grad()
        y_hat = model(x_train)
        loss = criterion(y_hat, y_train)
        loss.backward()
        optimizer.step()
        train_loss_history.append(float(loss.detach().cpu()))
    print(f'train loss: {sum(train_loss_history) / len(train_loss_history)}')

    model.eval()
    with torch.no_grad():
        for x_test, y_test in test:
            x_test = torch.movedim(x_test, -1, -3)
            x_test = x_test.to(torch.device('cuda'))
            y_test = y_test.to(torch.device('cuda'))
            y_hat = model(x_test)
            loss = criterion(y_hat, y_test)
            test_loss_history.append(float(loss.detach().cpu()))
    test_loss = sum(test_loss_history) / len(test_loss_history)
    if test_loss < best_loss:
        best_loss = test_loss
        _ = torch.onnx.export(model, dummy_input, f'models/resnet18_{epoch}_{test_loss:.4f}.onnx', verbose=False)
        print(f'test loss: {test_loss} (best, saving model)')
    else:
        print(f'test loss: {test_loss}')