In [None]:
import os
import random
import pandas as pd
import numpy as np
import cv2

import seaborn as sns
sns.set_style('darkgrid')

import matplotlib
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import regularizers, layers, Model, Sequential

from collections import defaultdict
from tqdm import tqdm

# Load Data

In [None]:
IMG_SIZE = 224
dataset_paths = [
    '/kaggle/input/breast-ultrasound-images-dataset/Dataset_BUSI_with_GT'
]

data_dict = defaultdict(list)
error_count = 0
image_paths = []
images = []
labels = []
masks = []

limit = 1000

for path in dataset_paths:
    for dirpath, _, filenames in os.walk(path):
        for filename in tqdm(filenames):
            
            if 'mask' in filename: continue
            label = dirpath.split('/')[-1]
            if len(data_dict[label]) >= limit: continue
            try:
                path = os.path.join(dirpath, filename)
                image = cv2.imread(path) # Check for no error
                image = cv2.resize(image, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_AREA)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                
                data_dict[label].append(image)
                image_paths.append(path)
                images.append(image)
                labels.append(label)
            except:
#                 print('ERROR: ', filename)
                error_count += 1

print("ErrorCount:", error_count)
print("Total Images:", len(images))

# Data Visualization

In [None]:
plt.rcParams.update({'font.size': 14})
plt.figure(figsize=(12, 8))
sns.barplot(x=list(data_dict.keys()), y=list(map(len, data_dict.values())))
plt.title("Distribution of BUSI data")

In [None]:
classes = list(data_dict.keys())
num_classes = len(classes)
classes

In [None]:
# Convert labels to numpy array
x = np.stack(images, axis=0) / 255.0
y = np.array([classes.index(label) for label in labels])

x.shape, y.shape

# Define some parameters

In [None]:
# Input shape
channels = 3

img_shape = (IMG_SIZE, IMG_SIZE, channels)        
latent_dim = 100

num_target = 1350
gen_batch = 40

# Define models

In [None]:
def build_generator(img_size=IMG_SIZE, num_blocks=4):

    model = Sequential()
    
    k = 2 ** (num_blocks)
    depth = 2 ** (3 + num_blocks)
    w = h = img_size // k
    
    # Fully Connected Layers
    model.add(layers.Dense(w * h * depth, input_shape=(latent_dim, ), activation="relu"))
    model.add(layers.Reshape((w, h, depth))) # img_size / k
    
    for i in range(num_blocks):

        model.add(layers.UpSampling2D()) #upsamples to img_size to 
        model.add(layers.Conv2D(depth, kernel_size=3, padding="same"))
        model.add(layers.BatchNormalization(momentum=0.8))
        model.add(layers.Activation("relu"))
    
    model.add(layers.Conv2D(channels, kernel_size=3, padding="same"))
    model.add(layers.Activation("sigmoid"))
    
    model.summary()
    
    #outputs an image
    noise = layers.Input(shape=(latent_dim, ))
    img = model(noise)

    return Model(noise, img)

In [None]:
def build_discriminator():

    model = Sequential()
    
    model.add(layers.Conv2D(32, kernel_size=3, strides=2, input_shape=img_shape, padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.25))
    #no normalization for the first layer

    model.add(layers.Conv2D(64, kernel_size=3, strides=2, padding='same'))
    model.add(layers.BatchNormalization(momentum=0.8))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.25))

    model.add(layers.Conv2D(128, kernel_size=3, strides=2, padding='same'))
    model.add(layers.BatchNormalization(momentum=0.8))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.25))

    model.add(layers.Conv2D(256, kernel_size=3, strides=2, padding='same'))
    model.add(layers.BatchNormalization(momentum=0.8))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.25))

    model.add(layers.Flatten())
    model.add(layers.Dense(1, activation='sigmoid'))
    
    model.summary()
    
    img = layers.Input(shape=img_shape)
    validity = model(img)

    return Model(img, validity)

# Define helper functions

In [None]:
def show_imgs():
    
    r, c = 3, 3
    noise = np.random.normal(0, 1, (r * c, latent_dim))
    gen_imgs = generator.predict(noise)

    fig, axs = plt.subplots(r, c, figsize=(12, 12))
    cnt = 0
    for i in range(r):
        for j in range(c):
            axs[i, j].imshow(gen_imgs[cnt])
            axs[i, j].axis('off')
            cnt += 1
            
    plt.show()
    plt.close()

In [None]:
def show_losses(losses):
    plt.rcParams.update({'font.size': 14})
    losses = np.array(losses)
    fig, ax = plt.subplots(figsize=(15, 10))
    plt.plot(losses.T[0], label='Discriminator')
    plt.plot(losses.T[1], label='Generator')
    plt.title("Training Losses")
    plt.legend()
    plt.show()

# CFG

In [None]:
batch_size=16
steps = 6000
display_interval=1000

valid = np.ones((batch_size, 1))
valid += 0.05 * np.random.random(valid.shape) * (-1)

fake = np.zeros((batch_size, 1))
fake += 0.05 * np.random.random(fake.shape)

# GAN apply for Benign class

In [None]:
# x_idx = 0
# X_train = x[y.flatten() == x_idx]
# current_label = classes[x_idx]
# current_label, X_train.shape

In [None]:
# Build the generator
# generator = build_generator()
# generator.summary()

In [None]:
# Build and compile the discriminator
# discriminator = build_discriminator()
# discriminator.summary()

In [None]:
# discriminator.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(0.0002, 0.5), metrics=['accuracy'])
# discriminator.trainable = False

# # The generator takes noise as input and generates imgs
# z = layers.Input(shape=(latent_dim, ))
# img = generator(z)
# # The discriminator takes generated images as input and determines validity
# validator = discriminator(img)
# # The combined model (stacked generator and discriminator)
# # Trains the generator to fool the discriminator
# combined = Model(z, validator)
# combined.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(0.0002, 0.5))
# combined.summary()

## Train

In [None]:
# checkpoint = f"{current_label}_best.h5"
# losses=[]

# for step in range(steps):
    
#     #  Train Discriminator
#     # Select a random half of images
#     idx = np.random.randint(0, X_train.shape[0], batch_size)
#     imgs = X_train[idx]

#     # Sample noise and generate a batch of new images
#     noise = np.random.normal(0, 1, (batch_size, latent_dim))
#     gen_imgs = generator.predict(noise)
    
#     # Train the discriminator (real classified as ones and generated as zeros)
#     d_loss_real, acc_real = discriminator.train_on_batch(imgs, valid)
#     d_loss_fake, acc_fake = discriminator.train_on_batch(gen_imgs, fake)
#     d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
    
#     #  Train Generator
#     # Train the generator (wants discriminator to mistake images as real)
#     noise = np.random.normal(0, 1, (batch_size, latent_dim))
#     g_loss_1 = combined.train_on_batch(noise, valid)
#     noise = np.random.normal(0, 1, (batch_size, latent_dim))
#     g_loss_2 = combined.train_on_batch(noise, valid)
#     g_loss = np.add(g_loss_1, g_loss_2) * 0.5
    
#     gan_loss = np.add(d_loss, g_loss) * 0.5
    
#     # Plot the progress
#     if step % display_interval == 0:
#         losses.append((d_loss, g_loss))
#         print(f"GAN loss: {gan_loss}")
#         print(f"D_loss_real: {d_loss_real}, d_loss_fake: {d_loss_fake}")
#         print("%d [D loss: %f] [G loss: %f]" % (step, d_loss, g_loss))
#         show_imgs()

# generator.save(checkpoint)

In [None]:
# show_losses(losses)

In [None]:
# generator = tf.keras.models.load_model(checkpoint)

In [None]:
# s = X_train[:40]
# f, ax = plt.subplots(5,8, figsize=(16,10))
# for i, img in enumerate(s):
#         ax[i//8, i%8].imshow(img)
#         ax[i//8, i%8].axis('off')
        
# plt.show()

In [None]:
# noise = np.random.normal(size=(40, latent_dim))
# generated_images = generator.predict(noise)
# f, ax = plt.subplots(5,8, figsize=(16,10))
# for i, img in enumerate(generated_images):
#         ax[i//8, i%8].imshow(img)
#         ax[i//8, i%8].axis('off')
        
# plt.show()

In [None]:
# num_gen = round((num_target - len(data_dict[current_label])) / gen_batch) * gen_batch
# noise = np.random.normal(size=(num_gen, latent_dim))
# gen_images = generator.predict(noise, batch_size=batch_size)

In [None]:
# os.makedirs(current_label, exist_ok=True)

# for i, img in tqdm(enumerate(gen_images)):
#     save_path = f"{current_label}/gen_{i}.png"
#     plt.imsave(save_path, img)



# GAN apply for Normal class

In [None]:
x_idx = 1
X_train = x[y.flatten() == x_idx]
current_label = classes[x_idx]
current_label, X_train.shape

In [None]:
# Build the generator
generator = build_generator()
generator.summary()

In [None]:
# Build and compile the discriminator
discriminator = build_discriminator()
discriminator.summary()

In [None]:
discriminator.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(0.0002, 0.5), metrics=['accuracy'])
discriminator.trainable = False

# The generator takes noise as input and generates imgs
z = layers.Input(shape=(latent_dim, ))
img = generator(z)
# The discriminator takes generated images as input and determines validity
validator = discriminator(img)
# The combined model (stacked generator and discriminator)
# Trains the generator to fool the discriminator
combined = Model(z, validator)
combined.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(0.0002, 0.5))
combined.summary()

## Train

In [None]:
checkpoint = f"{current_label}_best.h5"
losses=[]

for step in range(steps):
    
    #  Train Discriminator
    # Select a random half of images
    idx = np.random.randint(0, X_train.shape[0], batch_size)
    imgs = X_train[idx]

    # Sample noise and generate a batch of new images
    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    gen_imgs = generator.predict(noise)
    
    # Train the discriminator (real classified as ones and generated as zeros)
    d_loss_real, acc_real = discriminator.train_on_batch(imgs, valid)
    d_loss_fake, acc_fake = discriminator.train_on_batch(gen_imgs, fake)
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
    
    #  Train Generator
    # Train the generator (wants discriminator to mistake images as real)
    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    g_loss_1 = combined.train_on_batch(noise, valid)
    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    g_loss_2 = combined.train_on_batch(noise, valid)
    g_loss = np.add(g_loss_1, g_loss_2) * 0.5
    
    gan_loss = np.add(d_loss, g_loss) * 0.5
    
    # Plot the progress
    if step % display_interval == 0:
        losses.append((d_loss, g_loss))
        print(f"GAN loss: {gan_loss}")
        print(f"D_loss_real: {d_loss_real}, d_loss_fake: {d_loss_fake}")
        print("%d [D loss: %f] [G loss: %f]" % (step, d_loss, g_loss))
        show_imgs()

generator.save(checkpoint)

In [None]:
# generator = tf.keras.models.load_model(checkpoint)

In [None]:
s = X_train[:40]
f, ax = plt.subplots(5,8, figsize=(16,10))
for i, img in enumerate(s):
        ax[i//8, i%8].imshow(img)
        ax[i//8, i%8].axis('off')
        
plt.show()

In [None]:
noise = np.random.normal(size=(40, latent_dim))
generated_images = generator.predict(noise)
f, ax = plt.subplots(5,8, figsize=(16,10))
for i, img in enumerate(generated_images):
        ax[i//8, i%8].imshow(img)
        ax[i//8, i%8].axis('off')
        
plt.show()

In [None]:
num_gen = round((num_target - len(data_dict[current_label])) / gen_batch) * gen_batch
noise = np.random.normal(size=(num_gen, latent_dim))
gen_images = generator.predict(noise, batch_size=batch_size)

In [None]:
os.makedirs(current_label, exist_ok=True)

for i, img in tqdm(enumerate(gen_images)):
    save_path = f"{current_label}/gen_{i}.png"
    plt.imsave(save_path, img)


# GAN apply for Malignant class

In [None]:
x_idx = 2
X_train = x[y.flatten() == x_idx]
current_label = classes[x_idx]
current_label, X_train.shape

In [None]:
# Build the generator
generator = build_generator()
generator.summary()

In [None]:
# Build and compile the discriminator
discriminator = build_discriminator()
discriminator.summary()

In [None]:
discriminator.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(0.0002, 0.5), metrics=['accuracy'])
discriminator.trainable = False

# The generator takes noise as input and generates imgs
z = layers.Input(shape=(latent_dim, ))
img = generator(z)
# The discriminator takes generated images as input and determines validity
validator = discriminator(img)
# The combined model (stacked generator and discriminator)
# Trains the generator to fool the discriminator
combined = Model(z, validator)
combined.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(0.0002, 0.5))
combined.summary()

## Train

In [None]:
checkpoint = f"{current_label}_best.h5"
losses=[]

for step in range(steps):
    
    #  Train Discriminator
    # Select a random half of images
    idx = np.random.randint(0, X_train.shape[0], batch_size)
    imgs = X_train[idx]

    # Sample noise and generate a batch of new images
    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    gen_imgs = generator.predict(noise)
    
    # Train the discriminator (real classified as ones and generated as zeros)
    d_loss_real, acc_real = discriminator.train_on_batch(imgs, valid)
    d_loss_fake, acc_fake = discriminator.train_on_batch(gen_imgs, fake)
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
    
    #  Train Generator
    # Train the generator (wants discriminator to mistake images as real)
    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    g_loss_1 = combined.train_on_batch(noise, valid)
    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    g_loss_2 = combined.train_on_batch(noise, valid)
    g_loss = np.add(g_loss_1, g_loss_2) * 0.5
    
    gan_loss = np.add(d_loss, g_loss) * 0.5
    
    # Plot the progress
    if step % display_interval == 0:
        losses.append((d_loss, g_loss))
        print(f"GAN loss: {gan_loss}")
        print(f"D_loss_real: {d_loss_real}, d_loss_fake: {d_loss_fake}")
        print("%d [D loss: %f] [G loss: %f]" % (step, d_loss, g_loss))
        show_imgs()

generator.save(checkpoint)

In [None]:
show_losses(losses)

In [None]:
# generator = tf.keras.models.load_model(checkpoint)

In [None]:
s = X_train[:40]
f, ax = plt.subplots(5,8, figsize=(16,10))
for i, img in enumerate(s):
        ax[i//8, i%8].imshow(img)
        ax[i//8, i%8].axis('off')
        
plt.show()

In [None]:
noise = np.random.normal(size=(40, latent_dim))
generated_images = generator.predict(noise)
f, ax = plt.subplots(5,8, figsize=(16,10))
for i, img in enumerate(generated_images):
        ax[i//8, i%8].imshow(img)
        ax[i//8, i%8].axis('off')
        
plt.show()

In [None]:
num_gen = round((num_target - len(data_dict[current_label])) / gen_batch) * gen_batch
noise = np.random.normal(size=(num_gen, latent_dim))
gen_images = generator.predict(noise, batch_size=batch_size)

In [None]:
os.makedirs(current_label, exist_ok=True)

for i, img in tqdm(enumerate(gen_images)):
    save_path = f"{current_label}/gen_{i}.png"
    plt.imsave(save_path, img)