# Master Thesis - Vignoli Lorenzo

---

## 🐙 Helyx training

## ⚙️ Import Libraries

In [1]:
# Libraries
import os
import glob
import random
import warnings
import numpy as np
from PIL import Image
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
import torch.nn.functional as F
import shutil
import plotly.graph_objects as go # type: ignore

import matplotlib.pyplot as plt
import seaborn as sns
import random
import cv2 # type: ignore
import sys
from torch.amp import GradScaler, autocast

warnings.filterwarnings("ignore")
seed = 1
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = False

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


## 🐙 Plotting the data

In [2]:
# Get all dataset files matching the pattern
dataset_files = sorted(glob.glob("Dataset*.npz"))

# Initialize
xyz = []

for file in dataset_files:

    # Loading
    data = np.load(file)
    y = data['y']
    xyz.append(y[:, :3])

xyz = np.concatenate(xyz, axis=0)
print(xyz.shape)

# Extract x, y, z
x_points, y_points, z_points = xyz[:, 0], xyz[:, 1], xyz[:, 2]

# Plot 3D
fig = go.Figure(data=[go.Scatter3d(
    x=x_points, y=y_points, z=z_points,
    mode='markers',
    marker=dict(size=3, color='blue')
)])

fig.update_layout(
    scene=dict(
        xaxis_title='X',
        yaxis_title='Y',
        zaxis_title='Z'
    ),
    title='3D Interactive Plot of Labels'
)

fig.show()

(12000, 3)


## ⚓ Heavy data augmentation

In [3]:
class ColorJitter:
    def __init__(self, brightness=0.5, contrast=0.5, saturation=0.5, invert_prob=0.05, grayscale_prob=0.05):
        self.brightness = brightness
        self.contrast = contrast
        self.saturation = saturation
        self.invert_prob = invert_prob
        self.grayscale_prob = grayscale_prob

    def __call__(self, img):
        img = img.astype(np.float32)

        if self.brightness > 0:
            factor = random.uniform(1 - self.brightness, 1 + self.brightness)
            img = img * factor

        if self.contrast > 0:
            mean = img.mean(axis=(0, 1), keepdims=True)
            factor = random.uniform(1 - self.contrast, 1 + self.contrast)
            img = (img - mean) * factor + mean

        if self.saturation > 0:
            gray = img @ [0.2989, 0.5870, 0.1140]
            gray = np.expand_dims(gray, axis=2)
            factor = random.uniform(1 - self.saturation, 1 + self.saturation)
            img = img * factor + gray * (1 - factor)

        if random.random() < self.grayscale_prob:
            gray = img @ [0.2989, 0.5870, 0.1140]
            img = np.stack([gray, gray, gray], axis=2)

        if random.random() < self.invert_prob:
            img = 255 - img

        img = np.clip(img, 0, 255).astype(np.uint8)
        return img

In [4]:
class GaussianNoise:
    def __init__(self, mean=0.0, std=8.0):
        self.mean = mean
        self.std = std

    def __call__(self, img):
        noise = np.random.normal(self.mean, self.std, img.shape)
        img = img.astype(np.float32) + noise
        img = np.clip(img, 0, 255).astype(np.uint8)
        return img

In [5]:
class GaussianBlur:
    def __init__(self, p=0.5, ksize_range=(3, 7)):
        self.p = p
        self.ksize_range = ksize_range

    def __call__(self, img):
        if random.random() < self.p:
            k = random.choice(range(self.ksize_range[0], self.ksize_range[1] + 1, 2))
            img = cv2.GaussianBlur(img, (k, k), 0)
        return img

In [6]:
class Cutout:
    def __init__(self, num_holes_max=3, max_size=0.3):
        self.num_holes_max = num_holes_max
        self.max_size = max_size

    def __call__(self, img):
        h, w = img.shape[:2]
        num_holes = random.randint(0, self.num_holes_max)
        for _ in range(num_holes):
            hole_w = int(random.uniform(0.05, self.max_size) * w)
            hole_h = int(random.uniform(0.05, self.max_size) * h)
            x = random.randint(0, w - hole_w)
            y = random.randint(0, h - hole_h)
            img[y:y+hole_h, x:x+hole_w, :] = 0
        return img

In [7]:
class LabelNoise:
    def __init__(self, mean=0.0, std=0.01):
        self.mean = mean
        self.std = std

    def __call__(self, label):
        noise = np.random.normal(self.mean, self.std, label.shape)
        return label + noise

## ⏳ Load and process the Data

In [8]:
# Definition of augmentations
color_jitter = ColorJitter()
gaussian_noise = GaussianNoise()
gaussian_blur = GaussianBlur()
cutout = Cutout()
label_noise = LabelNoise()

In [None]:
# Saving original images with augmentations
if False:

    # Remove existing folders
    for split in ["train", "val", "test"]:
        shutil.rmtree(f"images/{split}", ignore_errors=True)
        shutil.rmtree(f"labels/{split}", ignore_errors=True)
        os.makedirs(f"images/{split}", exist_ok=True)
        os.makedirs(f"labels/{split}", exist_ok=True)

    dataset_files = sorted(glob.glob("Dataset*.npz"))

    # Compute total number of images
    total_originals = sum([np.load(f)["X_images"].shape[0] for f in dataset_files])
    all_indices = np.arange(total_originals)

    # Split train, validation, test (80-10-5 %)
    train_idx, temp_idx = train_test_split(all_indices, test_size=0.15, random_state=seed)
    val_idx, test_idx = train_test_split(temp_idx, test_size=1/3, random_state=seed)

    split_map = np.full(total_originals, "", dtype=object)
    split_map[train_idx] = "train"
    split_map[val_idx] = "val"
    split_map[test_idx] = "test"

    global_index = {
        "train": 0,
        "val": 0,
        "test": 0
    }

    current_idx = 0

    for file in dataset_files:

        print(f"\n Processing {file}...")
        data = np.load(file)
        X_images = data["X_images"]
        y = data["y"]

        for i in range(len(X_images)):

            split = split_map[current_idx]
            assert split in {"train", "val", "test"}

            base_image = X_images[i]
            base_label = y[i]

            image_tensor = torch.from_numpy(base_image).permute(2, 0, 1).to(torch.uint8)
            label_tensor = torch.from_numpy(base_label).to(torch.float32)

            idx_str = f"{global_index[split]:06d}"
            torch.save(image_tensor, f"images/{split}/img_{idx_str}.pt")
            torch.save(label_tensor, f"labels/{split}/label_{idx_str}.pt")
            global_index[split] = global_index[split] + 1

            # Augmentation only if training
            if split == "train":
                for _ in range(4):
                    aug_image = base_image.copy()
                    aug_label = base_label.copy()

                    # Augmentations
                    aug_image = color_jitter(aug_image)
                    aug_image = gaussian_noise(aug_image)
                    aug_image = gaussian_blur(aug_image)
                    aug_image = cutout(aug_image)
                    aug_label = label_noise(aug_label)

                    aug_tensor = torch.from_numpy(aug_image).permute(2, 0, 1).to(torch.uint8)
                    label_tensor = torch.from_numpy(aug_label).to(torch.float32)

                    idx_str = f"{global_index['train']:06d}"
                    torch.save(aug_tensor, f"images/train/img_{idx_str}.pt")
                    torch.save(label_tensor, f"labels/train/label_{idx_str}.pt")
                    global_index["train"] += 1

            current_idx += 1
            percent = (current_idx / total_originals) * 100
            sys.stdout.write(f"\rProgress: {percent:.2f}%")
            sys.stdout.flush()

        del X_images
        del y

    print("\n Finished dataset generation.")


 Processing Dataset1.npz...
Progress: 6.31%

In [None]:
class ImageRegressionDataset(Dataset):
    def __init__(self, image_paths, label_paths):
        self.image_paths = image_paths
        self.label_paths = label_paths

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        
        image = torch.load(self.image_paths[idx]).float()   # [C, H, W] in [0, 255]
        label = torch.load(self.label_paths[idx]).float()   # [6]

        return image, label

In [None]:
# Load sorted image and label paths
def get_paths(split):
    image_dir = os.path.join("images", split)
    label_dir = os.path.join("labels", split)
    image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(".pt")])
    label_paths = sorted([os.path.join(label_dir, f) for f in os.listdir(label_dir) if f.endswith(".pt")])
    assert len(image_paths) == len(label_paths)
    return image_paths, label_paths

# Get paths for each split
train_img, train_lbl = get_paths("train")
val_img, val_lbl = get_paths("val")
test_img, test_lbl = get_paths("test")

# Create datasets
train_dataset = ImageRegressionDataset(train_img, train_lbl)
val_dataset = ImageRegressionDataset(val_img, val_lbl)
test_dataset = ImageRegressionDataset(test_img, test_lbl)

# Create dataloaders (ottimizzati)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True,
                          num_workers=12, pin_memory=True, persistent_workers=True)

val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False,
                        num_workers=12, pin_memory=True, persistent_workers=True)

test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False,
                         num_workers=12, pin_memory=True, persistent_workers=True)


## ⚓ Network definition

In [None]:
# Color filters (unused to be more general, as for transfer learning and augmentation)
class RGBToHSV(nn.Module):
    def forward(self, x):
        r, g, b = x[:, 0], x[:, 1], x[:, 2]
        maxc, _ = torch.max(x, dim=1)
        minc, _ = torch.min(x, dim=1)
        v = maxc
        s = torch.where(v == 0, torch.zeros_like(v), (maxc - minc) / v)
        rc = (maxc - r) / (maxc - minc + 1e-8)
        gc = (maxc - g) / (maxc - minc + 1e-8)
        bc = (maxc - b) / (maxc - minc + 1e-8)
        h = torch.zeros_like(maxc)
        h[(maxc == r) & (g >= b)] = (gc - bc)[(maxc == r) & (g >= b)]
        h[(maxc == r) & (g < b)] = (gc - bc)[(maxc == r) & (g < b)] + 6
        h[maxc == g] = (bc - rc)[maxc == g] + 2
        h[maxc == b] = (rc - gc)[maxc == b] + 4
        h = h / 6
        h = h % 1.0
        return torch.stack([h, s, v], dim=1)

class GreenFilter(nn.Module):
    def forward(self, x):
        hsv = RGBToHSV()(x)
        h, s, v = hsv[:, 0], hsv[:, 1], hsv[:, 2]
        mask = (h > 0.25) & (h < 0.45) & (s > 0.3) & (v > 0.2)
        return mask.unsqueeze(1).float()

class BlueFilter(nn.Module):
    def forward(self, x):
        hsv = RGBToHSV()(x)
        h, s, v = hsv[:, 0], hsv[:, 1], hsv[:, 2]
        mask = (h > 0.55) & (h < 0.75) & (s > 0.3) & (v > 0.2)
        return mask.unsqueeze(1).float()

class RedFilter(nn.Module):
    def forward(self, x):
        hsv = RGBToHSV()(x)
        h, s, v = hsv[:, 0], hsv[:, 1], hsv[:, 2]
        lower_red = (h < 0.05) & (s > 0.3)
        upper_red = (h > 0.95) & (s > 0.3)
        mask = (lower_red | upper_red) & (v > 0.2)
        return mask.unsqueeze(1).float()

class BlackFilter(nn.Module):
    def forward(self, x):
        hsv = RGBToHSV()(x)
        v = hsv[:, 2]
        mask = v < 0.3
        return mask.unsqueeze(1).float()

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size // 2)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.dropout1 = nn.Dropout2d(0.2)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, padding=kernel_size // 2)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.dropout2 = nn.Dropout2d(0.2)

        self.skip_adjust = nn.Sequential()
        if in_channels != out_channels:
            self.skip_adjust = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.skip_adjust(x)
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.dropout1(out)
        out = self.bn2(self.conv2(out))
        out += identity
        out = self.relu(out)
        out = self.dropout2(out)
        return out

In [None]:
class NetCamera(nn.Module):
    def __init__(self):
        super().__init__()
        self.pool_input = nn.AvgPool2d(kernel_size=2)
        self.bn_input = nn.BatchNorm2d(3)

        self.block1 = ResidualBlock(3, 16)
        self.drop1 = nn.Dropout(0.05)

        self.block2 = ResidualBlock(16, 32)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        self.drop2 = nn.Dropout(0.1)

        self.block3 = ResidualBlock(32, 48)
        self.drop3 = nn.Dropout(0.05)

        self.block4 = ResidualBlock(48, 64)
        self.pool4 = nn.MaxPool2d(kernel_size=2)
        self.drop4 = nn.Dropout(0.1)

        self.block5 = ResidualBlock(64, 96)
        self.drop5 = nn.Dropout(0.05)       

        self.block6 = ResidualBlock(96, 128)
        self.pool6 = nn.MaxPool2d(kernel_size=2)
        self.drop6 = nn.Dropout(0.1)

        self.global_pool = nn.AdaptiveMaxPool2d(1)
        self.dense = nn.Sequential(
              nn.Linear(128, 64),
              nn.ReLU(),
              nn.Linear(64,6)
        )

    def forward(self, x):
        x = self.pool_input(x / 255.0)
        x = self.bn_input(x)

        x = self.block1(x)
        x = self.drop1(x)

        x = self.block2(x)
        x = self.pool2(x)
        x = self.drop2(x)

        x = self.block3(x)
        x = self.drop3(x)

        x = self.block4(x)
        x = self.pool4(x)
        x = self.drop4(x)

        x = self.block5(x)
        x = self.drop5(x)

        x = self.block6(x)
        x = self.pool6(x)
        x = self.drop6(x)

        x = self.global_pool(x)
        x = torch.flatten(x, 1)
        x = self.dense(x)
        return x

## 😓 Training

In [None]:
class BalancedMSELoss(nn.Module):
    def __init__(self):
        super().__init__()
        weights = torch.tensor([
            1.0, 1.0, 1.0,
            70.0 / np.pi, 70.0 / np.pi, 70.0 / np.pi
        ], dtype=torch.float32)
        self.register_buffer("weights", weights)
        self.base_loss = nn.MSELoss(reduction='none')  # loss for each elements

    def forward(self, y_pred, y_true):
        weights = self.weights.to(y_pred.device)
        loss = self.base_loss(y_pred, y_true)
        weighted_loss = loss * weights
        return weighted_loss.mean()

In [None]:
model = NetCamera().to(device)

# Check the number of parameters
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params:,}")

criterion = BalancedMSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.8,
    patience=10,
    verbose=True
)

best_val_loss = float('inf')
patience = 100
counter = 0

In [None]:
# Histories
history_train = []
history_val = []

# Scaler for efficiency
scaler = GradScaler(device='cuda')

for epoch in range(1000):
    model.train()
    train_losses = []
    total_batches = len(train_loader)

    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        with autocast(device_type='cuda'):
            preds = model(images)
            loss = criterion(preds, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        train_losses.append(loss.item())

        progress = (i + 1) / total_batches * 100
        sys.stdout.write(f"\rEpoch {epoch + 1}: {progress:.1f}%")
        sys.stdout.flush()

    # Validation
    model.eval()
    val_losses = []

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)

            with autocast(device_type='cuda'):
                preds = model(images)
                loss = criterion(preds, labels)

            val_losses.append(loss.item())

    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    scheduler.step(avg_val_loss)
    history_train.append(avg_train_loss)
    history_val.append(avg_val_loss)

    # Summary
    sys.stdout.write("\r")
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Epoch {epoch + 1} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | LR: {current_lr:.6f}")

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model, "helyx_model.pt")
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered.")
            break

del model

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(history_train, label='Train Loss')
plt.plot(history_val, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

## ⏰ Testing

In [None]:
# Load full model
model = torch.load("helyx_model.pt")
model = model.to(device)
model.eval()

all_preds = []
all_labels = []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        preds = model(images)
        all_preds.append(preds.cpu().numpy())
        all_labels.append(labels.cpu().numpy())

all_preds = np.concatenate(all_preds, axis=0)
all_labels = np.concatenate(all_labels, axis=0)

mse = np.mean((all_preds - all_labels) ** 2)
print(f"Test MSE: {mse:.4f}")

param_names = ['X', 'Y', 'Z', 'Yaw', 'Pitch', 'Roll']
fig, axes = plt.subplots(2, 3, figsize=(15, 8))

for i, ax in enumerate(axes.flatten()):
    ax.plot(all_labels[:, i], label='True', alpha=0.7)
    ax.plot(all_preds[:, i], label='Pred', alpha=0.7)
    ax.set_title(param_names[i])
    ax.grid(True)

axes[0, 0].legend()
plt.tight_layout()
plt.show()