In [1]:
# Import Tensorflow and Numpy
import tensorflow as tf
import numpy as np
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda, Cropping2D, ZeroPadding2D
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
import tensorflow_probability as tfp
tfd = tfp.distributions

tf.compat.v1.disable_eager_execution()

print(tf.__version__)

ImportError: This version of TensorFlow Probability requires TensorFlow version >= 2.8; Detected an installation of version 2.6.2. Please upgrade TensorFlow to proceed.

# Data preparation

In [3]:
dir = './Dataset/array'

data = np.load(dir + '/male.npy', allow_pickle=True)

data.shape

(2489, 256, 63)

# Generate model

In [None]:
input_shape = (512, 64, 1) 
conv_filters=(512, 256, 128, 64, 32)
conv_kernels=(3, 3, 3, 3, 3)
conv_strides=(2, 2, 2, 2, (2,1))
vector_dimension = 64
latent_space_dim = vector_dimension

In [None]:
class Parallel_CVAE(tf.keras.Model):

    def __init__(self, latent_dim):
        super(Parallel_CVAE, self).__init__()
        self.latent_dim = latent_dim
        self.alpha = # regularization factors
        self.lr = 0.0001

        self.e1 = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=input_shape),
                tf.keras.layers.Conv2D(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, activation='relu'),
                tf.keras.layers.Flatten(),
                # No activation
                tf.keras.layers.Dense(latent_space_dim + latent_space_dim),
            ]
        )
        self.e2 = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=input_shape),
                tf.keras.layers.Conv2D(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_filters, activation='relu'),
                tf.keras.layers.Flatten(),
                # No activation
                tf.keras.layers.Dense(latent_space_dim + latent_space_dim),
            ]
        )


        self.d1 = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(latent_space_dim,)),
                tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
                #tf.keras.layers.Reshape(target_shape=(7, 7, 32)),  # TODO
                tf.keras.layers.Conv2DTranspose(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_filters, padding='same',
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, padding='same',
                    activation='relu'),
                # No activation
                tf.keras.layers.Conv2DTranspose(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, padding='same'),
            ]
        )

        self.d2 = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(latent_space_dim,)),
                tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
                #tf.keras.layers.Reshape(target_shape=(7, 7, 32)),  # TODO
                tf.keras.layers.Conv2DTranspose(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, padding='same',
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, padding='same',
                    activation='relu'),
                # No activation
                tf.keras.layers.Conv2DTranspose(
                    filters=conv_filters, kernel_size=conv_kernels, strides=conv_strides, padding='same'),
            ]
        )




    
    def train(self, x_train, f0_train, formant_train, y_train):
        # init input variable
        x = tf.Variable(x_train, name="X", dtype="float32")

        # calculate f0 loss
        f0_target = tf.Variable(f0_train, name="F0", dtype="float32")
        f0_predicted = #f0(self.autoencode_z1(x)) %TODO
        f0_Loss = tf.nn.l2_loss(f0_predicted-f0_target)

        # Calculate formant loss
        formant_target = tf.Variable(formant_train, name="Formant", dtype="float32") 
        formant_predicted = #formant(self.autoencode_z2(x)) %TODO
        formant_Loss = tf.nn.l2_loss(formant_predicted-formant_target)
        
        # calculate reconstruction loss with KL
        y_target = tf.Variable(y_train, name="Y", dtype="float32")
        y_predicted = self.autoencode(x)
        reconstruction_Loss = tf.nn.l2_loss(y_target - y_predicted)

        # calculate joint_loss
        Joint_Loss = f0_Loss + formant_Loss + reconstruction_Loss

        # optimisers
        Optimiser = tf.train.AdamOptimizer(learning_rate = self.lr).minimize(Joint_Loss)
        # Y1_op = tf.train.AdamOptimizer().minimize(f0_Loss)
        # Y2_op = tf.train.AdamOptimizer(learning_rate = self.lr).minimize(f0_Loss)
        
        # run training
        with tf.Session() as session:
            session.run(tf.initialize_all_variables())
            _, Joint_Loss = session.run([Optimiser, Joint_Loss])
            print(Joint_Loss)


    @tf.function
    def sample(self, mean, logvar ):
        eps = tfp.distributions.Normal(shape=mean.shape).sample()     
        return eps * tf.exp(logvar * .5) + mean
    
    def encode_distrib_z1(self, x):
        mean, logvar = tf.split(self.e1(x), num_or_size_splits=2, axis=1)
        return mean, logvar
    
    def encode_z1(self, x):
        return self.sample(self.encode_distrib_z1(x))

    def encode_distrib_z2(self, x):
        mean, logvar = tf.split(self.e2(x), num_or_size_splits=2, axis=1)
        return mean, logvar
    
    def encode_z2(self, x):
        return self.sample(self.encode_distrib_z2(x))

    def decode_z1(self, z1, apply_sigmoid=False):
        logits = self.d1(z1)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits

    def decode_z2(self, z2, apply_sigmoid=False):
        logits = self.d2(z2)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits

    def decode(self, z1, z2):
        logits1 = self.d1(z1)
        logits2 = self.d2(z2)
        return logits1 + logits2
    
    def autoencode_z1(self, x):
        z1 = self.encode_z1(x)
        y1 = self.decode_z1(z1)
        return y1
    
    def autoencode_z2(self, x):
        z1 = self.encode_z2(x)
        y1 = self.decode_z2(z1)
        return y1
    
    def autoencode(self, x):
        y1 = self.autoencode_z1(x)
        y2 = self.autoencode_z2(x)
        return y1 + y2

