In [None]:
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import tensorflow as tf
from sklearn.model_selection import train_test_split

from align import AlignDlib
%matplotlib inline

alignment = AlignDlib('drive/models/landmarks.dat')
def load_image(path):
    img = cv2.imread(path, 1)
    # OpenCV loads images with color channels
    # in BGR order. So we need to reverse them
    return img[...,::-1]

In [None]:
face_embeddings = list(np.load('embedded_face.npy') )
face_emb_labels = list(np.load('embedded_face_labels.npy'))
voice_embeddings= list(np.load('voice_embeddings_input.npy'))
voice_emb_labels= list(np.load('voice_embeddings_labels.npy'))

voice_embedd = []
voice_embedd_labels = []
face_npy = []

for k in [0,1,2,3,4,5,6,7,8]:
    count = 0
    limit = face_emb_labels.count(k)
    for i in range(len(voice_emb_labels)):
        if(count==limit):
            break
        else:    
            if(voice_emb_labels[i]==k):
                voice_embedd.append(voice_embeddings[i])
                voice_embedd_labels.append(voice_emb_labels[i])
                count+=1
            
directory_path = 'Dataset'
directory_path1 = 'Dataset1'
face_npy = []
for k in [0,1,2,3,4,5,6,7,8]:
    count = 0
    limit = face_emb_labels.count(k)
    print(k)
    for file in os.listdir(os.path.join(directory_path,str(k))):
        if(count==limit):
            break
        else: 
            if(count==0):
                for file1 in os.listdir(os.path.join(directory_path1,str(k))):
                    file_path = os.path.join(directory_path1,str(k))
                    file_path = os.path.join(file_path,file1)
                    jc_orig = load_image(file_path)
                    bb = alignment.getLargestFaceBoundingBox(jc_orig)
                    jc_aligned = alignment.align(64, jc_orig, bb, landmarkIndices=AlignDlib.OUTER_EYES_AND_NOSE)
                    jc_aligned = jc_aligned/255.0
            face_npy.append(jc_aligned)
            count+=1
                                 
len(face_npy)==len(voice_embedd_labels)  

train_face, test_face, train_voice, test_voice = train_test_split(face_npy, voice_embedd, test_size=0.2, random_state=42)

In [None]:
epochs_completed = 0
index_in_epoch = 0
num_examples = len(train_face)

In [None]:
def next_batch(batch_size):    

    global train_face
    global train_voice
    global index_in_epoch
    global epochs_completed
    
    start = index_in_epoch
    index_in_epoch += batch_size
    
    # when all trainig data have been already used, it is reorder randomly    
    if index_in_epoch > num_examples:
        # finished epoch
        epochs_completed += 1
        # shuffle the data
        perm = np.arange(num_examples)
        np.random.shuffle(perm)
        train_face = [train_face[i] for i in perm]
        train_voice = [train_voice[i] for i in perm]
        # start next epoch
        start = 0
        index_in_epoch = batch_size
        assert batch_size <= num_examples
    end = index_in_epoch
    return train_face[start:end], train_voice[start:end]


In [None]:
def conv2d(x, in_channel, output_channels, name, reuse=False):
    with tf.variable_scope(name, reuse=resue):
        w = tf.Variable(tf.truncated_normal([5,5,in_channel,output_channels],stddev=0.1), name = 'w')
        b = tf.Variable(tf.zeros(output_channels), name = 'b')
    
        conv = tf.nn.conv2d(x,w,[1,2,2,1], padding = 'SAME') + b
    return conv
  
def deconv2d(x, output_shape, name, reuse=False):
    with tf.variable_scope(name, reuse=reuse):
        w = tf.Variable(tf.truncated_normal([5,5,output_shape[-1],int(x.get_shape()[-1])],stddev=0.1), name = 'w')
        b = tf.Variable(tf.zeros(output_shape[-1]), name = 'b')
    
        deconv = tf.nn.conv2d_transpose(x,w,output_shape = output_shape, strides = [1,2,2,1])+b
    return deconv
  
def dense(x,input_dim,output_dim,name, reuse = False):
    with tf.variable_scope(name,reuse=reuse):
        w = tf.Variable(tf.truncated_normal([input_dim,output_dim],stddev=0.1),name = 'w')
        b = tf.Variable(tf.zeros(output_dim), name = 'b')
    
    return tf.matmul(x,w)+b
  
def encoder(input_embedding):
  
    h1 = tf.nn.relu(dense(input_embedding,128,128,'dense_1'))
    
    h2 = tf.nn.relu(dense(h1,128,128,'dense_2'))
  
    z_mean = dense(h2, 128, 128, 'z_mean_dense')
    z_logstd = dense(h2,128,128,'z_stddev_dense')
  
  return z_mean,z_logstd

def decoder(z,reuse=False):
  
    z_fc = dense(z,128,16*16*32,'z_fc_dense',reuse)
  
    z_matrix = tf.nn.relu(tf.reshape(z_fc,[-1,16,16,32]))
  
    h1 = tf.nn.relu(deconv2d(z_matrix,[64,32,32,16],'deconv_1',reuse))
    h2 = deconv2d(h1, [64,64,64,3], 'deconv2', reuse)
  
  return tf.identity(h2)

def training_step(sess,input_faces,input_clips):
    sess.run(optimizer, feed_dict = {input_images:input_faces,input_voices:input_clips})

def loss_step(sess,input_faces,input_clips):
    return sess.run(loss, feed_dict = {input_images:input_faces,input_voices:input_clips})

def generation_step(sess, z_samples):
    return sess.run(generator, feed_dict = {self.z_samples:z_samples})

def recognition_step(sess, input_faces, input_clips):
    eturn sess.run(generated_images_sigmoid, feed_dict = {input_images:input_faces,input_voices:input_clips})
    

In [None]:
tf.reset_default_graph()
latent_dim = 128
batch_size = 64

# placeholder for input images. Input images are RGB 64x64
input_images = tf.placeholder(tf.float32, shape=[None,64,64,3])
input_voices = tf.placeholder(tf.float32, shape=[None,128])

input_images_flat = tf.reshape(input_images, [-1,64*64*3])

# placeholder for z_samples. We are using this placeholder when we are generating new images
z_samples = tf.placeholder(tf.float32, shape=[None,latent_dim])

# encoder
z_mean,z_logstd = encoder(input_voices)

#decoder input
samples = tf.random_normal([batch_size,latent_dim], 0, 1, dtype = tf.float32 )
z = z_mean + (tf.exp(.5*z_logstd)*samples)

#decoder
generated_images = decoder(z)
generated_images_sigmoid = tf.sigmoid(generated_images)
generated_images_flat = tf.reshape(generated_images, [-1,64*64*3])

#loss Calculation

generation_loss = tf.reduce_sum(tf.maximum(generated_images_flat, 0) - generated_images_flat * input_images_flat\
                                + tf.log(1 + tf.exp(-tf.abs(generated_images_flat))), 1)

latent_loss = 0.5 * tf.reduce_sum(tf.square(z_mean) + tf.exp(2*z_logstd) - 2*z_logstd - 1, 1)

loss = tf.reduce_mean(latent_loss + generation_loss)
 
#optimizer

learning_rate = 1e-3
optimizer = tf.train.AdamOptimizer(learning_rate).minimize(loss)

In [None]:
num_epochs = 5000
interval = 200

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())  
    while(epochs_completed < num_epochs):
        current_epoch = epochs_completed
        step = 0    
        while(epochs_completed < current_epoch+1):
            input_faces,input_clips = next_batch(batch_size)
            training_step(sess,input_faces,input_clips)
            step+=1
            if(step%interval==0):
                print('loss : {} '.format(loss_step(sess,input_faces,input_clips)))    
        print('Epochs {} completed'.format(current_epoch))  
    j=0
    for k in range(0,len(test_voice)-64,64):
        output_images = recognition_step(sess,test_face[k:k+64], test_voice[k:k+64])
        output_images = output_images * 255
        output_images = output_images.astype(np.uint8)
        for image in output_images:
            plt.imsave('test_vae_sample/'+str(j)+'.png',image)
            j+=1 


In [None]:
i=0

for image in output_images:
    plt.imsave('drivea/sample/'+str(i)+'.jpg',image)
    i+=1