In [1]:
#Libraries
import pandas as pd
import numpy as np
#for visualizations
import matplotlib.pyplot as plt

#PyTorch Libraries
import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
import torchvision.transforms.v2 as transforms

from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchvision.datasets import ImageFolder

#image class processing
from PIL import Image

#for progress bar
from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:

file_path = r"C:\Users\Owner\Documents\datsci\Pythonic\archive (2)\CCIH"

# 64x64 RGB, TANH-style normalization
train_transform_64 = transforms.Compose([
    transforms.ToImage(),                # ensure tensor
    transforms.Resize((64, 64)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToDtype(torch.float32, scale=True),  # [0,1]
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])       # -> [-1,1]
])

eval_transform_64 = transforms.Compose([
    transforms.ToImage(),
    transforms.Resize((64, 64)),
    transforms.ToDtype(torch.float32, scale=True),
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])
])

# Datasets
train_dataset_64 = ImageFolder(root=file_path + r"\train",
                               transform=train_transform_64)

gan_train_dataset_64 = ImageFolder(root=file_path + r"\train",
                                   transform=eval_transform_64)

valid_dataset_64 = ImageFolder(root=file_path + r"\valid",
                               transform=eval_transform_64)

test_dataset_64  = ImageFolder(root=file_path + r"\test",
                               transform=eval_transform_64)

batch_size = 32

train_loader_64     = DataLoader(train_dataset_64,     batch_size=batch_size, shuffle=True)
gan_train_loader_64 = DataLoader(gan_train_dataset_64, batch_size=batch_size, shuffle=True)
valid_loader_64     = DataLoader(valid_dataset_64,     batch_size=batch_size, shuffle=True)
test_loader_64      = DataLoader(test_dataset_64,      batch_size=batch_size, shuffle=False)

num_classes = 2
print("Classes:", train_dataset_64.classes)
print("Classes to index :", train_dataset_64.class_to_idx)


Classes: ['Acc', 'Nat']
Classes to index : {'Acc': 0, 'Nat': 1}


In [3]:
class CrashClassifierCNN(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        self.features = nn.Sequential(
            # (3, 64, 64) -> (64, 32, 32)
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            # (64, 32, 32) -> (128, 16, 16)
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            # (128, 16, 16) -> (256, 8, 8)
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),

            # (256, 8, 8) -> (512, 4, 4)
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 4 * 4, 256),
            nn.ReLU(),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


In [4]:
def train_classifier_epoch(data_loader, model, optimizer, loss_fn):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total = 0

    for X, y in data_loader:
        optimizer.zero_grad()
        logits = model(X)
        loss = loss_fn(logits, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * len(y)
        preds = logits.argmax(dim=1)
        total_correct += (preds == y).sum().item()
        total += len(y)

    avg_loss = total_loss / total
    avg_acc  = total_correct / total
    return avg_loss, avg_acc


def eval_classifier_epoch(data_loader, model, loss_fn):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total = 0

    with torch.no_grad():
        for X, y in data_loader:
            logits = model(X)
            loss = loss_fn(logits, y)

            total_loss += loss.item() * len(y)
            preds = logits.argmax(dim=1)
            total_correct += (preds == y).sum().item()
            total += len(y)

    avg_loss = total_loss / total
    avg_acc  = total_correct / total
    return avg_loss, avg_acc

In [5]:
baseline_model_64 = CrashClassifierCNN(num_classes=2)
ce_loss = nn.CrossEntropyLoss()
optimizer_base = torch.optim.Adam(baseline_model_64.parameters(), lr=1e-3)

num_cls_epochs = 10  # you can increase later

for epoch in range(num_cls_epochs):
    train_loss, train_acc = train_classifier_epoch(
        train_loader_64, baseline_model_64, optimizer_base, ce_loss
    )
    valid_loss, valid_acc = eval_classifier_epoch(
        valid_loader_64, baseline_model_64, ce_loss
    )
    print(
        f"[Baseline] Epoch {epoch:02d} | "
        f"train_loss={train_loss:.3f}, train_acc={train_acc:.3f} | "
        f"valid_loss={valid_loss:.3f}, valid_acc={valid_acc:.3f}"
    )

test_loss_base, test_acc_base = eval_classifier_epoch(
    test_loader_64, baseline_model_64, ce_loss
)
print(f"\nBaseline Test: loss={test_loss_base:.3f}, acc={test_acc_base:.3f}")


[Baseline] Epoch 00 | train_loss=0.554, train_acc=0.819 | valid_loss=0.753, valid_acc=0.779
[Baseline] Epoch 01 | train_loss=0.149, train_acc=0.943 | valid_loss=1.168, valid_acc=0.746
[Baseline] Epoch 02 | train_loss=0.093, train_acc=0.969 | valid_loss=1.066, valid_acc=0.730
[Baseline] Epoch 03 | train_loss=0.065, train_acc=0.983 | valid_loss=1.563, valid_acc=0.697
[Baseline] Epoch 04 | train_loss=0.057, train_acc=0.981 | valid_loss=1.629, valid_acc=0.697
[Baseline] Epoch 05 | train_loss=0.041, train_acc=0.978 | valid_loss=2.027, valid_acc=0.713
[Baseline] Epoch 06 | train_loss=0.040, train_acc=0.985 | valid_loss=1.246, valid_acc=0.746
[Baseline] Epoch 07 | train_loss=0.036, train_acc=0.987 | valid_loss=1.662, valid_acc=0.738
[Baseline] Epoch 08 | train_loss=0.042, train_acc=0.980 | valid_loss=1.517, valid_acc=0.713
[Baseline] Epoch 09 | train_loss=0.041, train_acc=0.985 | valid_loss=1.261, valid_acc=0.705

Baseline Test: loss=1.600, acc=0.585


In [6]:
latent_dim   = 100
num_classes  = 2
total_latent_dim = latent_dim + num_classes  # noise + one-hot labels

# Generator: (noise+label) -> (3, 64, 64)
generator_cnn = nn.Sequential(
    nn.Linear(total_latent_dim, 4 * 4 * 512),
    nn.Unflatten(1, (512, 4, 4)),

    nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.ReLU(True),

    nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.ReLU(True),

    nn.ConvTranspose2d(128, 64,  kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.ReLU(True),

    nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False),
    nn.Tanh()   # -> [-1, 1]
)

# Discriminator: (image + label maps) -> realism logit
in_channels_D = 3 + num_classes

discriminator_cnn = nn.Sequential(
    nn.Conv2d(in_channels_D, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.LeakyReLU(0.2, inplace=True),

    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),

    nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True),

    nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True),

    nn.Flatten(1),
    nn.Linear(512 * 4 * 4, 1)   # BCEWithLogitsLoss
)


In [7]:
def train_gan_epoch(data_loader,
                    discriminator,
                    generator,
                    discriminator_optimizer,
                    generator_optimizer,
                    loss_function,
                    latent_dim,
                    num_classes):

    discriminator.train()
    generator.train()

    total_d_loss = 0.0
    total_g_loss = 0.0
    total_batches = 0

    for real, labels in data_loader:
        batch_size = real.size(0)
        total_batches += 1

        # one-hot labels
        y_onehot = F.one_hot(labels, num_classes=num_classes).float()

        # label maps for D: expand to (N, num_classes, H, W)
        label_maps = y_onehot.unsqueeze(2).unsqueeze(3).expand(
            -1, -1, real.size(2), real.size(3)
        )

        ########################
        # 1. Train Discriminator
        ########################
        Z = torch.randn(batch_size, latent_dim)
        Z_cond = torch.cat((Z, y_onehot), dim=1)
        fake = generator(Z_cond)   # (N, 3, 64, 64)

        real_input = torch.cat((real, label_maps), dim=1)
        fake_input = torch.cat((fake, label_maps), dim=1)

        X_disc = torch.cat((real_input, fake_input), dim=0)
        y_disc = torch.cat(
            (torch.ones(batch_size), torch.zeros(batch_size))
        ).unsqueeze(1)

        pred = discriminator(X_disc)
        loss_d = loss_function(pred, y_disc)

        discriminator_optimizer.zero_grad()
        loss_d.backward()
        discriminator_optimizer.step()

        total_d_loss += loss_d.item()

        ########################
        # 2. Train Generator
        ########################
        Z = torch.randn(batch_size, latent_dim)
        Z_cond = torch.cat((Z, y_onehot), dim=1)
        gen_fake = generator(Z_cond)

        gen_fake_input = torch.cat((gen_fake, label_maps), dim=1)
        y_gen = torch.ones(batch_size).unsqueeze(1)

        pred_fake = discriminator(gen_fake_input)
        loss_g = loss_function(pred_fake, y_gen)

        generator_optimizer.zero_grad()
        loss_g.backward()
        generator_optimizer.step()

        total_g_loss += loss_g.item()

    avg_d_loss = total_d_loss / total_batches
    avg_g_loss = total_g_loss / total_batches

    return total_d_loss, avg_d_loss, total_g_loss, avg_g_loss

In [None]:
loss_function_gan = nn.BCEWithLogitsLoss()

discriminator_optimizer = torch.optim.Adam(
    discriminator_cnn.parameters(), lr=2e-4, betas=(0.5, 0.999)
)
generator_optimizer = torch.optim.Adam(
    generator_cnn.parameters(), lr=2e-4, betas=(0.5, 0.999)
)

num_gan_epochs = 40  # more than before

for epoch in range(num_gan_epochs):
    total_d, avg_d, total_g, avg_g = train_gan_epoch(
        gan_train_loader_64,
        discriminator_cnn,
        generator_cnn,
        discriminator_optimizer,
        generator_optimizer,
        loss_function_gan,
        latent_dim,
        num_classes
    )
    print(f"[GAN] Epoch {epoch:02d} | D_loss={avg_d:.3f} | G_loss={avg_g:.3f}")


[GAN] Epoch 00 | D_loss=0.051 | G_loss=0.394
[GAN] Epoch 01 | D_loss=0.001 | G_loss=0.070


In [None]:
def sample_conditioned_fake_images(generator, class_label, n_samples=16):
    labels = torch.full((n_samples,), class_label, dtype=torch.long)
    y_onehot = F.one_hot(labels, num_classes=num_classes).float()

    Z = torch.randn(n_samples, latent_dim)
    Z_cond = torch.cat((Z, y_onehot), dim=1)

    with torch.no_grad():
        fake = generator(Z_cond)   # (n_samples, 3, 64, 64)
    return fake, labels

def to_img(x):
    # from [-1,1] to [0,1] for plotting
    return (x + 1) / 2

# Plot fake crash (class 0)
fake_crash, _ = sample_conditioned_fake_images(generator_cnn, class_label=0, n_samples=16)

fig, ax = plt.subplots(4, 4, figsize=(5,5))
ax = ax.flatten()
for i in range(16):
    img = to_img(fake_crash[i]).permute(1,2,0).cpu().numpy()
    ax[i].imshow(img)
    ax[i].set_axis_off()
plt.suptitle("Generated crash vehicles (Class = 0)")
plt.tight_layout()
plt.show()

# Plot fake intact (class 1)
fake_intact, _ = sample_conditioned_fake_images(generator_cnn, class_label=1, n_samples=16)

fig, ax = plt.subplots(4, 4, figsize=(5,5))
ax = ax.flatten()
for i in range(16):
    img = to_img(fake_intact[i]).permute(1,2,0).cpu().numpy()
    ax[i].imshow(img)
    ax[i].set_axis_off()
plt.suptitle("Generated intact vehicles (Class = 1)")
plt.tight_layout()
plt.show()


In [None]:
def as_tensors(loader):
    X_list, y_list = [], []
    for X, y in loader:
        X_list.append(X)
        y_list.append(y)
    return torch.cat(X_list, dim=0), torch.cat(y_list, dim=0)

X_real, y_real = as_tensors(gan_train_loader_64)
print("Real train:", X_real.shape, y_real.shape)


In [None]:
def generate_cgan_samples(generator, n_per_class, latent_dim, num_classes=2):
    all_imgs = []
    all_labels = []

    for c in range(num_classes):
        labels = torch.full((n_per_class,), c, dtype=torch.long)
        y_onehot = F.one_hot(labels, num_classes=num_classes).float()
        Z = torch.randn(n_per_class, latent_dim)
        Z_cond = torch.cat((Z, y_onehot), dim=1)

        with torch.no_grad():
            fake = generator(Z_cond)   # (n_per_class, 3, 64, 64)

        all_imgs.append(fake)
        all_labels.append(labels)

    X_fake = torch.cat(all_imgs, dim=0)
    y_fake = torch.cat(all_labels, dim=0)
    return X_fake, y_fake

# Count real examples per class
n_real_crash  = (y_real == 0).sum().item()
n_real_intact = (y_real == 1).sum().item()
print("Real counts:", n_real_crash, n_real_intact)

# Choose how many fakes per class
n_per_class = max(n_real_crash, n_real_intact)

X_fake, y_fake = generate_cgan_samples(
    generator_cnn,
    n_per_class=n_per_class,
    latent_dim=latent_dim,
    num_classes=num_classes
)

print("Fake set:", X_fake.shape, y_fake.shape)


In [None]:
X_aug = torch.cat([X_real, X_fake], dim=0)
y_aug = torch.cat([y_real, y_fake], dim=0)

aug_dataset = TensorDataset(X_aug, y_aug)
aug_loader  = DataLoader(aug_dataset, batch_size=32, shuffle=True)

print("Augmented train set:", X_aug.shape, y_aug.shape)

In [None]:
aug_model = CrashClassifierCNN(num_classes=2)
optimizer_aug = torch.optim.Adam(aug_model.parameters(), lr=1e-3)

num_aug_epochs = 10  # can increase later

for epoch in range(num_aug_epochs):
    train_loss, train_acc = train_classifier_epoch(
        aug_loader, aug_model, optimizer_aug, ce_loss
    )
    valid_loss, valid_acc = eval_classifier_epoch(
        valid_loader_64, aug_model, ce_loss
    )
    print(
        f"[GAN-Aug] Epoch {epoch:02d} | "
        f"train_loss={train_loss:.3f}, train_acc={train_acc:.3f} | "
        f"valid_loss={valid_loss:.3f}, valid_acc={valid_acc:.3f}"
    )

test_loss_aug, test_acc_aug = eval_classifier_epoch(
    test_loader_64, aug_model, ce_loss
)

print("\n=== Final Test Comparison (64x64, RGB, Tanh) ===")
print(f"Baseline Test:      loss={test_loss_base:.3f}, acc={test_acc_base:.3f}")
print(f"GAN-Augmented Test: loss={test_loss_aug:.3f}, acc={test_acc_aug:.3f}")