In [1]:
# train a generative adversarial network on a one-dimensional function
from numpy import hstack
from numpy import zeros
from numpy import ones
from numpy.random import rand
from numpy.random import randn
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from matplotlib import pyplot as plt
import pandas as pd
import random

# DEFINE DISCRIMINATOR MODEL
Diskriminator bekommt ein SAMPLE aus den echten Daten (hier aus den 200 echten Werten nur 2) und wirft wahr oder falsch aus.

In [2]:
# define the standalone discriminator model
def define_discriminator(n_inputs=2):
    model = Sequential()
    model.add(Dense(25, activation='relu', kernel_initializer='he_uniform', input_dim=n_inputs))
    model.add(Dense(1, activation='sigmoid'))
    # compile model
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

# DEFINE GENERATOR MODEL
Generator bekommt einen Punkt aus dem Latenten Raum undd generiert daraus ein neues Sample, bspw. Vektor mit input und output der Funktion (f(x) = x^2 dann x und y).

The weights in the generator model are updated based on the performance of the discriminator model.

Specifically, a new GAN model can be defined that stacks the generator and discriminator such that the generator receives as input random points in the latent space, generates samples that are fed into the discriminator model directly, classified, and the output of this larger model can be used to update the model weights of the generator.

In [3]:
# define the standalone generator model
def define_generator(latent_dim, n_outputs=2):
    model = Sequential()
    model.add(Dense(15, activation='relu', kernel_initializer='he_uniform', input_dim=latent_dim))
    model.add(Dense(n_outputs, activation='linear'))
    return model

# DEFINE GAN MODEL INCLUDING GENERATOR AND DISCRIMINATOR
Input: Point in latent space; Output: Binary classification (real, fake)

The weights in the discriminator are marked as not trainable, which only affects the weights as seen by the GAN model and not the standalone discriminator model.

In [4]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(generator, discriminator):
    # make weights in the discriminator not trainable
    discriminator.trainable = False
    # connect them
    model = Sequential()
    # add generator
    model.add(generator)
    # add the discriminator
    model.add(discriminator)
    # compile model
    model.compile(loss='binary_crossentropy', optimizer='adam')
    return model

# LOAD REAL DATA

In [5]:
# generate n real samples with class labels
def generate_real_samples(n):
    # generate inputs in [-0.5, 0.5]
    X1 = rand(n) - 0.5
    # generate outputs X^2
    X2 = X1 * X1
    # stack arrays
    X1 = X1.reshape(n, 1)
    X2 = X2.reshape(n, 1)
    X = hstack((X1, X2))
    # generate class labels
    y = ones((n, 1))
    return X, y

# TODO: LOAD OUR DATA AND BRING IT IN FORM

In [6]:
def generate_real_samples(path_sensor, n):
    # get number of rows
    fs = sum(1 for line in open(path_sensor)) - 1
    # get percentage from batchsize and filesize
    p = 100 * n / fs
    print('Percentage:', p)
    # keep the header, then take only p% of lines
    # if random from [0,1] interval is greater than 0.01 the row will be skipped
    X1= pd.read_csv(
             path_sensor,
             header=0, 
             skiprows=lambda i: i>0 and random.random() > p
    ).to_numpy()
    X1 = X1[:,0]
    # reshape data and c
    X1 = X1.reshape(n, 1)
    X2 = np.arange(n).reshape(n, 1)
    print(X1.shape, X2.shape)
    X = hstack((X1, X2))
    # create label
    y = np.ones((n, 1))
    return X, y

In [7]:
def generate_real_samples(path_sensor, n):
    #number of records in file (excludes header) fs->filesize
    fs = sum(1 for line in open(path_sensor)) - 1
    print('fs:', fs)
    # get random start point
    r = int(random.uniform(1, fs))
    print('r:', r)
    # check if start point has n numbers to the end
    if fs-r>=n:
        # read csv file skip r rows
        X1 = pd.read_csv(path_sensor, skiprows=r, nrows=n).to_numpy()
        X1 = X1[:,0]
        # reshape data and c
        X1 = X1.reshape(n, 1)
        X2 = np.arange(n).reshape(n, 1)
        print(X1.shape, X2.shape)
        X = hstack((X1, X2))
        # create label
        y = np.ones((n, 1))
        return X, y

# n RANDOM WERTE AUS DEM LATENTEN RAUM

In [8]:
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n):
    # generate points in the latent space
    x_input = randn(latent_dim * n)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n, latent_dim)
    return x_input

# PREDICT SAMPLES USING GENERATOR & RANDOM LATENT SAMPLES

In [9]:
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n):
    # generate points in latent space
    x_input = generate_latent_points(latent_dim, n)
    # predict outputs
    X = generator.predict(x_input)
    # create class labels
    y = zeros((n, 1))
    return X, y

# EVALUATING PERFORMANCE
1st Generate real samples

2nd generate latent points

3rd generate fake samples from 2nd

4th plot both samples in one plot and compare

Having both samples plotted on the same graph allows them to be directly compared to see if the same input and output domain are covered and whether the expected shape of the target function has been appropriately captured, at least subjectively.
# SUMMARIZE PERFORMANCE
The summarize_performance() function below can be called any time during training to create a scatter plot of real and generated points to get an idea of the current capability of the generator model.

We may also be interested in the performance of the discriminator model at the same time.

Specifically, we are interested to know how well the discriminator model can correctly identify real and fake samples. A good generator model should make the discriminator model confused, resulting in a classification accuracy closer to 50% on real and fake examples.

In [10]:
# evaluate the discriminator and plot real and fake points
def summarize_performance_plot(epoch, generator, discriminator, latent_dim, path_sensor, n=100):
    global accarr_real, accarr_fake
    # prepare real samples
    x_real, y_real = generate_real_samples(path_sensor, n)
    # evaluate discriminator on real examples
    _, acc_real = discriminator.evaluate(x_real, y_real, verbose=0)
    # prepare fake examples
    x_fake, y_fake = generate_fake_samples(generator, latent_dim, n)
    # evaluate discriminator on fake examples
    _, acc_fake = discriminator.evaluate(x_fake, y_fake, verbose=0)
    # summarize discriminator performance
    accarr_real.append(acc_real)
    accarr_fake.append(acc_fake)
    print('Epoch:', epoch, 'Accurracy real:', acc_real, 'Accurracy fake:', acc_fake)
    # scatter plot real and fake data points
    plt.scatter(x_real[:, 0], x_real[:, 1], color='red')
    plt.scatter(x_fake[:, 0], x_fake[:, 1], color='blue')
    plt.show()

In [11]:
# evaluate the discriminator and plot real and fake points
def summarize_performance(epoch, generator, discriminator, latent_dim, path_sensor, n=100):
    global accarr_real, accarr_fake
    # prepare real samples
    x_real, y_real = generate_real_samples(path_sensor, n)
    # evaluate discriminator on real examples
    _, acc_real = discriminator.evaluate(x_real, y_real, verbose=0)
    # prepare fake examples
    x_fake, y_fake = generate_fake_samples(generator, latent_dim, n)
    # evaluate discriminator on fake examples
    _, acc_fake = discriminator.evaluate(x_fake, y_fake, verbose=0)
    # summarize discriminator performance
    accarr_real.append(acc_real)
    accarr_fake.append(acc_fake)
    print('Epoch:', epoch, 'Accurracy real:', acc_real, 'Accurracy fake:', acc_fake)
    # scatter plot real and fake data points
    #plt.scatter(x_real[:, 0], x_real[:, 1], color='red')
    #plt.scatter(x_fake[:, 0], x_fake[:, 1], color='blue')
    #plt.show()

# TRAIN COMPOSITE MODEL
Input: random points in latent space, Output: Binary Classification (real, fake)

What is required is that we first update the discriminator model with real and fake samples, then update the generator via the composite model.

This requires combining elements from the train_discriminator() function defined in the discriminator section and the train_gan() function defined above. It also requires that the generate_fake_samples() function use the generator model to generate fake samples instead of generating random numbers.

Furthermore implement the evaluation functions every n_eval epochs.

In [12]:
# train the generator and discriminator
def train(g_model, d_model, gan_model, latent_dim, path_sensor, n_epochs=10000, n_batch=128, n_eval=2000):
    global accarr_real, accarr_fake
    # determine half the size of one batch, for updating the discriminator
    half_batch = int(n_batch / 2)
    print('train:', n_batch, half_batch)
    # manually enumerate epochs
    for i in range(n_epochs):
        
        # prepare real samples
        x_real, y_real = generate_real_samples(path_sensor, half_batch)
        # prepare fake examples
        x_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
        # update discriminator
        
        d_model.train_on_batch(x_real, y_real)
        d_model.train_on_batch(x_fake, y_fake)
        
        # prepare points in latent space as input for the generator
        x_gan = generate_latent_points(latent_dim, n_batch)
        # create inverted labels for the fake samples
        y_gan = ones((n_batch, 1))
        
        # update the generator via the discriminator's error
        gan_model.train_on_batch(x_gan, y_gan)
        
        # plot evaluation of the model every n_eval epochs
        if (i+1) % n_eval == 0:
            summarize_performance_plot(i, g_model, d_model, path_sensor, latent_dim)
        #evaluate the model every epoch
        else:
            summarize_performance(i, g_model, d_model, path_sensor, latent_dim)

# __MAIN__

In [13]:
# size of the latent space
latent_dim = 5
# number of epochs
n_epochs = 100
# batchsize
n_batch = 256
# evaluate every n epoch
n_eval = 10
# accuracy array[2, epoch] -> 2: acc real, accfake -> epoch: number epochs
accarr_real = []
accarr_fake = []
# path to sensor
path_sensor = '../data/Exercises_SS22/sleeplab_dataset_10hz/patient_29_male_7_years/BeinLI_10HZ.csv'
# create the discriminator
discriminator = define_discriminator()
# create the generator
generator = define_generator(latent_dim)
# create the gan
gan_model = define_gan(generator, discriminator)

# train model
train(generator, discriminator, gan_model, latent_dim, path_sensor, n_epochs, n_batch, n_eval)

Metal device set to: Apple M1 Pro
train: 256 128
fs: 327310
r: 65010
(128, 1) (128, 1)


2022-06-30 21:03:13.498900: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2022-06-30 21:03:13.499008: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)
2022-06-30 21:03:13.619297: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
2022-06-30 21:03:13.650127: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:113] Plugin optimizer for device_type GPU is enabled.
2022-06-30 21:03:13.894980: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:113] Plugin optimizer for device_type GPU is enabled.
2022-06-30 21:03:14.137904: I tensorflow/core/grappler/o

fs: -1
r: 0


TypeError: cannot unpack non-iterable NoneType object

In [None]:
accarr_real = np.asarray(accarr_real).reshape((len(accarr_real),1))
accarr_fake = np.asarray(accarr_fake).reshape((len(accarr_fake),1))
    
plt.figure(figsize=(10, 7)) 
plt.legend()
    
plt.title('Development of the Accurracy of the Epochs')
plt.xlabel("Number of Epochs")
plt.ylabel("Accurracy")
    
plt.plot(np.arange(n_epochs), accarr_real, color='red', label='acc_real')
plt.plot(np.arange(n_epochs), accarr_fake, color='blue', label='acc_fake')

## sources:

https://machinelearningmastery.com/display-deep-learning-model-training-history-in-keras/

https://machinelearningmastery.com/how-to-code-a-wasserstein-generative-adversarial-network-wgan-from-scratch/