In [1]:
import os
import numpy as np
import math
import tensorflow as tf
import tensorflow.contrib.slim as slim
import h5py
from pylab import *
from libs.utils import weight_variable, bias_variable
from libs.readfile_fau import *
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import SGD
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec


Using TensorFlow backend.


In [2]:
#load the train and test data
filename_train = './data/FAU_train_256.arff'
filename_test =  './data/FAU_test_256.arff'
train_data, train_label, train_size = readFile(filename_train)
test_data, test_label, test_size = readFile(filename_test)
print("Train data：{0} , {1} , {2}".format(train_data.shape, train_label.shape, train_size))
print("Test data：{0} , {1} , {2}".format(test_data.shape, test_label.shape, test_size))
fau_a,fau_e,fau_n,fau_p,fau_r = separa_fau(train_data, train_label)

combine = [fau_a,fau_e,fau_n,fau_p,fau_r]
emotion = ['Anger', 'Emphatic', 'Neutral', 'Positive', 'Rest']
fake_emotion_txt = ['fake_a.txt', 'fake_e.txt', 'fake_n.txt', 'fake_p.txt', 'fake_r.txt']

batch_size = 128

input_height = 16
input_width = 16

d_dim = 256
g_dim = 100

gfc_dim = 1024
dfc_dim = 1024

g_filter_dim = 64
d_filter_dim = 64
color_dim = 1

learning_rate = 0.0002
beta1 = 0.5
epsilon = 1e-5
fake_num = 2000

x_d = tf.placeholder(tf.float32, shape = [None, d_dim])
x_g = tf.placeholder(tf.float32, shape = [None, g_dim])

t_vars = tf.trainable_variables()
var_d = [var for var in t_vars if 'd_' in var.name]
var_g = [var for var in t_vars if 'g_' in var.name]

# leaky relu activation function
def lrelu(x, leak=0.2, name="lrelu"):
      return tf.maximum(x, leak*x)
    
# batch normalization
def batch_norm(x, train, name='batch_norm'):
    return tf.contrib.layers.batch_norm( x, decay=0.9, updates_collections=None,
                                                                  epsilon=epsilon,
                                                                  scale=True,
                                                                  is_training=train,
                                                                  scope=name)

def conv2d(input_, output_dim,   k_h=5, k_w=5, d_h=2, d_w=2, stddev=0.02, name='conv2d'):
    with tf.variable_scope(name):
        kernel = tf.get_variable('kernel', [k_h, k_w, input_.get_shape()[-1], output_dim],initializer=tf.truncated_normal_initializer(stddev=stddev))
        conv = tf.nn.conv2d(input_, kernel, strides=[1, d_h, d_w, 1], padding='SAME')

        bias = tf.get_variable('bias', [output_dim], initializer=tf.constant_initializer(0.0))
        conv = tf.reshape(tf.nn.bias_add(conv, bias), conv.get_shape())

        return conv

def deconv2d(input_, output_shape,k_h=5, k_w=5, d_h=2, d_w=2, stddev=0.02,name="deconv2d", with_w=False):
    with tf.variable_scope(name):
        # filter : [height, width, output_channels, in_channels]
        kernel = tf.get_variable('kernel', [k_h, k_w, output_shape[-1], input_.get_shape()[-1]],initializer=tf.random_normal_initializer(stddev=stddev))
        deconv = tf.nn.conv2d_transpose(input_, kernel, output_shape=output_shape,strides=[1, d_h, d_w, 1])

        bias = tf.get_variable('biases', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
        deconv = tf.reshape(tf.nn.bias_add(deconv, bias), deconv.get_shape())

        if with_w:
              return deconv, kernel, bias
        else:
              return deconv
        
def linear(input_, output_size, scope=None, stddev=0.02, bias_start=0.0, with_w=False):
    input_ = tf.convert_to_tensor(input_, dtype=tf.float32)
    shape = input_.get_shape().as_list()

    with tf.variable_scope(scope or "Linear"):
        matrix = tf.get_variable("Matrix", [shape[1], output_size], tf.float32, tf.random_normal_initializer(stddev=stddev))
        bias = tf.get_variable("bias", [output_size],initializer=tf.constant_initializer(bias_start))
        if with_w:
              return tf.matmul(input_, matrix) + bias, matrix, bias
        else:
              return tf.matmul(input_, matrix) + bias

Train data：(9959, 256) , (9959, 5) , [881, 2093, 5590, 674, 721]
Test data：(8257, 256) , (8257, 5) , [611, 1508, 5377, 215, 546]


In [3]:
### GAN block ###
def sample_z(sample_num, z_dim):
    return  np.random.normal(0.0, 1.0, size=[sample_num, z_dim]) # use gaussian distribution to generate noise sample (mean=0, stddev=1)

def generator(z, is_training=True):
    # use ReLU as activation function except output layer
    # project z and reshape, 4x4x512
    z_linear, h0_w, h0_b = linear(z, 4*4*g_filter_dim*8, 'g_h0_linear', with_w=True)
    h0 = tf.reshape(z_linear, [-1, 4, 4, g_filter_dim*8])
    h0 = lrelu(batch_norm(h0, is_training, name='g_bn0'))
    # first deconvolution, 8x8x256
    h1, h1_w, h1_b = deconv2d(h0, [batch_size, 8, 8, g_filter_dim*4], name='g_h1_deconv', with_w=True)
    h1 = lrelu(batch_norm(h1, is_training, name='g_bn1'))
    # second deconvolution, 16x16x128
    h2, h2_w, h2_b = deconv2d(h1, [batch_size, 16, 16, g_filter_dim*2], name='g_h2_deconv', with_w=True)
    h2 = lrelu(batch_norm(h2, is_training, name='g_bn2'))
    # third deconvolution, 16x16x1
    h3, h3_w, h3_b = deconv2d(h2, [batch_size, 16, 16, color_dim], name='g_h3_deconv', with_w=True)
    # output layer use hyperbolic tangent (tanh)
    return tf.nn.tanh(h3)

def discriminator(x):
     # use leaky relu as activation function except output layer
    print(x.shape)
    feature = tf.reshape(x, [-1, 16,16, 1])
    # first convolution, 8x8x128
    h0 = lrelu(conv2d(feature, d_filter_dim*2, name='d_h0_conv'))
    # second convolution, 4x4x256
    h1 = lrelu(batch_norm(conv2d(h0, d_filter_dim*4, name='d_h1_conv'), name='d_bn1'))
    # third convolution, 2x2x512
    h2 = lrelu(batch_norm(conv2d(h1, d_filter_dim*8, name='d_h2_conv'), name='d_bn2'))
    # fully-connected layer, 1x1x1024
    h3 = linear(tf.reshape(h2, [batch_size, -1]), dfc_dim, 'd_h2_linear')
    # ouptut layer use sigmoid as activation function
    return tf.nn.sigmoid(h3), h3

def plot(num, samples):
    samples = np.concatenate((samples,np.zeros((samples.shape[0],16))),axis=1)
    fig = plt.figure(figsize=(4, 4))
    gs = gridspec.GridSpec(4, 4)
    gs.update(wspace=0.05, hspace=0.05)
    for i, sample in enumerate(samples):
        ax = plt.subplot(gs[i])
        plt.axis('off')
        ax.set_xticklabels([])
        ax.set_yticklabels([])
        ax.set_aspect('equal')
        plt.imshow(sample.reshape(20, 20), cmap = 'gray')
    plt.savefig('./pic/figure %d .png'%(num))
    #plt.show()
    
def train_gan(emotion_class, emotion_name):
    gan_train_data = emotion_class # set the emotion class
  
    G_sample = generator(x_g)
    D_real, D_real_logits = discriminator(x_d)
    D_fake, D_fake_logits_ = discriminator(G_sample)
    # define the loss function
    d_loss_real = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(D_real_logits, tf.ones_like(D_real)))
    d_loss_fake = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(D_fake_logits_, tf.zeros_like(D_fake)))
    g_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(D_fake_logits_, tf.ones_like(D_fake)))
    d_loss = d_loss_real + d_loss_fake
    #d_loss = -tf.reduce_mean(tf.log(d_real) + tf.log(1. - d_fake))
    #g_loss = -tf.reduce_mean(tf.log(d_fake))
    
    # update discriminator
    d_optimizer = tf.train.AdamOptimizer(learning_rate, beta1=beta1).minimize(d_loss, var_list = var_d)

    # update generator
    g_optimizer = tf.train.AdamOptimizer(learning_rate, beta1=beta1).minimize(g_loss, var_list = var_g)
    
    num = 0
    global_step = 30001
    sess = tf.InteractiveSession()
    tf.global_variables_initializer().run()
    saver = tf.train.Saver()
    for step in range(global_step):
        offset = (step * batch_size) % (gan_train_data.shape[0] - batch_size)
        batch_x = gan_train_data[offset:(offset + batch_size), :]
        _ , d_loss_train = sess.run([d_optimizer, d_loss], feed_dict = {x_d: batch_x, x_g: sample_z(batch_size, g_dim)})
        _ , g_loss_train = sess.run([g_optimizer, g_loss], feed_dict = {x_g: sample_z(batch_size, g_dim)})
        if step % 2000 == 0:
            print("Step %d, discriminator loss %.5f , generator loss %.5f" % (step, d_loss_train, g_loss_train))
        if step % 1000 == 0: 
            g_sample_plot = g_sample.eval(feed_dict = {x_g: sample_z(16, g_dim)})
            plot(num,g_sample_plot)
            path_name = './gan_model/' + emotion_name
            save_path = saver.save(sess, path_name + '/model%d.ckpt'% (num))
            num+=1
            
def generate_fake_data():
    fake_emotion_list = [[], [], [], [], []]
    for i in range(5):
        fake_emotion_model = './gan_model/' + emotion[i] + '/model30.ckpt'
        with tf.Session() as sess:
            saver = tf.train.Saver()
            saver.restore(sess,  fake_emotion_model)
            g_sample = generator(x_g, False)
            new_fau_sample = g_sample.eval(feed_dict = {x_g: sample_z(fake_num, g_dim)})
           #plot(i, new_fau_sample)\n",
            fake_emotion_list[i] = new_fau_sample

    fake_a = fake_emotion_list[0]
    fake_e = fake_emotion_list[1]
    fake_n = fake_emotion_list[2]
    fake_p = fake_emotion_list[3]
    fake_r = fake_emotion_list[4]
    # save the fake data
    np.savetxt("./gan_data/Anger/fake_a.txt", fake_a)
    np.savetxt("./gan_data/Emphatic/fake_e.txt", fake_e)
    np.savetxt("./gan_data/Neutral/fake_n.txt", fake_n)
    np.savetxt("./gan_data/Positive/fake_p.txt", fake_p)
    np.savetxt("./gan_data/Rest/fake_r.txt", fake_r)
    print ('\nEnd of generating.\n')

In [4]:
### KNN block ###
#Euclidean distance
def euclidean(train, test, train_num):
    dist=[]
    for i in range(train_num):# number of training data: i
        dist_temp=[math.sqrt(sum(pow(train[i,:] - test, 2)))]
        dist.append(dist_temp)
    return dist

# Manhattan distance: |x1-x2|+|y1-y2|
def manhattan(train, test, train_num):
    dist=[]
    for i in range(train_num):# number of training data: i
        dist_temp=[(sum(abs(train[i,:] - test)))]
        dist.append(dist_temp)
    return dist

#Chebyshev distance:  max(|x2-x1|, |y2-y1|)
def chebyshev(train, test, train_num):
    dist=[]
    for i in range(train_num):# number of training data: i
        dist_temp=[np.max(abs((train[i,:] - test)))]
        dist.append(dist_temp)
    return dist

def knn(target_data):
    kvalue = 1 # set the k value
    result=[0,0,0,0,0]
    a_weight=1.0/881.0
    e_weight=1.0/2093.0
    n_weight=1.0/5590.0
    p_weight=1.0/674.0
    r_weight=1.0/721.0
    dist_choice = 'eu'# distance type (eu , manha , chebyshev)
    if(dist_choice == 'eu'):
        dist = euclidean(train_data, target_data, train_data.shape[0])
    if(dist_choice == 'manha'):
        dist = manhattan(train_data, target_data, train_data.shape[0])
    if(dist_choice == 'chebyshev'):
        dist = chebyshev(train_data, target_data, train_data.shape[0])

    dist_label = np.concatenate((train_label, dist), axis=1) # concatenate distance with the corresponding label
    sorted_distances = sorted(dist_label, key = lambda x : x[1]) # sort by the distance
    sorted_distances=np.asarray(sorted_distances) # convert into an array
    
    for j in range(kvalue): # record the knn result
        m = np.argmax(sorted_distances[j][0:5])
        if  m == 0: #Anger
            result[0]+=a_weight
        elif m == 1: #Emphatic
            result[1]+=e_weight
        elif m == 2: #Neutral
            result[2]+=n_weight
        elif m == 3: #Positive
            result[3]+=p_weight
        elif m == 4: #Rest
            result[4]+=r_weight
            
    temp = np.argmax(result)
    if temp == 0: #Anger
        return [[1,0,0,0,0]]
    elif temp == 1: #Emphatic
        return  [[0,1,0,0,0]]
    elif temp == 2: #Neutral
        return [[0,0,1,0,0]]
    elif temp == 3: #Positive
        return [[0,0,0,1,0]]
    elif temp == 4: #Rest
        return [[0,0,0,0,1]]

def nearest_img(target,image,label):
    nearest=[]
    for i in range(target.shape[0]):
        dist=[]
        for j in range(image.shape[0]-1):
            dist_temp=np.sum(np.absolute(target[i]-image[j]))
            dist.append(dist_temp)
        dist=np.asarray(dist)
        nearest.append(label[np.argmin(dist)])
    nearest=np.asarray(nearest)
    return nearest

In [5]:
### teacher-student model block ###
def _to_tensor(x, dtype):
    x = tf.convert_to_tensor(x)
    if x.dtype != dtype:
        x = tf.cast(x, dtype)
    return x

def weighted_loss(r):
    
    def new_loss(y_true, y_pred):
        _EPSILON = 10e-8
        y_pred /= tf.reduce_sum(y_pred, reduction_indices=len(y_pred.get_shape()) - 1, keep_dims=True)
        # manual computation of crossentropy
        epsilon = _to_tensor(_EPSILON, y_pred.dtype.base_dtype)
        y_pred = tf.clip_by_value(y_pred, epsilon, 1. - epsilon)
        
        #y_true = tf.cast(y_true, tf.float64)
        rr = tf.cast(r, tf.float32)
        new_R = tf.matmul(rr, y_true, transpose_b=True)
        
        return - new_R * tf.reduce_sum(y_true * tf.log(y_pred), reduction_indices=len(y_pred.get_shape()) - 1)
        #return r * K.categorical_crossentropy(y_true, y_pred)

    return new_loss

def tea_stu_model(train_data, train_label, is_training):
    filename_test =  './data/FAU_test_256.arff'
    test_data, test_label, test_size = readFile(filename_test)
    test_data = normalize(test_data)
    input_dim = 256
    learning_rate = 0.01
    momentum = 0.5
    decay = 0.0005
    batch_size = 100
    epoch = 600
    # load the existing model and predict 
    if is_training == False:
        model = Sequential()
        model.add(Dense(30, activation='sigmoid', input_shape=(input_dim,)))
        model.add(Dense(5, activation='softmax'))

        model.load_weights("./model/model.h5")
        #model.summary()
        result = model.predict(train_data)
        use_data = result

        for i, (Target, Label) in enumerate( zip(test_label,use_data) ) :
            m = np.max(Label)# recognition result (max class)
            for j, value in enumerate(Label) : 
                if value == m :
                    if j == 0: # Anger
                        return [[1,0,0,0,0]]
                    if j == 1: # Emphatic
                        return [[0,1,0,0,0]]
                    if j == 2: # Neutral
                        return [[0,0,1,0,0]]
                    if j == 3: # Positive
                        return [[0,0,0,1,0]]
                    if j == 4: # Rest
                        return [[0,0,0,0,1]]

    # retraining the ts-model and predict
    else:
        filename_test =  './data/CS_Ftest_nor.arff'
        test_data, test_label, test_size = readFile(filename_test)
        print("Train data：{0} , {1}".format(train_data.shape, train_label.shape))
        print("Test data：{0} , {1}".format(test_data.shape, test_label.shape))
        weight_data = []
        weight_data.append([1.1,0.5,0.2,1.5,1.4])
        weight_data = np.asarray(weight_data)

        # use train data to generate teacher label
        model = Sequential()
        model.add(Dense(30, activation='sigmoid', input_shape=(input_dim,)))
        model.add(Dense(5, activation='softmax'))
        model.load_weights("./model/ts_46_model.h5")# load the existing model
        model.summary()
        
        sgd = SGD(lr=learning_rate, momentum=momentum, decay=decay)
        model.compile(loss=weighted_loss(r=weight_data), optimizer=sgd)
        history = model.fit(train_data, train_label,batch_size=100, epochs=600,verbose=2)    
        result = model.predict(train_data)
        np.save("teacher_label",result) # save the teacher label
        
        # use teacher label to retrain model and predict
        teacher_label = np.load("teacher_label.npy")
        train_label = teacher_label # use teacher_label
        
        model = Sequential()
        model.add(Dense(30, activation='sigmoid', input_shape=(input_dim,)))
        model.add(Dense(5, activation='softmax'))
        #model.load_weights("./model/ts_46_model.h5")# load the existing model
        model.summary()
        sgd = SGD(lr=learning_rate, momentum=momentum, decay=decay)
        model.compile(loss=weighted_loss(r=weight_data), optimizer=sgd)
        history = model.fit(train_data, train_label, batch_size=batch_size, epochs=epoch, verbose=2)    
        result = model.predict(test_data)
        
        AC_A = 0.0
        AC_E = 0.0
        AC_N = 0.0
        AC_P = 0.0
        AC_R = 0.0
        AC_WA = 0.0
        AC_UA = 0.0
        right=0# correct num
        matrix = np.zeros((5, 5))

        use_data = result

        for i, (Target, Label) in enumerate( zip(test_label,use_data) ) :  ### target:答案 label:辨識結果
            m = np.max(Label)#辨識結果（最大類別）
            for j, value in enumerate(Label) :  ### 分到哪類
                if value == m :
                    for k, num in enumerate(Target) :  ### 正確為哪類
                        if num == 1 :
                            matrix[k][j] += 1
                            break  # end of for k
                    break  # end of for j

        AC_A = matrix[0][0] / test_size[0]
        AC_E = matrix[1][1] / test_size[1]
        AC_N = matrix[2][2] / test_size[2]
        AC_P = matrix[3][3] / test_size[3]
        AC_R = matrix[4][4] / test_size[4]
        print("\n-------------實驗結果-------------")
        print ('A: {0} , E: {1} , N: {2} , P:{3} , R: {4}'.format(test_size[0],test_size[1],test_size[2],test_size[3],test_size[4],))
        print("Total {}:".format(len(use_data)))
        AC_UA = (AC_A + AC_E + AC_N + AC_P + AC_R) / 5
        AC_WA = (matrix[0][0] + matrix[1][1] + matrix[2][2] + matrix[3][3] + matrix[4][4]) / len(use_data)
        right = matrix[0][0] + matrix[1][1] + matrix[2][2] + matrix[3][3] + matrix[4][4]
        print('      A      E      N      P      R')
        print('A  %4d   %4d   %4d   %4d   %4d    ' % (matrix[0][0], matrix[0][1], matrix[0][2], matrix[0][3], matrix[0][4]))
        print('E  %4d   %4d   %4d   %4d   %4d    ' % (matrix[1][0], matrix[1][1], matrix[1][2], matrix[1][3], matrix[1][4]))
        print('N  %4d   %4d   %4d   %4d   %4d    ' % (matrix[2][0], matrix[2][1], matrix[2][2], matrix[2][3], matrix[2][4]))
        print('P  %4d   %4d   %4d   %4d   %4d    ' % (matrix[3][0], matrix[3][1], matrix[3][2], matrix[3][3], matrix[3][4]))
        print('R  %4d   %4d   %4d   %4d   %4d    ' % (matrix[4][0], matrix[4][1], matrix[4][2], matrix[4][3], matrix[4][4]))
        print('\nA: %f    E: %f     N: %f     P: %f     R: %f\n' % (AC_A*100, AC_E*100, AC_N*100, AC_P*100, AC_R*100))
        print('Correct: {}'.format(right))
        print("Correct rate: {}".format(AC_UA*100))
        print("=====================================================")

In [6]:
for emo_class, emo_name in zip(combine, emotion):
    print ('Class: %s training procedure' % (emo_name))
    train_gan(emo_class, emo_name)
print ('\nEnd of training\n')


Class: Anger training procedure
(?, 256)


ValueError: Cannot convert a partially known TensorShape to a Tensor: (?, 8, 8, 128)

In [8]:
#generate_fake_data()
add_data_amount = [0,0,0,0,0]
for i in range(5):
        fake_data = np.load('./fake_data/' + emotion[i] + '/' + "fake_data.npy")
        gan_label = [[0,0,0,0,0]]
        gan_label[0][i] += 1
        gan_label = np.asarray(gan_label)
        print (emotion[i])
        for j in range(2000):
            fake_data = reshape(fake_data, [-1,256])

            # run the knn procedure
            knn_label = knn(fake_data)
            knn_label = np.asarray(knn_label)
            # use teacher-student model to classify this fake data
            ts_label = tea_stu_model(fake_data,train_label, False)
            ts_label = np.asarray(ts_label)
            # whether to add this fake data to the training data
            print (gan_label)
            print ('knn_label: {0} , ts_label: {1}'.format(knn_label, ts_label))
            if np.array_equal(gan_label, knn_label) == True and np.array_equal(knn_label, ts_label) == False:
                print ('Add')
                train_data = np.concatenate((train_data, fake_data), axis = 0)
                train_label = np.concatenate((train_label, knn_label), axis = 0)
                add_data_amount[i] += 1
            else:
                print ('Discard')
        print ('{0} extends {1} data.'.format(emotion[i], add_data_amount[i]))
        
np.savetxt('./extend_train_data/new_train_data.txt', train_data, delimiter=' ', fmt='%f')
np.savetxt('./extend_train_data/new_train_data_label.txt', train_label, delimiter=' ', fmt='%d')

new_train_data, new_train_label = read_fake_data('new_train_data.txt', 'new_train_data_label.txt')
tea_stu_model(new_train_data, new_train_label, True)

Anger
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_label: [[0 0 0 1 0]]
Discard
[[1 0 0 0 0]]
knn_label: [[0 0 1 0 0]] , ts_l

KeyboardInterrupt: 