## Test with pretrained models

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import os
import cpuinfo
from tqdm import tqdm
from torchinfo import summary
from PIL import Image
import gc
gc.enable()

### File paths

In [None]:
train_path = "../data/Train"    # paths for your training and testing dataset
test_path = "../data/Test"
input_parameter = ""            # paths for import and export custom model trainable parameters
output_parameter = ""

### Device of use

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = "cpu"
device_name = ""

if device == torch.device("cuda"):
    device_name = torch.cuda.get_device_name(device)
    print(f"Using GPU: {device_name}")
else:
    cpu_info = cpuinfo.get_cpu_info()
    device_name = cpu_info['brand_raw']
    print(f"Using CPU: {device_name}")

### Pretrain Model of use from torchvision

In [None]:
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights
#model = efficientnet_b0(weights=None)
model = efficientnet_b0(weights=EfficientNet_B0_Weights.DEFAULT)
print(f"Using model {type(model).__name__}")

epochs = 60
batch_size = 32 # adjust to your memory
optimizer = optim.AdamW(
    model.parameters(),
    lr=3e-4,            # learning rate
    weight_decay=1e-5,  # L2 regularization
    betas=(0.9, 0.999), # Adam beta parameters
)
loss_function = nn.CrossEntropyLoss()
scheduler = CosineAnnealingLR(  # learning rate scheduler
    optimizer,
    T_max=epochs,               # number of epochs before restart
    eta_min=1e-6,               # minimum learning rate
)

In [None]:
# Controling randomness for reproducibility
seed = 42
torch.manual_seed(seed)

### Data Loader and normalization

In [None]:
# Get dataset mean and std
print(f"CPU count: {os.cpu_count()}")
num_workers = min(4, os.cpu_count() // 2)  # Dynamically set num_workers
def compute_mean_std(dataset_path, batch_size=32, num_workers=None):
    """Compute channel-wise mean and standard deviation of an image dataset.
    
    Args:
        dataset_path: Path to dataset directory (ImageFolder format)
        batch_size: Number of images per batch
        num_workers: Number of parallel data loading workers
    
    Returns:
        Tuple of (mean, std) tensors for each channel (3 for RGB)
    """
    transform = transforms.Compose([
        transforms.Resize(224),          # Resize shorter side to 224px
        transforms.CenterCrop(224),      # Take center 224x224 crop
        transforms.ToTensor()            # Convert to [0,1] range tensor
    ])
    
    dataset = datasets.ImageFolder(root=dataset_path, transform=transform)
    loader = DataLoader(
        dataset, 
        batch_size=batch_size, 
        shuffle=False,                  # Maintain deterministic order
        num_workers=num_workers
    )

    mean = 0.0                          # Accumulator for mean values
    std = 0.0                           # Accumulator for std values 
    n_images = 0                        # Total images processed

    for images, _ in tqdm(loader, desc="Computing mean/std"):
        batch_size = images.size(0)
        # Flatten spatial dimensions (height x width)
        images = images.view(batch_size, images.size(1), -1)  # (B, C, H*W)

        # Compute mean across spatial dimensions
        mean += images.mean(2).sum(0)   # Sum per-channel means (shape [C])

        # Compute unbiased std across spatial dimensions
        std += torch.std(images, dim=2, unbiased=True).sum(0)  # [C]

        n_images += batch_size

    # Average across all images
    mean /= n_images
    std /= n_images

    return mean, std

In [None]:
#mean_train, std_train = compute_mean_std(train_path)
#print(f"Train mean: {mean_train}, Train std: {std_train}")
#mean_test, std_test = compute_mean_std(test_path)
#print(f"Test mean: {mean_test}, Test std: {std_test}")

In [None]:
# Transform for training and testing datasets
transform_train = transforms.Compose([          # on training dataset
    transforms.RandomAffine(
        degrees=15,          # Rotation range
        translate=(0.1, 0.1),# Shift range (width, height)
        scale=(0.9, 1.1),    # Zoom range
        shear=10,            # Shear angle in degrees
        fill=(128, 128, 128) # Fill color for empty areas (e.g., gray)
    ),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),

    transforms.Resize(256),
    transforms.RandomCrop(224),

    transforms.ToTensor(),

    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),    # ImageNet mean and std
    #transforms.Normalize(mean=mean_train.tolist(), std=std_train.tolist())
    ])

transform_test = transforms.Compose([   # on test dataset
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),

    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    #transforms.Normalize(mean=mean_train.tolist(), std=std_train.tolist())
    ])

# Load datasets
train_dataset = datasets.ImageFolder(root=train_path, transform=transform_train)
test_dataset = datasets.ImageFolder(root=test_path, transform=transform_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

train_data = [
    (images.to(device), labels.to(device))
    for images, labels in tqdm(train_loader, desc=f"Preloading Train Data to {device_name}", leave=False)
]
val_data = [
    (images.to(device), labels.to(device))
    for images, labels in tqdm(test_loader, desc=f"Preloading Val Data to {device_name}", leave=False)
]

print(f"Classes: {train_dataset.classes}")

In [None]:
# Edit the output layer of the model
num_classes = len(train_dataset.classes)
num_features = model.classifier[1].in_features
print(f"Classifier - Input features: {num_features}, Output classes: {num_classes}")
model.classifier = nn.Sequential(
    nn.Dropout(p=0.5),
    nn.Linear(num_features, 128),
    nn.BatchNorm1d(128),
    nn.ReLU(),
    nn.Dropout(p=0.3),
    nn.Linear(128, num_classes),
    #nn.Softmax(dim=1)  # Use Softmax for multi-class classification (may not be needed for CrossEntropyLoss)
)

### Configure model parameters

In [None]:
# IF NEEDED
# Load custom weight and optimizer states
# if os.path.exists(input_parameter):
#     checkpoint = torch.load("test_weights.pth", map_location=device)
#     model.load_state_dict(checkpoint['model_state_dict'])
#     optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# # Selective layer freezing
# # change base on your model
# # "Early layers are often already well-optimized" by ChatGPT-O4
# for _, param in model.named_parameters():
#     param.requires_grad = True          # Unfreeze all layers first
# for name, param in model.named_parameters():
#     if "classifier" not in name:
#         param.requires_grad = False     # Freeze everything except the last classifier layer

In [None]:
# Move model to device
model.to(device)
print(f"Model is on {next(model.parameters()).device}")

In [None]:
# Print model architecture
print(summary(model, (batch_size, 3, 224, 224)))

### Training Epochs

In [None]:
previous_loss = float('inf')
for epoch in range(epochs):
    # Training phase
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    train_bar = tqdm(train_data, desc=f"Epoch {epoch + 1}/{epochs} [Train]", leave=False)
    for images, labels in train_bar:
        # Forward pass
        outputs = model(images)
        loss = loss_function(outputs, labels)

        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate statistics
        running_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

        # Update progress bar
        train_bar.set_postfix({
            'loss': f"{running_loss / total_train:.4f}",
            'acc': f"{100. * correct_train / total_train:.2f}%"
        })

    # Validation phase
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0

    val_bar = tqdm(val_data, desc=f"Epoch {epoch + 1}/{epochs} [Val]", leave=False)
    with torch.no_grad():
        for images, labels in val_bar:
            outputs = model(images)
            loss = loss_function(outputs, labels)

            val_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

            # Update progress bar
            val_bar.set_postfix({
                'loss': f"{val_loss / total_val:.4f}",
                'acc': f"{100. * correct_val / total_val:.2f}%"
            })

    # Update learning rate
    scheduler.step()

    # Print epoch summary
    print(f"Epoch {epoch + 1}/{epochs} - "
          f"Train Loss: {running_loss / total_train:.4f}, Train Acc: {100. * correct_train / total_train:.2f}% | "
          f"Val Loss: {val_loss / total_val:.4f}, Val Acc: {100. * correct_val / total_val:.2f}%")
    
    # Augment the dataset if validation loss is not improving
    if val_loss > previous_loss:
        print(f"Validation loss did not improve from {previous_loss / total_val:.4f}. Augmenting dataset...")
        del train_data
        torch.cuda.empty_cache()
        train_dataset = datasets.ImageFolder(root=train_path, transform=transform_train)
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
        train_data = [
            (images.to(device), labels.to(device))
            for images, labels in tqdm(train_loader, desc=f"Preloading Train Data to {device_name}", leave=False)
        ]
    previous_loss = min(val_loss, previous_loss)
    

### Training log and data export

In [None]:
# Code here
# iteration 3 template 2