# Import libraries

In [None]:
!pip install opencv-python
!pip install librosa
!pip install moviepy

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [2]:
import inspect
from typing import List

import cv2
import glob
import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt
import librosa
import librosa.display

from PIL import Image
from moviepy.editor import VideoFileClip

In [3]:
## Limit memory grow
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Set memory growth to True for each GPU
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        print(e)

1 Physical GPUs, 1 Logical GPUs


# Generate data

#### 1. Extracting visual and audio 

In [None]:
video_file = 'dia0_utt0.mp4'
visual_folder = 'visual_frames'
os.makedirs(visual_folder, exist_ok=True)

clip = VideoFileClip(video_file)
frames = [frame for frame in clip.iter_frames()]
for i, frame in enumerate(frames):
    image = Image.fromarray(frame)
    image.save(os.path.join(visual_folder, f'frame_{i:05d}.jpg'))

audio = clip.audio
audio.write_audiofile('dia0_utt0_audio.wav')

#### 2. Extract face from frame

In [None]:
face_folder = 'face_frames'
os.makedirs(face_folder, exist_ok=True)

In [None]:
modelFile = "models/res10_300x300_ssd_iter_140000_fp16.caffemodel"
configFile = "models/deploy.prototxt"
net = cv2.dnn.readNetFromCaffe(configFile, modelFile)
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)

In [None]:
lst_frames = glob.glob('visual_frames/*.jpg')

In [None]:
for f in lst_frames:
    bn = os.path.splitext(os.path.basename(f))[0]
    frameOrig = cv2.imread(f)
    frame = cv2.resize(frameOrig, (640, 480))
    blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104.0, 177.0, 123.0), swapRB=True, crop=False)
    net.setInput(blob)
    detections = net.forward()
    for i in range(detections.shape[2]):
        confidence = detections[0, 0, i, 2]
        if confidence > 0.2:
            box = detections[0, 0, i, 3:7] * np.array([frame.shape[1], frame.shape[0], frame.shape[1], frame.shape[0]])
            (start_x, start_y, end_x, end_y) = box.astype(int)

            # Extract the face region from the frame
            face = frame[start_y:end_y, start_x:end_x]

            # Save the face as a JPG image
            face_filename = f'{face_folder}/face_{bn}.jpg'
            cv2.imwrite(face_filename, face)

#### 3. Transform audio waveform to log-mel spectrograme

In [None]:
audio_file = 'dia0_utt0_audio.wav'
y, sr = librosa.load(audio_file)

In [None]:
# Compute the mel spectrogram
mel_spec = librosa.feature.melspectrogram(y=y, sr=sr)
# Convert to log scale (dB)
log_mel_spec = librosa.power_to_db(mel_spec, ref=np.max)

In [None]:
# Display the log-mel spectrogram
fig = plt.figure(figsize=(10, 4))
ax = plt.Axes(fig, [0., 0., 1., 1.])
#plt.colorbar(format='%+2.0f dB')
#plt.title('Log-Mel Spectrogram')
ax.set_axis_off()
fig.add_axes(ax)
librosa.display.specshow(log_mel_spec, sr=sr, x_axis='time', y_axis='mel')
plt.tight_layout()
plt.show()

In [None]:
output_image = 'dia0_utt0_spectrogram_tight.png'
fig.savefig(output_image, dpi=300, bbox_inches='tight', pad_inches=0)

#### 4. Randomly generate data samples

In [4]:
def scale_one(tensor):
    sum_values = tf.reduce_sum(tensor, axis=-1, keepdims=True)
    return tensor / sum_values

In [5]:
x_train_vis = tf.random.uniform(shape=(1000,500,1024))
x_val_vis   = tf.random.uniform(shape=(100,500,1024))
x_train_aud = tf.random.uniform(shape=(1000,500,512))
x_val_aud   = tf.random.uniform(shape=(100,500,512))

y_train_au  = tf.cast(tf.random.uniform(shape=(1000,500,10), minval=0, maxval=1)>0.5, dtype=tf.int32)
y_val_au    = tf.cast(tf.random.uniform(shape=(100,500,10), minval=0, maxval=1)>0.5, dtype=tf.int32)
y_train_emo = tf.cast(tf.random.uniform(shape=(1000,500,8), minval=0, maxval=1), dtype=tf.float32)
y_val_emo   = tf.cast(tf.random.uniform(shape=(100,500,8), minval=0, maxval=1), dtype=tf.float32)
y_train_emo = scale_one(y_train_emo)
y_val_emo   = scale_one(y_val_emo)

In [6]:
train_ds = tf.data.Dataset.from_tensor_slices(
    (x_train_vis, x_train_aud, y_train_au, y_train_emo)).shuffle(1000).batch(32)
val_ds = tf.data.Dataset.from_tensor_slices(
    (x_val_vis, x_val_aud, y_val_au, y_val_emo)).batch(32)

# Build Temporal Convolutional Network
![TCNs](https://cdn-images-1.medium.com/max/1000/1*1cK-UEWHGaZLM-4ITCeqdQ.png)

This module is used to encode both visual and audio representation

In [7]:
def is_power_of_two(num: int):
    return num != 0 and ((num & (num - 1)) == 0)

def adjust_dilations(dilations: list):
    if all([is_power_of_two(i) for i in dilations]):
        return dilations
    else:
        new_dilations = [2 ** i for i in dilations]
        return new_dilations

#### 1. Build Residual Layer

In [8]:
class ResidualBlock(tf.keras.layers.Layer):
    """ Defines the residual block for the WaveNet TCN
    Args:
        nb_filters: The number of convolutional filters to use in this block
        kernel_size: The size of the convolutional kernel
        padding: The padding used in the convolutional layers, 'same' or 'causal'.
        activation: The final activation used in o = Activation(x + F(x))
        dropout_rate: Float between 0 and 1. Fraction of the input units to drop.
        kernel_initializer: Initializer for the kernel weights matrix (Conv1D).
        use_batch_norm: Whether to use batch normalization in the residual layers or not.
        use_layer_norm: Whether to use layer normalization in the residual layers or not.
        use_weight_norm: Whether to use weight normalization in the residual layers or not.
        kwargs: Any initializers for Layer class.
    """
    def __init__(self,
                 dilation_rate: int,
                 nb_filters: int,
                 kernel_size: int,
                 padding: str,
                 activation: str = 'relu',
                 dropout_rate: float = 0,
                 kernel_initializer: str = 'he_normal',
                 use_batch_norm: bool = False,
                 use_layer_norm: bool = False,
                 use_weight_norm: bool = False,
                 **kwargs):
        
        self.dilation_rate = dilation_rate
        self.nb_filters = nb_filters
        self.kernel_size = kernel_size
        self.padding = padding
        self.activation = activation
        self.dropout_rate = dropout_rate
        self.use_batch_norm = use_batch_norm
        self.use_layer_norm = use_layer_norm
        self.use_weight_norm = use_weight_norm
        self.kernel_initializer = kernel_initializer
        self.layers = []
        self.shape_match_conv = None
        self.res_output_shape = None
        self.final_activation = None
        
        super(ResidualBlock, self).__init__(**kwargs)
        
    def _build_layer(self, layer):
        """ Helper function for building layer
        Args:
            layer: Appends layer to internal layer list and builds it based on the current output
                   shape of ResidualBlocK. Updates current output shape.
        """
        self.layers.append(layer)
        self.layers[-1].build(self.res_output_shape)
        self.res_output_shape = self.layers[-1].compute_output_shape(self.res_output_shape)
        
    def build(self, input_shape):

        with tf.keras.backend.name_scope(self.name):  # name scope used to make sure weights get unique names
            self.layers = []
            self.res_output_shape = input_shape

            for k in range(2):  # dilated conv block.
                name = 'conv1D_{}'.format(k)
                with tf.keras.backend.name_scope(name):  # name scope used to make sure weights get unique names
                    conv = tf.keras.layers.Conv1D(
                        filters=self.nb_filters,
                        kernel_size=self.kernel_size,
                        dilation_rate=self.dilation_rate,
                        padding=self.padding,
                        name=name,
                        kernel_initializer=self.kernel_initializer
                    )
                    if self.use_weight_norm:
                        from layers import WeightNormalization
                        # wrap it. WeightNormalization API is different than BatchNormalization or LayerNormalization.
                        with tf.keras.backend.name_scope('norm_{}'.format(k)):
                            conv = WeightNormalization(conv)
                    self._build_layer(conv)

                with tf.keras.backend.name_scope('norm_{}'.format(k)):
                    if self.use_batch_norm:
                        self._build_layer(BatchNormalization())
                    elif self.use_layer_norm:
                        self._build_layer(LayerNormalization())
                    elif self.use_weight_norm:
                        pass  # done above.

                with tf.keras.backend.name_scope('act_and_dropout_{}'.format(k)):
                    self._build_layer(tf.keras.layers.Activation(self.activation, name='Act_Conv1D_{}'.format(k)))
                    self._build_layer(tf.keras.layers.SpatialDropout1D(rate=self.dropout_rate, name='SDropout_{}'.format(k)))

            if self.nb_filters != input_shape[-1]:
                # 1x1 conv to match the shapes (channel dimension).
                name = 'matching_conv1D'
                with tf.keras.backend.name_scope(name):
                    # make and build this layer separately because it directly uses input_shape.
                    # 1x1 conv.
                    self.shape_match_conv = tf.keras.layers.Conv1D(
                        filters=self.nb_filters,
                        kernel_size=1,
                        padding='same',
                        name=name,
                        kernel_initializer=self.kernel_initializer
                    )
            else:
                name = 'matching_identity'
                self.shape_match_conv = tf.keras.layers.Lambda(lambda x: x, name=name)

            with tf.keras.backend.name_scope(name):
                self.shape_match_conv.build(input_shape)
                self.res_output_shape = self.shape_match_conv.compute_output_shape(input_shape)

            self._build_layer(tf.keras.layers.Activation(self.activation, name='Act_Conv_Blocks'))
            self.final_activation = tf.keras.layers.Activation(self.activation, name='Act_Res_Block')
            self.final_activation.build(self.res_output_shape)  # probably isn't necessary

            # this is done to force Keras to add the layers in the list to self._layers
            for layer in self.layers:
                self.__setattr__(layer.name, layer)
            self.__setattr__(self.shape_match_conv.name, self.shape_match_conv)
            self.__setattr__(self.final_activation.name, self.final_activation)

            super(ResidualBlock, self).build(input_shape)
    
    def call(self, inputs, training=None, **kwargs):
        """
        Args:
            inputs: The previous layer in the model
            training: boolean indicating whether the layer should behave in training mode or in inference mode
        
        Returns: A tuple where the first element is the residual model tensor, and the second
                 is the skip connection tensor.
        """
        x1 = inputs
        for layer in self.layers:
            training_flag = 'training' in dict(inspect.signature(layer.call).parameters)
            x1 = layer(x1, training=training) if training_flag else layer(x1)
        x2 = self.shape_match_conv(inputs)
        x1_x2 = self.final_activation(tf.keras.layers.add([x2, x1], name='Add_Res'))
        return [x1_x2, x1]
    
    def compute_output_shape(self, input_shape):
        return [self.res_output_shape, self.res_output_shape]

#### 2. Build TCN layer

In [9]:
class TCN(tf.keras.layers.Layer):
    """Creates a TCN layer.
        Input shape:
            A 3D tensor with shape (batch_size, timesteps, input_dim).
            
        Args:
            nb_filters: The number of filters to use in the convolutional layers. Can be a list.
            kernel_size: The size of the kernel to use in each convolutional layer.
            dilations: The list of the dilations. Example is: [1, 2, 4, 8, 16, 32, 64].
            nb_stacks : The number of stacks of residual blocks to use.
            padding: The padding to use in the convolutional layers, 'causal' or 'same'.
            use_skip_connections: Boolean. If we want to add skip connections from input to each residual blocK.
            return_sequences: Boolean. Whether to return the last output in the output sequence, or the full sequence.
            activation: The activation used in the residual blocks o = Activation(x + F(x)).
            dropout_rate: Float between 0 and 1. Fraction of the input units to drop.
            kernel_initializer: Initializer for the kernel weights matrix (Conv1D).
            use_batch_norm: Whether to use batch normalization in the residual layers or not.
            use_layer_norm: Whether to use layer normalization in the residual layers or not.
            use_weight_norm: Whether to use weight normalization in the residual layers or not.
            go_backwards: Boolean (default False). If True, process the input sequence backwards and
            return the reversed sequence.
            return_state: Boolean. Whether to return the last state in addition to the output. Default: False.
            kwargs: Any other arguments for configuring parent class Layer. For example "name=str", Name of the model.
                    Use unique names when using multiple TCN.
                    
        Returns:
            A TCN layer.
    """
    def __init__(self,
                 nb_filters=64,
                 kernel_size=3,
                 nb_stacks=1,
                 dilations=(1, 2, 4, 8, 16, 32),
                 padding='causal',
                 use_skip_connections=True,
                 dropout_rate=0.0,
                 return_sequences=True,
                 activation='relu',
                 kernel_initializer='he_normal',
                 use_batch_norm=False,
                 use_layer_norm=False,
                 use_weight_norm=False,
                 go_backwards=False,
                 return_state=False,
                 **kwargs):
        self.return_sequences = return_sequences
        self.dropout_rate = dropout_rate
        self.use_skip_connections = use_skip_connections
        self.dilations = dilations
        self.nb_stacks = nb_stacks
        self.kernel_size = kernel_size
        self.nb_filters = nb_filters
        self.activation_name = activation
        self.padding = padding
        self.kernel_initializer = kernel_initializer
        self.use_batch_norm = use_batch_norm
        self.use_layer_norm = use_layer_norm
        self.use_weight_norm = use_weight_norm
        self.go_backwards = go_backwards
        self.return_state = return_state
        self.skip_connections = []
        self.residual_blocks = []
        self.layers_outputs = []
        self.build_output_shape = None
        self.slicer_layer = None  # in case return_sequence=False
        self.output_slice_index = None  # in case return_sequence=False
        self.padding_same_and_time_dim_unknown = False  # edge case if padding='same' and time_dim = None
        
        if self.use_batch_norm + self.use_layer_norm + self.use_weight_norm > 1:
            raise ValueError('Only one normalization can be specified at once.')

        if isinstance(self.nb_filters, list):
            assert len(self.nb_filters) == len(self.dilations)
            if len(set(self.nb_filters)) > 1 and self.use_skip_connections:
                raise ValueError('Skip connections are not compatible '
                                 'with a list of filters, unless they are all equal.')

        if padding != 'causal' and padding != 'same':
            raise ValueError("Only 'causal' or 'same' padding are compatible for this layer.")

        # initialize parent class
        super(TCN, self).__init__(**kwargs)
        
    @property
    def receptive_field(self):
        return 1 + 2 * (self.kernel_size - 1) * self.nb_stacks * sum(self.dilations)
    
    def build(self, input_shape):

        # member to hold current output shape of the layer for building purposes
        self.build_output_shape = input_shape

        # list to hold all the member ResidualBlocks
        self.residual_blocks = []
        total_num_blocks = self.nb_stacks * len(self.dilations)
        if not self.use_skip_connections:
            total_num_blocks += 1  # cheap way to do a false case for below

        for s in range(self.nb_stacks):
            for i, d in enumerate(self.dilations):
                res_block_filters = self.nb_filters[i] if isinstance(self.nb_filters, list) else self.nb_filters
                self.residual_blocks.append(ResidualBlock(dilation_rate=d,
                                                          nb_filters=res_block_filters,
                                                          kernel_size=self.kernel_size,
                                                          padding=self.padding,
                                                          activation=self.activation_name,
                                                          dropout_rate=self.dropout_rate,
                                                          use_batch_norm=self.use_batch_norm,
                                                          use_layer_norm=self.use_layer_norm,
                                                          use_weight_norm=self.use_weight_norm,
                                                          kernel_initializer=self.kernel_initializer,
                                                          name='residual_block_{}'.format(len(self.residual_blocks))))
                # build newest residual block
                self.residual_blocks[-1].build(self.build_output_shape)
                self.build_output_shape = self.residual_blocks[-1].res_output_shape

        # this is done to force keras to add the layers in the list to self._layers
        for layer in self.residual_blocks:
            self.__setattr__(layer.name, layer)

        self.output_slice_index = None
        if self.padding == 'same':
            time = self.build_output_shape.as_list()[1]
            if time is not None:  # if time dimension is defined. e.g. shape = (bs, 500, input_dim).
                self.output_slice_index = int(self.build_output_shape.as_list()[1] / 2)
            else:
                # It will known at call time. c.f. self.call.
                self.padding_same_and_time_dim_unknown = True

        else:
            self.output_slice_index = -1  # causal case.
        self.slicer_layer = tf.keras.layers.Lambda(lambda tt: tt[:, self.output_slice_index, :], name='Slice_Output')
        self.slicer_layer.build(self.build_output_shape.as_list())
        
    def compute_output_shape(self, input_shape):
        if not self.built:
            self.build(input_shape)
        if not self.return_sequences:
            batch_size = self.build_output_shape[0]
            batch_size = batch_size.value if hasattr(batch_size, 'value') else batch_size
            nb_filters = self.build_output_shape[-1]
            return [batch_size, nb_filters]
        else:
            # Compatibility tensorflow 1.x
            return [v.value if hasattr(v, 'value') else v for v in self.build_output_shape]
        
    def call(self, inputs, training=None, **kwargs):
        x = inputs

        if self.go_backwards:
            # reverse x in the time axis
            x = tf.reverse(x, axis=[1])

        self.layers_outputs = [x]
        self.skip_connections = []
        for res_block in self.residual_blocks:
            try:
                x, skip_out = res_block(x, training=training)
            except TypeError:  # compatibility with tensorflow 1.x
                x, skip_out = res_block(K.cast(x, 'float32'), training=training)
            self.skip_connections.append(skip_out)
            self.layers_outputs.append(x)

        if self.use_skip_connections:
            if len(self.skip_connections) > 1:
                # Keras: A merge layer should be called on a list of at least 2 inputs. Got 1 input.
                x = tf.keras.layers.add(self.skip_connections, name='Add_Skip_Connections')
            else:
                x = self.skip_connections[0]
            self.layers_outputs.append(x)

        if not self.return_sequences:
            # case: time dimension is unknown. e.g. (bs, None, input_dim).
            if self.padding_same_and_time_dim_unknown:
                self.output_slice_index = K.shape(self.layers_outputs[-1])[1] // 2
            x = self.slicer_layer(x)
            self.layers_outputs.append(x)
        return x
    
    def get_config(self):
        """
        Returns the config of a the layer. This is used for saving and loading from a model
        :return: python dictionary with specs to rebuild layer
        """
        config = super(TCN, self).get_config()
        config['nb_filters'] = self.nb_filters
        config['kernel_size'] = self.kernel_size
        config['nb_stacks'] = self.nb_stacks
        config['dilations'] = self.dilations
        config['padding'] = self.padding
        config['use_skip_connections'] = self.use_skip_connections
        config['dropout_rate'] = self.dropout_rate
        config['return_sequences'] = self.return_sequences
        config['activation'] = self.activation_name
        config['use_batch_norm'] = self.use_batch_norm
        config['use_layer_norm'] = self.use_layer_norm
        config['use_weight_norm'] = self.use_weight_norm
        config['kernel_initializer'] = self.kernel_initializer
        config['go_backwards'] = self.go_backwards
        config['return_state'] = self.return_state
        return config

In [10]:
class PositionalEncoder(tf.keras.layers.Layer):
    def __init__(self, timesteps, projection_dim):
        super().__init__()
        self.timesteps  = timesteps
        self.projection = tf.keras.layers.Dense(units=projection_dim)
        self.position_embedding = tf.keras.layers.Embedding(
            input_dim=self.timesteps, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.timesteps, delta=1)
        encoded   = self.projection(patch) + self.position_embedding(positions)
        return encoded

#### 3. Build Fusion module

In [11]:
class TCAN(tf.keras.layers.Layer):
    """Temporal Co-Attention Networks
    Inputs:
        list of modalities [M1, M2]
        M1 has shape [batch_size, timestep, feature_dim_1]
        M2 has shape [batch_size, timestep, feature_dim_2]
    Args:
        hidden_units: output dimension of feature channel to be returned
    Output:
        A fused features with shape [batch_size, timestep, 2*hidden_units]
    """
    def __init__(self, hidden_units):
        super(TCAN, self).__init__()
        
        self.hidden_units = hidden_units
        
        # Attention weights
        self.attention_weights1 = tf.keras.layers.Dense(1)
        self.attention_weights2 = tf.keras.layers.Dense(1)
        
        # Temporal co-attention
        self.co_attention1 = tf.keras.layers.Dense(hidden_units)
        self.co_attention2 = tf.keras.layers.Dense(hidden_units)
        
    def call(self, inputs):
        modality1_features, modality2_features = inputs
        
        # Compute attention scores for modality 1
        attention_scores1 = self.attention_weights1(modality1_features)
        
        # Compute attention scores for modality 2
        attention_scores2 = self.attention_weights2(modality2_features)
        
        # Compute attention weights for modality 1
        attention_weights1 = tf.nn.softmax(attention_scores1, axis=1)
        
        # Compute attention weights for modality 2
        attention_weights2 = tf.nn.softmax(attention_scores2, axis=1)
        
        # Apply attention to modality 1
        attended_modality1 = modality1_features * attention_weights1
        
        # Apply attention to modality 2
        attended_modality2 = modality2_features * attention_weights2
        
        # Compute temporal co-attention
        co_attention1 = self.co_attention1(attended_modality1)
        co_attention2 = self.co_attention2(attended_modality2)
        
        # Concatenate co-attention features
        co_attention_features = tf.concat([co_attention1, co_attention2], axis=-1)
        
        return co_attention_features

In [12]:
class MTRN(tf.keras.layers.Layer):
    """Multimodal Temporal Relation Networks
    Inputs:
        list of modalities [M1, M2]
        M1 has shape [batch_size, timestep, feature_dim_1]
        M2 has shape [batch_size, timestep, feature_dim_2]
    Args:
        hidden_units: output dimension of feature channel to be returned
    Output:
        A fused features with shape [batch_size, timestep, hidden_units]
    """
    def __init__(self, hidden_units):
        super(MTRN, self).__init__()

        self.hidden_units = hidden_units

        # Temporal relation modeling
        self.temporal_relation1 = tf.keras.layers.Dense(hidden_units)
        self.temporal_relation2 = tf.keras.layers.Dense(hidden_units)
        self.temporal_relation3 = tf.keras.layers.Dense(hidden_units)

    def call(self, inputs):
        modality1_features, modality2_features = inputs

        # Compute temporal relations
        relation1 = self.temporal_relation1(modality1_features)
        relation2 = self.temporal_relation2(modality2_features)
        relation3 = tf.multiply(relation1, relation2)

        # Fusion with element-wise sum
        fused_output = tf.add(tf.add(relation1, relation2), relation3)

        return fused_output

#### 4. Build full model

In [13]:
class MultimodalEmotion(tf.keras.Model):
    """ Multimodal audio-visual for single task
    Args:
        timesteps: number of timesteps
        output_dim: the output size of prediction
        tcn_config_model1: set of configuration for the first model 
        tcn_config_model2: set of configuration for the second model
        fused_mode: type of fision technique
        fused_units: dimension size of the fusion output
        output_activation: activation function in text for output prediction
        use_pe: whether to use ppositional encoding or not

    Return:
        tensorflow model
    """
    def __init__(self,
                 timesteps,
                 output_dim,
                 tcn_config_model1, # config of TCN for visual 
                 tcn_config_model2, # config of TCn for audio
                 fused_mode=None, # if None, simplify concatenate
                 fused_units = 32,
                 output_activation='softmax',
                 use_pe=False,
                 **kwargs):
        # initialize parent class
        super(MultimodalEmotion, self).__init__(**kwargs)

        self.timesteps   = timesteps
        self.output_dim  = output_dim
        self.output_activation = output_activation
        self.fused_mode  = fused_mode
        self.fused_units = fused_units
        self.tcn_config_model1 = tcn_config_model1
        self.tcn_config_model2 = tcn_config_model2
        self.use_pe = use_pe
        # Encoding layers
        self.tcn_model1 = TCN(**tcn_config_model1, name='tcn_model_1')
        self.tcn_model2 = TCN(**tcn_config_model2, name='tcn_model_2')
        # Fusion layer
        if self.fused_mode=='TCAN':
            self.fused_layer = TCAN(self.fused_units)
        elif self.fused_mode=='MTRN':
            self.fused_layer = MTRN(self.fused_units)
        #else:
        #    self.fused_layer = tf.keras.layers.Concatenate(axis=-1)
        # Add positional Encoding
        if self.use_pe:
            self.pe_layer = PositionalEncoder(self.timesteps, self.fused_units)
        # Output layer
        self.output_layer = tf.keras.layers.Dense(self.output_dim)
        
    def call(self, inputs, training=None, **kwargs):
        x_1, x_2 = inputs
        
        x_1 = self.tcn_model1(x_1, training=training)
        x_2 = self.tcn_model2(x_2, training=training)
        
        x_fused = self.fused_layer([x_1, x_2])

        if self.use_pe:
            x_fused   = self.pe_layer(x_fused)

        x_out = self.output_layer(x_fused)
        if self.output_activation=='softmax':
            x_out = tf.nn.softmax(x_out)
        elif self.output_activation=='tanh':
            x_out = tf.keras.activations.tanh(x_out)

        return x_out

    def get_config(self):
        config = super(MultimodalEmotion, self).get_config()
        config['timesteps']         = self.timesteps
        config['output_dim']        = self.output_dim
        config['output_activation'] = self.output_activation
        config['tcn_config_model1'] = self.tcn_config_model1
        config['tcn_config_model2'] = self.tcn_config_model2
        config['fused_mode']        = self.fused_mode
        config['fused_units']       = self.fused_units
        config['use_pe']            = self.use_pe
        return config  

In [14]:
tcn_config = {
    'nb_filters':256,
    'kernel_size':3,
    'nb_stacks':1,
    'dilations':(1, 2, 4, 8, 16, 32),
    'padding':'causal',
    'use_skip_connections':True,
    'dropout_rate':0.0,
    'return_sequences':True,
    'activation':'relu',
    'kernel_initializer':'he_normal',
    'use_batch_norm':False,
    'use_layer_norm':False,
    'use_weight_norm':False,
    'go_backwards':False,
    'return_state':False
}

In [15]:
## Multimodal model for AU prediction
multi_model_au = MultimodalEmotion(
                          timesteps=x_train_vis.shape[1],
                          output_dim=10,
                          output_activation='sigmoid',
                          tcn_config_model1=tcn_config,
                          tcn_config_model2=tcn_config,
                          fused_mode='TCAN',
                          fused_units=128,
                          use_pe=True,
                  )
## Multimodal model for emotion recognisiotn
multi_model_emo = MultimodalEmotion(
                          timesteps=x_train_vis.shape[1],
                          output_dim=8,
                          output_activation='softmax',
                          tcn_config_model1=tcn_config,
                          tcn_config_model2=tcn_config,
                          fused_mode='TCAN',
                          fused_units=128,
                          use_pe=True,
                  )

# Training Strategy

#### 1. Define loss function and metric

In [16]:
def multi_label_loss(y_true, y_pred):
    y_true_flat = tf.reshape(y_true, shape=(-1, y_true.shape[-1]))
    y_pred_flat = tf.reshape(y_pred, shape=(-1, y_true.shape[-1]))

    bce_loss = tf.keras.losses.BinaryCrossentropy()
    loss = bce_loss(y_true_flat, y_pred_flat)
    return loss

In [17]:
import tensorflow as tf

class MultiLabelMetrics(tf.keras.metrics.Metric):
    def __init__(self, name='multi_label_metrics', **kwargs):
        super(MultiLabelMetrics, self).__init__(name=name, **kwargs)
        self.precision = self.add_weight(name='precision', initializer='zeros')
        self.recall = self.add_weight(name='recall', initializer='zeros')
        self.f1_score = self.add_weight(name='f1_score', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.bool)
        y_pred = tf.cast(y_pred, tf.bool)

        true_positives = tf.reduce_sum(tf.cast(tf.logical_and(y_true, y_pred), tf.float32), axis=(0, 1))
        predicted_positives = tf.reduce_sum(tf.cast(y_pred, tf.float32), axis=(0, 1))
        actual_positives = tf.reduce_sum(tf.cast(y_true, tf.float32), axis=(0, 1))

        precision = true_positives / (predicted_positives + tf.keras.backend.epsilon())
        recall = true_positives / (actual_positives + tf.keras.backend.epsilon())
        f1_score = 2 * (precision * recall) / (precision + recall + tf.keras.backend.epsilon())

        self.precision.assign_add(tf.reduce_mean(precision))
        self.recall.assign_add(tf.reduce_mean(recall))
        self.f1_score.assign_add(tf.reduce_mean(f1_score))

    def result(self):
        return self.precision, self.recall, self.f1_score

    def reset_states(self):
        self.precision.assign(0.0)
        self.recall.assign(0.0)
        self.f1_score.assign(0.0)

#### 2. Define training and test functions

In [18]:
learning_rate = 1e-1
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [19]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = MultiLabelMetrics(name='train_accuracy')

val_loss = tf.keras.metrics.Mean(name='val_loss')
val_accuracy = MultiLabelMetrics(name='val_accuracy')

In [20]:
@tf.function
def train_step(xs, y_true):
    """
    xs: list of input data
    y_true: grouth truth
    """
    with tf.GradientTape() as tape:
        y_pred = multi_model_au(xs, training=True)
        loss   = multi_label_loss(y_true, y_pred)
    gradients  = tape.gradient(loss, multi_model_au.trainable_variables)
    optimizer.apply_gradients(zip(gradients, multi_model_au.trainable_variables))

    train_loss(loss)
    train_accuracy.update_state(y_true, y_pred)

In [21]:
def test_step(xs, y_true):
    """
    xs: list of input data
    y_true: grouth truth
    """
    y_pred = multi_model_au(xs, training=True)
    loss   = multi_label_loss(y_true, y_pred)

    val_loss(loss)
    val_accuracy.update_state(y_true, y_pred)

#### 3. Training

In [22]:
EPOCHS = 20

for epoch in range(EPOCHS):
    # Reset the metrics at the start of the next epoch
    train_loss.reset_states()
    train_accuracy.reset_states()
    val_loss.reset_states()
    val_accuracy.reset_states()

    for xt_1, xt_2, yt_1, yt_2 in train_ds:
        train_step([xt_1, xt_2], yt_1)

    for xv_1, xv_2, yv_1, yv_2 in val_ds:
        test_step([xv_1, xv_2], yv_1)

    pre, rec, f1 = train_accuracy.result()
    val_pre, val_rec, val_f1 = val_accuracy.result()
    
    print(
      f'Epoch {epoch + 1}, '
      f'Loss: {train_loss.result()}, '
      f'F1: {f1.numpy()}, '
      f'Validation Loss: {val_loss.result()}, '
      f'Validation F1: {val_f1.numpy()}'
    )

Epoch 1, Loss: 7.454263687133789, F1: 21.33376121520996, Validation Loss: 7.677295684814453, Validation F1: 2.6612868309020996
Epoch 2, Loss: 7.685193061828613, F1: 21.330249786376953, Validation Loss: 7.677295684814453, Validation F1: 2.6612868309020996
Epoch 3, Loss: 7.686321258544922, F1: 21.33146858215332, Validation Loss: 7.677295684814453, Validation F1: 2.6612868309020996
Epoch 4, Loss: 7.685826301574707, F1: 21.331247329711914, Validation Loss: 7.677295684814453, Validation F1: 2.6612868309020996
Epoch 5, Loss: 7.684256076812744, F1: 21.32970428466797, Validation Loss: 7.677295684814453, Validation F1: 2.6612868309020996
Epoch 6, Loss: 7.684277057647705, F1: 21.331274032592773, Validation Loss: 7.677295684814453, Validation F1: 2.6612868309020996
Epoch 7, Loss: 7.68722677230835, F1: 21.334400177001953, Validation Loss: 7.677295684814453, Validation F1: 2.6612868309020996
Epoch 8, Loss: 7.685286521911621, F1: 21.331878662109375, Validation Loss: 7.677295684814453, Validation F1: