In [1]:
!pip install --upgrade pip
!pip install -U albumentations ultralytics huggingface_hub torchmetrics 
!git clone https://github.com/sathishkumar67/ADIS.git
!mv /kaggle/working/ADIS/* /kaggle/working/

# necessary imports
import os
from huggingface_hub import hf_hub_download
from utils import unzip_file

REPO_ID = "pt-sk/ADIS" 
FILENAME_IN_REPO = "dataset.zip"
LOCAL_DIR = os.getcwd()
REPO_TYPE = "dataset"
DATASET_PATH = f"{LOCAL_DIR}/{FILENAME_IN_REPO}"
DATASET_FOLDER_PATH = f"{LOCAL_DIR}/dataset"
NUM_CLASSES = 10                                               
CLASSES = ['Cat', 'Cattle', 'Chicken', 'Deer', 'Dog', 'Squirrel', 'Eagle', 'Goat', 'Rodents', 'Snake'] 
BACKGROUND_CLASS_ID = 0
MODEL_NUM_CLASSES = NUM_CLASSES + 1     

# download the dataset and unzip it
hf_hub_download(repo_id=REPO_ID, filename=FILENAME_IN_REPO, repo_type=REPO_TYPE, local_dir=LOCAL_DIR)
unzip_file(DATASET_PATH, LOCAL_DIR)

# remove dataset.zip
os.remove(DATASET_PATH)

# number of cores
num_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cores}")

Collecting pip
  Downloading pip-25.0.1-py3-none-any.whl.metadata (3.7 kB)
Downloading pip-25.0.1-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-25.0.1
Collecting albumentations
  Downloading albumentations-2.0.5-py3-none-any.whl.metadata (41 kB)
Collecting ultralytics
  Downloading ultralytics-8.3.100-py3-none-any.whl.metadata (37 kB)
Collecting huggingface_hub
  Downloading huggingface_hub-0.30.1-py3-none-any.whl.metadata (13 kB)
Collecting torchmetrics
  Downloading torchmetrics-1.7.0-py3-none-any.whl.metadata (21 kB)
Collecting albucore==0.0.23 (from albumentations)
  Downloading albucore-0.0.23-py3-none-any.whl.metadata (5.3 kB)
Collecting simsimd>=5.9.2 (from a

dataset.zip:   0%|          | 0.00/9.57G [00:00<?, ?B/s]

Unzipping: 100%|██████████| 9.60G/9.60G [00:59<00:00, 160MB/s] 


Number of CPU cores: 4


In [2]:
from typing import Tuple
from tqdm import tqdm
import os
from PIL import Image
from functools import partial
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms.functional as TF
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
from torchvision.models.detection.ssdlite import SSDLiteClassificationHead
from torchvision.models.detection import _utils as det_utils
from torchmetrics.detection import MeanAveragePrecision

In [3]:
class SSDLITEOBJDET_DATASET(Dataset):
    def __init__(self, root_dir, split, img_size=320):
        self.img_size = img_size
        self.root = os.path.join(root_dir, split)
        self.image_files = [os.path.join(self.root, f) for f in os.listdir(self.root) if f.endswith(('.jpg', '.png', '.jpeg'))]
        self.label_files = [os.path.join(self.root, f) for f in os.listdir(self.root) if f.endswith('.txt')]
        
        # sort the files in ascending order to maintain consistency
        self.image_files.sort()
        self.label_files.sort()
        
        if len(self.image_files) != len(self.label_files):
            raise ValueError("Mismatch between number of images and labels.")
        
        # check if all image has corresponding label file
        for image_file in self.image_files:
            label_file = os.path.splitext(image_file)[0] + '.txt'
            if label_file not in self.label_files:
                raise ValueError(f"Label file not found for image: {image_file}")

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path, label_path = self.image_files[idx], self.label_files[idx]
        
        # Load image
        image = Image.open(img_path).convert('RGB')
        orig_width, orig_height = image.size
        
        # Load annotations
        boxes = []
        labels = []
        with open(label_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) != 5:
                    continue  # Skip invalid lines
                
                try:
                    cid, cx, cy, w, h = map(float, parts)
                except:
                    continue  # Skip malformed entries
                
                # Convert YOLO format to absolute coordinates
                xmin = (cx - w/2) * orig_width
                ymin = (cy - h/2) * orig_height
                xmax = (cx + w/2) * orig_width
                ymax = (cy + h/2) * orig_height
                
                boxes.append([xmin, ymin, xmax, ymax])
                labels.append(int(cid) + 1)  # Add 1 for background class

        # Handle empty labels (add dummy background box)
        if len(boxes) == 0:
            boxes.append([0.0, 0.0, 1.0, 1.0])  # Small valid box
            labels.append(0)  # Background class

        # Convert and resize image
        image = TF.to_tensor(image)
        image = TF.resize(image, (self.img_size, self.img_size))
        
        # Scale boxes
        scale_x = self.img_size / orig_width
        scale_y = self.img_size / orig_height
        boxes = torch.tensor(boxes, dtype=torch.float32) * torch.tensor([scale_x, scale_y, scale_x, scale_y])

        target = {
            'boxes': boxes,
            'labels': torch.tensor(labels, dtype=torch.int64)
        }

        return image, target
    
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
import os
import time

class SSD_MOBILENET_V3_Large(nn.Module):
    def __init__(self, num_classes_with_bg:int) -> None:
        super(SSD_MOBILENET_V3_Large, self).__init__()
        self.num_classes_with_bg = num_classes_with_bg
        self.model = ssdlite320_mobilenet_v3_large(weights='COCO_V1', weights_backbone="DEFAULT") 
        self.model.head.classification_head = SSDLiteClassificationHead(
            in_channels=det_utils.retrieve_out_channels(self.model.backbone, (320, 320)),
            num_anchors=self.model.anchor_generator.num_anchors_per_location(),
            num_classes=self.num_classes_with_bg,
            norm_layer=partial(nn.BatchNorm2d, eps=0.001, momentum=0.03)
        )
    
    def configure_optimizers(self, lr: float = 0.0001, betas: Tuple[float, float] = (0.9, 0.999), weight_decay: float = 0.0001, eps: float = 1e-08, fused: bool = True) -> torch.optim.Optimizer:        
        # start with all of the candidate parameters (that require grad)
        param_dict = {pn: p for pn, p in self.named_parameters()}
        param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad}
        # create optim groups. Any parameters that is 2D will be weight decayed, otherwise no.
        # i.e. all weight tensors in matmuls + embeddings decay, all biases and layernorms don't.
        decay_params = [p for _, p in param_dict.items() if p.dim() >= 2]
        nodecay_params = [p for _, p in param_dict.items() if p.dim() < 2]

        # Create AdamW optimizer and use the fused version if available 
        return torch.optim.AdamW([{'params': decay_params, 'weight_decay': weight_decay},
                                    {'params': nodecay_params, 'weight_decay': 0.0}], 
                                    lr=lr, 
                                    betas=betas, 
                                    eps=eps, 
                                    fused=fused)
    
    def forward(self, images: torch.Tensor, targets: dict=None):
        return self.model(images, targets)
    
    def evaluate(self, dataset_root: str, device: torch.device|str, batch_size: int = 64):
        num_cores = os.cpu_count()
        print(f"Number of CPU cores: {num_cores}")
        
        metric = MeanAveragePrecision(
        iou_type="bbox",
        class_metrics=True,
        extended_summary=True)
        
        train_dataset = SSDLITEOBJDET_DATASET(dataset_root, split='train')
        val_dataset = SSDLITEOBJDET_DATASET(dataset_root, split='val')
        test_dataset = SSDLITEOBJDET_DATASET(dataset_root, split='test')
        
        # if device is a string
        if isinstance(device, str):
            device = device
        else:
            device = f"{device.type}:{device.index}"
        
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=num_cores, pin_memory=True, pin_memory_device=device)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=num_cores, pin_memory=True, pin_memory_device=device)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=num_cores, pin_memory=True, pin_memory_device=device)
        
        splits = ['train', 'val', 'test']
        loaders = [train_loader, val_loader, test_loader]
        results = {}
        for split, loader in zip(splits, loaders):
            print(f"Evaluating {split} set")
            self.eval()
            metric.reset()
            progress_bar = tqdm(loader, desc=f"Evaluating {split} set", unit="batch")
            with torch.no_grad():
                for images, targets in progress_bar:
                    images = list(image.to(device) for image in images)
                    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
                    outputs = self(images)
                    metric.update(outputs, targets)
                results[split] =  metric.compute()
        

In [31]:
model = SSD_MOBILENET_V3_Large(num_classes_with_bg=MODEL_NUM_CLASSES)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.load_state_dict(torch.load("ssd_checkpoint/checkpoint_1.pth", map_location="cpu")["model_state_dict"])
model.to(device)

SSD_MOBILENET_V3_Large(
  (model): SSD(
    (backbone): SSDLiteFeatureExtractorMobileNet(
      (features): Sequential(
        (0): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
            (1): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
            (2): Hardswish()
          )
          (1): InvertedResidual(
            (block): Sequential(
              (0): Conv2dNormActivation(
                (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
                (1): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
                (2): ReLU(inplace=True)
              )
              (1): Conv2dNormActivation(
                (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
                (1): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_runn

In [None]:
def train():
    # Set device
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    # Load the model
    model = SSD_MOBILENET_V3_Large(num_classes_with_bg=MODEL_NUM_CLASSES)
    model.to(device)
    
    train_dataset = SSDLITEOBJDET_DATASET(DATASET_FOLDER_PATH, 'train')
    val_dataset = SSDLITEOBJDET_DATASET(DATASET_FOLDER_PATH, 'val')

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn, num_workers=num_cores, pin_memory=True, pin_memory_device="cuda:0")
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn, num_workers=num_cores, pin_memory=True, pin_memory_device="cuda:0")

    # Optimizer and scheduler
    optimizer = model.configure_optimizers(lr=0.0001, betas=(0.9, 0.999), weight_decay=0.001, eps=1e-08, fused=True)
    lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    # Training loop
    num_epochs = 50
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        num_batches = len(train_loader)
        
        # Import tqdm for progress bar
        train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch")
        
        for _, (images, targets) in enumerate(train_bar):
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            batch_loss = losses.detach().item()
            total_loss += batch_loss
            
            # Update progress bar with current batch loss
            train_bar.set_postfix(loss=batch_loss)

        avg_loss = total_loss / num_batches
        print(f"Epoch {epoch+1}/{num_epochs} | Avg Train Loss: {avg_loss:.4f}")

        lr_scheduler.step()

        # Validation
        model.eval()
        metric = MeanAveragePrecision()
        with torch.no_grad():
            for images, targets in val_loader:
                images = list(img.to(device) for img in images)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
                
                predictions = model(images)
                metric.update(predictions, targets)
        
        map_result = metric.compute()
        print(f"Epoch {epoch+1} | Val mAP: {map_result['map']:.4f}")

    # Save model
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()}, 'ssd_mobilenet_v3_finetuned.pth')

In [None]:
# Run the training function
# train()

In [None]:

#         elapsed = time.time() - start_time
        
#         results.append({
#             'split': split,
#             **split_metrics,
#             'time': f"{elapsed:.1f}s"
#         })
        
#         print(f"\nCompleted {split} split in {elapsed:.1f} seconds")
#         print(f"Split Metrics - mAP: {split_metrics['mAP']:.4f}, Precision: {split_metrics['Precision']:.4f}")

#     # Create DataFrame
#     df = pd.DataFrame(results).set_index('split')
#     numeric_cols = ['mAP', 'mAP_50', 'mAP_75', 'mAP_small', 'mAP_medium', 
#                    'mAP_large', 'Recall', 'Precision', 'F1']
#     df[numeric_cols] = df[numeric_cols].applymap(lambda x: f"{float(x):.4f}")
    
#     return df

# def evaluate_split(model, dataloader, device, metric):
#     """Evaluate with batch-level progress"""
#     model.eval()
#     metric.reset()
    
#     # Batch progress bar
#     batch_progress = tqdm(dataloader, 
#                         desc="Processing batches",
#                         leave=False,
#                         position=1)
    
#     with torch.no_grad():
#         for images, targets in batch_progress:
#             # Move data to device
#             images = list(img.to(device) for img in images)
#             targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
#             # Inference
#             predictions = model(images)
#             metric.update(predictions, targets)
            
#             # Update progress description
#             batch_progress.set_postfix({
#                 'current_mAP': f"{metric.compute()['map'].item():.3f}",
#                 'batch_size': len(images)
#             })

#     # Final metrics
#     metrics = metric.compute()
    
#     return {
#         'mAP': metrics['map'].item(),
#         'mAP_50': metrics['map_50'].item(),
#         'mAP_75': metrics['map_75'].item(),
#         'mAP_small': metrics['map_small'].item(),
#         'mAP_medium': metrics['map_medium'].item(),
#         'mAP_large': metrics['map_large'].item(),
#         'Recall': metrics['mar_100'].item(),
#         'Class_APs': metrics['classes'].cpu().numpy().round(4),
#         'Precision': metrics['precision'].cpu().numpy().mean().round(4),
#         'Recall': metrics['recall'].cpu().numpy().mean().round(4),
#         'F1': (2 * (metrics['precision'] * metrics['recall']) / 
#               (metrics['precision'] + metrics['recall'] + 1e-16)).cpu().numpy().mean().round(4)
#     }

# def evaluate():
#     print("\n🚀 Starting Comprehensive Evaluation")
#     device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
#     print(f"🔧 Using device: {device}")
    
#     # Model loading
#     print("\n🔄 Loading model weights...")
#     start_load = time.time()
#     model = SSD_MOBILENET_V3_Large(num_classes_with_bg=MODEL_NUM_CLASSES)

#     print(f"✅ Model loaded in {time.time()-start_load:.1f}s")
    
#     # Evaluation
#     print("\n📊 Starting evaluation on all splits...")
#     metrics_df = evaluate_model(model, DATASET_FOLDER_PATH, device)
    
#     # Results display
#     print("\n🎯 Final Metrics Summary:")
#     print(metrics_df[['mAP', 'mAP_50', 'mAP_75', 'Recall', 'Precision', 'F1', 'time']])
    
#     print("\n📈 Class-wise Performance:")
#     class_df = pd.DataFrame(metrics_df['Class_APs'].tolist(), 
#                           index=metrics_df.index).T
#     class_df.columns = metrics_df.index
#     print(class_df.round(4))
    
#     print("\n🏁 Evaluation complete!")

# if __name__ == '__main__':
#     evaluate()