Defining the GAN Architecture
1. The Generator
2. The Discriminator
3. Connecting the Generator & Discriminator
4. Compiling the Model

The Generator:

In [None]:
# Generator's output shape must be the same shape as the cropped real image: (210, 180, 1)

from tensorflow.keras import layers, models

dim = 100

def build_generator(latent_dim, output_shape=(210, 180, 1)):
    model = models.Sequential(name="ChemoCraft_Generator")
    print("Building Generator")

    model.add(layers.Input(shape=(latent_dim,)))
    model.add(layers.Dense(128, activation="relu"))
    model.add(layers.Dense(256 * 32, activation="relu"))
    model.add(layers.Reshape(target_shape=(16, 16, 32)))
    model.add(layers.Conv2DTranspose(filters=32, kernel_size=5, strides=6, padding="same", activation="relu"))
    model.add(layers.Conv2DTranspose(filters=8, kernel_size=3, strides=5, padding="same", activation="relu"))
    
    prev_out = model.layers[-1].output.shape

    model.add(layers.Conv2D(1, kernel_size=(prev_out[1]-output_shape[0]+1, prev_out[2]-output_shape[1]+1), strides=1, padding="valid", activation="tanh"))

    return model

chemocraft_generator = build_generator(latent_dim=dim)
chemocraft_generator.summary()

The Discriminator:

In [None]:
# Discriminator's input shape should be the same shape as the generator's output: (210, 180, 1)

shape = (210, 180, 1)

def build_discriminator(input_shape=shape):

    model = models.Sequential(name="ChemoCraft_Discriminator")
    print("Building Discriminator Model")

    model.add(layers.Input(shape=input_shape))
    model.add(layers.Conv2D(filters=16, kernel_size=9, strides=5, padding="same", activation="relu")) 
    model.add(layers.Conv2D(filters=32, kernel_size=5, strides=4, padding="same", activation="relu"))
    model.add(layers.Conv2D(filters=128, kernel_size=3, strides=3, padding="same", activation="relu"))
    model.add(layers.Flatten())
    model.add(layers.Dense(1, activation="sigmoid", name="output"))

    return model
   
chemocraft_discriminator = build_discriminator(shape)
chemocraft_discriminator.summary()

Connecting Generator & Discriminator through the GAN:

In [None]:
def compile_gan(generator, discriminator, latent_dim):
    discriminator.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    z = layers.Input(shape=(latent_dim,))
    img = generator(z)
    discriminator.trainable = False
    validity = discriminator(img)
    gan = models.Model(z, validity)
    gan.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return gan

chemocraft_gan = compile_gan(chemocraft_generator, chemocraft_discriminator,latent_dim=dim)
chemocraft_gan.summary()

Setting up Data Pipeline for Training:

In [None]:
s3 = boto3.resource('s3')
bucket_name = 'chemocraft-data'
folder_path = 'MICCAI_BraTS2020_TrainingData/'
bucket = s3.Bucket(bucket_name)

folder_path = 'tanmay/brain_slices/'

keys = []

for obj in bucket.objects.filter(Prefix=folder_path):
    if obj.key.endswith('.png'):
        sample_key = obj.key.split('/')[-3] # Getting the Brain numbers.
        if sample_key not in keys:
            keys.append(sample_key)

print(len(keys))

In [None]:
from keras.preprocessing.image import load_img

folder_path = 'tanmay/brain_slices/'

def load_images(bucket_name, folder_path, folder_suffix):
    directory = f"{folder_path}{folder_suffix}/"
    print(f"Loading images from S3 Bucket: {bucket_name}{directory}")
    images = []

    for obj in bucket_name.objects.filter(Prefix=directory):
        if obj.key.endswith('.png'):
            try:
                file_stream = io.BytesIO(obj.get()['Body'].read())
                image = load_img(file_stream, target_size=(210, 180), color_mode='grayscale')
                print(f"Adding {obj.key.removeprefix("tanmay/brain_slices/")} into an array.")
                image = np.array(image) / 255.0  # Normalize to [0, 1]
                images.append(image)
            
            except Exception as e:
                print(f"Error loading image {obj.key}: {e}")

    return np.array(images)

my_arr = load_images(bucket_name=bucket, folder_path=folder_path, folder_suffix="320/flair") # Testing functionality on a small folder
print(my_arr.shape)

Batch Training:

In [None]:
import tensorflow as tf

def train_gan(generator, latent_dim, discriminator, gan, training_array, batch_size):
    for _ in range(len(training_array) // batch_size):
        # Select random batch of real images
        idx = np.random.randint(0, len(training_array), batch_size)
        real_slices = np.array([training_array[i] for i in idx])

        # Generate fake images
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        fake_slices = generator.predict(noise)

        # Train the discriminator
        d_loss_real = discriminator.train_on_batch(real_slices, np.ones((batch_size, 1)))
        d_loss_fake = discriminator.train_on_batch(fake_slices, np.zeros((batch_size, 1)))
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # Train the generator
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        g_loss = gan.train_on_batch(noise, np.ones((batch_size, 1)))

        print(f"D Loss: {d_loss[0]}, G Loss: {g_loss[0]}")

In [None]:
folder_path = 'tanmay/brain_slices/'

epochs = 150

for epoch in range(epochs):
    for key in keys:
        brain_array = load_images(bucket_name=bucket, folder_path=folder_path, folder_suffix=key)
        print("Training Gan now:")
        train_gan(generator=chemocraft_generator, latent_dim=dim, discriminator=chemocraft_discriminator, gan=chemocraft_gan, training_array=brain_array, batch_size=5)
        print(f"Epoch {epoch + 1}/{epochs}")