# MD-cGAN

## Kaggle environment

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Data reading

In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from sklearn.utils import shuffle

# Define the labels
labels = ['glioma', 'meningioma', 'notumor', 'pituitary']

# Load the data (train)
X_train = []
y_train = []
y_train_new = []

# Image size: 112x112
image_size = 112

for label in labels:
    folder_path = os.path.join('../input/brain-tumor-mri-dataset/', 'Training', label)
    for filename in tqdm(os.listdir(folder_path)):
        image = cv2.imread(os.path.join(folder_path, filename))
        image = cv2.resize(image, (image_size, image_size))
        # Convert image to grayscale if needed
        # image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        X_train.append(image)
        y_train.append(label)

# Convert lists to arrays
X_train = np.array(X_train)
y_train = np.array(y_train)

# Convert y_train to integers
# glioma -> 0, meningioma -> 1, notumor -> 2, pituitary -> 3
for label in y_train:
    y_train_new.append(labels.index(label))

# Shuffle the data
X_train, y_train_new = shuffle(X_train, y_train_new, random_state=1970)

# Create the dataset
dataset_bio = (X_train, np.asarray(y_train_new))

In [None]:
import matplotlib.pyplot as plt

# Define the grid size for visualization
grid_width = 4
grid_height = 4

# Create subplots
fig, axes = plt.subplots(grid_width, grid_height)
fig.set_size_inches(8, 8)

img_idx = 0
for i in range(grid_width):
    for j in range(grid_height):
        # Turn off axis and set title
        axes[i, j].axis('off')
        axes[i, j].set_title(y_train[img_idx])
        
        # Display the image
        axes[i, j].imshow(X_train[img_idx])
        
        img_idx += 1

# Adjust the spacing between subplots
plt.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=0.2, hspace=0.4)

# Save the image file
plt.savefig("img_real.png", bbox_inches="tight")


## Discriminator model

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Dense, Reshape, Concatenate, Conv2D, LeakyReLU, Flatten, Dropout
from tensorflow.keras.optimizers import Adam

def define_discriminator(in_shape=(112, 112, 3), n_classes=4):
    # Input for labels
    in_label = Input(shape=(1,))
    # Embed the labels
    embedded_labels = Embedding(n_classes, 50)(in_label)
    # Scale to the size of images
    n_nodes = in_shape[0] * in_shape[1]
    embedded_labels = Dense(n_nodes)(embedded_labels)
    # Reshape to add an additional channel
    embedded_labels = Reshape((in_shape[0], in_shape[1], 1))(embedded_labels)
    
    # Input for images
    in_image = Input(shape=in_shape)
    # Concatenate image and label
    merged = Concatenate()([in_image, embedded_labels])
    
    # Convolutional layers for downscaling
    fe = Conv2D(128, (3, 3), strides=(2, 2), padding='same', input_shape=in_shape)(merged)
    fe = LeakyReLU(alpha=0.2)(fe)
    
    fe = Conv2D(128, (3, 3), strides=(2, 2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    
    fe = Conv2D(128, (3, 3), strides=(2, 2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    
    fe = Conv2D(128, (3, 3), strides=(2, 2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    
    # Flatten the feature map
    fe = Flatten()(fe)
    
    # Apply dropout
    fe = Dropout(0.4)(fe)
    
    # Output layer
    out_layer = Dense(1, activation='sigmoid')(fe)
    
    # Define the model
    model = Model([in_image, in_label], out_layer)
    
    # Compile the model
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
    
    return model


In [None]:
from tensorflow.keras.utils import plot_model

# Define the model
model = define_discriminator()

# Summarize the model
model.summary()

# Plot the model
plot_model(model, to_file='discriminator_plot.png', show_shapes=True, show_layer_names=True)


## Generator model

In [None]:
from tensorflow.keras.layers import Conv2DTranspose

# Define the generator model
# Provide the latent space dimension and number of classes
def define_generator(latent_dim, n_classes=4):
    # Input for labels
    in_label = Input(shape=(1,))
    # Embedding for labels
    li = Embedding(n_classes, 50)(in_label)
    # Multiplication
    n_nodes = 7 * 7
    li = Dense(n_nodes)(li)
    # Reshape to additional channel
    li = Reshape((7, 7, 1))(li)
    # Input for latent space
    in_lat = Input(shape=(latent_dim,))
    # Base of the image (7x7)
    n_nodes = 128 * 7 * 7
    gen = Dense(n_nodes)(in_lat)
    gen = LeakyReLU(alpha=0.2)(gen)
    gen = Reshape((7, 7, 128))(gen)
    # Concatenate image and label
    merge = Concatenate()([gen, li])
    # Upsampling by transpose convolution (7x7 -> 14x14)
    gen = Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same')(merge)
    gen = LeakyReLU(alpha=0.2)(gen)
    # Upsampling by transpose convolution (14x14 -> 28x28)
    gen = Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # Upsampling by transpose convolution (28x28 -> 56x56)
    gen = Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # Upsampling by transpose convolution (56x56 -> 112x112)
    gen = Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    
    # Output
    out_layer = Conv2D(3, (7, 7), activation='tanh', padding='same')(gen)
    # Define the model (not compiled)
    model = Model([in_lat, in_label], out_layer)
    return model


In [None]:
# Define the model
model = define_generator(latent_dim=100, n_classes=4)
# Summarize the model
model.summary()
# Plot the model
plot_model(model, to_file='generator_plot.png', show_shapes=True, show_layer_names=True)


## Combined generator-discriminator model

In [None]:
# Calculate Median
import tensorflow_probability as tfp
# Calculate Mean, Maximum and Minimum
from tensorflow.math import reduce_mean, reduce_max, reduce_min

# Calculate Mean, Maximum and Minimum
from tensorflow.keras.layers import Average
from tensorflow.keras.layers import Minimum
from tensorflow.keras.layers import Maximum


# Define the combined generator-discriminator model for updating the generator
def define_gan(g_model, d_model, num_d, parameter):
    y = []
    # Freeze the discriminator models so they don't train
    for n_d in range(num_d):
        d_model[n_d].trainable = False
    # Get the noise input and label from the generator model
    gen_noise, gen_label = g_model.input
    # Get the output image from the generator model
    gen_output = g_model.output
    # Connect the generator's image and label as inputs to the discriminator
    if num_d == 1:
        gan_output = d_model[0]([gen_output, gen_label])
    else:
        for n_d in range(num_d):
            y.append(d_model[n_d]([gen_output, gen_label]))
        

        # Define the calculation (Mean, Maximum, Minimum or Median)
        if parameter == "Mean":
            #gan_output = Average()(y)
            gan_output = reduce_mean(y)
        elif parameter == "Maximum":
            #gan_output = reduce_max(y)
            gan_output = Maximum()(y)
        elif parameter == "Minimum":
            #gan_output = reduce_min(y)
            gan_output = Minimum()(y)
        elif parameter == "Median":
            gan_output = tfp.stats.percentile(y, q=50, interpolation='midpoint')
        else:
            print("Parámetro estadístico no válido") 
        
    # Define the combined GAN model
    model = Model([gen_noise, gen_label], gan_output)
    # Compile the model
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt)
    return model


## Load samples

In [None]:
# Load the brain tumor MRI images
def load_real_samples():
    # Load the training data
    (trainX, trainy) = dataset_bio

    # Add an additional channel for the class label (3D)
    X = expand_dims(trainX, axis=-1)

    # Convert from integer to float
    X = X.astype('float32')

    # Scale the pixel values from [0, 255] to [-1, 1]
    X = (X - 127.5) / 127.5

    return [X, trainy]


## Define modules

In [None]:
from numpy import expand_dims, ones, zeros, asarray
from numpy.random import randint, randn
from matplotlib import pyplot


# Select a random group of real images
def generate_real_samples(dataset, n_samples):
    # Split images and labels
    images, labels = dataset

    # Randomly select indices
    ix = randint(0, images.shape[0], n_samples)

    # Randomly select images and labels
    X, labels = images[ix], labels[ix]

    # Generate class labels (1 for real, 0 for fake)
    y = ones((n_samples, 1))
    return [X, labels], y


# Generate points in the latent space as input for the generator
def generate_latent_points(latent_dim, n_samples, n_classes=4):
    # Generate random points
    x_input = randn(latent_dim * n_samples)

    # Reshape points
    z_input = x_input.reshape(n_samples, latent_dim)

    # Generate labels
    labels = randint(0, n_classes, n_samples)
    return [z_input, labels]


# Use the generator to generate fake samples with class labels (0 for fake)
def generate_fake_samples(g_model, latent_dim, n_samples):
    # Generate points in the latent space
    z_input, labels_input = generate_latent_points(latent_dim, n_samples)

    # Generate predictions
    images = g_model.predict([z_input, labels_input])

    # Generate class labels (0 for fake, 1 for real)
    y = zeros((n_samples, 1))
    return [images, labels_input], y


# Save a plot of generated images (4x4 grid)
def save_plot(examples, epoch, num_d, parameter, n=4):
    for i in range(n * n):
        pyplot.subplot(n, n, 1 + i)
        pyplot.axis('off')
        pyplot.imshow(examples[i, :, :, 0], cmap='gray_r')
    filename = 'plot_%s_%dD_e%03d.png' % (parameter, num_d, epoch + 1)
    pyplot.savefig(filename)
    pyplot.close()


# Evaluate the discriminator model, generate images, and save the generator model
def summarize_performance(epoch, g_model, d_model, num_d, dataset, latent_dim, parameter, n_samples=100):
    acc_real = []
    acc_fake = []

    [X_real, labels_real], y_real = generate_real_samples(dataset, n_samples)
    [X_fake, labels_fake], y_fake = generate_fake_samples(g_model, latent_dim, n_samples)

    for n_d in range(num_d):
        _, acc_real.append(d_model[n_d].evaluate([X_real, labels_real], y_real, verbose=0))
        _, acc_fake.append(d_model[n_d].evaluate([X_fake, labels_fake], y_fake, verbose=0))
        print('>Accuracy real_%d: %.0f%%, fake_%d: %.0f%%' % (n_d + 1, acc_real[n_d][-1] * 100, n_d + 1,
                                                               acc_fake[n_d][-1] * 100))

    latent_points, labels = generate_latent_points(100, 100)
    labels = asarray([x for _ in range(25) for x in range(4)])
    X = g_model.predict([latent_points, labels])
    X = (X + 1) / 2.0
    save_plot(X, epoch, num_d, parameter, n=4)

    filename = 'mod_%s_%dD_%03d.h5' % (parameter, num_d, epoch + 1)
    g_model.save(filename)


# Train the generator and discriminator models
def train(g_model, d_model, num_d, gan_model, dataset, latent_dim, n_epochs, n_batch, parameter):
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    half_batch = int(n_batch / 2)
    
    for i in range(n_epochs):
        for j in range(bat_per_epo):
            [X_real, labels_real], y_real = generate_real_samples(dataset, half_batch)
            [X_fake, labels_fake], y_fake = generate_fake_samples(g_model, latent_dim, half_batch)

            d_loss_real = []
            d_loss_fake = []
            for n_d in range(num_d):
                d_loss_real, _ = d_model[n_d].train_on_batch([X_real, labels_real], y_real)
                d_loss_fake, _ = d_model[n_d].train_on_batch([X_fake, labels_fake], y_fake)

            [z_input, labels_input] = generate_latent_points(latent_dim, n_batch)
            y_gan = ones((n_batch, 1))
            g_loss = gan_model.train_on_batch([z_input, labels_input], y_gan)

        if (i + 1) % 10 == 0:
            summarize_performance(i, g_model, d_model, num_d, dataset, latent_dim, parameter, n_samples=100)


## Parameters

In [None]:
import time

# Set image shape
image_shape = (112, 112, 3)

# Number of epochs
n_epochs = 200

# Batch size
n_batch = 128

# Size of the latent space (100, 10, 50, 500)
latent_dim = 100

# Number of discriminators
num_d = 7

# Parameter (Mean, Maximum, Minimum, Median)
parameter = "Mean"

# Create discriminator models
d_model = []
for n_d in range(num_d):
     d_model.append(define_discriminator())

# Create generator model
g_model = define_generator(latent_dim)

# Create combined GAN model
gan_model = define_gan(g_model, d_model, num_d, parameter)

# Load real images
dataset = load_real_samples()

# Train the GAN
inicio = time.time()
train(g_model, d_model, num_d, gan_model, dataset, latent_dim, n_epochs, n_batch, parameter)
fin = time.time()
print(f'Training time: {fin - inicio} seconds')


## Clean output

In [None]:
import os

# Remove files
def remove_files_with_extension(directory, extension):
    # Iterate over all files in the directory
    for filename in os.listdir(directory):
        # Check if the file has the specified extension
        if filename.endswith(extension):
            file_path = os.path.join(directory, filename)
            # Remove the file
            os.remove(file_path)

# Specify the directory and extension
directory = "/kaggle/working"
extension = ".db"

# Call the function to remove files with the specified extension
remove_files_with_extension(directory, extension)

## Generate images

In [None]:
from tensorflow.keras.models import Model
from keras.models import load_model

# Load the model
model_saved = load_model('mod_Mean_7D_150.h5')

# Generate latent points
latent_points, labels = generate_latent_points(100, 16)

# Specify labels
labels = np.asarray([x for _ in range(4) for x in range(4)])

# Generate images
X = model_saved.predict([latent_points, labels])

# Scale the images
X = (X + 1) / 2.0

In [None]:
# Visualize some examples
grid_width = 4
grid_height = 4
fig, axes = plt.subplots(grid_width, grid_height)
fig.set_size_inches(8, 8)

labels = ['glioma', 'meningioma', 'notumor', 'pituitary']
img_idx = 0
for i in range(grid_width):
    for j in range(grid_height):
        axes[i][j].axis('off')
        axes[i][j].set_title(labels[j])
        axes[i][j].imshow(X[img_idx])
        img_idx += 1

plt.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=0.2, hspace=0.4)

# Save the image file
plt.savefig("example.png", bbox_inches="tight")

# Fréchet Inception Distance (FID)

## Calculate FID

In [None]:
import numpy as np
from numpy import cov
from numpy import trace, sum
from scipy.linalg import sqrtm
from keras.applications.inception_v3 import InceptionV3, preprocess_input
from keras.datasets.mnist import load_data
from skimage.transform import resize

# Scale the images
def scale_images(images, new_shape):
    images_list = [resize(image, new_shape, 0) for image in images]
    return np.asarray(images_list)

# Calculate FID
def calculate_fid(model, images1, images2):
    act1 = model.predict(images1)
    act2 = model.predict(images2)
    mu1, sigma1 = act1.mean(axis=0), cov(act1, rowvar=False)
    mu2, sigma2 = act2.mean(axis=0), cov(act2, rowvar=False)
    ssdiff = sum((mu1 - mu2) ** 2.0)
    covmean = sqrtm(sigma1.dot(sigma2))
    if np.iscomplexobj(covmean):
        covmean = covmean.real
    fid = ssdiff + trace(sigma1 + sigma2 - 2.0 * covmean)
    return fid

# Prepare InceptionV3 model (discriminator)
model = InceptionV3(include_top=False, pooling='avg', input_shape=(299, 299, 3))

# Parameters
n_packages = 100
n_classes = 4
num_images = n_packages * n_classes

# Generate latent points
latent_points, labels = generate_latent_points(100, num_images, n_classes)

# Labels
labels = np.asarray([x for _ in range(n_packages) for x in range(n_classes)])

# Load the real dataset
dataset = load_real_samples()
n_samples = 400
[X_real, labels_real], y_real = generate_real_samples(dataset, n_samples)
X_real = X_real.astype('float32')

# Scale the real images
X_real = scale_images(X_real, (299, 299, 3))

# Preprocess the real images
X_real = preprocess_input(X_real)

# FID for the selected models
fid_epoch = []
epoch = []

for i in range(20):
    model_saved = 'mod_Mean_7D_%03d.h5' % ((i + 1) * 10)
    model_saved = load_model(model_saved)
    
    # Generate fake images
    X_fake = model_saved.predict([latent_points, labels])
    X_fake = X_fake.astype('float32')
    
    # Scale the fake images
    X_fake = scale_images(X_fake, (299, 299, 3))
    
    # Preprocess the fake images
    X_fake = preprocess_input(X_fake)
    
    # Calculate FID
    fid = calculate_fid(model, X_real, X_fake)
    fid_epoch.append(fid)
    epoch.append((i + 1) * 10)
    print('FID: %.3f' % fid)

## Plot FID

In [None]:
import numpy as np
import matplotlib.pyplot as plt

x = epoch
y = fid_epoch

# Create a new figure and axis
fig, ax = plt.subplots()

# Plot the data points with diamond markers
ax.plot(x, y, marker='D')

# Set labels and title
ax.set_xlabel('epoch')
ax.set_ylabel('fid')
ax.set_title('FID per Epoch')

# Show the plot
plt.show()

# Save the figure as an image file
#fig.savefig("FID.png", bbox_inches='tight')
fig.savefig("FID 7D Mean 200E.png")


In [None]:
# FID
fid_epoch