In [None]:
import os
import sys
import optuna
import torch
from torchinfo import summary
from torchvision.transforms import v2
from torch.utils.data import DataLoader
module_dir = os.path.dirname(os.path.abspath('__file__'))
sys.path.append(os.path.dirname(module_dir))


from modules.data import PhoneDataset
from modules.models import PhoneNet
from modules.training import EarlyStopping, ModelTrainer
from modules.transforms import (
    RandomHorizontalFlip,
    RandomVerticalFlip,
    RandomTranslation
)
from modules.utilities import (
    get_data,
    visualize_augmentations,
    train_test_split
)

In [None]:
folder = os.path.join('..', 'find_phone_data')
seed = 0
test_size = 0.1
batch_size = 128
patience = 5
min_delta = 0.0
loss_fn = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.Adam
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau
epochs = 50
lr = 1e-2

In [None]:
image_paths, locations = get_data(folder)
print(image_paths[0])
print(locations[os.path.basename(image_paths[0])])

In [None]:
train_image_paths, test_image_paths, train_locations, test_locations = train_test_split(
    image_paths, locations, test_size, seed
)
print(train_image_paths[0])
print(train_locations[os.path.basename(train_image_paths[0])])

In [None]:
aware_transforms = v2.Compose([
    RandomHorizontalFlip(),
    RandomVerticalFlip(),
    RandomTranslation(),
])
train_dataset = PhoneDataset(train_image_paths, train_locations, aware_transforms)
test_dataset = PhoneDataset(test_image_paths, test_locations)
visualize_augmentations(train_dataset, train_locations, random_img=True)

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
for X, y in train_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape} {X.dtype}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

early_stopping = EarlyStopping(15)

model_trainer = ModelTrainer(
    train_dataloader,
    test_dataloader,
    loss_fn,
    optimizer,
    early_stopping,
    scheduler,
    device
)

In [None]:
def objective(trial) -> float:
    """
    Define a objective function for a hyper-parameter optimization task.

    trial: optuna.trial.Trial
        A Trial instance from optuna. Is set automatically.

    Return
    ------
    score: float
        The score on the test set of the model trained with the hyper-parameters
        given by trial.
    """
    in_channels = 3
    factor = trial.suggest_int('factor', 2, 3)
    n_res_layers = trial.suggest_int('n_res_layers', 2, 4)
    n_dense_layers = trial.suggest_int('n_dense_layers', 1, 1)
    p = trial.suggest_float('p', 0.1, 0.7)

    model = PhoneNet(in_channels, factor, n_res_layers, n_dense_layers, p)

    model_trainer(epochs, model, lr, silent=True)
    model.eval()
    score = 0
    size = len(test_dataloader.dataset)
    for X, y in test_dataloader:
        X, y = X.to(device), y.to(device)
        pred = model(X)
        score += loss_fn(pred, y).item()
    return score / size

In [None]:
sampler =  optuna.samplers.TPESampler(
    seed=seed, multivariate=True, constant_liar=True,
    warn_independent_sampling=False
)
pruner =  optuna.pruners.HyperbandPruner()
n_trials = 50

study = optuna.create_study(
    direction='minimize', sampler=sampler, pruner=pruner,
    study_name='phone-model', load_if_exists=True
)

study.optimize(
    objective, n_trials=n_trials, timeout=60*n_trials*2, n_jobs=1,
    show_progress_bar=True, gc_after_trial=True
)

In [None]:
best_params = study.best_params
best_params

In [None]:
c_in, h_in, w_in = 3, 326, 490
factor = best_params['factor']
n_res_layers = best_params['n_res_layers']
n_dense_layers = best_params['n_dense_layers']
p = best_params['p']

In [None]:
model = PhoneNet(c_in, factor, n_res_layers, n_dense_layers, p)

summary(
    model=model,
    input_size=(batch_size, c_in, h_in, w_in),
    col_names=["input_size", "output_size", "num_params"],
    col_width=20,
    row_settings=["var_names"]
)

In [None]:
model_trainer(epochs, model, lr)

In [None]:
model.eval()
train_size = len(train_dataloader.dataset)
train_correct = 0
for X, y in train_dataloader:
    X, y = X.to(device), y.to(device)
    pred = model(X)
    correct = (torch.sum((y - pred)**2, dim=-1) < 0.05)
    train_correct += correct.type(torch.float).sum().item()

test_size = len(test_dataloader.dataset)
test_correct = 0
for X, y in test_dataloader:
    X, y = X.to(device), y.to(device)
    pred = model(X)
    correct = (torch.sum((y - pred)**2, dim=-1) < 0.05)
    test_correct += correct.type(torch.float).sum().item()

print(f'Train set accuracy: {train_correct / train_size}')
print(f'Test set accuracy: {test_correct / test_size}')