# Training a Simple CNN

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from data.image_dataset import ImageDataset
from models.cnn import CNN
from models.nvgaze import NVGaze
from training.train import *
from training.eval import *
from data.convert_labels import *
import matplotlib.pyplot as plt
import time
from datetime import datetime

# for auto-reloading external modules
# see http://stackoverflow.com/questions/1907993/autoreload-of-modules-in-ipython
%load_ext autoreload
%autoreload 2

In [None]:
data_subset = '01_05_160_120_no_crop'
mode = 'reg'

In [None]:
train_dir_path = f'data/real/{data_subset}/train'
train_annotations_file_path = f'{train_dir_path}/{data_subset}_train.csv'
val_dir_path = f'data/real/{data_subset}/val'
val_annotations_file_path = f'{val_dir_path}/{data_subset}_val.csv'
test_dir_path = f'data/real/{data_subset}/test'
test_annotations_file_path = f'{test_dir_path}/{data_subset}_test.csv'

target_transform = None if mode == 'reg' else (lambda target: convert_labels(target))

train_dataset = ImageDataset(
  annotations_file=train_annotations_file_path,
  img_dir=train_dir_path,
  transform=None,
  target_transform=target_transform
)
val_dataset = ImageDataset(
  annotations_file=val_annotations_file_path,
  img_dir=val_dir_path,
  transform=None,
  target_transform=target_transform
)
test_dataset = ImageDataset(
  annotations_file=test_annotations_file_path,
  img_dir=test_dir_path,
  transform=None,
  target_transform=target_transform
)

In [None]:
# Create Dataloader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True)

train_features, train_labels = next(iter(train_loader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Label batch shape: {train_labels.size()}")

In [None]:
# model = CNN(input_dims=(160, 120))
out_num_features = 2 if mode == 'reg' else 16
model = NVGaze(input_dims=(160, 120), out_num_features=out_num_features)
criterion = nn.MSELoss() if mode == 'reg' else nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=1e-4, momentum=0.99, weight_decay=0.1, nesterov=True)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Set-up GPU device (if available) and move model
device = torch.device('mps' if torch.mps.is_available() else 'cpu')
model = model.to(device)
print(f'Using device: {device}')

results = train(model, device, mode, criterion, optimizer, train_loader, val_loader, epochs=200)

In [None]:
loss_history, train_perf_history, val_perf_history = results
plt.subplot(2, 1, 1)
plt.plot(loss_history, 'o')
plt.xlabel('Iteration')
plt.ylabel('Loss')

plt.subplot(2, 1, 2)
plt.plot(train_perf_history, '-o')
plt.plot(val_perf_history, '-o')
plt.legend(['Train', 'Val'], loc='upper right')
plt.xlabel('Epoch')
plt.ylabel('Error' if mode == 'reg' else 'Accuracy')

plt.subplots_adjust(hspace=0.5)
plt.show()

In [None]:
if mode == 'reg':
  train_error = evaluate_error(model, device, train_loader)
  val_error = evaluate_error(model, device, val_loader)
  test_error = evaluate_error(model, device, test_loader)
  print(f'Train / Val / Test error: {train_error.item():.6f} / {val_error.item():.6f} / {test_error.item():.6f}', end=' ')
  print(f'({np.rad2deg(train_error.item()):.2f} / {np.rad2deg(val_error.item()):.2f} / {np.rad2deg(test_error.item()):.2f})')
else:
  test_acc = evaluate_acc(model, device, test_loader)
  print(f'Test acc: {test_acc.item()}')

In [None]:
# Save current model
s = datetime.fromtimestamp(time.time()).strftime('%m-%d_%H:%M:%S')
PATH = f'models/saved_models/{s}.pt'
torch.save(model.state_dict(), PATH)