# Import dependencies

In [1]:
import tensorflow as tf
import tensorflow_io as tfio
from tensorflow.keras.layers.experimental.preprocessing import PreprocessingLayer
from typing import List, Optional

2022-03-09 16:34:43.406933: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-03-09 16:34:43.407001: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


# Augmentor

In [2]:
class SpecAugment(PreprocessingLayer):
    def __init__(self, 
                 freq_mask_prob: float = 0.5,
                 freq_mask_param: float = 10,
                 time_mask_prob: float = 0.5,
                 time_mask_param: float = 10):
        self.freq_mask_prob = freq_mask_prob
        self.freq_mask_param = freq_mask_param
        self.time_mask_prob = time_mask_prob
        self.time_mask_param = time_mask_param
    
    def call(self, features):
        prob = tf.random.uniform([])
        augmented = tfio.audio.freq_mask(features, param=self.freq_mask_param)
        features = tf.cond(prob >= self.freq_mask_prob,
                           lambda: augmented,
                           lambda: features)
        
        prob = tf.random.uniform([])
        augmented = tfio.audio.time_mask(features, param=self.time_mask_param)
        features = tf.cond(prob >= self.time_mask_prob,
                           lambda: augmented,
                           lambda: features)

        return features

# Convolution Subsampling

In [3]:
class ConvSubsampling(tf.keras.layers.Layer):
    def __init__(self,
                 filters: List[int],
                 kernel_size: List[int] = [3, 3],
                 num_blocks: int = 1,
                 num_layers_per_block: int = 2,
                 dropout_rate: float = 0.0,
                 name: str = "ConvSubsampling",
                 **kwargs):
        
        super(ConvSubsampling, self).__init__(name=name, **kwargs)
        
        self.conv_blocks = tf.keras.Sequential()
        for i in range(num_blocks):
            convs = tf.keras.Sequential()
            for _ in range(num_layers_per_block):
                conv = tf.keras.layers.Conv2D(filters=filters[i],
                                              kernel_size=kernel_size[i],
                                              padding='same')
                dropout = tf.keras.layers.Dropout(rate=dropout_rate)
                relu = tf.keras.layers.ReLU()

                convs.add(conv)
                convs.add(dropout)
                convs.add(relu)
            
            self.conv_blocks.add(convs)
    
    def call(self, inputs, training=False, **kwargs):
        outputs = self.conv_blocks(inputs, training=training)

        return outputs


batch_size, seq_len1, seq_len2, dim = 3, 1, 15, 512
a = tf.random.uniform([batch_size, seq_len1, seq_len2, dim],
                       minval=-40,
                       maxval=40)
conv_sub = ConvSubsampling(filters=[512, 512])
b = conv_sub(a)
print(b)

2022-03-09 16:34:46.075546: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-03-09 16:34:46.075620: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-03-09 16:34:46.075665: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (asus): /proc/driver/nvidia/version does not exist
2022-03-09 16:34:46.076482: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


tf.Tensor(
[[[[ 0.          0.          1.7592382  ...  7.3736258   0.
    10.7482    ]
   [ 0.          0.          3.9257488  ...  3.7242112   0.
     0.        ]
   [ 0.          0.          7.5487366  ...  1.0469484   0.
     8.181978  ]
   ...
   [ 0.          0.          1.0650812  ...  5.8976912   0.
    11.519484  ]
   [ 0.          0.          2.8196242  ...  0.          0.
     1.1583123 ]
   [ 0.          2.7070963   2.9822922  ...  0.          0.
     2.8085012 ]]]


 [[[ 0.          0.          0.         ...  3.8675451   0.
     7.451819  ]
   [ 0.          0.          0.         ... 10.164973    0.
     0.        ]
   [ 0.          0.85858864  0.         ... 11.796834    0.
    11.693396  ]
   ...
   [ 0.          0.          5.906206   ...  7.208153    0.
     9.236406  ]
   [ 0.          0.          0.         ...  2.6752915   0.
     6.1389604 ]
   [ 0.          0.          0.         ...  8.276555    0.
     0.        ]]]


 [[[ 0.          0.          0.         ...

# Feed Forward Module

In [4]:
class FeedForwardModule(tf.keras.layers.Layer):
    def __init__(self,
                 ffn_dim: int,
                 dropout_rate: float = 0.4,
                 expansion_factor: int = 4,
                 output_reduction_factor: int = 0.5,
                 name: str = "FeedForwardModule",
                 **kwargs):
        super(FeedForwardModule, self).__init__(name=name, **kwargs)
        self.output_reduction_factor = output_reduction_factor

        self.ffn = tf.keras.Sequential([
            tf.keras.layers.LayerNormalization(),
            tf.keras.layers.Dense(ffn_dim * expansion_factor),
            tf.keras.layers.Activation(tf.nn.silu),     # Swish activation with beta=1
            tf.keras.layers.Dropout(dropout_rate),
            tf.keras.layers.Dense(ffn_dim),
            tf.keras.layers.Dropout(dropout_rate)
        ])
        self.add = tf.keras.layers.Add()


    def call(self, inputs, training=False, **kwargs):
        outputs = self.ffn(inputs, training=training)
        outputs = self.add([inputs, outputs * self.output_reduction_factor])
        
        return outputs

x = tf.random.uniform([batch_size, seq_len2, dim],
                       minval=-40,
                       maxval=40)
ffn_module = FeedForwardModule(ffn_dim=512)
y = ffn_module(x)
print(y)

tf.Tensor(
[[[ 32.955734  -12.525145   -3.701473  ...  20.555428   11.570231
   -10.307843 ]
  [ -6.0838504 -35.29328   -28.255804  ... -16.09479    28.688818
    20.9091   ]
  [ 26.145418   28.98612    -2.8145013 ...  23.870813   29.649874
    -6.3953967]
  ...
  [ 27.683733   30.917841   -4.138617  ... -29.21204   -16.682503
   -37.47012  ]
  [ 19.270308   35.466263   37.385265  ... -32.332584   31.857155
    37.144943 ]
  [  3.6893954  24.89962    38.809315  ... -23.52759   -20.726103
    23.821571 ]]

 [[ -1.7030383 -13.7418585  34.011215  ... -30.73392    -5.105972
    16.99659  ]
  [-23.19826    12.8521185 -32.671764  ... -18.971943    1.9581873
    18.207022 ]
  [ -0.8717662 -30.154459  -10.048917  ... -27.246363  -25.186249
   -22.200712 ]
  ...
  [ 16.074629   10.158666  -38.069565  ...  36.170956  -26.120712
    10.051651 ]
  [-13.962652   13.160518   12.935482  ...  39.650917  -30.30099
    31.140192 ]
  [-14.427951   28.662128  -27.069307  ... -33.504307   23.567274
   -13.

# Convolution Module

In [5]:
class GLU(tf.keras.layers.Layer):
    def __init__(self,
                 name: str = "GLU",
                 **kwargs):
        super().__init__(name=name, **kwargs)

    def call(self, inputs, **kwargs):
        mat1, mat2 = tf.split(inputs, 2, axis=-1)
        mat2 = tf.nn.sigmoid(mat2)

        return tf.math.multiply(mat1, mat2)

In [6]:
class ConvolutionModule(tf.keras.layers.Layer):
    def __init__(self,
                 filters: int,
                 expansion_factor: int = 2,
                 kernel_size: int = 3,
                 dropout_rate: float = 0.4,
                 name: str = "ConvolutionModule",
                 **kwargs):
        super(ConvolutionModule, self).__init__(name=name, **kwargs)

        self.conv_module = tf.keras.Sequential([
            tf.keras.layers.LayerNormalization(),
            tf.keras.layers.Conv1D(filters=filters * expansion_factor,      # Pointwise Conv
                                   kernel_size=1),
            GLU(),
            tf.keras.layers.Conv1D(filters=filters,                         # 1D Depthwise Conv
                                   kernel_size=kernel_size,
                                   padding='same',
                                   groups=filters),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Activation(tf.nn.silu),
            tf.keras.layers.Conv1D(filters=filters,                         # Pointwise Conv
                                   kernel_size=1),
            tf.keras.layers.Dropout(rate=dropout_rate)
        ])
        self.add = tf.keras.layers.Add()

    def call(self, inputs, training=False, **kwargs):
        outputs = self.conv_module(inputs, training=training)
        outputs = self.add([inputs, outputs])
        
        return outputs


x = tf.random.uniform([batch_size, seq_len2, dim],
                       minval=-40,
                       maxval=40)
cv = ConvolutionModule(filters=512)
y = cv(x)
print(y)

tf.Tensor(
[[[ 31.696167   -10.663067   -37.48425    ... -20.835714    11.380675
   -12.265628  ]
  [ 21.004318    18.080875   -33.19178    ...   9.084221   -12.156484
    21.89629   ]
  [ 37.9657      34.2879       5.5267434  ...  -1.2475196   22.66422
     7.633396  ]
  ...
  [  4.7922487   26.616556   -34.79274    ...  -4.6261277   33.255142
    14.239597  ]
  [  2.743063    17.43611     30.946697   ...  26.331295     4.661812
   -29.321018  ]
  [ 19.288813   -24.23975     29.891933   ...   3.2656744  -36.67496
    24.931236  ]]

 [[  0.15628254   3.2264771  -29.631634   ...  22.652111     2.2430909
   -29.765686  ]
  [ 25.349346   -12.552378   -28.984999   ...  39.130886   -15.609497
   -18.216927  ]
  [  1.0560838   34.77839     31.27607    ...  -0.43130168  37.300343
   -20.287409  ]
  ...
  [-19.600626    -9.761104   -11.008283   ... -25.728521     0.72996974
     4.361849  ]
  [  7.263018    34.226643   -14.928892   ... -18.79405      0.8717155
    34.57973   ]
  [ 23.465448   

# Multi Headed Self-Attention Module

In [7]:
class PositionalEncoding(tf.keras.layers.Layer):
    """
    Implements the sinusoidal positional encoding function
    Based on https://nlp.seas.harvard.edu/2018/04/03/attention.html#positional-encoding
    """
    def __init__(self,
                 d_model: int = 512,
                 name: str = "PositionalEncoding",
                 **kwargs):
        self.d_model = d_model
        super(PositionalEncoding, self).__init__(name=name, **kwargs)

    def build(self, input_shape):
        d_model = input_shape[-1]
        assert d_model == self.d_model, f"d_model must be equal to the last dimension of the input, which is {self.d_model}"

    @staticmethod
    def encode(max_len, d_model):
        pe = tf.zeros([max_len, d_model])
        position = tf.expand_dims(tf.range(0, max_len), axis=1)
        position = tf.cast(position, dtype=tf.float32)
        div_term = tf.math.exp(tf.range(0, d_model, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / float(d_model)))
        
        # Have to set up this way cause Tensorflow not allow assigning to EagerTensor
        pe = tf.Variable(pe)
        pe[:, 0::2].assign(tf.math.sin(position * div_term))
        pe[:, 1::2].assign(tf.math.cos(position * div_term))
        pe = tf.convert_to_tensor(pe)
        pe = tf.expand_dims(pe, axis=0)

        return pe

    def call(self, inputs, **kwargs):
        print(tf.shape(inputs))
        max_len, d_model = tf.shape(inputs)[-2], tf.shape(inputs)[-1]
        pe = self.encode(max_len, d_model)
        # outputs = tf.math.add(inputs, pe)

        return pe


pos = PositionalEncoding()
b = pos(a)
print(b)

tf.Tensor([  3   1  15 512], shape=(4,), dtype=int32)
tf.Tensor(
[[[ 0.00000000e+00  1.00000000e+00  0.00000000e+00 ...  1.00000000e+00
    0.00000000e+00  1.00000000e+00]
  [ 8.41470957e-01  5.40302277e-01  8.21856201e-01 ...  1.00000000e+00
    1.03663326e-04  1.00000000e+00]
  [ 9.09297407e-01 -4.16146815e-01  9.36414778e-01 ...  1.00000000e+00
    2.07326651e-04  1.00000000e+00]
  ...
  [-5.36572933e-01  8.43853951e-01 -8.36262643e-01 ...  9.99999166e-01
    1.24395953e-03  9.99999225e-01]
  [ 4.20167029e-01  9.07446802e-01 -2.57669855e-02 ...  9.99999046e-01
    1.34762272e-03  9.99999106e-01]
  [ 9.90607381e-01  1.36737227e-01  8.06904018e-01 ...  9.99998868e-01
    1.45128614e-03  9.99998927e-01]]], shape=(1, 15, 512), dtype=float32)


In [8]:
class RelativeMHA(tf.keras.layers.Layer):
    """
    Multi-head Attention with Relative Positional Embedding
    Based on https://github.com/sooftware/conformer/blob/main/conformer/attention.py
    """
    def __init__(self,
                 num_heads: int = 8,
                 d_model: int = 512,
                 dropout_rate: float = 0.4,
                 name: str = "RelativeMHA",
                 **kwargs):
        super(RelativeMHA, self).__init__(name=name, **kwargs)
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_head = d_model // num_heads

        self.dropout = tf.keras.layers.Dropout(rate=dropout_rate)

        self.query_linear = tf.keras.layers.Dense(d_model)
        self.key_linear = tf.keras.layers.Dense(d_model)
        self.value_linear = tf.keras.layers.Dense(d_model)
        self.pos_linear = tf.keras.layers.Dense(d_model)
        self.out_linear = tf.keras.layers.Dense(d_model)

        self.u_bias = tf.Variable(tf.keras.initializers.HeUniform()([self.num_heads, self.d_head]))
        self.v_bias = tf.Variable(tf.keras.initializers.HeUniform()([self.num_heads, self.d_head]))


    def build(self, input_shape):
        d_model = input_shape[-1]
        assert d_model == self.d_model, f"d_model must be equal to the last dimension of the input, which is {self.d_model}"
        assert d_model % self.num_heads == 0, f"num_heads must be divisible by {d_model}"
    
    def call(self, 
             query: tf.Tensor,
             key: tf.Tensor,
             value: tf.Tensor,
             pos_embedding: tf.Tensor,
             training=False,
             attention_mask: Optional[tf.Tensor] = None) -> tf.Tensor:

        batch_size, seq_len = tf.shape(query)[0], tf.shape(query)[2]
        query = tf.reshape(self.query_linear(query, training=training), [batch_size, -1, self.num_heads, self.d_head])
        key = tf.transpose(tf.reshape(self.key_linear(key, training=training), [batch_size, -1, self.num_heads, self.d_head]), perm=[0, 2, 1, 3])
        value = tf.transpose(tf.reshape(self.value_linear(value, training=training), [batch_size, -1, self.num_heads, self.d_head]), perm=[0, 2, 1, 3])
        pos_embedding = tf.reshape(self.pos_linear(pos_embedding, training=training), [batch_size, -1, self.num_heads, self.d_head])

        content_score = tf.linalg.matmul(tf.transpose(query + self.u_bias, perm=[0, 2, 1, 3]), tf.transpose(key, perm=[0, 1, 3, 2]))
        pos_score = tf.linalg.matmul(tf.transpose(query + self.v_bias, perm=[0, 2, 1, 3]), tf.transpose(pos_embedding, perm=[0, 2, 3, 1]))
        pos_score = self._relative_shift(pos_score)

        score = (content_score + pos_score) / tf.math.sqrt(float(self.d_model))

        if attention_mask is not None:
            attention_mask = tf.expand_dims(attention_mask, axis=1)
            score = tf.where(attention_mask, tf.fill(tf.shape(score), -1e9), score)

        attn = tf.nn.softmax(score, axis=-1)
        attn = self.dropout(attn, training=training)
        context = tf.transpose(tf.linalg.matmul(attn, value), perm=[0, 2, 1, 3])
        context = self.out_linear(tf.reshape(context, [batch_size, -1, seq_len, self.d_model]), training=training)

        return context

    @staticmethod
    def _relative_shift(pos_score: tf.Tensor) -> tf.Tensor:
        batch_size, num_heads, seq_len1, seq_len2 = tf.shape(pos_score)
        zeros = tf.zeros([batch_size, num_heads, seq_len1, 1])
        padded_pos_score = tf.concat([zeros, pos_score], axis=-1)

        padded_pos_score = tf.reshape(padded_pos_score, [batch_size, num_heads, seq_len2 + 1, seq_len1])
        pos_score = tf.reshape(padded_pos_score[:, :, 1:], tf.shape(pos_score))

        return pos_score

In [9]:
class MultiHeadedSelfAttention(tf.keras.layers.Layer):
    def __init__(self,
                 num_heads: int = 8,
                 d_model: int = 512,
                 dropout_rate: float = 0.4,
                 name: str = "MultiHeadedSelfAttention",
                 **kwargs):
        super(MultiHeadedSelfAttention, self).__init__(name=name, **kwargs)
        self.layer_norm = tf.keras.layers.LayerNormalization()
        self.positional_encoding = PositionalEncoding(d_model)
        self.attention = RelativeMHA(num_heads=num_heads, d_model=d_model, dropout_rate=dropout_rate)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
    
    def call(self, inputs: tf.Tensor, training=False, mask: Optional[tf.Tensor] = None) -> tf.Tensor:
        batch_size = tf.shape(inputs)[0]
        pos_embedding = self.positional_encoding(inputs)
        pos_embedding = tf.concat([pos_embedding for _ in range(batch_size)], axis=0)
        
        x = self.layer_norm(inputs, training=training)
        x = self.attention(x, x, x, pos_embedding, training=training, attention_mask=mask)
        x = self.dropout(x, training=training)  

        return x

In [10]:
class MHSAModule(tf.keras.layers.Layer):
    def __init__(self,
                 head_size: int,
                 num_heads: int = 8,
                 d_model: int = 512,
                 dropout_rate: float = 0.4,
                 name: str = "MHSAModule",
                 **kwargs):
        super(MHSAModule, self).__init__(name=name, **kwargs)
        self.layer_norm = tf.keras.layers.LayerNormalization()
        self.positional_encoding = PositionalEncoding(d_model)
        self.attention = tf.keras.layers.MultiHeadAttention(num_heads=num_heads,
                                                            key_dim=head_size)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.add = tf.keras.layers.Add()

    def call(self, inputs: tf.Tensor, training=False, mask: Optional[tf.Tensor] = None) -> tf.Tensor:
        batch_size = tf.shape(inputs)[0]
        pos_embedding = self.positional_encoding(inputs)
        pos_embedding = tf.concat([pos_embedding for _ in range(batch_size)], axis=0)
        pos_embedding = tf.cast(pos_embedding, dtype=inputs.dtype)

        outputs = self.layer_norm(inputs, training=training)
        outputs = self.add([outputs, pos_embedding])
        outputs = self.attention(outputs, outputs, outputs, attention_mask=mask, training=training)
        outputs = self.dropout(outputs, training=training)
        outputs = self.add([inputs, outputs])
        
        return outputs

# Conformer Block

In [11]:
class ConformerBlock(tf.keras.layers.Layer):
    def __init__(self,
                 num_blocks: int = 1,
                 encoder_dim: int = 512,
                 num_heads: int = 8,
                 dropout_rate: float = 0.4,
                 name: str = "ConformerBlock",
                 **kwargs):
        super(ConformerBlock, self).__init__(name=name, **kwargs)
        self.num_blocks = num_blocks
        self.ff_module = FeedForwardModule(encoder_dim)
        self.attention = MultiHeadedSelfAttention(num_heads=num_heads, d_model=encoder_dim, dropout_rate=dropout_rate)
        # self.attention = MHSAModule(head_size=encoder_dim, 
        #                             num_heads=num_heads, 
        #                             d_model=encoder_dim, 
        #                             dropout_rate=dropout_rate)
        self.conv = ConvolutionModule(encoder_dim)
        self.layer_norm = tf.keras.layers.LayerNormalization()

    def call(self, inputs: tf.Tensor, training=False, mask: Optional[tf.Tensor] = None) -> tf.Tensor:
        for _ in range(self.num_blocks):
            x = self.ff_module(inputs, training=training)
            x = self.attention(x, training=training, mask=mask)
            x = self.conv(x, training=training)
            x = self.ff_module(x, training=training)
            x = self.layer_norm(x, training=training)

        return x

# Conformer

In [12]:
class ConformerEncoder(tf.keras.Model):
    def __init__(self,
                 num_conv_filters: List[int],
                 num_blocks: int = 1,
                 encoder_dim: int = 512,
                 num_heads: int = 8,
                 dropout_rate: float = 0.4,
                 num_classes:int = 10,
                 include_top: bool = True,
                 name: str = "ConformerEncoder",
                 **kwargs):
        super(ConformerEncoder, self).__init__(name=name, **kwargs)
        self.include_top = include_top
        self.conv_subsampling = ConvSubsampling(filters=num_conv_filters, dropout_rate=dropout_rate)
        self.linear = tf.keras.layers.Dense(encoder_dim)
        self.out_linear = tf.keras.layers.Dense(num_classes)
        self.relu = tf.keras.layers.Activation(tf.nn.relu)
        self.log_softmax = tf.keras.layers.Activation(tf.nn.log_softmax)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.conformer_block = ConformerBlock(num_blocks=num_blocks, encoder_dim=encoder_dim, num_heads=num_heads, dropout_rate=dropout_rate)

    def call(self, inputs: tf.Tensor, training=False, mask: Optional[tf.Tensor] = None) -> tf.Tensor:
        x = self.conv_subsampling(inputs, training=training)
        x = self.linear(x, training=training)
        x = self.relu(x, training=training)
        x = self.dropout(x, training=training)
        x = self.conformer_block(x, training=training, mask=mask)

        if self.include_top:
            x = self.out_linear(x, training=training)
            x = self.log_softmax(x, training=training)

        return x

# Test

In [13]:
batch_size, seq_len, dim = 3, 15, 512

inputs = tf.random.uniform((batch_size, seq_len, dim),
                            minval=-40,
                            maxval=40)

model = ConformerEncoder(num_conv_filters=[512, 512], num_blocks=1, encoder_dim=512, num_heads=8, dropout_rate=0.4, num_classes=10, include_top=True)

In [14]:
inputs = tf.expand_dims(inputs, axis=1)
outputs = model(inputs)

tf.Tensor([  3   1  15 512], shape=(4,), dtype=int32)


In [15]:
print(tf.shape(outputs))

tf.Tensor([ 3  1 15 10], shape=(4,), dtype=int32)
