In [1]:
import os
import random
import tensorflow as tf
from tensorflow.keras import layers

# COUCHE D ENTREE DU DECODEUR

In [4]:
class TokenEmbedding(layers.Layer):
    def __init__(self, num_vocab=1000, maxlen=100, num_hid=64):
        super()._init__()
        self.emb= tf.keras.layers.Embedding(num_vocab, num_hid)
        self.pos_em= tf.keras.layers.Embedding(input_dim=maxlen, output_dim=num_hid)
        
    def call(self, x):
        maxlen= tf.shape(x)[-1]
        x= self.emb(x)
        
        positions= tf.range(start=0, limit=maxlen, delta=1)
        positions=self.pos_emb(positions)
        return x+positions

# COUCHE D ENTREE DE L ENCODEUR

In [5]:
class SpeechFeatureEmbedding(layers.Layer):
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv1D(num_hid, 11, strides=2, padding="same", activation="relu")
        self.conv2 = tf.keras.layers.Conv1D(num_hid, 11, strides=2, padding="same", activation="relu")
        self.conv3 = tf.keras.layers.Conv1D(num_hid, 11, strides=2, padding="same", activation="relu")
        
        
    def call(self, x):
        x=self.conv1(x)
        x=self.conv2(x)
        return self.conv3(x)
        

# COUCHE D ENCODEUR DU TRANSFORMATEUR

In [None]:
class TranssformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, ffn_dim, rate=0.01):
        super().__init__()
        self.att= layers.MultiHeadAttention(num_heads=num_heads, key_dim= embed_dim)
        self.ffn= tf.keras.Sequential(
        [
            tf.keras.layers.Dense(ffn_dim, activation="relu")
            tf.keras.layers.Dense(embed_dim)
        ]
        )
        self.layernorm1= tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2= tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1= tf.keras.Dropout(rate)
        self.dropout2= tf.keras.Dropout(rate)
        
    def call(self, inputs,training):
        att_output= self.att(inputs, inputs)
        att_output= self.dropout1(att_output, training= training)
        
        out1= self.layernorm1(inputs+att_output)
        ffn_output= self.ffn(out1)
        ffn_output= self.dropout(ffn_output, training= training)
        return self.layernorm2(out1+ffn_output)
        

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self , embed_dim, num_heads, ffn_dim, rate=0.1):
        super().__init__()
        self.layernorm1= tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2= tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3= tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.self_att= tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.enc_att= tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.self_dropout= tf.keras.layers.Dropout(0.5)
        self.enc_dropout= tf.keras.layers.Dropout(0.1)
        self.ffn_dropout= tf.keras.layers.Dropout(0.1)
        ffn= tf.keras.Sequential(
        [
           
            tf.keras.layers.Dense(ffn_dim, actvation="relu")
            tf.keras.layers.Dense(embed_dim)
        ])
        
    def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention.

        This prevents flow of information from future tokens to current token.
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)
    
    def call( self, enc_out, target):
        input_shape= tf.shape(target)
        batch_size= input_shape[0]
        seq_len= input_shape[1]
        causal_mask= self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att=self.self_att(target, target, attention_mask= causal_mask)
        target_norm= self.layernorm1(target+ self.self_dropout(target_att))
        enc_out= self.enc_att(target_norm , enc_out)
        enc_out_norm= self.layernorm2(self.enc_dropout(enc_out)+target_norm)
        ffn_out= self.ffn(enc_out_norm)
        ffn_out_norm= self.layernorm3(enc_out_norm+ self.ffn_dropout(ffn_out))
        return ffn_out_norm
        

# TRANSFORMER MODEL

In [None]:
class Transformer(keras.Model):
    def __init__(self,
                num_hid=64,
                num_head=2,
                num_ff=128,
                source_maxlen=100,
                target_maxlen=100,
                num_layers_enc=4,
                num_layers_dec=1,
                num_classes=10,):
        super().__init__()
        self.loss_metric= tf.keras.metrics.Mean(name="loss")
        self.num_layers_enc= num_layers_enc
        self.num_layers_dec= num_layers_dec
        self.target_maxlen=target_maxlen
        self.num_classes= num_classes
        
        self.enc_input= SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input= TokenEmbedding( num_vocab=num_classes, maxlen=target_maxlen , num_hid= num_hid)
        
        #define the encoder laer of transformer
        self.encoder= tf.keras.Sequential(
        [self.enc_input]+[TransformerEncoder(num_hid, num_head, num_ff) for _ in range(num_layers_enc)])
        
        
    def train_step(self, batch):
        """Processes one batch inside model.fit()."""
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        with tf.GradientTape() as tape:
            preds = self([source, dec_input])
            one_hot = tf.one_hot(dec_target, depth=self.num_classes)
            mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
            loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def test_step(self, batch):
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        preds = self([source, dec_input])
        one_hot = tf.one_hot(dec_target, depth=self.num_classes)
        mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
        loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}

    def generate(self, source, target_start_token_idx):
        """Performs inference over one batch of inputs using greedy decoding."""
        bs = tf.shape(source)[0]
        enc = self.encoder(source)
        dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
        dec_logits = []
        for i in range(self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input)
            logits = self.classifier(dec_out)
            logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
            last_logit = tf.expand_dims(logits[:, -1], axis=-1)
            dec_logits.append(last_logit)
            dec_input = tf.concat([dec_input, last_logit], axis=-1)
        return dec_input
        
        

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
    def __init__(self, batch , idx_to_token, target_start_token_idx=27, target_end_token_idx=28):
        self.batch=batch
        self.target_start_token_idx= target_start_token_idx
        self.target_end_token_idx= target_end_token_idx
        self.idx_to_char= idx_to_token
        
    def on_epoch_end(self, epoch, logs=None):
        if epoch%5!=0:
            return
        source=self.batch["source"]
        target=self.batch["target"].numpy()
        bs= tf.shape(source)[0]
        preds= self.model.generate(source, self.target_start_token_idx)
        preds= preds.numpy()
        for i in range(bs):
            target_text= "".join([self.idx_to_char[_] for _ in range(target[i,:])])
            prediction=""
            for idx in preds[i,:]:
                prediction+= self.idx_to_char[idx]
                if idx== self.target_end_token_idx:
                    break
            print(f"target:   {target_text.replace("-", "")})
            print(f"predictions:  {predictionss}\n)
                 

# BAREME D 'APPRENTISSAGE

In [None]:

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self,
                 init_lr=0.00001 ,
                 lr_after_warnup=0.001 ,
                 final_lr=0.00001, 
                 warnup_epochs=15,
                 decay_epochs=85,
                steps_per_epoch=203,
                ):
        super().__init__()
        self.init_lr= init_lr
        self.lr_after_warnup= lr_after_warnup
        self.final_lr=final_lr
        self.warnup_epochs= warnup_epochs
        self.decay_epochs=decay_epochs
        self.steps_per_epoch= steps_per_epoch
        
    def calculate_lr(self, epoch):
        warnup_lr=(
        self.init_lr
            +((self.lr_after_warnup- self.init_lr) / (self.warnup_epochs-1))*epoch)
        
        decay_lr=tf.math.maximum(self.final_lr, self.after_warnup-(epoch - self.warnup_epochs)
                                *(self.lr_after_warnup- self.final_lr)/(self.decay_epochs),)
        
        return tf.math.minimum(warnup_lr , decay_lr)
        


def call(self , step):
    epoch= step// self.steps_per_epoch
    return self.calculate_lr(epoch)


# CREER ET FORMER LE MODELE DE BOUT EN BOUT

In [None]:
batch= next(iter(val_ds))

idx_to_char= vectorizer.get_vocabulary()
display_cb= DisplayOutputs(batch, idx_to_char, target_start_token_idx=2, target_end_token_idx=3 )

Model= Transformer(
    num_hid=200,
    num_head=2,
    num_feed_forward=400, 
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=34)

loss_fn= tf.keras.losses.CategoricalCrossentropy(from_logits=True, label_smoothing=0.1,)

learning_rate=CustomSchedule(
    init_lr=0.00001,
    lr_after_warnup=0.001,
    final_lr=0.00001,
    warnup_epochs=15,
    decay_epochs=85,
    steps_per_epoch=len(ds),)
optimizer= tf.keras.optimizers.Adam(learning_rate=learning_rate)
Model.compile(loss=loss_fn, optimizer=optimizer)

history= Model.fit(ds, validation_data=val_ds, callbacks=[display_cb], epoch=200)

