In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Activation, Dense, Input
from tensorflow.keras.layers import Conv2D, Flatten
from tensorflow.keras.layers import Reshape, Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import concatenate

from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model
from tensorflow.keras.datasets import mnist
from tensorflow.keras import backend as K
from tensorflow.keras import regularizers
from tensorflow.keras.utils import to_categorical

import numpy as np
import argparse

import sys

import math
import matplotlib.pyplot as plt
import os

import pandas as pd
from sklearn import preprocessing 
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

import tensorflow as tf
tf.config.run_functions_eagerly(True)

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


# Build discriminator and generator

In [None]:
def build_generator(inputs, n_features, activation='sigmoid',labels=None):
    # network parameters : sizes of the hidden layers
    layers_size=[128, 64, 32, 1]
    if labels is not None:
      inputs = [inputs, labels]
      x = concatenate(inputs, axis=1)
    else:
        # default input is just 100-dim noise (z-code)
        x = inputs
    x = Dense(n_features*layers_size[0])(x)
    x = Reshape((n_features,layers_size[0]))(x)

    for units in layers_size:
      x = BatchNormalization()(x)
      x = Activation('relu')(x)
      x = Dense(units, bias_regularizer=regularizers.l2(1e-4), kernel_regularizer=regularizers.l2(1e-4))(x)

    if activation is not None:
        x = Activation(activation)(x)
    gen= Model(inputs, x, name='generator')
    #gen.summary()
    # generator output is the synthesized dataset
    return gen

In [None]:
def build_discriminator(inputs, n_features, activation='sigmoid', labels=None):
    layers_size = [32, 64, 128]# , 256, 512] #, 256

    x = inputs
    y = Dense(38)(labels) #Dense() takes the number of features
    y = Reshape((38, 1))(y)
    x = concatenate([x, y])
    for units in layers_size:
      x = LeakyReLU(alpha=0.2)(x)
      x = Dense(units, bias_regularizer=regularizers.l2(1e-4), kernel_regularizer=regularizers.l2(1e-4))(x)

    x = Flatten()(x)
    # default output is probability that the data is real
    outputs = Dense(1)(x)
    if activation is not None:
        print(activation)
        outputs = Activation(activation)(outputs)

    return Model([inputs, labels], outputs, name='discriminator')

# Training

Frechet-Inception Distance

In [None]:

# function to compute the FID between two distributions
def calculate_fid(act1, act2):
  mu1 = act1.mean(axis = 0)
  sigma1 = np.cov(act1.astype(float), rowvar = False)
  mu2 = act2.mean(axis = 0)
  sigma2 = np.cov(act2.astype(float), rowvar = False)
  # calculate sum squared difference between means
  ssdiff = np.sum((mu1-mu2)**2.0)
  # calculate sqrt of product between cov
  covmean = np.sqrt(np.abs(sigma1.dot(sigma2)))

  fid = ssdiff + np.trace(sigma1 + sigma2 -2.0*covmean)

  return fid

Training function

In [None]:
def train(models, data, y, params):
    # the GAN models
    generator, discriminator, adversarial = models
    # input data and labels
    x_train = data
    y_train = y 
    # network parameters and runs counter r
    batch_size, latent_size, train_steps, num_labels, model_name,r = params
    #batch_size, latent_size, train_steps, num_labels, model_name = params

    # noise vector to see how the generator output evolves during training
    noise_input = np.random.uniform(-1.0, 1.0, size=[16, latent_size])
    # one-hot label the noise will be conditioned to
    noise_class = np.eye(num_labels)[np.arange(0, 16) % num_labels]
    # number of elements in train dataset
    train_size = x_train.shape[0]

    print(model_name,
          "Labels for generated data: ",
          np.argmax(noise_class, axis=1))
    
    # uncomment and adapt the path to save training logs

    #with open ("/content/drive/MyDrive/Colab Notebooks/GAN_fatica/trainingMMH/discriminator_"+str(train_steps)+".csv","a") as discr_file:
    #  discr_file.write("step,loss,accuracy,FID"+"\n")
    #with open ("/content/drive/MyDrive/Colab Notebooks/GAN_fatica/trainingMMH/adversarial_"+str(train_steps)+".csv","a") as adv_file:
    #  adv_file.write("step,loss,accuracy"+"\n")
    
    # train the GAN for train_steps iterations
    for i in range(train_steps):
        # train the discriminator for 1 batch
        # 1 batch of real (label=1.0) and fake data (label=0.0)
        # randomly pick real data from dataset
        rand_indexes = np.random.randint(0, train_size, size=batch_size)
        real_data = x_train[rand_indexes]
        # corresponding one-hot labels of real data
        real_labels = y_train[rand_indexes]
        # generate fake data from noise using generator
        # generate noise using uniform distribution
        noise = np.random.uniform(-1.0, 1.0,size=[batch_size, latent_size])                                
        # assign random one-hot labels
        fake_labels = np.eye(num_labels)[np.random.choice(num_labels, batch_size, p=[0.5, 0.5])]                                                       
        
        # generate fake data conditioned on fake labels
        fake_data = generator.predict([noise, fake_labels])
        fake_data = np.reshape(fake_data,(fake_data.shape[0], fake_data.shape[1]))
        #  real + fake data = 1 batch of train data
        x = np.concatenate((real_data, fake_data))
        # real + fake one-hot labels = 1 batch of train one-hot labels
        labels = np.concatenate((real_labels, fake_labels))

        # label real and fake data
        # real data label is 1.0
        y = np.ones([2 * batch_size, 1])
        # fake data label is 0.0
        y[batch_size:, :] = 0.0
        
        # train discriminator network, log the loss and accuracy
                ###########
        ### FID (measured during training iterations) ###
        ###########
        fid=calculate_fid(real_data,fake_data)

        loss, acc = discriminator.train_on_batch([x, labels], y)
        #salvo su file
        #with open ("/content/drive/MyDrive/Colab Notebooks/GAN_fatica/trainingMMH/discriminator_"+str(train_steps)+".csv","a") as discr_file:
        #  discr_file.write(str(i)+","+ str(loss)+","+str(acc)+","+str(fid)+"\n")
        log = "%d: [discriminator loss: %f, accuracy: %f]" % (i, loss, acc)
        
        # train the adversarial network for 1 batch
        # 1 batch of fake data conditioned on fake 1-hot labels 
        # w/ label=1.0
        # since the discriminator weights are frozen in 
        # adversarial network only the generator is trained
        # generate noise using uniform distribution        
        noise = np.random.uniform(-1.0,
                                  1.0,
                                  size=[batch_size, latent_size])
        # assign random one-hot labels
        fake_labels = np.eye(num_labels)[np.random.choice(num_labels,
                                                          batch_size)]
        # label fake data as real or 1.0
        y = np.ones([batch_size, 1])
        
        # train the adversarial (generator) network 
        # note that unlike in discriminator training, 
        # we do not save the fake data in a variable
        # the fake data go to the discriminator input
        # of the adversarial for classification
        # log the loss 
        loss, acc = adversarial.train_on_batch([noise, fake_labels], y)
        log = "%s [adversarial loss: %f, accuracy adv: %f]" % (log, loss, acc)
        #with open ("/content/drive/MyDrive/Colab Notebooks/GAN_fatica/trainingMMH/adversarial_"+str(train_steps)+".csv","a") as adv_file:
        #  adv_file.write(str(i)+","+ str(loss)+","+str(acc)+"\n")
        print(log)
    
    # save the model after training the generator
    # the trained generator can be reloaded for 
    # future generation
    generator.save("/content/drive/MyDrive/Colab Notebooks/GAN_fatica/training_MMH/"+model_name + ".h5")

Build and train GAN

In [None]:
# build the GAN components and train them;
#r is the number of runs 
# train steps is fixed the number of epochs (here intended as iterations)

def build_and_train_models(data, y, train_steps,r):

    num_labels = np.amax(y) + 1
    y = to_categorical(y)

    input_shape = (data.shape[1],1) #columns in the data
    label_shape = (num_labels, )#quante label
    # CHANGE MODEL NAME (OVER OR UNDER)
    model_name = "fatigue_over"+str(r)
    # network parameters
    latent_size = 100 #size of the noise vector
    batch_size = 64 
    lr = 5e-5 #learning rate 

    # build DISCRIMINATOR model
    inputs = Input(shape=input_shape, name='discriminator_input')
    labels = Input(shape=label_shape, name='class_labels')

    discriminator = build_discriminator(inputs, data.shape[1], 'linear',labels)
    # ADAM optimizer
    optimizer = Adam(learning_rate=lr,beta_1=0.9,epsilon=1e-07)
    discriminator.compile(loss='binary_crossentropy',
                          optimizer=optimizer,
                          metrics=['accuracy'])
    discriminator.summary()

    # build GENERATOR model
    input_shape = (latent_size, )
    inputs = Input(shape=input_shape, name='z_input')
    generator = build_generator(inputs, data.shape[1], 'sigmoid', labels)
    generator.summary()

    # build ADVERSARIAL model = generator + discriminator
    # freeze the weights of discriminator during adversarial training
    discriminator.trainable = False
    outputs = discriminator([generator([inputs, labels]), labels])
    adversarial = Model([inputs, labels],
                        outputs,
                        name=model_name)
    adversarial.compile(loss='binary_crossentropy',
                        optimizer=optimizer,
                        metrics=['accuracy'])
    adversarial.summary()

    # train discriminator and adversarial networks
    models = (generator, discriminator, adversarial)
    params = (batch_size, latent_size, train_steps, num_labels, model_name,r)
    train(models, data, y, params)

# MAIN
Import the original real dataset

In [None]:
# over 40 data
data = pd.read_excel("/content/drive/MyDrive/Colab Notebooks/Github XAIGAN/Real Data and Rules/MMH_over40.xlsx")
# under 40 data
# data = pd.read_excel("/content/drive/MyDrive/Colab Notebooks/Github XAIGAN/Real Data and Rules/MMH_under40.xlsx")
#data.head()
targetName="fatiguestate1"
dataoutput = data[targetName]
datadd = data

datadd.head()

In [None]:
data = data.drop([targetName], axis=1)
data = data.fillna(data.median())
#print(data.head(3))
# Data Scaling in (0,1)
X = data.iloc[:, 0:data.shape[1]].values
y = dataoutput.values
#print(X)
scaler = MinMaxScaler((0,1))
scaler.fit(X)
#scaled input data
inputdata = scaler.transform(X)
inputdata.shape[0] # check number of samples

108

Execution of GAN runs r=1,...,N. At each iteration

*   Build and train the GAN network
*   Used the trained GAN model to generate the fake r-th dataset 



In [None]:

N=10
train_steps=5000
for r in range(N):
  build_and_train_models(inputdata,y,train_steps,r) #inputdata: scaled features (0,1); y target values;
  # load the model built and trained
  model=load_model("/content/drive/MyDrive/Colab Notebooks/Github XAIGAN/Real Data and Rules/training_MMH/fatigue_over"+str(r)+".h5")
  
  # generate random noise and use the trained GAN to synthesize a fake dataset of 400 samples 
  noise_input = np.random.uniform(-1.0, 1.0, size=[400, 100])
  noise_class = np.eye(2)[np.random.choice(2, 400)]
  gendata = model.predict([noise_input,noise_class])
  gendata = gendata.reshape((400,38)) # 400 samples with 38 dimensions (features)
  gendata = scaler.inverse_transform(gendata) #reconvert to original scale
  noise_class = tf.math.argmax(noise_class, axis=1) #reverse from one-hot format
  # save the generated dataset to file
  gendata = pd.DataFrame(gendata,columns=data.columns)
  gendata['Output']=noise_class
  gendata.to_excel("/content/drive/MyDrive/Colab Notebooks/Github XAIGAN/Real Data and Rules/generated_MMH/MMH_over"+str(r)+".xlsx")
