In [None]:
import sys
sys.modules[__name__].__dict__.clear()

In [1]:

import numpy as np
import tensorflow as tf
from keras.layers import Input,Concatenate,Activation, Dense, Dropout, Flatten, Conv2D, Conv2DTranspose, BatchNormalization, LeakyReLU, Reshape
from keras.models import Model
from numpy.random import randn
from numpy.random import randint
from tensorflow import keras

#Tx: time dimension
#nx: number of features

# generator
def make_generator_model(latent_dim,Tx,nx):
    dim1= 2
    dim2=int(nx/2)
    depth=256

    in_label = Input(shape=(1,))
    in_lat = Input(shape=(latent_dim,))
    merge = Concatenate(axis=1)([in_lat, in_label])

    n_nodes = dim1 * dim2 * depth
    gen = Dense(n_nodes)(merge)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU()(gen)
    gen = Reshape((dim1, dim2, depth))(gen)


    gen = Conv2DTranspose(128,(3,3), strides=(1,1), padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU()(gen)
    gen=Dropout(0.2)(gen)

    gen = Conv2DTranspose(64,(3,3), strides=(1,1), padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU()(gen)
    gen=Dropout(0.2)(gen)


    gen = Conv2DTranspose(1,(3,3), strides=(2,2), padding='same')(gen)
    #gen = BatchNormalization()(gen)
    #gen = LeakyReLU()(gen)
    #gen=Dropout(0.5)(gen)
    out_layer = Activation('tanh')(gen)
    model = Model([in_lat, in_label], out_layer,name='Generator')
    #opt = Adam(lr=0.0002, beta_1=0.5)
    #model.compile(loss='binary_crossentropy', optimizer=opt)
    model.summary()




    #gen = Conv2D(1,(3,3), strides=(1,1), padding='same')(gen)
    #out_layer = Activation('tanh')(gen)
    #model = Model([in_lat, in_label], out_layer)
    #model.summary()
    return model


#discriminator
def make_discriminator_model(Tx,nx):

    dim1=Tx
    dim2=nx

    in_image= Input (shape=[dim1, dim2, 2])
    fe = Conv2D(64,(3,3), strides=(2,2), padding='same')(in_image)
    fe= LeakyReLU(alpha=0.2)(fe)
    fe=Dropout(0.2)(fe)

    fe = Conv2D(128, (3,3), strides=(2,2),padding='same')(fe)
    #fe = BatchNormalization()(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    fe = Dropout(0.2)(fe)

    fe = Conv2D(256,(3,3), strides=(2,2), padding='same')(fe)
    #fe = BatchNormalization()(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    fe = Dropout(0.2)(fe)

    fe = Conv2D(512,(3,3), strides=(2,2), padding='same')(fe)
    #fe = BatchNormalization()(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    fe = Dropout(0.2)(fe)

    fe = Flatten()(fe)
    #real/fake output
    out1 =  Dense(1,activation='linear')(fe)

    model = Model(in_image, out1,name='Critic')

    #opt = Adam(lr=0.0002, beta_1=0.5)
    #model.compile(loss='binary_crossentropy', optimizer=opt)
    model.summary()

    return model


# gradient penalty
def gradient_penalty(batch_size, crit, real_images, fake_images):
    alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
    diff = fake_images - real_images
    interpolated = real_images + alpha * diff
    with tf.GradientTape() as gp_tape:
        gp_tape.watch(interpolated)
        # 1. Get the discriminator output for this interpolated image.
        pred = crit(interpolated, training=True)
        # 2. Calculate the gradients w.r.t to this interpolated image.
        grads = gp_tape.gradient(pred, [interpolated])[0]
        # 3. Calculate the norm of the gradients.
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp



    #gradient = gradient.flatten()
    #gradient_norm = tf.norm(gradient, ord=2, axis=1)
    #penalty = ((gradient_norm -1)**2).mean()
    #return penalty

# generator loss
def get_gen_loss(crit_fake_pred):
    return -tf.reduce_mean(crit_fake_pred)
#discriminator loss
def get_crit_loss (crit_fake_pred, crit_real_pred, gp, c_lambda):
    real_loss = tf.reduce_mean(crit_real_pred)
    fake_loss = tf.reduce_mean(crit_fake_pred)
    d_cost = fake_loss - real_loss
    crit_loss = d_cost + gp*c_lambda
    return crit_loss

# make one hot images from the ground true labels
def one_hot_images(dim,labels):
    Lx = np.zeros((dim[0],dim[1],dim[2],1))
    for i in range(labels.shape[0]):
        if labels[i]== 0:
            Lx[i] = np.zeros((1,dim[1],dim[2],1))
        elif labels[i]== 1:
            Lx[i] = np.ones((1,dim[1],dim[2],1))

    #X_new = np.concatenate((X,Lx),axis=3)
    return  Lx


# select real samples
def generate_real_samples(dataset, n_samples):
    # split into images and labels
    images, labels = dataset
    # choose random instances
    ix = randint(0, images.shape[0], n_samples)
    # select images and labels
    X, labels = images[ix], labels[ix]
    # concatenate the one hot images of labels to the X
    dim = X.shape
    Lx = one_hot_images(dim,labels)
    X = np.concatenate((X,Lx),axis=3)
    return X, labels


# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples, n_classes=2):
    # generate points in the latent space
    x_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    z_input = x_input.reshape(n_samples, latent_dim)
    # generate labels
    labels = randint(0, n_classes, n_samples) #check these labels!


    return [z_input, labels]



# train the generator and discriminator
def train(g_model, d_model, dataset, latent_dim ,gen_loss, disc_loss, n_epochs=700, n_batch=8):


    d_optimizer = keras.optimizers.Adam(learning_rate=0.0002,beta_1=0.5,beta_2=0.9)
    g_optimizer = keras.optimizers.Adam(learning_rate=0.0002,beta_1=0.5,beta_2=0.9)

    n_steps=5000
    c_lambda = 5


    for i in range(n_steps):
        #train the critic 3 times more than the generator
        for j in range(3):
            # get randomly selected 'real' samples
            X_real, y_real = generate_real_samples(dataset, n_batch)
            # generate fake samples
            dim = X_real.shape
            z_input, labels_fake = generate_latent_points(latent_dim, n_batch)
            Lx = one_hot_images(dim,labels_fake)
            with tf.GradientTape() as tape:

                images = g_model([z_input, labels_fake], training= True)

                # concatenate the one hot images of labels to the images
                X_fake = np.concatenate((images,Lx),axis=3)

                # get the logits for fake images
                crit_fake_pred = d_model(X_fake, training= True)
                # get the logits for real images
                crit_real_pred = d_model(X_real, training= True)

                # gradient penalty
                gp = gradient_penalty(n_batch, d_model, X_real, X_fake)
                d_loss = get_crit_loss(crit_fake_pred, crit_real_pred, gp, c_lambda)
            # Get the gradients w.r.t the discriminator loss
            d_gradient = tape.gradient(d_loss, d_model.trainable_variables)
            # Update the weights of the discriminator using the discriminator optimizer
            d_optimizer.apply_gradients(zip(d_gradient, d_model.trainable_variables))


        # prepare points in latent space as input for the generator
        z_input, z_labels = generate_latent_points(latent_dim, n_batch)

        dim=[]
        dim.append(n_batch)
        dim.append(X_real.shape[1])
        dim.append(X_real.shape[2])
        dim.append(X_real.shape[3])
        label_image = one_hot_images(dim,z_labels)
        with tf.GradientTape() as tape:
            # Generate fake images using the generator
            generated_images = g_model([z_input, z_labels], training=True)
            # Get the discriminator logits for fake images
            Input_new_fake = tf.concat([generated_images, label_image], axis=3)
            crit_fake_pred = d_model(Input_new_fake, training=True)
            g_loss = get_gen_loss(crit_fake_pred)



        # Get the gradients w.r.t the generator loss
        gen_gradient = tape.gradient(g_loss, g_model.trainable_variables)
        # Update the weights of the generator using the generator optimizer
        g_optimizer.apply_gradients(
            zip(gen_gradient, g_model.trainable_variables)
        )


        gen_loss.append(g_loss)
        disc_loss.append(d_loss)




    return gen_loss, disc_loss, g_model, d_model




# size of the latent space
latent_dim = 100
# create the discriminator
discriminator = make_discriminator_model(Tx,nx)
# create the generator
generator = make_generator_model(latent_dim,Tx,nx)

# load image data
dataset = [X_train, Y_train]  # you need to change this part x_train dimension: (m,Tx,nx) m: number of samples, Tx: time dimension, nx: number of features

gen_loss = []
disc_loss =[]

# train model
gen_loss, disc_loss, generator,discriminator = train(generator, discriminator, dataset, latent_dim,gen_loss, disc_loss)