In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split
from torch.utils.data import Dataset, DataLoader
from PIL import Image


### Lets analyze the files

In [None]:
TRAIN_PATH='/kaggle/input/binary-biplob-art-attack/Artist Classification/train'
TEST_PATH='/kaggle/input/binary-biplob-art-attack/Artist Classification/kaggle_test'
CLASS_MAPPINGS='/kaggle/input/binary-biplob-art-attack/Artist Classification/class_mapping.yml'

In [None]:
#Printing the number of folders
os.listdir(TRAIN_PATH)

In [None]:
print('number of classes', len(os.listdir(TRAIN_PATH)))

### The Class mappings folder contains the metadata of the dataset. When submitting, make sure you substitute each label name with its number

In [None]:
with open(CLASS_MAPPINGS) as f:
    class_mappings=f.read();

print(class_mappings)

### As the images are in a dataset, we use torchvision image folder to load the dataset into from a folder

In [None]:
dataset=datasets.ImageFolder('/kaggle/input/binary-biplob-art-attack/Artist Classification/train')

In [None]:
dataset.classes

### Splitting the Dataset

In [None]:
# 1. Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 2. Enhanced Transforms with Data Augmentation
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),          # Larger size for better detail
    transforms.RandomResizedCrop(224),      # Random crop for variation
    transforms.RandomHorizontalFlip(0.5),   # Horizontal flip
    transforms.RandomRotation(15),          # Slight rotation
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomGrayscale(p=0.1),      # Occasional grayscale
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])

val_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),             # Center crop for validation
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 3. Load datasets with different transforms
train_dataset_temp = datasets.ImageFolder(root=TRAIN_PATH, transform=train_transform)
val_dataset_temp = datasets.ImageFolder(root=TRAIN_PATH, transform=val_transform)

print(f"Classes: {train_dataset_temp.classes}")
print(f"Number of classes: {len(train_dataset_temp.classes)}")

# 4. Split indices for consistent train/val split
dataset_size = len(train_dataset_temp)
train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_size

# Create indices for splitting
indices = list(range(dataset_size))
np.random.seed(42)  # For reproducibility
np.random.shuffle(indices)

train_indices = indices[:train_size]
val_indices = indices[train_size:]

# Create datasets using SubsetRandomSampler
from torch.utils.data import Subset

train_dataset = Subset(train_dataset_temp, train_indices)
val_dataset = Subset(val_dataset_temp, val_indices)

# 5. Data loaders with better settings
train_loader = DataLoader(
    train_dataset, 
    batch_size=32, 
    shuffle=True,
    num_workers=4,      # Faster data loading
    pin_memory=True     # Better GPU transfer
)
val_loader = DataLoader(
    val_dataset, 
    batch_size=32,
    shuffle=False,
    num_workers=4,
    pin_memory=True
)

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Batch size: 32")

### Training Code

In [None]:


print("✅ Optimized ConvNeXt-Tiny ready!")
print("🎯 Expected: Should match or beat EfficientNet-B3's 0.87")
print("🔧 Key changes: Lower LR, higher weight decay, simpler head, cosine scheduler")

In [None]:
# 5. Enhanced Model (convnext_tiny)
from torchvision.models import convnext_tiny
import torch.nn.functional as F

model = convnext_tiny(weights='IMAGENET1K_V1')

num_features = model.classifier[2].in_features  # Get the correct input features
model.classifier[2] = nn.Sequential(
    nn.Dropout(0.2),
    nn.Linear(num_features, len(train_dataset_temp.classes))  # Direct to classes
)
model = model.to(device)

print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

# 6. Advanced Loss & optimizer
criterion = nn.CrossEntropyLoss(label_smoothing=0.05)  # Label smoothing for better generalization
# optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)  # AdamW with weight decay
optimizer = optim.AdamW(
    model.parameters(), 
    lr=0.0005,          # Half the original LR
    weight_decay=0.05,   # 5x higher weight decay
    betas=(0.9, 0.999)
)
# Learning rate scheduler
# from torch.optim.lr_scheduler import ReduceLROnPlateau, OneCycleLR
# scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3, verbose=True)

from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=5, T_mult=1, eta_min=1e-6)


# 7. Enhanced Training loop with early stopping
epochs = 150  # More epochs
best_val_acc = 0.0
patience = 7
patience_counter = 0

train_losses = []
train_accuracies = []
val_accuracies = []

print("Starting training...")
for epoch in range(epochs):
    # Training phase
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Print progress every 50 batches
        if batch_idx % 50 == 0:
            print(f'Epoch {epoch+1}/{epochs}, Batch {batch_idx}/{len(train_loader)}, Loss: {loss.item():.4f}')

    train_acc = 100 * correct / total
    avg_train_loss = running_loss / len(train_loader)
    
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_acc)

    # Validation phase
    model.eval()
    val_correct = 0
    val_total = 0
    val_loss = 0.0
    
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()
    
    val_acc = 100 * val_correct / val_total
    val_accuracies.append(val_acc)
    
    # Learning rate scheduling
    scheduler.step(val_acc)
    
    print(f"Epoch {epoch+1}/{epochs}:")
    print(f"  Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"  Val Loss: {val_loss/len(val_loader):.4f}, Val Acc: {val_acc:.2f}%")
    print(f"  Learning Rate: {optimizer.param_groups[0]['lr']:.6f}")
    
    # Early stopping and model saving
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': val_acc,
        }, 'best_model.pth')
        print(f"  ✅ New best validation accuracy: {val_acc:.2f}%")
    else:
        patience_counter += 1
        print(f"  No improvement. Patience: {patience_counter}/{patience}")
    
    if patience_counter >= patience:
        print(f"Early stopping triggered after {epoch+1} epochs")
        break
    
    print("-" * 50)

print(f"Training completed! Best validation accuracy: {best_val_acc:.2f}%")

In [None]:
# Load best model
checkpoint = torch.load('best_model.pth')
model.load_state_dict(checkpoint['model_state_dict'])
print(f"Loaded best model with validation accuracy: {checkpoint['val_acc']:.2f}%")

# Enhanced test transform (same as validation)
test_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Test Time Augmentation transforms
tta_transforms = [
    transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
    transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(p=1.0),  # Always flip
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
    transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.CenterCrop(224),
        transforms.RandomRotation(degrees=5),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
]

# Enhanced Custom Dataset for test folder
class InferenceDataset(Dataset):
    def __init__(self, folder_path, transform=None, tta_transforms=None):
        self.folder_path = folder_path
        self.image_paths = sorted([os.path.join(folder_path, fname) 
                            for fname in os.listdir(folder_path) 
                            if fname.lower().endswith(('.png', '.jpg', '.jpeg'))])
        self.transform = transform
        self.tta_transforms = tta_transforms

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert("RGB")
        
        if self.tta_transforms:
            # Return multiple augmented versions
            images = []
            for transform in self.tta_transforms:
                images.append(transform(img))
            return images, img_path
        else:
            if self.transform:
                img = self.transform(img)
            return img, img_path

# Create dataset with TTA
test_dataset = InferenceDataset(TEST_PATH, tta_transforms=tta_transforms)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)  # Smaller batch for TTA

# Enhanced inference with Test Time Augmentation
model.eval()
results = []

print("Starting inference with Test Time Augmentation...")
with torch.no_grad():
    for batch_idx, (images_list, paths) in enumerate(test_loader):
        batch_predictions = []
        
        for img_idx in range(len(paths)):
            # Get all TTA versions for this image
            tta_predictions = []
            
            for tta_idx in range(len(tta_transforms)):
                img = images_list[tta_idx][img_idx].unsqueeze(0).to(device)
                output = model(img)
                prob = torch.softmax(output, dim=1)
                tta_predictions.append(prob.cpu())
            
            # Average predictions across TTA
            avg_prediction = torch.mean(torch.stack(tta_predictions), dim=0)
            pred_class = torch.argmax(avg_prediction, dim=1).item()
            confidence = avg_prediction[0][pred_class].item()
            
            results.append({
                "image_id": os.path.basename(paths[img_idx]),
                "class_id": pred_class,
                "confidence": round(confidence, 4)
            })
        
        if batch_idx % 20 == 0:
            print(f"Processed {batch_idx * test_loader.batch_size}/{len(test_dataset)} images")

# Create submission DataFrame
df = pd.DataFrame(results)
print(f"Generated predictions for {len(df)} images")
print("\nPrediction distribution:")
print(df['class_id'].value_counts().sort_index())

# Load sample submission and create final submission
submission = pd.read_csv('/kaggle/input/binary-biplob-art-attack/sample_submission.csv')
submission['label'] = df['class_id']
submission.to_csv('submission.csv', index=False)

print(f"\n✅ Submission saved! Shape: {submission.shape}")
submission.head()

In [None]:
# Visualize training progress
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Plot training loss
ax1.plot(train_losses, label='Training Loss', color='blue')
ax1.set_title('Training Loss Over Time')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True)

# Plot accuracies
epochs_completed = len(train_accuracies)
ax2.plot(range(1, epochs_completed + 1), train_accuracies, label='Training Accuracy', color='blue')
ax2.plot(range(1, epochs_completed + 1), val_accuracies, label='Validation Accuracy', color='red')
ax2.set_title('Model Accuracy Over Time')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.savefig('training_progress.png', dpi=300, bbox_inches='tight')
plt.show()

# Print final statistics
print(f"📊 Training Summary:")
print(f"  - Epochs completed: {epochs_completed}")
print(f"  - Best validation accuracy: {best_val_acc:.2f}%")
print(f"  - Final training accuracy: {train_accuracies[-1]:.2f}%")
print(f"  - Final validation accuracy: {val_accuracies[-1]:.2f}%")

# Show prediction confidence distribution
df['confidence'].describe()

In [None]:
submission=pd.read_csv('/kaggle/input/binary-biplob-art-attack/sample_submission.csv')

In [None]:
submission

In [None]:
df.head()

In [None]:
submission['label']=df['class_id']

In [None]:

submission.to_csv('submission.csv',index=False)