In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import pandas as pd
from PIL import Image
import os
import time

# Configuration
class Config:
    root_dir = 'D:/celeba/img_align_celeba/img_align_celeba'
    csv_path = 'D:/celeba/list_attr_celeba.csv'
    partition_csv_path = 'D:/celeba/list_eval_partition.csv'
    num_classes = 40
    batch_size = 64
    lr = 1e-3
    grad_accum_steps = 2
    grad_clip = 1.0
    max_epochs = 10
    pretrained_weights = models.ResNet50_Weights.IMAGENET1K_V2
    early_stop_patience=5
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    attribute_names = [
        '5_o_Clock_Shadow', 'Arched_Eyebrows', 'Attractive', 'Bags_Under_Eyes',
        'Bald', 'Bangs', 'Big_Lips', 'Big_Nose', 'Black_Hair', 'Blond_Hair',
        'Blurry', 'Brown_Hair', 'Bushy_Eyebrows', 'Chubby', 'Double_Chin',
        'Eyeglasses', 'Goatee', 'Gray_Hair', 'Heavy_Makeup', 'High_Cheekbones',
        'Male', 'Mouth_Slightly_Open', 'Mustache', 'Narrow_Eyes', 'No_Beard',
        'Oval_Face', 'Pale_Skin', 'Pointy_Nose', 'Receding_Hairline', 'Rosy_Cheeks',
        'Sideburns', 'Smiling', 'Straight_Hair', 'Wavy_Hair', 'Wearing_Earrings',
        'Wearing_Hat', 'Wearing_Lipstick', 'Wearing_Necklace', 'Wearing_Necktie', 'Young'
    ]

# Dataset class with header handling
class CelebADataset(Dataset):
    def __init__(self, root_dir, csv_path, partition_csv_path, split, transform=None):
        # Load attributes
        self.df = pd.read_csv(csv_path).replace(-1, 0)
        
        # Load partition data with header
        partition_df = pd.read_csv(partition_csv_path)
        partition_df['partition'] = partition_df['partition'].astype(int)
        
        # Merge datasets
        self.df = self.df.merge(partition_df, on='image_id', how='inner')
        
        # Convert split name to code
        split_codes = {'train': 0, 'valid': 1, 'validation': 1, 'test': 2}
        split = split.lower()
        if split not in split_codes:
            raise ValueError(f"Invalid split: {split}. Use train/valid/test")
            
        split_code = split_codes[split]
        self.df = self.df[self.df['partition'] == split_code].copy()
        
        # Validate dataset
        if len(self.df) == 0:
            raise ValueError(f"No samples found for {split} partition")
        
        self.root_dir = root_dir
        self.transform = transform
        self.labels = self.df.drop(['image_id', 'partition'], axis=1).values.astype('float32')

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.df.iloc[idx]['image_id'])
        try:
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
        except Exception as e:
            print(f"Error loading {img_path}: {str(e)}")
            image = torch.zeros(3, 224, 224)
        return image, self.labels[idx]

# Initialize device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Transforms
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(Config.mean, Config.std)
])

eval_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(Config.mean, Config.std)
])

# Create datasets
try:
    train_set = CelebADataset(
        root_dir=Config.root_dir,
        csv_path=Config.csv_path,
        partition_csv_path=Config.partition_csv_path,
        split='train',
        transform=train_transform
    )

    val_set = CelebADataset(
        root_dir=Config.root_dir,
        csv_path=Config.csv_path,
        partition_csv_path=Config.partition_csv_path,
        split='valid',
        transform=eval_transform
    )
except Exception as e:
    print(f"Error creating dataset: {str(e)}")
    raise

# Data loaders
train_loader = DataLoader(
    train_set,
    batch_size=Config.batch_size,
    shuffle=True,
    num_workers=0,
    pin_memory=True
)

val_loader = DataLoader(
    val_set,
    batch_size=Config.batch_size,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

print(f"Training samples: {len(train_set)}, Validation samples: {len(val_set)}")

Using device: cuda
Training samples: 162770, Validation samples: 19867


In [22]:
# Create data loaders
train_loader = DataLoader(train_set, Config.batch_size, shuffle=True, pin_memory=True)
val_loader = DataLoader(val_set, Config.batch_size, pin_memory=True)
# Optimized Model Class
class AttributeClassifier:
    def __init__(self, config):
        self.config = config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = models.resnet50(weights=config.pretrained_weights)
        
        # Freeze layers
        for param in self.model.parameters():
            param.requires_grad = False
        for param in self.model.layer3.parameters():
            param.requires_grad = True
        for param in self.model.layer4.parameters():
            param.requires_grad = True
            
        # Modified head
        self.model.fc = nn.Linear(self.model.fc.in_features, config.num_classes)
        self.model.to(self.device)
        
        # Optimizer
        self.optimizer = optim.AdamW(
            filter(lambda p: p.requires_grad, self.model.parameters()),
            lr=config.lr,
            weight_decay=0.01
        )
        self.criterion = nn.BCEWithLogitsLoss()

    def train(self, train_loader, val_loader):
        best_f1 = 0
        epochs_no_improve = 0
        scaler = torch.cuda.amp.GradScaler()
        
        for epoch in range(self.config.max_epochs):
            # Training phase
            self.model.train()
            epoch_train_loss = 0.0
            
            for batch_idx, (inputs, labels) in enumerate(train_loader):
                inputs = inputs.to(self.device, non_blocking=True)
                labels = labels.to(self.device, non_blocking=True)
                
                # Mixed precision forward pass
                with torch.cuda.amp.autocast():
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels) / self.config.grad_accum_steps
                
                # Backward pass with gradient scaling
                scaler.scale(loss).backward()
                
                # Gradient accumulation steps
                if (batch_idx + 1) % self.config.grad_accum_steps == 0:
                    # Unscale before clipping
                    scaler.unscale_(self.optimizer)
                    
                    # Gradient clipping
                    torch.nn.utils.clip_grad_norm_(
                        self.model.parameters(),
                        self.config.grad_clip
                    )
                    
                    # Optimizer step
                    scaler.step(self.optimizer)
                    scaler.update()
                    self.optimizer.zero_grad(set_to_none=True)
                
                epoch_train_loss += loss.item() * self.config.grad_accum_steps
            
            # Calculate average training loss
            avg_train_loss = epoch_train_loss / len(train_loader)
            
            # Validation phase
            avg_val_loss, avg_f1 = self._validate(val_loader)
            
            # Early stopping logic
            if avg_f1 > best_f1:
                best_f1 = avg_f1
                epochs_no_improve = 0
                torch.save(self.model.state_dict(), 'best_model.pth')
                print(f"Epoch {epoch+1}/{self.config.max_epochs}")
                print(f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
                print(f"Val F1: {avg_f1:.4f}* (Best)")
            else:
                epochs_no_improve += 1
                print(f"Epoch {epoch+1}/{self.config.max_epochs}")
                print(f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
                print(f"Val F1: {avg_f1:.4f} (No improvement {epochs_no_improve}/{self.config.early_stop_patience})")
            
            if epochs_no_improve >= self.config.early_stop_patience:
                print(f"\nEarly stopping triggered after {epoch+1} epochs!")
                break
    
        # Load best model weights
        self.model.load_state_dict(torch.load('best_model.pth'))
        print("Training complete. Loaded best model weights.")

    def _validate(self, val_loader):
        self.model.eval()
        val_loss = 0.0
        total_f1 = 0.0
        
        with torch.no_grad(), torch.cuda.amp.autocast():
            for inputs, labels in val_loader:
                inputs = inputs.to(self.device)
                labels = labels.to(self.device).float()
                outputs = self.model(inputs)
                
                # Calculate loss
                val_loss += self.criterion(outputs, labels).item()
                
                # Calculate F1
                preds = (torch.sigmoid(outputs) > 0.5).float()
                total_f1 += self._calculate_f1(preds, labels)
    
        return val_loss/len(val_loader), total_f1/len(val_loader)



    def _calculate_f1(self, preds, labels):
            # Convert boolean masks to float for calculations
        preds = preds.bool()
        labels = labels.bool()
            
        tp = (preds & labels).sum(0, dtype=torch.float32)
        fp = (preds & ~labels).sum(0, dtype=torch.float32)
        fn = (~preds & labels).sum(0, dtype=torch.float32)
            
        precision = tp / (tp + fp + 1e-9)
        recall = tp / (tp + fn + 1e-9)
        f1 = 2 * (precision * recall) / (precision + recall + 1e-9)
        return f1.mean().item()
        
    def test(self, test_loader, top_ks=[5, 10, 20, 30]):
        self.model.eval()
        results = {
            'strict': 0.0,
            'mean': 0.0,
            'top_acc': {k: 0.0 for k in top_ks},
            'per_attribute': {}
        }
        attr_correct = torch.zeros(self.config.num_classes).to(self.device)
        total_samples = 0
        
        with torch.no_grad(), torch.amp.autocast(device_type='cuda', dtype=torch.float16):
            for inputs, labels in test_loader:
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)
                batch_size = inputs.size(0)
                
                outputs = self.model(inputs)
                probs = torch.sigmoid(outputs)
                preds = (probs > 0.5).float()
                
                # Per-image metrics
                results['strict'] += (preds == labels).all(dim=1).sum().item()
                results['mean'] += (preds == labels).float().mean(dim=1).sum().item()
                
                # Per-attribute metrics
                attr_correct += (preds == labels).sum(dim=0)
                
                # Top-k calculations
                for k in top_ks:
                    topk_probs, topk_indices = torch.topk(probs, k, dim=1)
                    correct = torch.gather(labels, 1, topk_indices).sum(dim=1)
                    results['top_acc'][k] += (correct.float() / k).sum().item()
                
                total_samples += batch_size
    
        # Calculate final metrics
        results['strict'] /= total_samples
        results['mean'] /= total_samples
        for k in top_ks:
            results['top_acc'][k] /= total_samples
        
        # Per-attribute accuracies
        per_attr_acc = (attr_correct / total_samples).cpu().numpy()
        for idx, acc in enumerate(per_attr_acc):
            results['per_attribute'][self.config.attribute_names[idx]] = acc
    
        # Print results
        print("\n=== Test Results ===")
        print(f"Strict Accuracy: {results['strict']:.4f}")
        print(f"Mean Accuracy: {results['mean']:.4f}")
        for k in sorted(top_ks):
            print(f"Top-{k} Accuracy: {results['top_acc'][k]:.4f}")
        
        # Print per-attribute accuracies
        # Inside the test() method, replace the print block with:
        print("\nPer-Attribute Accuracy Ranking:")
        print("-" * 65)
        print(f"{'Rank':<5}{'Attribute':<30}{'Accuracy':<10} | {'Rank':<5}{'Attribute':<30}{'Accuracy':<10}")
        print("-" * 65)
        
        sorted_attrs = sorted(results['per_attribute'].items(), key=lambda x: x[1], reverse=True)
        for i in range(0, len(sorted_attrs), 2):
            line = ""
            # First column
            if i < len(sorted_attrs):
                name, acc = sorted_attrs[i]
                line += f"{i+1:<5}{name:<30}{acc:.4f}    "
            else:
                line += " " * 45
                
            # Second column
            line += "| "
            if i+1 < len(sorted_attrs):
                name, acc = sorted_attrs[i+1]
                line += f"{i+2:<5}{name:<30}{acc:.4f}"
            
            print(line)

In [None]:
        
# Training Execution
if __name__ == "__main__":
    config = Config()
    classifier = AttributeClassifier(config)
    classifier.train(train_loader, val_loader)
    print("Training completed. Best model saved as 'celeba_teacher.pth'")

In [24]:
# Usage Example
if __name__ == "__main__":
    config = Config()
    
    # Create test dataset
    test_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(config.mean, config.std)
    ])
    
    test_set = CelebADataset(
        root_dir=config.root_dir,
        csv_path=config.csv_path,
        partition_csv_path=config.partition_csv_path,
        split='test',
        transform=test_transform
    )
    
    test_loader = DataLoader(
        test_set,
        batch_size=config.batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True
    )
    
    # Initialize and load model
    classifier = AttributeClassifier(config)
    classifier.model.load_state_dict(torch.load('best_model.pth'))
    
    # Run evaluation
    strict_acc, mean_acc = classifier.test(test_loader)


=== Test Results ===
Strict Accuracy: 0.0250
Mean Accuracy: 0.9134
Top-5 Accuracy: 0.9115
Top-10 Accuracy: 0.7423
Top-20 Accuracy: 0.4588
Top-30 Accuracy: 0.3083

Per-Attribute Accuracy Ranking:
-----------------------------------------------------------------
Rank Attribute                     Accuracy   | Rank Attribute                     Accuracy  
-----------------------------------------------------------------
1    Eyeglasses                    0.9970    | 2    Wearing_Hat                   0.9916
3    Bald                          0.9904    | 4    Male                          0.9852
5    Gray_Hair                     0.9830    | 6    Sideburns                     0.9791
7    Goatee                        0.9755    | 8    Pale_Skin                     0.9716
9    Mustache                      0.9704    | 10   No_Beard                      0.9644
11   Blurry                        0.9638    | 12   Double_Chin                   0.9619
13   Blond_Hair                    0.9614   

TypeError: cannot unpack non-iterable NoneType object