In [None]:
# Enable GPU acceleration
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
# Create NN
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.models import resnet34, ResNet34_Weights
BATCH = 1

class VisOdoNet(nn.Module):
    def __init__(self):
        super().__init__()
        # Convolutional layers to learn spatial features and adjust channels
        self.conv1 = nn.Conv2d(4, 3, kernel_size=5, padding=2)

        # Pretrained Resnet
        self.r50 = resnet50(weights=ResNet50_Weights.DEFAULT)

        # LSTM network to learn temporal features
        self.rnn1 = nn.LSTM(input_size=1000,
                            hidden_size=8, num_layers=4,
                            bidirectional=False, # should be monotonic
                            batch_first=True)

        # And a few FC layers to tie it all together
        self.fc1 = nn.Linear(64, 16)
        self.fc2 = nn.Linear(16, BATCH)

    def forward(self, x):         # input [B x 59 x 4 x 256 x 256]
        x = torch.squeeze(x)      # shape [59 x 4 x 256 x 256] --> "Batch" of 59
        x = F.relu(self.conv1(x)) # shape [B x 3 x 256 x 256]

        # Use ResNet50 to reduce dimensionality
        x = self.r50(x)        # shape [B x 1000]

        _, (x, c) = self.rnn1(x)  # pull both hidden and cell states
        x = x.reshape(-1, 32) # B x 32 hidden state
        c = c.reshape(-1, 32) # B x 32 cell state
        x = torch.concat((x,c))
        x = F.relu(x.reshape(-1, 64))   # [B x 64]
        x = F.relu(self.fc1(x))   # shape [B x 16]
        x = F.relu(self.fc2(x))   # shape [B]
        return x.reshape(-1)
 

In [None]:
from torch.utils.data import Dataset, DataLoader

class VideoSet(Dataset):
    def __init__(self, tar_paths):
        self.paths = tar_paths
        
    def __len__(self):
        return len(self.paths)
    
    def __getitem__(self, idx: int):
        tar = torch.load(self.paths[idx], weights_only = True)
        videotensor = torch.stack(tar["frames"], dim = 0)
        labeltensor = tar["target"][0]
        
               # Tensor[59x4x256x256], float32
        return (videotensor, labeltensor)


In [None]:
import os
from random import sample

videos_path = "/home/tjw/Downloads/VisOdo"
all_videos = os.listdir(videos_path)

n_train = int(0.7*len(all_videos))
n_test = int(0.3*len(all_videos))
# n_test = len(all_videos) - n_train

training_videos = sample(all_videos, k = n_train)
# testing_videos = [os.path.join(videos_path, v) for v in all_videos if v not in training_videos]
testing_videos = [os.path.join(videos_path, v) for v in sample(all_videos, k=n_test) if v not in training_videos]
training_videos = [os.path.join(videos_path, v) for v in training_videos]

In [None]:
# Create data loaders
train_set = VideoSet(training_videos)
test_set = VideoSet(testing_videos)

train_loader = DataLoader(
    dataset = train_set,
    shuffle = False
)
test_loader = DataLoader(
    dataset = test_set,
    shuffle = False
)

In [None]:
import matplotlib.pyplot as plt
from random import choice

def train(epoch, model, device, optimizer, data_loader, loss_function):
    # Prepare model
    model = model.to(device)
    model = model.train()
    i = 0
    GRAD_ACCUM = 10
    predictions = []
    correct = []
    losses = []
    
    for batch_idx, (frame, y) in enumerate(data_loader):
        frame = frame.to(device)
        
        # Quickly converges to loss=0, proves that training process works
        # if choice([True, False]):
        #     frame[:] = 0
        #     y = torch.tensor(0)
        # else:
        #     frame[:] = 1
        #     y = torch.tensor(1)
        
        y = y.to(device).type(torch.float)
        correct.append(float(y))
        
        # Calculate and record output & loss
        output = model(frame)
        predictions.append(float(output.cpu()))
        loss = loss_function(output, y)/GRAD_ACCUM
        loss.backward()
        losses.append(float(loss.cpu())*GRAD_ACCUM)
        i += 1
        
        # Gradient accumulation & plotting
        if i >= GRAD_ACCUM:
            optimizer.step()
            i = 0
            optimizer.zero_grad()
        
        # Periodically report on training progress
        print(f"\rEpoch {epoch}: Training {batch_idx*BATCH}/{len(data_loader.dataset)} " + 
              f"(Loss: {loss.item():02.4})", end=" "*10)
    print(f"\rEpoch {epoch}: Trained {len(data_loader.dataset)}/{len(data_loader.dataset)} " + 
              f"(Loss: {loss.item():02.4})" + " "*10)
    
    # Sort plotted values because otherwise they would be very noisy
    pairs = sorted(zip(correct, predictions))
    correct, predictions = zip(*pairs)
    
    # Plot to gauge accuracy of that epoch
    plt.figure()
    plt.plot(predictions, color="tab:blue", label="Predicted")
    plt.plot(correct, color="tab:red", label="Truth")
    plt.ylabel("Speed (m/s)")
    plt.tick_params(axis='x', which='both', bottom=False, labelbottom=False)
    plt.title(f"Epoch {epoch}")
    plt.legend()
    
    return loss

In [None]:
def test(epoch, model, device, data_loader, loss_function):
    model = model.to(device)
    model = model.eval()
    test_loss = []
    map = []
    
    with torch.no_grad():
        for batch_idx, (frame, y) in enumerate(data_loader):
            # Load data into `device`
            frame = frame.to(device)
            y = y.to(device).type(torch.float)
            
            # Calculate prediction & loss
            output = model(frame)
            test_loss.append(loss_function(output, y).item())
            map.append(torch.mean(torch.abs((output - y) / y)) * 100)
            
            # Periodically report on testing progress
            print(f"\rEpoch {epoch}: Testing {batch_idx*BATCH}/{len(data_loader.dataset)}, estimated MAPE {torch.mean(torch.tensor(map)):02.4}%", end=" "*10)
        print(f"\rEpoch {epoch}: Testing {len(data_loader.dataset)}/{len(data_loader.dataset)}" + " "*30)
    
    # Report results
    test_loss = torch.mean(torch.tensor(test_loss))
    accuracy = torch.tensor(map)
    print(f"Test Result, epoch {epoch}: Avg loss {test_loss:04.4}, MAPE {torch.mean(accuracy):02.4}%")
    
    return accuracy

In [None]:
# Initialize models & parameters
model = VisOdoNet()
optimizer = torch.optim.AdamW(model.parameters(), amsgrad=True, weight_decay=0.02)

MAX_EPOCH = 50

def h_loss(x, y):
    return F.huber_loss(input=x, target=y, delta=0.1)

In [None]:
# Train & Test Loop!
for epoch in range(1, 1+MAX_EPOCH):
    train(epoch, model, device, optimizer, train_loader, F.huber_loss)
    test(epoch, model, device, test_loader, F.mse_loss)
