In [None]:
import os

import pandas as pd
import numpy as np

root = "/work/dataset"
path_col = "path"
label_cols = ["vel", "ang"]

pairs = []
for clip_path in os.listdir(root):
    if not os.path.isdir(os.path.join(root, clip_path)):
        continue
    elif clip_path[0] == ".":
        continue
        
    df = pd.read_csv(
        os.path.join(root, clip_path, "label.csv"),
        usecols=[path_col, *label_cols],
    )

    df[path_col] = df[path_col].apply(lambda p: os.path.join(clip_path, p))

    pairs.append(df)


df = pd.concat(pairs, axis=0)

In [None]:
def filter_df(df):
    return df[df.loc[:, "vel"] > 0]
    
filtered_df = filter_df(df)

In [None]:
from sklearn.model_selection import train_test_split

def train_dev_test_split(df):
    train_set, test_set = train_test_split(df, test_size=0.02)
    test_set, dev_set = train_test_split(test_set, test_size=0.5)
    
    return train_set, dev_set, test_set

def map_splits(df):
    splits = train_dev_test_split(df)
    
    for i, split in enumerate(splits):
        split.insert(len(split.columns), "split", [i] * len(split))

    splits = pd.concat(splits, axis=0)
    
    return split

train, dev, test = train_dev_test_split(filtered_df)
split = map_splits(df)
print(split)

In [None]:
import os

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

root = "/work/dataset"
path_col = "path"
label_cols = ["vel", "ang"]

pairs = []
for clip_path in os.listdir(root):
    if not os.path.isdir(os.path.join(root, clip_path)):
        continue
    elif clip_path[0] == ".":
        continue
        
    df = pd.read_csv(
        os.path.join(root, clip_path, "label.csv"),
        usecols=[path_col, *label_cols],
    )

    df[path_col] = df[path_col].apply(lambda p: os.path.join(clip_path, p))

    pairs.append(df)


df = pd.concat(pairs, axis=0)

df = filter_df(df)

splits = map_splits(df)

splits.to_csv(os.path.join(root, "label.csv"))

In [8]:
import os

import pandas as pd

import torch
import numpy as np

from torchvision.io import read_image
from PIL import Image

from torch.utils.data import Dataset
import torchvision.transforms as tsfms
from torch.utils.data import DataLoader
from torch.cuda.amp import GradScaler, autocast
import torch.nn as nn


batch = 64
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
root = "/work/dataset"
img_size = (64, 48)

class CustomDataset(Dataset):  
    def __init__(self, root, split, transform=None, target_transform=None):
        self.path_col = "path"
        self.label_cols = ["vel", "ang"]
        
        self.root = root
        self.transform = transform
        self.target_transform = target_transform
        
        if split == "train":
            split = 0
        elif split == "dev":
            split = 1
        elif split == "test":
            split = 2
        
        df = pd.read_csv(os.path.join(root, "label.csv"))
        
        df.where(df.loc[:, "split"] == split)
        df.dropna()
        
        self.df = df.drop(["split"], axis=1)

        
    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.root, self.df.loc[idx, self.path_col]))

        vel_ang = self.df.loc[idx, self.label_cols].values.astype(np.float32)

        if self.transform is not None:
            image = self.transform(image)
        if self.target_transform is not None:
            vel_ang = self.target_transform(vel_ang)

        return image, vel_ang

    def __len__(self):
        return len(self.df)

resize = tsfms.Compose([
    tsfms.ToTensor(),
    tsfms.Resize(img_size),
])


train_set = CustomDataset(root, transform=resize, split="train")
dataloader = DataLoader(train_set, batch_size=batch, num_workers=6)

mean = 0
mean_squared = 0
n = 0
for x, _ in dataloader:
    x = x.to(device)
    n += x.shape[1]
    with torch.no_grad():
        mean += x.mean(dim=(0, 2, 3))
        mean_squared += (x ** 2).mean(dim=(0, 2, 3))

with torch.no_grad():
    mean = mean / n
    mean_squared = mean_squared / n
    std = torch.sqrt(mean_squared - mean ** 2)

mean = mean.cpu()
std = std.cpu()

print(f"mean: {mean}, std: {std}")

tsfm_list = [
    tsfms.ToTensor(),
    tsfms.Resize(img_size),
    tsfms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=1),
    tsfms.Normalize(mean=mean, std=std),
]

train_tsfm = tsfms.Compose(tsfm_list)

# remove color jitter
tsfm_list.pop(2)

test_tsfm = tsfms.Compose(tsfm_list)

train_set = CustomDataset(root, transform=train_tsfm, split="train")
dev_set = CustomDataset(root, transform=test_tsfm, split="dev")
test_set = CustomDataset(root, transform=test_tsfm, split="test")

train_dataloader = DataLoader(train_set, batch_size=batch, num_workers=6, shuffle=True, pin_memory=True)
dev_dataloader = DataLoader(dev_set, batch_size=batch, num_workers=6, pin_memory=True)
test_dataloader = DataLoader(test_set, batch_size=batch, num_workers=6, pin_memory=True)

modelx = nn.Sequential(
    nn.Conv2d(3, 16, 4, 2),
    nn.LeakyReLU(),
    nn.Conv2d(16, 32, 3, 2),
    nn.LeakyReLU(),
    nn.Conv2d(32, 64, 2, 2),
    nn.LeakyReLU(),
    nn.Flatten(),
    nn.LazyLinear(100),
    nn.LeakyReLU(),
    nn.LazyLinear(2)
)

modelx = modelx.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

criterion = nn.MSELoss()

scaler = GradScaler()

epoch = 100

for e in range(epoch):    
    train_loss = 0
    i = 0
    for x, y in train_dataloader:
        x = x.to(device)
        y = y.to(device)
        
        optimizer.zero_grad()
        
        with autocast():
            y_pred = model(x)
            y_pred[:, 0] = 1.2 * nn.Sigmoid()(y_pred[:, 0])
            y_pred[:, 1] = 0.7 * nn.Tanh()(y_pred[:, 1])
        
            loss = criterion(y_pred, y)
        
        scaler.scale(loss).backward()

        scaler.step(optimizer)

        scaler.update()
        
        with torch.no_grad():
            train_loss += loss
            i += 1
        
            if i % 100:
                print(f"epoch: {e}, running_loss: {train_loss / i}")
        
        
        
    print(f"epoch: {e} train loss: {train_loss / i}")
        
loss = 0
i = 0
for x, y in dev_dataloader:
    x = x.to(device)
    y = y.to(device)
        
    with autocast():
        with torch.no_grad():
            y_pred = model(x)
            y_pred[:, 0] = 1.2 * nn.Sigmoid()(y_pred[:, 0])
            y_pred[:, 1] = 0.7 * nn.Tanh()(y_pred[:, 1])

            loss += criterion(y_pred, y).item()
            i += 1
            
print(f"dev loss: {loss / i}")