In [1]:
import os
import cv2
import numpy as np
import random
from glob import glob
from tqdm.notebook import tqdm
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
import matplotlib.pyplot as plt

In [2]:
DATA_DIR = "data/stanford_dogs/Images"
OUTPUT_DIR = "data/Processed"
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 5
LR = 0.001

mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

In [3]:
def load_dataset(data_dir):
    all_paths = glob(os.path.join(data_dir, "*", "*.jpg"))
    all_labels = [os.path.basename(os.path.dirname(p)) for p in all_paths]
    return all_paths, all_labels

In [4]:
def resize_image(image, target_size=IMG_SIZE):
    return cv2.resize(image, target_size, interpolation=cv2.INTER_AREA)

def normalize_image(image):
    return image.astype(np.float32) / 255.0

def preprocess_image(path, target_size=IMG_SIZE, to_rgb=True, normalize=True):
    img = cv2.imread(path)
    if img is None:
        return None
    if to_rgb:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = resize_image(img, target_size)
    if normalize:
        img = normalize_image(img)
    return img

In [5]:
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=0.05):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

    def __repr__(self):
        return f"{self.__class__.__name__}(mean={self.mean}, std={self.std})"

In [6]:
final_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=5)], p=0.3),
    transforms.RandomApply([AddGaussianNoise(0., 0.1)], p=0.3),
    transforms.RandomApply([transforms.RandomErasing(p=1.0, scale=(0.1, 0.2))], p=0.3),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomRotation(degrees=15),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.8, 1.2)),
    transforms.RandomResizedCrop(size=(224, 224), scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.05),
    transforms.RandomGrayscale(p=0.2),
    transforms.RandomAdjustSharpness(sharpness_factor=2.0, p=0.5),
    transforms.Normalize(mean, std)
])

In [7]:
def split_and_save_dataset(image_paths, labels):
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    SPLIT_RATIOS = {"train": 0.7, "validation": 0.15, "test": 0.15}

    class_to_paths = {}
    for path, label in zip(image_paths, labels):
        class_to_paths.setdefault(label, []).append(path)

    path_to_split = {}
    for label, paths in class_to_paths.items():
        random.shuffle(paths)
        total = len(paths)
        train_end = int(SPLIT_RATIOS["train"] * total)
        val_end = train_end + int(SPLIT_RATIOS["validation"] * total)
        for i, path in enumerate(paths):
            if i < train_end:
                path_to_split[path] = "train"
            elif i < val_end:
                path_to_split[path] = "validation"
            else:
                path_to_split[path] = "test"

    for split in SPLIT_RATIOS.keys():
        for label in class_to_paths:
            os.makedirs(os.path.join(OUTPUT_DIR, split, label), exist_ok=True)

    for path, label in tqdm(zip(image_paths, labels), total=len(image_paths), desc="Preprocessing"):
        img = preprocess_image(path)
        if img is None:
            print(f"[READ FAIL] {path}")
            continue
        save_img = (img * 255).astype(np.uint8)
        save_img = cv2.cvtColor(save_img, cv2.COLOR_RGB2BGR)
        split = path_to_split.get(path, "train")
        save_dir = os.path.join(OUTPUT_DIR, split, label)
        cv2.imwrite(os.path.join(save_dir, os.path.basename(path)), save_img)

In [8]:
def visualize_sample(image_path):
    img_cv2 = cv2.imread(image_path)
    img_rgb = cv2.cvtColor(img_cv2, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(img_rgb)
    img_tensor = final_transform(img_pil)
    img_np = img_tensor.permute(1, 2, 0).numpy()
    img_np = (img_np * std + mean).clip(0, 1)

    import matplotlib.pyplot as plt
    plt.figure(figsize=(6, 6))
    plt.imshow(img_np)
    plt.axis('off')
    plt.title("Augmented Image")
    plt.show()

In [9]:
# if __name__ == "__main__":
#     image_paths, labels = load_dataset(DATA_DIR)
#     visualize_sample(image_paths[0])

In [10]:
class BaselineCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Flatten(),
            nn.Linear(128 * 28 * 28, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        return self.net(x)

In [11]:
def train_and_validate():
    train_ds = datasets.ImageFolder(os.path.join(OUTPUT_DIR, 'train'), transform=final_transform)
    val_ds = datasets.ImageFolder(os.path.join(OUTPUT_DIR, 'validation'), transform=final_transform)
    
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE)

    model = BaselineCNN(num_classes=len(train_ds.classes)).to("cuda" if torch.cuda.is_available() else "cpu")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)

    device = "cuda" if torch.cuda.is_available() else "cpu"

    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0
        correct = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            pred = model(x)
            loss = criterion(pred, y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            correct += (pred.argmax(1) == y).sum().item()

        acc = correct / len(train_ds)
        print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}, Accuracy: {acc:.4f}")

    torch.save(model.state_dict(), "baseline_cnn.pth")
    print("Model saved as baseline_cnn.pth")

In [None]:
image_paths, labels = load_dataset(DATA_DIR)
split_and_save_dataset(image_paths, labels)

In [None]:
train_and_validate()
