In [1]:
%cd ..

c:\Users\HP\OneDrive - University of Moratuwa\Desktop\E-Vision-Projects\Shelf_Product_Count_Generation


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import AutoImageProcessor, AutoModel
from torchvision import transforms
from PIL import Image
from pathlib import Path
import numpy as np
from tqdm import tqdm
import random

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# 1. Dataset for DINOv2
class ProductDataset(Dataset):
    def __init__(self, reference_dir='data/reference_images', 
                 image_size=224, augment=True):
        self.reference_dir = Path(reference_dir)
        self.image_size = image_size
        
        # Load all images with their product IDs
        self.images = []
        self.product_ids = []
        self.product_to_images = {}
        
        product_folders = sorted([d for d in self.reference_dir.iterdir() if d.is_dir()])
        
        for product_folder in product_folders:
            product_id = product_folder.name
            image_files = sorted(product_folder.glob('*.jpg')) + \
                         sorted(product_folder.glob('*.jpeg')) + \
                         sorted(product_folder.glob('*.png'))
            
            self.product_to_images[product_id] = []
            
            for image_path in image_files:
                self.images.append(str(image_path))
                self.product_ids.append(product_id)
                self.product_to_images[product_id].append(len(self.images) - 1)
        
        # Create product ID to integer mapping
        unique_products = sorted(set(self.product_ids))
        self.product_to_idx = {pid: idx for idx, pid in enumerate(unique_products)}
        self.idx_to_product = {idx: pid for pid, idx in self.product_to_idx.items()}
        self.num_products = len(unique_products)
        
        # DINOv2 uses specific preprocessing
        if augment:
            self.transform = transforms.Compose([
                transforms.Resize((image_size, image_size)),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomRotation(degrees=15),
                transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                                   std=[0.229, 0.224, 0.225])
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize((image_size, image_size)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                                   std=[0.229, 0.224, 0.225])
            ])
        
        print(f"Loaded {len(self.images)} images from {self.num_products} products")
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image_path = self.images[idx]
        product_id = self.product_ids[idx]
        product_idx = self.product_to_idx[product_id]
        
        image = Image.open(image_path).convert('RGB')
        image = self.transform(image)
        
        return image, product_idx, product_id

In [4]:
# 2. DINOv2 Model with Fine-tuning Head
class FineTunedDINOv2(nn.Module):
    def __init__(self, model_name='facebook/dinov2-base', embedding_dim=512, freeze_backbone=False):
        super().__init__()
        
        # Load pre-trained DINOv2
        self.dinov2 = AutoModel.from_pretrained(model_name)
        
        # Get embedding dimension from DINOv2
        if 'base' in model_name:
            dinov2_dim = 768
        elif 'small' in model_name:
            dinov2_dim = 384
        elif 'large' in model_name:
            dinov2_dim = 1024
        else:
            dinov2_dim = 768  # default
        
        # Freeze backbone if needed (for faster training)
        if freeze_backbone:
            for param in self.dinov2.parameters():
                param.requires_grad = False
        
        # Fine-tuning head (projection layer)
        self.projection_head = nn.Sequential(
            nn.Linear(dinov2_dim, 1024),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(1024, embedding_dim),
            nn.LayerNorm(embedding_dim)
        )
        
    def forward(self, pixel_values):
        """
        Forward pass through DINOv2
        pixel_values: preprocessed images (batch_size, 3, 224, 224)
        """
        # DINOv2 forward pass
        outputs = self.dinov2(pixel_values=pixel_values)
        
        # Get CLS token (first token) - this is the image embedding
        cls_token = outputs.last_hidden_state[:, 0, :]  # (batch_size, dinov2_dim)
        
        # Project to desired embedding dimension
        embedding = self.projection_head(cls_token)
        
        # L2 normalize
        embedding = nn.functional.normalize(embedding, p=2, dim=1)
        
        return embedding

In [5]:
# 3. Triplet Loss for Metric Learning
class TripletLoss(nn.Module):
    def __init__(self, margin=0.5):
        super().__init__()
        self.margin = margin
    
    def forward(self, anchor, positive, negative):
        """
        anchor: embedding of anchor image
        positive: embedding of positive (same product) image
        negative: embedding of negative (different product) image
        """
        distance_positive = nn.functional.pairwise_distance(anchor, positive)
        distance_negative = nn.functional.pairwise_distance(anchor, negative)
        
        loss = torch.relu(distance_positive - distance_negative + self.margin)
        return loss.mean()

# 4. Triplet Sampler
class TripletSampler:
    def __init__(self, dataset):
        self.dataset = dataset
        self.product_to_indices = {}
        
        for idx, product_id in enumerate(dataset.product_ids):
            if product_id not in self.product_to_indices:
                self.product_to_indices[product_id] = []
            self.product_to_indices[product_id].append(idx)
    
    def sample_triplet(self):
        """Sample anchor, positive, negative"""
        # Random anchor product
        anchor_product = random.choice(list(self.product_to_indices.keys()))
        
        # Need at least 2 images for this product
        if len(self.product_to_indices[anchor_product]) < 2:
            # If only one image, duplicate it
            anchor_idx = positive_idx = self.product_to_indices[anchor_product][0]
        else:
            anchor_idx, positive_idx = random.sample(
                self.product_to_indices[anchor_product], 2
            )
        
        # Random negative product (different from anchor)
        negative_product = random.choice([
            p for p in self.product_to_indices.keys() 
            if p != anchor_product
        ])
        negative_idx = random.choice(self.product_to_indices[negative_product])
        
        return anchor_idx, positive_idx, negative_idx

In [10]:
# 5. Training Function
def train_dinov2_model(
    reference_dir='data/reference_images',
    model_name='facebook/dinov2-base',
    embedding_dim=512,
    batch_size=16,
    num_epochs=50,
    learning_rate=0.0001,
    margin=0.5,
    freeze_backbone=False,
    save_path='models/dinov2_finetuned.pth'
):
    import os
    import sys
    
    # Device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Windows compatibility: set num_workers to 0
    num_workers = 0 if sys.platform == 'win32' else 4
    
    # Dataset
    train_dataset = ProductDataset(reference_dir, augment=True)
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True,
        num_workers=num_workers,  # 0 for Windows, 4 for Linux/Mac
        pin_memory=True if device.type == 'cuda' and num_workers > 0 else False,
        persistent_workers=False  # Disable persistent workers for Windows
    )
    
    # Model
    model = FineTunedDINOv2(
        model_name=model_name,
        embedding_dim=embedding_dim,
        freeze_backbone=freeze_backbone
    )
    model = model.to(device)
    
    # Count trainable parameters
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Trainable parameters: {trainable_params:,}")
    
    # Loss and optimizer
    criterion = TripletLoss(margin=margin)
    optimizer = optim.AdamW(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=learning_rate,
        weight_decay=0.01
    )
    scheduler = optim.lr_scheduler.CosineAnnealingLR(
        optimizer, T_max=num_epochs, eta_min=1e-6
    )
    
    # Triplet sampler
    triplet_sampler = TripletSampler(train_dataset)
    
    # Training loop
    model.train()
    best_loss = float('inf')
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        num_batches = 0
        
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
        
        for batch_images, batch_labels, batch_product_ids in progress_bar:
            # Sample triplets
            anchor_indices = []
            positive_indices = []
            negative_indices = []
            
            for _ in range(batch_size):
                a, p, n = triplet_sampler.sample_triplet()
                anchor_indices.append(a)
                positive_indices.append(p)
                negative_indices.append(n)
            
            # Get images - handle errors gracefully
            try:
                anchor_images = torch.stack([train_dataset[i][0] for i in anchor_indices]).to(device)
                positive_images = torch.stack([train_dataset[i][0] for i in positive_indices]).to(device)
                negative_images = torch.stack([train_dataset[i][0] for i in negative_indices]).to(device)
            except Exception as e:
                print(f"Error loading images: {e}")
                continue
            
            # Forward pass
            try:
                anchor_emb = model(anchor_images)
                positive_emb = model(positive_images)
                negative_emb = model(negative_images)
                
                # Loss
                loss = criterion(anchor_emb, positive_emb, negative_emb)
                
                # Backward pass
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                
                epoch_loss += loss.item()
                num_batches += 1
                
                progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})
            except Exception as e:
                print(f"Error in forward/backward pass: {e}")
                continue
        
        if num_batches == 0:
            print(f"Warning: No batches processed in epoch {epoch+1}")
            continue
            
        avg_loss = epoch_loss / num_batches
        scheduler.step()
        
        current_lr = optimizer.param_groups[0]['lr']
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f} - LR: {current_lr:.6f}")
        
        # Save best model
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save({
                'model_state_dict': model.state_dict(),
                'model_name': model_name,
                'embedding_dim': embedding_dim,
                'num_products': train_dataset.num_products,
                'product_to_idx': train_dataset.product_to_idx,
                'epoch': epoch,
                'loss': avg_loss
            }, save_path)
            print(f"✓ Saved best model (loss: {avg_loss:.4f})")
    
    print(f"\nTraining complete! Best loss: {best_loss:.4f}")
    return model

# 6. Load Trained Model
def load_trained_dinov2(model_path='models/dinov2_finetuned.pth', device='cuda'):
    """Load trained DINOv2 model"""
    checkpoint = torch.load(model_path, map_location=device)
    
    model = FineTunedDINOv2(
        model_name=checkpoint['model_name'],
        embedding_dim=checkpoint['embedding_dim'],
        freeze_backbone=False
    )
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()
    model = model.to(device)
    
    return model, checkpoint['product_to_idx']

# 7. Extract Embeddings with Trained Model
def get_embedding_dinov2_trained(model, image, device='cuda'):
    """Extract embedding using trained DINOv2"""
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                         std=[0.229, 0.224, 0.225])
    ])
    
    if isinstance(image, str):
        image = Image.open(image).convert('RGB')
    
    image_tensor = transform(image).unsqueeze(0).to(device)
    
    with torch.no_grad():
        embedding = model(image_tensor)
    
    return embedding.cpu().numpy().astype('float32')

In [11]:
# 8. Usage Example
model = train_dinov2_model(
        reference_dir='data/reference_images',
        model_name='facebook/dinov2-base',  # or 'facebook/dinov2-small' for faster training
        embedding_dim=512,
        batch_size=8,  # Start small, increase if you have GPU memory
        num_epochs=5,
        learning_rate=0.0001,
        margin=0.5,
        freeze_backbone=False  # Set True for faster training, False for better accuracy
    )

Using device: cpu
Loaded 166 images from 49 products
Trainable parameters: 87,893,760


Epoch 1/5: 100%|██████████| 21/21 [07:17<00:00, 20.84s/it, loss=0.4016]


Epoch 1/5 - Loss: 0.4513 - LR: 0.000091
✓ Saved best model (loss: 0.4513)


Epoch 2/5: 100%|██████████| 21/21 [07:39<00:00, 21.88s/it, loss=0.3107]


Epoch 2/5 - Loss: 0.4023 - LR: 0.000066
✓ Saved best model (loss: 0.4023)


Epoch 3/5: 100%|██████████| 21/21 [08:12<00:00, 23.44s/it, loss=0.3757]


Epoch 3/5 - Loss: 0.3787 - LR: 0.000035
✓ Saved best model (loss: 0.3787)


Epoch 4/5: 100%|██████████| 21/21 [07:10<00:00, 20.52s/it, loss=0.1203]


Epoch 4/5 - Loss: 0.3075 - LR: 0.000010
✓ Saved best model (loss: 0.3075)


Epoch 5/5: 100%|██████████| 21/21 [06:50<00:00, 19.55s/it, loss=0.2303]


Epoch 5/5 - Loss: 0.2295 - LR: 0.000001
✓ Saved best model (loss: 0.2295)

Training complete! Best loss: 0.2295
