# Setup

In [None]:
# mount google drive
from google.colab import drive

drive.mount("/content/drive")

Mounted at /content/drive


In [2]:
!mkdir -p /content/dataset
!cp '/content/drive/MyDrive/Colab Notebooks/computer_vision/1/hw1-data.tar.gz' '/content/dataset'
!tar -xf '/content/dataset/hw1-data.tar.gz' -C '/content/dataset'

In [3]:
%cd '/content/drive/My Drive/Colab Notebooks/computer_vision/1'

/content/drive/My Drive/Colab Notebooks/computer_vision/1


In [4]:
!apt install htop nvtop 1>/dev/null 2>/dev/null

# `regnety_160`

In [None]:
import os
import zipfile
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets
import timm
from PIL import Image

# ----------------------------
# Setup directories and transforms
# ----------------------------
data_dir = "/content/dataset/data"
train_dir = os.path.join(data_dir, "train")
val_dir = os.path.join(data_dir, "val")
test_dir = os.path.join(data_dir, "test")

# Define transforms (modify as needed)
train_transforms = transforms.Compose(
    [
        transforms.Resize(256),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)
val_transforms = transforms.Compose(
    [
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

# ----------------------------
# Load training and validation data
# ----------------------------
train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset = datasets.ImageFolder(val_dir, transform=val_transforms)

# Reverse the mapping so that index -> class label
idx_to_class = {v: k for k, v in train_dataset.class_to_idx.items()}

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)

# ----------------------------
# Create and fine-tune the model
# ----------------------------
num_classes = len(train_dataset.classes)  # should be 100
model = timm.create_model("regnety_160", pretrained=True, num_classes=num_classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 10  # adjust as needed

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}")

model.safetensors:   0%|          | 0.00/335M [00:00<?, ?B/s]

Epoch 1/10, Loss: 0.9746
Epoch 2/10, Loss: 0.3943
Epoch 3/10, Loss: 0.3108
Epoch 4/10, Loss: 0.2408
Epoch 5/10, Loss: 0.2242
Epoch 6/10, Loss: 0.1974
Epoch 7/10, Loss: 0.1724
Epoch 8/10, Loss: 0.1514
Epoch 9/10, Loss: 0.1501
Epoch 10/10, Loss: 0.1408
Saved predictions to prediction.csv
Created zip file pred.zip


In [None]:
# ----------------------------
# Define a custom test dataset
# ----------------------------
class TestDataset(Dataset):
    def __init__(self, test_dir, transform=None):
        self.test_dir = test_dir
        self.transform = transform
        self.images = [fname for fname in os.listdir(test_dir) if fname.endswith(".jpg")]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = self.images[idx]
        img_path = os.path.join(self.test_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, img_name


test_dataset = TestDataset(test_dir, transform=val_transforms)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# ----------------------------
# Inference on the test set
# ----------------------------
model.eval()
predictions = []
with torch.no_grad():
    for images, names in test_loader:
        images = images.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        preds = preds.cpu().numpy()
        for name, pred in zip(names, preds):
            # Map numeric prediction back to the original class label using our reversed mapping.
            label = idx_to_class[pred]
            predictions.append((name.split(".")[0], label))

# ----------------------------
# Save predictions: remove existing prediction.csv and pred.zip if they exist.
# ----------------------------
csv_filename = "prediction.csv"
zip_filename = "pred.zip"

if os.path.exists(csv_filename):
    os.remove(csv_filename)
if os.path.exists(zip_filename):
    os.remove(zip_filename)

# Save predictions to CSV in the required format: image_name,pred_label
df = pd.DataFrame(predictions, columns=["image_name", "pred_label"])
df.to_csv(csv_filename, index=False)
print(f"Saved predictions to {csv_filename}")

# Create a zip file containing the prediction.csv file
with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf:
    zipf.write(csv_filename)
print(f"Created zip file {zip_filename}")

Saved predictions to prediction.csv
Created zip file pred.zip


# `seresnext101_64x4d`

In [None]:
    import os
    import zipfile
    import pandas as pd
    import numpy as np
    from collections import Counter

    import torch
    from torch.utils.data import DataLoader, Dataset, Subset
    from torchvision import transforms, datasets
    import timm
    from PIL import Image

    # ----------------------------
    # Setup directories and transforms
    # ----------------------------
    data_dir = "/content/dataset/data"
    train_dir = os.path.join(data_dir, "train")
    val_dir = os.path.join(data_dir, "val")
    test_dir = os.path.join(data_dir, "test")

    # Define transforms (adjust as needed)
    train_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                            [0.229, 0.224, 0.225])
    ])
    val_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                            [0.229, 0.224, 0.225])
    ])

    # ----------------------------
    # Load training and validation data
    # ----------------------------
    train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
    val_dataset = datasets.ImageFolder(val_dir, transform=val_transforms)

    # Reverse the mapping so that index -> class label
    idx_to_class = {v: k for k, v in train_dataset.class_to_idx.items()}

    # Create DataLoaders for validation (if needed)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)

    # ----------------------------
    # Parameters and device
    # ----------------------------
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    num_classes = len(train_dataset.classes)  # should be 100
    ensemble_size = 10     # Number of bagging models in the ensemble
    num_epochs = 20       # Adjust epochs as needed
    batch_size = 80

    # ----------------------------
    # Bagging Ensemble: Train multiple models on bootstrapped training sets
    # ----------------------------
    ensemble_models = []
    for i in range(ensemble_size):
        print(f"Training model {i+1}/{ensemble_size}")
        # Create bootstrapped sample indices for training dataset
        indices = np.random.choice(len(train_dataset), size=len(train_dataset), replace=True).tolist()
        bagged_dataset = Subset(train_dataset, indices)
        bagged_loader = DataLoader(bagged_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

        # Create model instance using seresnext101_64x4d with pretrained weights
        model = timm.create_model('seresnext101_64x4d', pretrained=True, num_classes=num_classes)
        model = model.to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
        criterion = torch.nn.CrossEntropyLoss()

        best_acc = 0.0  # Initialize best accuracy for this model

        # Training loop for the current ensemble model
        for epoch in range(num_epochs):
            model.train()
            running_loss = 0.0
            for images, labels in bagged_loader:
                images, labels = images.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
            avg_loss = running_loss / len(bagged_loader)
            print(f"  Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

            # ----------------------------
            # Validation evaluation and model saving
            # ----------------------------
            model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    _, preds = torch.max(outputs, 1)
                    correct += (preds == labels).sum().item()
                    total += labels.size(0)
            val_acc = correct / total
            print(f"    Validation Accuracy: {val_acc:.4f}")

            # Save the model if the current epoch's accuracy is the best so far
            if val_acc > best_acc:
                best_acc = val_acc
                model_save_path = f"./saved_models/model{i+1}_{epoch+1}.pth"
                torch.save(model.state_dict(), model_save_path)
                print(f"    Model improved. Saved model to {model_save_path}")
            model.train()
        ensemble_models.append(model)
        print(f"Finished training model {i+1}\n")

Training model 1/10
  Epoch 1/20, Loss: 1.2066
    Validation Accuracy: 0.7467
    Model improved. Saved model to ./saved_models/model1_1.pth
  Epoch 2/20, Loss: 0.2785
    Validation Accuracy: 0.8167
    Model improved. Saved model to ./saved_models/model1_2.pth
  Epoch 3/20, Loss: 0.1526
    Validation Accuracy: 0.8267
    Model improved. Saved model to ./saved_models/model1_3.pth
  Epoch 4/20, Loss: 0.1111
    Validation Accuracy: 0.7900
  Epoch 5/20, Loss: 0.0950
    Validation Accuracy: 0.8133
  Epoch 6/20, Loss: 0.0767
    Validation Accuracy: 0.8300
    Model improved. Saved model to ./saved_models/model1_6.pth
  Epoch 7/20, Loss: 0.0757
    Validation Accuracy: 0.8133
  Epoch 8/20, Loss: 0.0671
    Validation Accuracy: 0.7967
  Epoch 9/20, Loss: 0.0464
    Validation Accuracy: 0.8033
  Epoch 10/20, Loss: 0.0649
    Validation Accuracy: 0.8133
  Epoch 11/20, Loss: 0.0509
    Validation Accuracy: 0.8367
    Model improved. Saved model to ./saved_models/model1_11.pth
  Epoch 12/20

In [None]:
# ----------------------------
# Define a custom test dataset to include image names
# ----------------------------
class TestDataset(Dataset):
    def __init__(self, test_dir, transform=None):
        self.test_dir = test_dir
        self.transform = transform
        self.images = [fname for fname in os.listdir(test_dir) if fname.endswith(".jpg")]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = self.images[idx]
        img_path = os.path.join(self.test_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, img_name


test_dataset = TestDataset(test_dir, transform=val_transforms)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# ----------------------------
# Inference on the test set using majority voting
# ----------------------------
print("Starting inference on test set using ensemble models...")
# Set all models to evaluation mode
for model in ensemble_models:
    model.eval()

predictions = []
with torch.no_grad():
    for images, names in test_loader:
        images = images.to(device)
        # Collect predictions from each ensemble member
        ensemble_preds = []
        for model in ensemble_models:
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            ensemble_preds.append(preds.cpu().numpy())

        # Convert list of arrays to a 2D numpy array of shape (ensemble_size, batch_size)
        ensemble_preds = np.array(ensemble_preds)
        # Transpose to shape (batch_size, ensemble_size)
        ensemble_preds = ensemble_preds.T

        # For each image in the batch, apply majority voting
        for name, preds in zip(names, ensemble_preds):
            vote = Counter(preds).most_common(1)[0][0]
            # Map numeric prediction back to the original class label using reversed mapping
            label = idx_to_class[vote]
            predictions.append((name.split(".")[0], label))


# ----------------------------
# Save predictions: remove existing prediction.csv and pred.zip if they exist.
# ----------------------------
csv_filename = "prediction.csv"
zip_filename = "pred.zip"

if os.path.exists(csv_filename):
    os.remove(csv_filename)
if os.path.exists(zip_filename):
    os.remove(zip_filename)

# Save predictions to CSV with the required format: image_name,pred_label
df = pd.DataFrame(predictions, columns=["image_name", "pred_label"])
df.to_csv(csv_filename, index=False)
print(f"Saved predictions to {csv_filename}")

# Create a zip file containing the prediction.csv file
with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf:
    zipf.write(csv_filename)
print(f"Created zip file {zip_filename}")

Starting inference on test set using ensemble models...
Saved predictions to prediction.csv
Created zip file pred.zip


# Check param count

In [29]:
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {total_params}")
print(f"Trainable parameters: {trainable_params}")

Total parameters: 86388884
Trainable parameters: 86388884
