In [None]:
import os
import glob
import time
import copy

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import models, transforms
from PIL import Image

In [None]:
# --- Configuration (Kaggle) ---
DATA_DIR = '/kaggle/input/dtd-dataset/dtd'  # adjust to your dataset
IMAGE_DIR = os.path.join(DATA_DIR, 'images')
LABELS_DIR = os.path.join(DATA_DIR, 'labels')
OUTPUT_DIR = '/kaggle/working'


In [None]:
# Use splits 1-5
SPLITS = list(range(1, 6))
MODEL_NAME = 'resnet50'
NUM_CLASSES = 47
BATCH_SIZE = 16
NUM_EPOCHS = 40
LEARNING_RATE = 5e-4
WEIGHT_DECAY = 1e-2
STEP_LR_SIZE = 12
STEP_LR_GAMMA = 0.1
NUM_WORKERS = 2

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {DEVICE}")

In [None]:
# --- Dataset Definition ---
class DTDDataset(Dataset):
    def __init__(self, image_dir, label_files, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.class_names = sorted(os.listdir(image_dir))
        self.class_to_idx = {c: i for i, c in enumerate(self.class_names)}
        seen = set()
        for lf in label_files:
            with open(lf) as f:
                for line in f:
                    path = line.strip()
                    full = os.path.join(image_dir, path)
                    if os.path.exists(full) and full not in seen:
                        seen.add(full)
                        self.image_paths.append(full)
                        cls = path.split('/')[0]
                        self.labels.append(self.class_to_idx[cls])

    def __len__(self): return len(self.image_paths)

    def __getitem__(self, idx):
        img = Image.open(self.image_paths[idx]).convert('RGB')
        if self.transform: img = self.transform(img)
        lbl = self.labels[idx]
        return img, lbl

# --- Data Transforms ---
weights = models.ResNet50_Weights.DEFAULT
input_size = weights.transforms().crop_size[0]
resize_size = weights.transforms().resize_size[0]
mean, std = weights.transforms().mean, weights.transforms().std

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.TrivialAugmentWide(),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ]),
    'val': transforms.Compose([
        transforms.Resize(resize_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ]),
    'test': transforms.Compose([
        transforms.Resize(resize_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ]),
}

# --- Prepare DataLoaders ---
def get_label_files(split):
    return [os.path.join(LABELS_DIR, f"{split}{i}.txt") for i in SPLITS]

train = DTDDataset(IMAGE_DIR, get_label_files('train'), data_transforms['train'])
val   = DTDDataset(IMAGE_DIR, get_label_files('val'),   data_transforms['val'])
test  = DTDDataset(IMAGE_DIR, get_label_files('test'),  data_transforms['test'])

dataloaders = {
    'train': DataLoader(train, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True),
    'val':   DataLoader(val,   batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True),
    'test':  DataLoader(test,  batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)
}
sizes = {x: len(dataloaders[x].dataset) for x in dataloaders}
print(f"Dataset sizes: {sizes}")

# --- Model Setup ---
model = getattr(models, MODEL_NAME)(weights=weights)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, NUM_CLASSES)
model = model.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
scheduler = lr_scheduler.StepLR(optimizer, step_size=STEP_LR_SIZE, gamma=STEP_LR_GAMMA)


In [None]:
# --- Training Loop ---
def train_model(model, criterion, optimizer, scheduler, num_epochs=NUM_EPOCHS):
    best_acc, best_wts = 0, None
    history = {'train_acc':[], 'val_acc':[], 'train_loss':[], 'val_loss':[]}
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        for phase in ['train','val']:
            model.train() if phase=='train' else model.eval()
            running_loss, running_corrects = 0,0
            for inputs, labels in dataloaders[phase]:
                inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase=='train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    preds = outputs.argmax(dim=1)
                    if phase=='train': loss.backward(); optimizer.step()
                running_loss += loss.item()*inputs.size(0)
                running_corrects += (preds==labels).sum().item()
            if phase=='train': scheduler.step()
            epoch_loss = running_loss/sizes[phase]
            epoch_acc = running_corrects/sizes[phase]
            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc)
            print(f" {phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")
            if phase=='val' and epoch_acc>best_acc:
                best_acc, best_wts = epoch_acc, copy.deepcopy(model.state_dict())
                torch.save(best_wts, os.path.join(OUTPUT_DIR, 'best_model.pth'))
                print(f"  --> New best val acc: {best_acc:.4f}")
    model.load_state_dict(best_wts)
    return model, history

model, history = train_model(model, criterion, optimizer, scheduler)


In [None]:
# --- Testing ---
def test_model(model):
    model.eval(); running_corrects, total=0,0
    with torch.no_grad():
        for inputs, labels in dataloaders['test']:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = model(inputs)
            preds = outputs.argmax(dim=1)
            running_corrects += (preds==labels).sum().item()
            total += inputs.size(0)
    acc = running_corrects/total if total>0 else 0
    print(f"Test Accuracy: {acc:.4f}")
    return acc

test_acc = test_model(model)

# Optional: save training history
import json
with open(os.path.join(OUTPUT_DIR, 'history.json'), 'w') as f:
    json.dump(history, f)

print("Done.")
