In [1]:
import torch
import sys 
import os 



In [2]:
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..", "src")))
from data.make_dataset import get_train_loader, get_test_loader, get_val_loader
from visualization.visualize import plot_data_distribution

In [3]:
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

Using mps device


In [4]:
train_dataset_path = "/Users/wizzy/Documents/school/vision/project-1/data/train"
val_dataset_path = "/Users/wizzy/Documents/school/vision/project-1/data/val"
test_dataset_path = "/Users/wizzy/Documents/school/vision/project-1/data/test"

train_loader = get_train_loader(train_dataset_path)
val_loader = get_val_loader(val_dataset_path)
test_loader = get_test_loader(test_dataset_path)

In [5]:
dataloaders = {}
dataloaders['train'] = train_loader
dataloaders['val'] = val_loader
dataloaders['test'] = test_loader

In [6]:
class_names = train_loader.dataset.classes

# Structure

In [7]:
from torchvision.models import resnet50, ResNet50_Weights
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from collections import Counter
import torch.nn as nn
import torch
from block.block import CBAMBlock, InceptionBlock, SEBlock

In [8]:
# Load pretrained model
model = resnet50(weights=ResNet50_Weights.DEFAULT)

# Freeze early layers
for name, param in model.named_parameters():
    if "layer1" in name or "layer2" in name:
        param.requires_grad = False

# Replace layer3 with CBAM
original_layer3 = model.layer3
model.layer3 = nn.Sequential(
    original_layer3,
    CBAMBlock(1024)
)

# Add Inception block after layer4
original_layer4 = model.layer4
model.layer4 = nn.Sequential(
    original_layer4,
    InceptionBlock(2048),  # output channels of layer4
    SEBlock(256)
)

# Get output from Inception
num_ftrs = 256  # output channels from InceptionBlock

# Classifier head with dropout + GELU + LayerNorm
model.fc = nn.Sequential(
    nn.Linear(num_ftrs, 256),
    nn.LayerNorm(256),
    nn.GELU(),
    nn.Dropout(0.3),
    nn.Linear(256, len(class_names))
)

# Calculate class weights for CrossEntropyLoss
all_labels = [label for _, label in dataloaders['train'].dataset.samples]
class_counts = Counter(all_labels)
num_samples = sum(class_counts.values())
num_classes = len(class_counts)
class_weights = [num_samples / class_counts[i] for i in range(num_classes)]
class_weights = torch.FloatTensor(class_weights).to(device)
class_weights = class_weights / class_weights.sum()

# Loss with class weights + label smoothing
criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.005)

# Optimizer and scheduler
optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4, weight_decay=1e-4)
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=3, T_mult=2)


# Training

In [9]:
from models.train_model import train_model

In [10]:
type(dataloaders['train'])

torch.utils.data.dataloader.DataLoader

In [11]:
# trained_model, history = train_model(
#     model=model,
#     criterion=criterion,
#     optimizer=optimizer,
#     scheduler=scheduler,
#     dataloaders=dataloaders,
#     num_epochs=40,
#     early_stop_patience=10,
#     save_path="resnet_cbam_incept.pt"
# )

In [None]:
model.load_state_dict(torch.load("/Users/wizzy/Documents/school/vision/project-1/checkpoints/resnet_cbam_incept.pt", map_location=device))

<All keys matched successfully>

In [13]:
from models.predict_model import predict_model
predict_model(model, dataloaders['test'], save_path="c6-pred.csv")