<a href="https://colab.research.google.com/github/vladiant/MachineLearningUtils/blob/main/SemiSupervisedGAN/SGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Semi-Supervised GAN (SGAN)

Based on https://machinelearningmastery.com/semi-supervised-generative-adversarial-network/

In [1]:
from numpy import expand_dims, zeros, ones, asarray
from numpy.random import randn, randint

from tensorflow.keras.datasets.mnist import load_data

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Reshape, Conv2D, Conv2DTranspose, LeakyReLU, Dropout, Lambda, Flatten, Activation
from tensorflow.keras.optimizers import Adam

from tensorflow.keras.utils import plot_model

from tensorflow.keras import backend

from matplotlib import pyplot

Custom activation function

In [2]:
def custom_activation(output):
    logexpsum = backend.sum(backend.exp(output), axis=-1, keepdims=True)
    result = logexpsum / (logexpsum + 1.0)
    return result

Define the standalone discriminator model

In [3]:
def define_discriminator(in_shape=(28, 28, 1), n_classes=10):
    # Image input
    in_image = Input(shape=in_shape)
    # Downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
    fe = LeakyReLU(alpha=0.2)(fe)
    # Downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # Downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # Flatten feature maps
    fe = Flatten()(fe)
    # Dropout
    fe = Dropout(0.4)(fe)
    # Output layer nodes
    fe = Dense(n_classes)(fe)
    # Supervised output
    c_out_layer = Activation('softmax')(fe)
    
    # Define and compile supervices discriminator model
    c_model = Model(in_image, c_out_layer)
    c_model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(lr=0.0002, beta_1=0.5))
    
    # Unsupervised output
    d_out_layer = Lambda(custom_activation)(fe)    
    # Define and compile discriminator model
    d_model = Model(in_image, d_out_layer)
    d_model.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.0002, beta_1=0.5))
    return d_model, c_model

Define the standalone generator model

In [4]:
def define_generator(latent_dim):
    # Image generator input
    in_lat = Input(shape=(latent_dim,))
    # Foundations for 7x7 image
    n_nodes = 128 * 7 * 7
    gen = Dense(n_nodes)(in_lat)
    gen = LeakyReLU(alpha=0.2)(gen)
    gen = Reshape((7, 7, 128))(gen)
    # Upsample to 14x14
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # Upsample to 28x28
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # Output
    out_layer = Conv2D(1, (7,7), activation='tanh', padding='same')(gen)
    # Define model
    model = Model(in_lat, out_layer)
    return model

Define the combined generator and discriminator model, for updating the generator

In [5]:
def define_gan(g_model, d_model):
    # Make weights in the discriminator not trainable
    d_model.trainable = False
    # Connect image output from generator as input to discriminator
    gan_output = d_model(g_model.output)
    # Define the gan model as taking noise and outputting a classification
    model = Model(g_model.input, gan_output)
    # Compile model
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt)
    return model

Load the images

In [6]:
def load_real_images():
    # Load dataset
    (trainX, trainY), (_, _) = load_data()
    # Expand to 3D, e.g. add channels
    X = expand_dims(trainX, axis=-1)
    # Convert from ints to floats
    X = X.astype('float32')
    # Scale from [0..255] to [-1, 1]
    X = (X - 127.5) / 127.5
    print(X.shape, trainY.shape)
    return [X, trainY]

Select a supervised subset of the dataset, ensure classes are balanced

In [7]:
def select_supervised_samples(dataset, n_samples=100, n_classes=10):
    X, y = dataset
    X_list, y_list = list(), list()
    n_per_class = int(n_samples / n_classes)
    for i in range(n_classes):
        # Get all images for this class
        X_with_class = X[y == i]
        # Choose random instances
        ix = randint(0, len(X_with_class), n_per_class)
        # Add to list
        [X_list.append(X_with_class[j]) for j in ix]
        [y_list.append(i) for j in ix]
    
    return asarray(X_list), asarray(y_list)

Select real samples

In [8]:
def generate_real_samples(dataset, n_samples):
    # Split into images and labels
    images, labels = dataset
    # Choose random instances
    ix = randint(0, images.shape[0], n_samples)
    # Select images and labels
    X, labels = images[ix], labels[ix]
    # Generate class labels
    y = ones((n_samples, 1))
    return [X, labels], y

Generate points in latent space as input for the generator

In [9]:
def generate_latent_points(latent_dim, n_samples):
    # Generate points in the latent space
    z_input = randn(latent_dim * n_samples)
    # Reshape into a batch of inputs for the network
    z_input = z_input.reshape(n_samples, latent_dim)
    return z_input

Use the generator to generate n fake samples with class labels

In [10]:
def generate_fake_samples(generator, latent_dim, n_samples):
    # Generate points in latent space
    z_input = generate_latent_points(latent_dim, n_samples)
    # Predict outputs
    images = generator.predict(z_input)
    # Create class labels
    y = zeros((n_samples, 1))
    return images, y

Generate samples and save as a plot and save the model

In [11]:
def summarize_performance(step, g_model, c_model, latent_dim, dataset, n_samples=100):
    # Prepare fake samples
    X, _ = generate_fake_samples(g_model, latent_dim, n_samples)
    # Scale from [-1, 1] to [0, 1]
    X = (X + 1) / 2.0
    # Plot images
    for i in range(100):
        # Define subplot
        pyplot.subplot(10, 10, 1+i)
        # Turn off axis
        pyplot.axis('off')
        # Plot raw pixel data
        pyplot.imshow(X[i, :, :, 0], cmap='gray_r')
        
    # Save plot to file
    filename1 = f'generated_plot_{step+1:04}.png'
    pyplot.savefig(filename1)
    pyplot.close()
    
    # Evaluate the classifier model
    x, y = dataset
    acc = c_model.evaluate(x, y, verbose=0)
    print(f"Classifier Accuracy: {acc*100:.3f}")
    
    # Save the generator model
    filename2 = f"g_model_{step+1:04}.h5"
    g_model.save(filename2)
    
    # Save the classifier model
    filename3 = f"c_model_{step+1:04}.h5"
    c_model.save(filename3)
    
    print(f">Saved {filename1}, {filename2}, {filename3}")

Train the generator and discriminator

In [12]:
def train(g_model, d_model, c_model, gan_model, dataset, latent_dim, n_epochs=20, n_batch=100):
    # Select supervised dataset
    X_sup, y_sup = select_supervised_samples(dataset)
    print(X_sup.shape, y_sup.shape)
    
    # Calculate the number of batches per training epoch
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    # Calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # Calculate the size of half a batch of samples
    half_batch = int(n_batch / 2)
    print(f"n_epochs={n_epochs}, n_batch={n_batch}, 1/2={half_batch}, b/e={bat_per_epo}, steps={n_steps}")
    
    # Manually enumerate epochs
    for i in range(n_steps):
        # Update supervised discriminator (c)
        [Xsup_real, ysup_real], _ = generate_real_samples([X_sup, y_sup], half_batch)
        c_loss = c_model.train_on_batch(Xsup_real, ysup_real)
        # Update unsupervised discriminator (d)
        [X_real, _], y_real = generate_real_samples(dataset, half_batch)
        d_loss1 = d_model.train_on_batch(X_real, y_real)
        X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
        d_loss2 = d_model.train_on_batch(X_fake, y_fake)
        # Update generator (g)
        X_gan, y_gan = generate_latent_points(latent_dim, n_batch), ones((n_batch, 1))
        g_loss = gan_model.train_on_batch(X_gan, y_gan)
        # Summarize loss on this batch
        print(f">{i+1}, c[{c_loss:.3f}], d[{d_loss1:.3f},{d_loss2:.3f}], d[{g_loss:.3f}] ")
        # Evaluate model performance so often
        if (i+1) % (bat_per_epo * 1) == 0:
            summarize_performance(i, g_model, c_model, latent_dim, dataset)

Size of the latent space

In [13]:
latent_dim = 100

Create the discriminator models

In [14]:
d_model, c_model = define_discriminator()

  "The `lr` argument is deprecated, use `learning_rate` instead.")


Create the generator

In [15]:
g_model = define_generator(latent_dim)

Create the gan

In [16]:
gan_model = define_gan(g_model, d_model)

  "The `lr` argument is deprecated, use `learning_rate` instead.")


Load image data

In [17]:
dataset = load_real_images()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
(60000, 28, 28, 1) (60000,)


Train model

In [18]:
train(g_model, d_model, c_model, gan_model, dataset, latent_dim)

[1;30;43mПредаденият резултат е скъсен до последните 5000 реда.[0m
>7026, c[0.002], d[0.708,0.881], d[1.265] 
>7027, c[0.001], d[0.709,0.614], d[1.042] 
>7028, c[0.004], d[0.652,0.736], d[1.168] 
>7029, c[0.001], d[0.726,0.775], d[1.102] 
>7030, c[0.001], d[0.757,0.663], d[1.145] 
>7031, c[0.002], d[0.652,0.826], d[1.152] 
>7032, c[0.001], d[0.768,0.819], d[1.260] 
>7033, c[0.002], d[0.805,0.612], d[1.070] 
>7034, c[0.002], d[0.778,0.868], d[1.135] 
>7035, c[0.002], d[0.629,0.677], d[1.060] 
>7036, c[0.003], d[0.704,0.702], d[1.235] 
>7037, c[0.004], d[0.738,0.833], d[1.064] 
>7038, c[0.001], d[0.622,0.840], d[1.089] 
>7039, c[0.001], d[0.916,0.652], d[1.179] 
>7040, c[0.002], d[0.582,0.777], d[1.155] 
>7041, c[0.002], d[0.671,0.735], d[1.132] 
>7042, c[0.001], d[1.037,0.845], d[1.113] 
>7043, c[0.001], d[0.728,0.733], d[1.149] 
>7044, c[0.001], d[0.612,0.722], d[1.140] 
>7045, c[0.001], d[0.650,0.689], d[0.969] 
>7046, c[0.001], d[0.572,1.076], d[1.090] 
>7047, c[0.001], d[0.700,0.6