In [2]:
# =========================
# IMPORT REQUIRED LIBRARIES
# =========================

import os
import boto3
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.utils import save_image
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm


# =========================
# AWS S3 CONFIGURATION
# =========================

BUCKET_NAME = "ai-bmi-predictor"

INPUT_PREFIX = "tight and loose classifier/orginal dataset/Dataset/Training Data/"
OUTPUT_PREFIX = "tight and loose classifier/synthetic data/Training Data/"

print("Connecting to S3...")
s3 = boto3.client("s3")
print("Connected to S3")


# =========================
# IMAGE SETTINGS
# =========================

IMAGE_SIZE = 64
CHANNELS = 3
LATENT_DIM = 100
NUM_CLASSES = 2
BATCH_SIZE = 16
EPOCHS = 300

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")


# =========================
# IMAGE TRANSFORMS
# =========================

transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])


# =========================
# CUSTOM S3 DATASET
# =========================

class S3ImageDataset(Dataset):
    def __init__(self, prefix, transform):
        self.images = []
        self.labels = []
        self.transform = transform

        print("Loading images from S3...")

        for label, folder in enumerate(["Loose/", "Tight/"]):
            response = s3.list_objects_v2(
                Bucket=BUCKET_NAME,
                Prefix=prefix + folder
            )
            for obj in response.get("Contents", []):
                self.images.append(obj["Key"])
                self.labels.append(label)

        print(f"Total images loaded: {len(self.images)}")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        obj = s3.get_object(Bucket=BUCKET_NAME, Key=self.images[idx])
        img = Image.open(obj["Body"]).convert("RGB")
        img = self.transform(img)
        label = self.labels[idx]
        return img, label


dataset = S3ImageDataset(INPUT_PREFIX, transform)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

print("Dataset ready")


# =========================
# CONDITIONAL DCGAN GENERATOR
# =========================

class Generator(nn.Module):
    def __init__(self):
        super().__init__()

        self.label_emb = nn.Embedding(NUM_CLASSES, LATENT_DIM)

        self.net = nn.Sequential(
            nn.ConvTranspose2d(LATENT_DIM * 2, 512, 4, 1, 0, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(True),

            nn.ConvTranspose2d(512, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(True),

            nn.ConvTranspose2d(256, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(True),

            nn.ConvTranspose2d(128, 64, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True),

            nn.ConvTranspose2d(64, CHANNELS, 4, 2, 1, bias=False),
            nn.Tanh()
        )

    def forward(self, noise, labels):
        label_embedding = self.label_emb(labels)
        x = torch.cat((noise, label_embedding), dim=1)
        x = x.unsqueeze(2).unsqueeze(3)
        return self.net(x)


# =========================
# CONDITIONAL DCGAN DISCRIMINATOR
# =========================

class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()

        self.label_emb = nn.Embedding(NUM_CLASSES, IMAGE_SIZE * IMAGE_SIZE)

        self.net = nn.Sequential(
            nn.Conv2d(CHANNELS + 1, 64, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, True),

            nn.Conv2d(64, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, True),

            nn.Conv2d(128, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, True),

            nn.Conv2d(256, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, True),

            nn.Conv2d(512, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, img, labels):
        label_map = self.label_emb(labels).view(-1, 1, IMAGE_SIZE, IMAGE_SIZE)
        x = torch.cat((img, label_map), dim=1)
        return self.net(x).view(-1, 1)


# =========================
# INITIALIZE MODELS
# =========================

generator = Generator().to(DEVICE)
discriminator = Discriminator().to(DEVICE)

criterion = nn.BCELoss()

g_optimizer = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=0.0001, betas=(0.5, 0.999))

print("Models initialized")


# =========================
# TRAINING LOOP
# =========================

print("Starting DCGAN training...")

for epoch in range(EPOCHS):
    for batch_idx, (imgs, labels) in enumerate(dataloader):

        imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
        batch_size = imgs.size(0)

        real = torch.full((batch_size, 1), 0.9, device=DEVICE)
        fake = torch.zeros(batch_size, 1, device=DEVICE)

        # Train Discriminator
        noise = torch.randn(batch_size, LATENT_DIM, device=DEVICE)
        gen_imgs = generator(noise, labels)

        d_real = criterion(discriminator(imgs, labels), real)
        d_fake = criterion(discriminator(gen_imgs.detach(), labels), fake)
        d_loss = d_real + d_fake

        d_optimizer.zero_grad()
        d_loss.backward()
        d_optimizer.step()

        # Train Generator
        g_loss = criterion(discriminator(gen_imgs, labels), real)

        g_optimizer.zero_grad()
        g_loss.backward()
        g_optimizer.step()

        if batch_idx % 5 == 0:
            print(
                f"Epoch [{epoch+1}/{EPOCHS}] "
                f"Batch [{batch_idx}/{len(dataloader)}] "
                f"D Loss: {d_loss.item():.4f} "
                f"G Loss: {g_loss.item():.4f}"
            )

    print(f"Epoch {epoch+1} completed")

print("Training finished")


# =========================
# GENERATE & UPLOAD IMAGES
# =========================

generator.eval()

def generate_and_upload(label, name):
    print(f"Generating {name} images")

    for i in range(1000):
        noise = torch.randn(1, LATENT_DIM, device=DEVICE)
        lbl = torch.tensor([label], device=DEVICE)

        img = generator(noise, lbl)
        img = (img + 1) / 2

        filename = f"{name}_{i}.png"
        path = f"/tmp/{filename}"

        save_image(img, path)

        s3.upload_file(
            path,
            BUCKET_NAME,
            f"{OUTPUT_PREFIX}{name}/{filename}"
        )

        if i % 100 == 0:
            print(f"{name}: {i}/1000 uploaded")

    print(f"{name} generation completed")


generate_and_upload(0, "Loose")
generate_and_upload(1, "Tight")

print("All DCGAN images generated and uploaded")


Connecting to S3...
Connected to S3
Using device: cuda
Loading images from S3...
Total images loaded: 100
Dataset ready
Models initialized
Starting DCGAN training...
Epoch [1/300] Batch [0/7] D Loss: 1.4071 G Loss: 3.3136
Epoch [1/300] Batch [5/7] D Loss: 2.4201 G Loss: 2.3094
Epoch 1 completed
Epoch [2/300] Batch [0/7] D Loss: 1.8791 G Loss: 2.7638
Epoch [2/300] Batch [5/7] D Loss: 1.8761 G Loss: 2.5804
Epoch 2 completed
Epoch [3/300] Batch [0/7] D Loss: 1.2539 G Loss: 2.8439
Epoch [3/300] Batch [5/7] D Loss: 1.2673 G Loss: 3.0392
Epoch 3 completed
Epoch [4/300] Batch [0/7] D Loss: 1.0817 G Loss: 2.7394
Epoch [4/300] Batch [5/7] D Loss: 1.2848 G Loss: 2.6787
Epoch 4 completed
Epoch [5/300] Batch [0/7] D Loss: 1.1077 G Loss: 2.5044
Epoch [5/300] Batch [5/7] D Loss: 1.4107 G Loss: 2.9071
Epoch 5 completed
Epoch [6/300] Batch [0/7] D Loss: 1.5348 G Loss: 2.1644
Epoch [6/300] Batch [5/7] D Loss: 1.4229 G Loss: 2.1121
Epoch 6 completed
Epoch [7/300] Batch [0/7] D Loss: 1.3412 G Loss: 2.583