In [1]:
!pip install tqdm



In [2]:
import os
import json
from glob import glob
import xml.etree.ElementTree as ET
from PIL import Image
from sklearn.model_selection import train_test_split
from collections import defaultdict

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
import numpy as np

os.environ['CUDA_VISIBLE_DEVICES'] = '0'  # Force GPU 0 (NVIDIA)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


print("CUDA Available:", torch.cuda.is_available())
print("Device Count:", torch.cuda.device_count())
if torch.cuda.is_available():
    print("Current Device:", torch.cuda.current_device())
    print("Device Name:", torch.cuda.get_device_name(0))
    print("Device Capability:", torch.cuda.get_device_capability(0))

Using device: cuda
CUDA Available: True
Device Count: 1
Current Device: 0
Device Name: NVIDIA GeForce RTX 4060 Laptop GPU
Device Capability: (8, 9)


In [3]:
# Path configuration
BASE_DIR = os.path.abspath(".")
OPENLOGO_DIR = os.path.join(BASE_DIR, "openlogo")

JPEG_DIR = os.path.join(OPENLOGO_DIR, "JPEGImages")
ANNO_DIR = os.path.join(OPENLOGO_DIR, "Annotations")
IMAGESET_DIR = os.path.join(OPENLOGO_DIR, "ImageSets", "class_sep")

print("BASE_DIR:     ", BASE_DIR)
print("OPENLOGO_DIR: ", OPENLOGO_DIR)
print("JPEG_DIR:     ", JPEG_DIR)
print("ANNO_DIR:     ", ANNO_DIR)
print("IMAGESET_DIR: ", IMAGESET_DIR)

# Sanity check
some_imgs = glob(os.path.join(JPEG_DIR, "*.jpg"))[:5]
some_annos = glob(os.path.join(ANNO_DIR, "*.xml"))[:5]
print("\nSample images:", some_imgs[:2])
print("Sample annos: ", some_annos[:2])

BASE_DIR:      C:\Users\Srihari\ENPM703FinalProject
OPENLOGO_DIR:  C:\Users\Srihari\ENPM703FinalProject\openlogo
JPEG_DIR:      C:\Users\Srihari\ENPM703FinalProject\openlogo\JPEGImages
ANNO_DIR:      C:\Users\Srihari\ENPM703FinalProject\openlogo\Annotations
IMAGESET_DIR:  C:\Users\Srihari\ENPM703FinalProject\openlogo\ImageSets\class_sep

Sample images: ['C:\\Users\\Srihari\\ENPM703FinalProject\\openlogo\\JPEGImages\\1008198576.jpg', 'C:\\Users\\Srihari\\ENPM703FinalProject\\openlogo\\JPEGImages\\1016381746.jpg']
Sample annos:  ['C:\\Users\\Srihari\\ENPM703FinalProject\\openlogo\\Annotations\\1008198576.xml', 'C:\\Users\\Srihari\\ENPM703FinalProject\\openlogo\\Annotations\\1016381746.xml']


In [4]:
def parse_voc_xml(xml_path):
    """
    Parse one XML annotation. Return a list of objects.
    Each object is: {'class_name': str, 'bbox': [xmin, ymin, xmax, ymax]}
    """
    tree = ET.parse(xml_path)
    root = tree.getroot()

    objects = []
    for obj in root.findall("object"):
        cls_name = obj.find("name").text.strip()

        bbox_node = obj.find("bndbox")
        xmin = int(float(bbox_node.find("xmin").text))
        ymin = int(float(bbox_node.find("ymin").text))
        xmax = int(float(bbox_node.find("xmax").text))
        ymax = int(float(bbox_node.find("ymax").text))

        objects.append({
            "class_name": cls_name,
            "bbox": [xmin, ymin, xmax, ymax]
        })

    return objects

# Test
if len(some_annos) > 0:
    test_xml = some_annos[0]
    print("Testing parse_voc_xml on:", test_xml)
    print(parse_voc_xml(test_xml)[:3])

Testing parse_voc_xml on: C:\Users\Srihari\ENPM703FinalProject\openlogo\Annotations\1008198576.xml
[{'class_name': 'guinness', 'bbox': [333, 238, 595, 414]}]


In [5]:
def build_logo_samples(imageset_dir, jpeg_dir, anno_dir, 
                       train_suffix="_train.txt", test_size=0.2, random_state=42):
    """
    Build train/val samples from OpenLogo dataset.
    Returns: train_samples, val_samples, class_to_idx
    """
    all_samples = []

    list_files = glob(os.path.join(imageset_dir, f"*{train_suffix}"))
    list_files.sort()
    print("Found train list files:", len(list_files))

    for lf in list_files:
        with open(lf, "r") as f:
            ids = [line.strip() for line in f if len(line.strip()) > 0]

        for img_id in ids:
            img_path = os.path.join(jpeg_dir, img_id + ".jpg")
            xml_path = os.path.join(anno_dir, img_id + ".xml")

            if not (os.path.exists(img_path) and os.path.exists(xml_path)):
                continue

            objs = parse_voc_xml(xml_path)

            for obj in objs:
                cls_name = obj["class_name"]
                bbox = obj["bbox"]

                all_samples.append({
                    "img_path": img_path,
                    "bbox": bbox,
                    "class_name": cls_name
                })

    # Build class mapping
    classes = sorted(list({s["class_name"] for s in all_samples}))
    class_to_idx = {c: i for i, c in enumerate(classes)}
    print("Discovered classes:", len(class_to_idx))

    # Add numeric labels
    for s in all_samples:
        s["label"] = class_to_idx[s["class_name"]]

    # Stratified split
    y = [s["label"] for s in all_samples]
    train_s, val_s = train_test_split(
        all_samples,
        test_size=test_size,
        random_state=random_state,
        stratify=y
    )

    print(f"Total samples: {len(all_samples)} -> train {len(train_s)}, val {len(val_s)}")
    return train_s, val_s, class_to_idx


train_samples, val_samples, class_to_idx = build_logo_samples(
    IMAGESET_DIR, JPEG_DIR, ANNO_DIR,
    train_suffix="_train.txt", test_size=0.2, random_state=42
)

num_classes = len(class_to_idx)
print("num_classes:", num_classes)
print("\nSample:", train_samples[0])

Found train list files: 353
Discovered classes: 352
Total samples: 72652 -> train 58121, val 14531
num_classes: 352

Sample: {'img_path': 'C:\\Users\\Srihari\\ENPM703FinalProject\\openlogo\\JPEGImages\\CPA_Australia_sportslogo_22.jpg', 'bbox': [1, 330, 54, 352], 'class_name': 'cpa_australia', 'label': 104}


In [6]:
class LogoCropDataset(Dataset):
    def __init__(self, samples, transform=None):
        self.samples = samples
        self.transform = transform

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        s = self.samples[idx]
        path = s["img_path"]
        xmin, ymin, xmax, ymax = s["bbox"]
        label = s["label"]

        # Load image
        img = Image.open(path).convert("RGB")

        # Defensive clamp
        w, h = img.size
        xmin_c = max(0, min(xmin, w - 1))
        ymin_c = max(0, min(ymin, h - 1))
        xmax_c = max(0, min(xmax, w))
        ymax_c = max(0, min(ymax, h))

        if xmax_c <= xmin_c or ymax_c <= ymin_c:
            crop = img
        else:
            crop = img.crop((xmin_c, ymin_c, xmax_c, ymax_c))

        if self.transform is not None:
            crop = self.transform(crop)

        return crop, label

In [7]:
train_transform = T.Compose([
    T.Resize((224, 224)),
    T.RandomHorizontalFlip(p=0.5),
    T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transform = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_ds = LogoCropDataset(train_samples, transform=train_transform)
val_ds = LogoCropDataset(val_samples, transform=val_transform)

BATCH_SIZE = 32

train_loader = DataLoader(
    train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=(device.type == "cuda")
)

val_loader = DataLoader(
    val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=(device.type == "cuda")
)

print("Train batches:", len(train_loader), "Val batches:", len(val_loader))

Train batches: 1817 Val batches: 455


In [8]:
import torchvision.models as models

# Use pretrained ResNet18
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

# Replace final layer for 352 classes
model.fc = nn.Linear(model.fc.in_features, num_classes)

model = model.to(device)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nTotal parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
print("\nModel ready!")


Total parameters: 11,357,088
Trainable parameters: 11,357,088

Model ready!


In [9]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

print("Criterion:", criterion)
print("Optimizer:", optimizer)

Criterion: CrossEntropyLoss()
Optimizer: Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.0001
    maximize: False
    weight_decay: 0
)


In [10]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    running_correct = 0
    running_total = 0

    for images, labels in loader:
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, preds = outputs.max(1)
        running_correct += (preds == labels).sum().item()
        running_total += labels.size(0)

    epoch_loss = running_loss / running_total
    epoch_acc = running_correct / running_total
    return epoch_loss, epoch_acc


@torch.no_grad()
def eval_one_epoch(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    running_correct = 0
    running_total = 0

    for images, labels in loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        running_loss += loss.item() * images.size(0)
        _, preds = outputs.max(1)
        running_correct += (preds == labels).sum().item()
        running_total += labels.size(0)

    epoch_loss = running_loss / running_total
    epoch_acc = running_correct / running_total
    return epoch_loss, epoch_acc

In [11]:
EPOCHS = 5

best_val_acc = 0.0
best_state_dict = None

history = {
    "train_loss": [],
    "train_acc": [],
    "val_loss": [],
    "val_acc": []
}

for epoch in range(EPOCHS):
    print(f"\n===== Epoch {epoch+1}/{EPOCHS} =====")
    
    # TRAINING
    model.train()
    running_loss = 0.0
    running_correct = 0
    running_total = 0
    
    total_batches = len(train_loader)
    print_every = max(1, total_batches // 10)  # Print 10 times per epoch
    
    for batch_idx, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, preds = outputs.max(1)
        running_correct += (preds == labels).sum().item()
        running_total += labels.size(0)
        
        # Print progress
        if (batch_idx + 1) % print_every == 0 or (batch_idx + 1) == total_batches:
            batch_acc = running_correct / running_total
            progress = (batch_idx + 1) / total_batches * 100
            print(f"  [{progress:5.1f}%] Batch {batch_idx+1}/{total_batches} - loss: {loss.item():.4f}, acc: {batch_acc:.4f}")

    train_loss = running_loss / running_total
    train_acc = running_correct / running_total
    
    # VALIDATION
    model.eval()
    running_loss = 0.0
    running_correct = 0
    running_total = 0
    
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * images.size(0)
            _, preds = outputs.max(1)
            running_correct += (preds == labels).sum().item()
            running_total += labels.size(0)
    
    val_loss = running_loss / running_total
    val_acc = running_correct / running_total
    
    history["train_loss"].append(train_loss)
    history["train_acc"].append(train_acc)
    history["val_loss"].append(val_loss)
    history["val_acc"].append(val_acc)
    
    print(f"\n[Epoch {epoch+1}] train_loss={train_loss:.4f}  train_acc={train_acc:.4f}")
    print(f"[Epoch {epoch+1}] val_loss={val_loss:.4f}  val_acc={val_acc:.4f}")
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state_dict = {k: v.cpu() for k, v in model.state_dict().items()}
        print(f"  New BEST model (val_acc={best_val_acc:.4f})")

print("\n=== Training Complete ===")
print(f"Best val_acc = {best_val_acc:.4f}")


===== Epoch 1/5 =====
  [ 10.0%] Batch 181/1817 - loss: 2.7166, acc: 0.3258
  [ 19.9%] Batch 362/1817 - loss: 1.7384, acc: 0.4659
  [ 29.9%] Batch 543/1817 - loss: 0.7490, acc: 0.5436
  [ 39.8%] Batch 724/1817 - loss: 2.0234, acc: 0.5953
  [ 49.8%] Batch 905/1817 - loss: 1.1550, acc: 0.6372
  [ 59.8%] Batch 1086/1817 - loss: 0.6894, acc: 0.6687
  [ 69.7%] Batch 1267/1817 - loss: 0.6966, acc: 0.6938
  [ 79.7%] Batch 1448/1817 - loss: 0.4722, acc: 0.7151
  [ 89.7%] Batch 1629/1817 - loss: 0.3734, acc: 0.7338
  [ 99.6%] Batch 1810/1817 - loss: 0.6523, acc: 0.7504
  [100.0%] Batch 1817/1817 - loss: 0.7508, acc: 0.7509

[Epoch 1] train_loss=1.4308  train_acc=0.7509
[Epoch 1] val_loss=0.3911  val_acc=0.9218
  New BEST model (val_acc=0.9218)

===== Epoch 2/5 =====
  [ 10.0%] Batch 181/1817 - loss: 0.7683, acc: 0.9296
  [ 19.9%] Batch 362/1817 - loss: 0.2478, acc: 0.9303
  [ 29.9%] Batch 543/1817 - loss: 0.2410, acc: 0.9334
  [ 39.8%] Batch 724/1817 - loss: 0.1974, acc: 0.9342
  [ 49.8%] Batc

In [14]:
if best_state_dict is not None:
    torch.save(best_state_dict, "best_custom_cnn_image_resnet18.pth")
    print("Saved best model -> best_custom_cnn_image_resnet18.pth")

with open("class_to_idx_image_resnet18.json", "w") as f:
    json.dump(class_to_idx, f, indent=2)
print("Saved class_to_idx_image_resnet18.json")

print("\nTraining curves:")
for i in range(EPOCHS):
    print(
        f"Epoch {i+1:02d}: "
        f"train_acc={history['train_acc'][i]:.4f}, "
        f"val_acc={history['val_acc'][i]:.4f}, "
        f"train_loss={history['train_loss'][i]:.4f}, "
        f"val_loss={history['val_loss'][i]:.4f}"
    )

Saved best model -> best_custom_cnn_image_resnet18.pth
Saved class_to_idx_image_resnet18.json

Training curves:
Epoch 01: train_acc=0.7509, val_acc=0.9218, train_loss=1.4308, val_loss=0.3911
Epoch 02: train_acc=0.9428, val_acc=0.9619, train_loss=0.3138, val_loss=0.1938
Epoch 03: train_acc=0.9707, val_acc=0.9705, train_loss=0.1535, val_loss=0.1391
Epoch 04: train_acc=0.9818, val_acc=0.9732, train_loss=0.0926, val_loss=0.1237
Epoch 05: train_acc=0.9881, val_acc=0.9743, train_loss=0.0610, val_loss=0.1155
