## Preparing Data

### Number of Images and Labels and its shapes

In [6]:
import os
import re 

images_dir = '/mnt/0D6BEAD6291820B7/Wilgo/Datasets/sugar_beets/rgb'
masks_dir = '/mnt/0D6BEAD6291820B7/Wilgo/Datasets/sugar_beets/labels'

mask_file_pattern = re.compile(r'frame(\d+)_GroundTruth_color\.png')

mask_id_to_path_map = {}
for mask_filename in os.listdir(masks_dir):
    match = mask_file_pattern.search(mask_filename)
    if match:
        frame_id = match.group(1)
        mask_id_to_path_map[frame_id] = os.path.join(masks_dir, mask_filename)

image_paths = []
mask_paths = []

for image_filename in os.listdir(images_dir):
   
    if image_filename.startswith('rgb_') and image_filename.endswith('.png'):
        try:
         
            img_id_str = os.path.splitext(image_filename)[0].split('_')[1]
            frame_id_to_match = str(int(img_id_str))

            if frame_id_to_match in mask_id_to_path_map:
                image_paths.append(os.path.join(images_dir, image_filename))
                mask_paths.append(mask_id_to_path_map[frame_id_to_match])

        except (IndexError, ValueError):
            print(f"⚠️ Skipping unexpected image file format: {image_filename}")

image_paths.sort()
mask_paths.sort()

print(f"✅ Total valid pairs found: {len(image_paths)}")

if image_paths:
    print(f"\nExample Image Path: {image_paths[0]}")
    print(f"Example Mask Path:  {mask_paths[0]}")

✅ Total valid pairs found: 283

Example Image Path: /mnt/0D6BEAD6291820B7/Wilgo/Datasets/sugar_beets/rgb/rgb_00023.png
Example Mask Path:  /mnt/0D6BEAD6291820B7/Wilgo/Datasets/sugar_beets/labels/bonirob_2016-05-23-10-37-10_0_frame100_GroundTruth_color.png


### Plot the images and color masks

In [None]:
import matplotlib.pyplot as plt
import cv2
import os

fig, axs = plt.subplots(2, 5, figsize=(20, 8))
shown = 0  

idx = 0
while shown < 5 and idx < len(image_paths):
    img_path = image_paths[idx]
    mask_path = mask_paths[idx]

    if os.path.exists(img_path) and os.path.exists(mask_path):
        img = cv2.imread(img_path)
        mask = cv2.imread(mask_path)

        if img is not None and mask is not None:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB)

            axs[0, shown].imshow(img)
            axs[0, shown].set_title(f"Image {idx + 1}")
            axs[0, shown].axis('off')

            axs[1, shown].imshow(mask)
            axs[1, shown].set_title(f"Mask {idx + 1}")
            axs[1, shown].axis('off')

            shown += 1

    idx += 1

plt.tight_layout()
plt.show()


### Tranform color mask in class mask

In [None]:
import cv2
import numpy as np
from tqdm import tqdm

COLOR_MAP = {
    (0, 0, 0): 0,           
    (255, 0, 0): 1,          
    (0, 50, 255): 2,         
    (255, 150, 0): 3,       
    (255, 200, 0): 3         
}

def convert_mask_to_class(mask_rgb):
    mask_class = np.zeros(mask_rgb.shape[:2], dtype=np.uint8)
    for color, class_idx in COLOR_MAP.items():
        matches = np.all(mask_rgb == color, axis=-1)
        mask_class[matches] = class_idx
    return mask_class

masks_class = []


for path in tqdm(mask_paths, desc="Converting masks"):
    mask_rgb = cv2.imread(path)
    if mask_rgb is None:
        continue
    mask_rgb = cv2.cvtColor(mask_rgb, cv2.COLOR_BGR2RGB)
    mask_class = convert_mask_to_class(mask_rgb)
    masks_class.append(mask_class)


In [None]:
import numpy as np

mask = masks_class[3]
total_pixels = mask.size
classes, counts = np.unique(mask, return_counts=True)

print("Percentage distribution of classes:")
for cls, count in zip(classes, counts):
    percent = (count / total_pixels) * 100
    print(f"Class {cls}: {percent:.2f}%")

print("Shape of the original mask (RGB):", mask_rgb.shape)
print("Shape of the converted mask (indexed):", mask_class.shape)


## Dataset and Traning

### Creating dataset for training

In [None]:
import cv2
from tqdm import tqdm

all_img = []
all_mask = []


for img_path, mask_path in tqdm(zip(image_paths, mask_paths), total=len(image_paths), desc="Extracting images and masks"):

    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    mask_rgb = cv2.imread(mask_path)
    mask_rgb = cv2.cvtColor(mask_rgb, cv2.COLOR_BGR2RGB)
    mask_class = convert_mask_to_class(mask_rgb)

    all_img.append(image)
    all_mask.append(mask_class)

print(f"\nTotal of images and masks: {len(all_img)}")
print(f"Shape each one images: {all_img[0].shape}, mask: {all_mask[0].shape}")


### Dataset (Split)

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import torchvision.models.segmentation as segmentation
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import time
import gc

class PatchDataset(Dataset):
    def __init__(self, images, masks):
        self.images = images
        self.masks = masks

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx].astype(np.float32) / 255.0
        mask = self.masks[idx].astype(np.int64)
        image = np.transpose(image, (2, 0, 1))
        return torch.tensor(image, dtype=torch.float32), torch.tensor(mask, dtype=torch.long)

dataset = PatchDataset(all_img, all_mask)


train_size = int(0.7 * len(dataset))
calibration_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - calibration_size
train_dataset, calibration_dataset, test_dataset = random_split(
    dataset, [train_size, calibration_size, test_size],
    generator=torch.Generator().manual_seed(42)
)

calibration_size_A = len(calibration_dataset) // 2
calibration_size_B = len(calibration_dataset) - calibration_size_A
calibration_A, calibration_B = random_split(
    calibration_dataset, [calibration_size_A, calibration_size_B],
    generator=torch.Generator().manual_seed(42)
)

batch_size = 2
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
calibration_loader_A = DataLoader(calibration_A, batch_size=batch_size, shuffle=False)
calibration_loader_B = DataLoader(calibration_B, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Train: {len(train_dataset)}")
print(f"Calibration A (Validação): {len(calibration_A)}")
print(f"Calibration B (Conformal): {len(calibration_B)}")
print(f"Test: {len(test_dataset)}")

### Model

#### DeepLabV3

In [None]:
# Modelo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = segmentation.deeplabv3_mobilenet_v3_large(weights='DEFAULT')
model.classifier[4] = nn.Conv2d(256, 4, kernel_size=(1, 1))
model = model.to(device)

# Loss e otimizador
class_weights = torch.tensor([0.5, 1.0, 2.0, 0.5]).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Mixed precision
scaler = torch.cuda.amp.GradScaler()

# Salvar melhor modelo
def save_best_model(model, path, iou_score, best_iou):
    if iou_score > best_iou:
        print(f"New best model! IoU: {iou_score:.4f}")
        torch.save(model.state_dict(), path)
        return iou_score
    return best_iou

# Treinamento usando mixed precision
def train_model(model, train_loader, val_loader, epochs=10):
    best_iou = 0.0
    for epoch in range(epochs):
        print(f"\n=== Epoch {epoch+1}/{epochs} ===")
        model.train()
        train_loss = 0.0
        start_time = time.time()

        for images, masks in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
            images, masks = images.to(device), masks.to(device)
            optimizer.zero_grad()

            with torch.cuda.amp.autocast():
                outputs = model(images)['out']
                loss = criterion(outputs, masks)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            train_loss += loss.item()

            # Libera memória
            torch.cuda.empty_cache()
            gc.collect()

        print(f"Training Loss: {train_loss/len(train_loader):.4f}")

        # Validação
        model.eval()
        val_loss = 0.0
        iou_scores = np.zeros(4)
        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.to(device), masks.to(device)
                with torch.cuda.amp.autocast():
                    outputs = model(images)['out']
                    loss = criterion(outputs, masks)
                val_loss += loss.item()
                preds = outputs.argmax(dim=1)
                for cls in range(4):
                    intersection = ((preds == cls) & (masks == cls)).sum().item()
                    union = ((preds == cls) | (masks == cls)).sum().item()
                    iou = intersection / union if union != 0 else 0
                    iou_scores[cls] += iou

                torch.cuda.empty_cache()
                gc.collect()

        iou_scores /= len(val_loader)
        mean_iou = np.mean(iou_scores)
        print(f"Validation Loss: {val_loss/len(val_loader):.4f}")
        for cls_idx, iou_score in enumerate(iou_scores):
            print(f"Class {cls_idx} IoU: {iou_score:.4f}")
        print(f"Mean IoU: {mean_iou:.4f}")

        best_iou = save_best_model(model, "best_model.pth", mean_iou, best_iou)

        if device.type == "cuda":
            print(f"GPU memory used: {torch.cuda.memory_allocated() / 1024 ** 2:.2f} MB")

# Rodar treinamento
train_model(model, train_loader, calibration_loader_A, epochs=10)

### Save the logits

In [None]:
import torch
from tqdm import tqdm

def save_logits_masks_images(model, loader, save_path):
    model.eval()
    all_logits = []
    all_masks = []
    all_images = []

    with torch.no_grad():
        for images, masks in tqdm(loader, desc="Saving logits, masks and images"):
            images = images.to(device)
            outputs = model(images)['out']

            all_logits.append(outputs.cpu())
            all_masks.append(masks.cpu())
            all_images.append(images.cpu())

    all_logits = torch.cat(all_logits)
    all_masks = torch.cat(all_masks)
    all_images = torch.cat(all_images)

    save_dict = {
        "logits": all_logits,
        "masks": all_masks,
        "images": all_images
    }

    torch.save(save_dict, save_path)
    print(f"Logits, masks e images have saved at: {save_path}")

model.load_state_dict(torch.load("best_model.pth", weights_only=True))
save_logits_masks_images(model, calibration_loader_B, "calibration_logits.pth")
save_logits_masks_images(model, test_loader, "test_logits.pth")
