In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import math
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score

tf.compat.v1.disable_eager_execution()
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_visible_devices(gpus[1], 'GPU')

In [None]:
# define the standalone discriminator model
def get_discriminator():
    #in_condition refers to estimated embedding from G
    in_condition = tf.keras.Input(shape=(768,))
    fe = tf.keras.layers.Dense(128)(in_condition)
    fe = tf.keras.layers.ELU()(fe)
    fe = tf.keras.layers.Dropout(0.2)(fe)
    fe =  tf.keras.layers.Dense(64)(fe)
    fe = tf.keras.layers.ELU()(fe)
    fe = tf.keras.layers.Dropout(0.2)(fe)
    fe =  tf.keras.layers.Dense(16)(fe)
    fe = tf.keras.layers.ELU()(fe)
    fe = tf.keras.layers.Dropout(0.2)(fe)
    # 8 emotion, 1 neutral, 1 fake/real
    out_layer1 = tf.keras.layers.Dense(1, activation='sigmoid', name='fr')(fe)
    out_layer2 = tf.keras.layers.Dense(1, activation='sigmoid',name='condition')(fe)
    out_layer3 = tf.keras.layers.Dense(1, activation='sigmoid',name='class_label')(fe)
    model = tf.keras.Model(in_condition, [out_layer2, out_layer1, out_layer3])
    opt = tf.keras.optimizers.Adam(lr=0.0002)
    
    model.compile(loss={'condition': 'mse', 'fr': 'binary_crossentropy', 'class_label': 'binary_crossentropy'}, loss_weights={'condition': 1., 'fr': 1., 'class_label': 1.}, optimizer=opt)
    return model

def get_generator():
    # interface for neutral indicator 
    in_label1_input = tf.keras.Input(shape=(1,))
    # interface for noise
    in_label2_input = tf.keras.Input(shape=(32,))
    in_embedding = tf.keras.Input(shape=(768,))
    gen = tf.keras.layers.Dense(512)(in_embedding)
    gen = tf.keras.layers.LeakyReLU()(gen)
    gen = tf.keras.layers.Dense(198)(gen)
    gen = tf.keras.layers.LeakyReLU(name = 'd_embed')(gen)
    merge = tf.keras.layers.Concatenate()([in_label1_input,in_label2_input, gen])
    gen = tf.keras.layers.Dense(198)(merge)
    gen = tf.keras.layers.LeakyReLU()(gen)
    gen = tf.keras.layers.Dense(512)(gen)
    gen = tf.keras.layers.LeakyReLU()(gen)
    out_layer = tf.keras.layers.Dense(768, activation='linear')(gen)
    model = tf.keras.Model([in_label1_input,in_label2_input, in_embedding], out_layer)
    return model
# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model):
    # get noise and label inputs from generator model
    in_label1,in_label2, in_embedding = g_model.input
    d_model.trainable = False
    # in_label1: emotion scores
    # in_label2 gaussian noise
    # in_embedding: original embedding 
    
    # get image output from the generator model
    gen_output = g_model.output
    # connect image output and label input from generator as inputs to discriminator
    gan_output1, gan_output2, gan_output3  = d_model(gen_output)
    
    # define gan model as taking noise and label and outputting a classification
    model = tf.keras.Model([in_label1,in_label2, in_embedding], [gan_output1, gan_output2, gan_output3])
    # compile model
    opt = tf.keras.optimizers.Adam(lr=0.0002)
#     opt = tf.keras.optimizers.Adadelta()
    model.compile(loss={d_model.name: 'mse', d_model.name+'_1': 'binary_crossentropy', d_model.name+'_2': 'binary_crossentropy'}, loss_weights={d_model.name: 1., d_model.name+'_1': 1.,  d_model.name+'_2': 1.}, optimizer=opt)
    return model

In [None]:
g = get_generator()
d = get_discriminator()
gan = define_gan(g, d)

In [None]:
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping
callbacks = [EarlyStopping(monitor='val_loss', patience=3)]

def generate_real_samples_for_d(embeddings, scores, n_samples, current_batch):
    # embedding 
    ix = randint(0, embeddings.shape[0], n_samples)
#     embedding = embeddings[current_batch*n_samples:(current_batch+1)*n_samples]
    embedding = embeddings[ix]
    score = []
    label = []
    for index in range(n_samples):
        score.append(np.array([0]))
        label.append(np.array([1]))
    # neutral score
    for index in range(n_samples):
        score[index][0] = scores[ix[index]]
#         score[index][0] = random.uniform(0, 1)
    return embedding, score, label, train_label1[ix]
 
# generate points in latent space as input for the generator
def generate_latent_points(n_samples):
    z_input = []
    noise = []
    for _ in range(n_samples):
        noise.append(list(generate_known_gaussian(32)[0]))
    return noise, z_input
 
def generate_fake_samples_for_d(embeddings, scores, n_samples, current_batch, generator):
    # embedding 
    ix = randint(0, embeddings.shape[0], n_samples)
    embedding = embeddings[ix]
    score = []
    label = []
    for _ in range(n_samples):
        score.append(np.array([0]))
        label.append(np.array([0]))
    # neutral score
    for index in range(n_samples):
        score[index][0] = scores[ix[index]]
    noise, z_input = generate_latent_points(n_samples)
    fake_embedding = generator.predict([score, noise ,embedding])
    return fake_embedding, score, label, train_label1[ix]

def define_encoder(g_model):
    in_label1,in_label2, in_embedding = g_model.input
    output = g_model.get_layer("d_embed").output
    model = tf.keras.Model([in_label1,in_label2, in_embedding], output)
    return model

In [None]:
def train(g_model, d_model, gan_model, embeddings, scores, n_epochs, test_embedding, n_batch=64):
    bat_per_epo = int(embeddings.shape[0] / n_batch)
    half_batch = int(n_batch / 2)
    for i in range(n_epochs):
        for j in range(bat_per_epo):
            embedding_real, score_real, label_real, cls_label = generate_real_samples_for_d(embeddings, scores, half_batch, j)
            d_loss1 = d_model.train_on_batch(embedding_real, [score_real, label_real, cls_label])
            fake_embedding, score_fake, label_fake, cls_label = generate_fake_samples_for_d(embeddings, scores, half_batch, j, g_model)
            d_loss2 = d_model.train_on_batch(fake_embedding, [score_fake, label_fake, cls_label])
            ix = randint(0, embeddings.shape[0], n_batch)
            embedding = embeddings[ix]
            noise, z_input = generate_latent_points(n_batch)
            score = []
            label = []
            for _ in range(n_batch):
                score.append(np.array([0]))
                label.append(np.array([1]))
            # neutral score
            for index in range(n_batch):
                score[index][0] = scores[ix[index]]
            g_loss = gan_model.train_on_batch([score, noise, embedding], [score, label, train_label1[ix]])
            total_gan_loss.append([g_loss[0], (g_loss[1]+g_loss[2])/2, (d_loss1[0]+d_loss2[0])/2, (((d_loss1[1]+d_loss1[2])/2) + ((d_loss2[1]+d_loss2[2])/2))/2 ])
            print('>%d, %d/%d, g_loss=%.3f, g_condition=%.3f, d_loss=%.3f, d_condition=%.3f' %(i+1, j+1, bat_per_epo, g_loss[0], (g_loss[1]+g_loss[2])/2, (d_loss1[0]+d_loss2[0])/2, (((d_loss1[1]+d_loss1[2])/2) + ((d_loss2[1]+d_loss2[2])/2))/2),end='\r')


In [None]:
total_d1_loss = []
total_d2_loss = []
total_gan_loss = []
total_d_loss = []
train(g,d,gan,embedding,neutral_score,100, embedding2, 16)