![image.png](attachment:59538bc3-73cd-4f39-93e1-f5d506f11cde.png)
![image.png](attachment:8931759e-6b2b-4cf4-808a-39c61aafeba2.png)

This notebook uses Generative Adversarial Networks (GANs) to generate synthetic pneumonia-positive X-ray images. The goal is to generate hyperrealistic images that can be used to augment the existing dataset for better model training. The current demo has yet to achieve this goal, but the plan is to explore the application of Generative Adversarial Networks (GANs) to generate new and accurate medical images in the future.

**The primary aim is to train the GAN to generate highly realistic synthetic X-ray images that closely resemble the real ones in the dataset. These synthetic images can then be utilized for future data augmentation.**

As of the current demonstration, the generated images might not yet reach clinically accurate levels. However, **a more complex architecture has the potential to generate highly realistic and accurate pneumonia-positive X-ray images.** The synthetic images produced can effectively augment existing datasets of pneumonia-positive X-rays, leading to improved accuracy and performance of pneumonia detection models

The main takeaway from this project is the potential of GANs in medical image generation for data augmentation, enabling the development of more effective diagnostic tools for pneumonia and other medical conditions.


# Configuration Details:

* GPU Accelerator: T4 (2 units)
* Dataset: **[Chest X-ray Pneumonia](https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia)** 
* Dataset Size: 3875
* Output Image Size: 512x512
* Optimizer: Adam
* Learning Rate: 0.0002
* Loss Function: Binary Crossentropy
* Generator: 1 Dense, 6 Convolutional Transpose, 1 Convolutional
* Discriminator: 4 Convolutional, 4 MaxPooling, 1 Flatten, 2 Dense

# Import

In [None]:
import os
import time
import cv2 as cv
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.utils import np_utils
from keras.models import Sequential, Model
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.layers import Input, Dense, BatchNormalization, Conv2D, Conv2DTranspose, ReLU, LeakyReLU, Flatten, MaxPooling2D, Dropout, Reshape

# GPU Initialization

In [None]:
strategy = tf.distribute.MirroredStrategy()
print('DEVICES AVAILABLE: {}'.format(strategy.num_replicas_in_sync))

# Initialization

In [None]:
#initially I went for 128*128, later decided to go with 512*512 image size
BUFFER_SIZE = 64000
BATCH_SIZE = 32*strategy.num_replicas_in_sync
batch_size = BATCH_SIZE
EPOCHS = 50
latent_dim = 128
input_size = [256*2, 256*2, 3]
image_size = (256*2, 256*2)

# Data Preprocessing

In [None]:
#Creating dataset with ImageDataGenerator is way simpler than other methods
datagen = ImageDataGenerator(
    rescale=1./255,   
# Initially created the datatest with some augmentation, then realised, bad idea.
#     shear_range=0.2,      
#     zoom_range=0.2,       
#     horizontal_flip=True 
)

image_directory = '/kaggle/input/chest-xray-pneumonia/chest_xray'

dataset= datagen.flow_from_directory(
    os.path.join(image_directory, 'train'),   
    classes=['PNEUMONIA'],   
    target_size=image_size,        
    batch_size=BATCH_SIZE,      
    class_mode='binary',        
    shuffle=True                 
)


In [None]:
len(dataset)

# Generator Model

In [None]:
def gen_model():
    #in case you get OOM error, change the filter size, set it to a smaller value, 28 for example
    model = Sequential([
        Input(shape = (latent_dim,)),
        Dense(8*8*256),
        Reshape((8, 8, 256)),
        Conv2DTranspose(128*2, kernel_size = 4, strides = 2, padding = 'same'), 
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128*3, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128*3, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128*4, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128*5, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128*6, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2D(3, kernel_size =4, padding = 'same', activation = 'sigmoid')
    ],
        name = "generator"
    )
    return model

# Discriminator Model

In [None]:
def disc_model():
    #in case you get OOM error, change the filter size, set it to a smaller value, 256 or lower for example
    #keep reducing that value untill the error goes away.
    model = Sequential([
        Input(shape = input_size),
        Conv2D(256, kernel_size = 4, strides= 2, padding = 'same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides = 2),
        Conv2D(256*2, kernel_size=4, strides=2, padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        Conv2D(256*3, kernel_size=4, strides=2, padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        Conv2D(256*4, kernel_size=4, strides=2, padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        
        Flatten(),
        Dense(256*4),
        LeakyReLU(alpha=0.1),
        Dropout(0.2),
        Dense(1, activation = 'sigmoid')
    ], 
        name = "discriminator"
    )
    return model



In [None]:
with strategy.scope(): 
    #In order to utilize mutliple GPU, 
    #you  must declare model, optimizers and checkpoints inside of a scope 
    generator = gen_model()
    discriminator = disc_model()


In [None]:
generator.summary()
discriminator.summary()

In [None]:
#helper funtion to help us with loadidng images in batches
def image_loader(generator):
    for images, labels in generator:
        yield images, labels

# GAN with Custom Traning Step

In [None]:
#gan model with custom gradient calculation
class Gan(Model):
    def __init__(self, discriminator, generator, latent_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
    
    def compile(self, disc_opt, gen_opt, loss_function):
        super().compile()
        self.disc_opt = disc_opt
        self.gen_opt = gen_opt
        self.loss_function = loss_function
        self.disc_loss_metric = tf.keras.metrics.Mean(name = "disc_loss")
        self.gen_loss_metric = tf.keras.metrics.Mean(name = "gen_loss")
        
    @property
    def metrics(self):
        return [self.disc_loss_metric, self.gen_loss_metric]
    
    #custom training step
    def train_step(self, data):  # Modify the function to accept labels separately
        real_images, real_labels = data
        batch_size = tf.shape(real_images)[0]

        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Fake image decoding
        generated_images = self.generator(random_latent_vectors)

        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Concatenate the real and fake labels
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )

        
        labels += 0.05*tf.random.uniform(tf.shape(labels))
        
        
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            disc_loss = self.loss_function(labels, predictions)
            
        grads  = tape.gradient(disc_loss, self.discriminator.trainable_weights)
        self.disc_opt.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )
        
        
        random_latent_vectors = tf.random.normal(shape = (batch_size,self.latent_dim))
        
        misleading_labels = tf.zeros((batch_size, 1))
        
        
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            
            gen_loss = self.loss_function(misleading_labels, predictions)
            
        grads = tape.gradient(gen_loss, self.generator.trainable_weights)
        self.gen_opt.apply_gradients(zip(grads, self.generator.trainable_weights))
        
        self.disc_loss_metric.update_state(disc_loss)
        self.gen_loss_metric.update_state(gen_loss)
        return{
            "disc_loss": self.disc_loss_metric.result(),
            "gen_loss": self.gen_loss_metric.result()
        }
                                                 

In [None]:
#helper functiont to generated a image using current generator model
#It was initially desgined to use inside of a  callback, but can be used outside of that too. 
#Use 1 as defalut parameter when calling independently. 
def gen_images(current_epoch):
    noise = tf.random.normal([2, latent_dim])
    num_of_sample = 2
    generated_images = generator(noise, training = False)
    figure = plt.figure(figsize=(20,20))
    for i in range(generated_images.shape[0]):
        plt.subplot(2, 2,i+1)
        plt.imshow(generated_images[i, :, :, 0, ], cmap = 'gray')
        plt.title(f"After epoch {current_epoch}")        
        plt.axis('off')
    plt.savefig('After epochs{:04d}.png'.format(current_epoch))
    plt.show()

# Callbacks 

In [None]:
#callbacks. We are showing progress of gan and also saving samples after each epochs
class Gan_Callback(tf.keras.callbacks.Callback):
    def __init__(self, num_images=2, latent_dim = 128):
        self.num_images = num_images
        self.latent_dim = latent_dim       
    
    def on_epoch_end(self, epoch, logs =None):
        latent_vectors = tf.random.normal(shape = (self.num_images, latent_dim))
        generated_images = self.model.generator(latent_vectors)
        generated_images *=255
        generated_images.numpy()
        figure = plt.figure(figsize=(10,10))
        for i in range(generated_images.shape[0]):
            plt.subplot(2, 2,i+1)
            plt.imshow(generated_images[i, :, :, 0, ], cmap='gray')
            plt.title(f"After epoch {epoch+1}")
            plt.axis('off')
        plt.savefig('After epochs{:04d}.png'.format(epoch+1))
        plt.show()
        if(epoch % 10 ==0):
            self.model.generator.save('/kaggle/working/gen.h5')
            self.model.discriminator.save('/kaggle/working/disc.h5')

In [None]:
with strategy.scope():
    #In order to utilize mutliple GPU, 
    #you  must declare model, optimizers and checkpoints inside of a scope 
    gan = Gan(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
    gan.compile(
        disc_opt=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5), 
        gen_opt=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
        # Parallel gpu computing won't work unless  we pass reduction=tf.keras.losses.Reduction.NONE as a parameter too.
        loss_function=tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE),
    )


# Traning

In [None]:
#actual traing begins here
history = gan.fit(
    image_loader(dataset), 
    epochs=EPOCHS,
    steps_per_epoch=len(dataset),  
    callbacks=[Gan_Callback(num_images=4, latent_dim=latent_dim)]
)
