In [1]:
# mount google drive. data and utils
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My\ Drive/Coursera/Coursera_DeepLearningSpecialization/course5_coding_assignments/W4A1/

Mounted at /gdrive
/gdrive/My Drive/Coursera/Coursera_DeepLearningSpecialization/course5_coding_assignments/W4A1


In [2]:
!pip install transformers

Collecting transformers
  Downloading transformers-4.31.0-py3-none-any.whl (7.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m46.9 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.14.1 (from transformers)
  Downloading huggingface_hub-0.16.4-py3-none-any.whl (268 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.8/268.8 kB[0m [31m28.4 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers)
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m104.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting safetensors>=0.3.1 (from transformers)
  Downloading safetensors-0.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m74.7 MB/s[0m eta [36m0:00:

In [3]:
import tensorflow as tf
import time
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras.layers import Embedding, MultiHeadAttention, Dense, Input, Dropout, LayerNormalization
from transformers import DistilBertTokenizerFast
from transformers import TFDistilBertForTokenClassification
from utils import create_padding_mask, create_look_ahead_mask, FullyConnected, positional_encoding
%load_ext autoreload
%autoreload 2


- Encoder
- Decoder
- Transformer

Resources:
- [TensorFlow - A Transformer Chatbot Tutorial with TensorFlow 2.0](https://medium.com/tensorflow/a-transformer-chatbot-tutorial-with-tensorflow-2-0-88bf59e66fe2)
- [ML Mastery - Implementing the Transformer Encoder from Scratch in TensorFlow and Keras](https://machinelearningmastery.com/implementing-the-transformer-encoder-from-scratch-in-tensorflow-and-keras/)
- [ML Mastery - Implementing the Transformer Decoder from Scratch in TensorFlow and Keras](https://machinelearningmastery.com/implementing-the-transformer-decoder-from-scratch-in-tensorflow-and-keras/)


## Encoder

In [4]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               embedding_dim,
               num_heads,
               fully_connected_dim,
               dropout_rate=0.1,
               layernorm_eps=1e-6,
               ):
    super(EncoderLayer, self).__init__()
    self.att = MultiHeadAttention(num_heads=num_heads,
                                  key_dim=embedding_dim,
                                  dropout=dropout_rate)
    self.ffn = tf.keras.Sequential([
        tf.keras.layers.Dense(fully_connected_dim, activation="relu"),
        tf.keras.layers.Dense(embedding_dim)
    ])

    self.layernorm1 = LayerNormalization(epsilon=layernorm_eps)
    self.layernorm2 = LayerNormalization(epsilon=layernorm_eps)
    self.dropout1 = Dropout(dropout_rate)
    self.dropout2 = Dropout(dropout_rate)

  def call(self, x, training, mask):
    attn_output = self.att(x, x, x, mask)
    # attn_output = self.dropout1(attn_output, training = training)
    normed_output = self.layernorm1(x + attn_output)
    ffn_output = self.ffn(normed_output)
    ffn_output = self.dropout2(ffn_output, training = training)
    encoder_layer_out = self.layernorm2(normed_output + ffn_output)
    return encoder_layer_out


In [5]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self,
               num_layers,
               embedding_dim,
               num_heads,
               fully_connected_dim,
               input_vocab_size,
               maximum_position_encoding,
               dropout_rate=0.1,
               layernorm_eps=1e-6,
               ):
    super(Encoder, self).__init__()
    self.num_layers = num_layers
    self.embedding_dim = embedding_dim
    self.embedding = Embedding(input_vocab_size, embedding_dim)
    self.pos_encoding = positional_encoding(maximum_position_encoding,
                                            embedding_dim)
    self.enc_layers = [EncoderLayer(embedding_dim=embedding_dim,
                                    num_heads=num_heads,
                                    fully_connected_dim=fully_connected_dim,
                                    dropout_rate=dropout_rate,
                                    layernorm_eps=layernorm_eps)
                      for _ in range(num_layers)]
    self.dropout = Dropout(dropout_rate)

  def call(self,
           inputs,
           training,
           mask):
    """
    Args:
      - inputs:tf.Tensor (batch_size, input_seq_len, fully_connected_dim)
      - training:bool
      - mask:
    Returns:
      - outputs:tf.Tensor(batch_size, input_seq_len, fully_connected_dim)
    """

    seq_len = tf.shape(inputs)[1]
    # Pass input through the Embedding layer
    embeddings = self.embedding(inputs)
    # Scale embedding by multiplying it by the square root of the embedding dimension
    embeddings *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
    # Add the position encoding to embedding
    pos_encoding_output = embeddings + self.pos_encoding[:, :seq_len, :]
    outputs = self.dropout(pos_encoding_output, training = training)
    for i in range(self.num_layers):
      outputs = self.enc_layers[i](outputs, training, mask)
    return outputs

## Decoder

In [6]:
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               embedding_dim,
               num_heads,
               fully_connected_dim,
               dropout_rate=0.1,
               layernorm_eps=1e-6):
    super(DecoderLayer, self).__init__()
    self.att1 = MultiHeadAttention(num_heads=num_heads,
                                  key_dim=embedding_dim,
                                  dropout=dropout_rate)
    self.att2 = MultiHeadAttention(num_heads=num_heads,
                                  key_dim=embedding_dim,
                                  dropout=dropout_rate)
    self.ffn = tf.keras.Sequential([
        tf.keras.layers.Dense(fully_connected_dim, activation="relu"),
        tf.keras.layers.Dense(embedding_dim)
    ])
    self.layernorm1 = LayerNormalization(epsilon=layernorm_eps)
    self.layernorm2 = LayerNormalization(epsilon=layernorm_eps)
    self.layernorm3 = LayerNormalization(epsilon=layernorm_eps)
    self.dropout1 = Dropout(dropout_rate)
    self.dropout2 = Dropout(dropout_rate)
    self.dropout3 = Dropout(dropout_rate)

  def call(self,
           x,
           enc_output,
           training,
           look_ahead_mask,
           padding_mask):
    # att_output, att_weights_block
    attn_output1, attn_weights_block1 = self.att1(x, x, x, look_ahead_mask, return_attention_scores=True)
    attn_output1 = self.dropout1(attn_output1, training = training)
    normed_output1 = self.layernorm1(attn_output1 + x)
    attn_output2, attn_weights_block2 = self.att2(normed_output1, enc_output, enc_output, padding_mask, return_attention_scores=True)
    attn_output2 = self.dropout2(attn_output2, training = training)
    normed_output2 = self.layernorm2(normed_output1 + attn_output2)
    ffn_output = self.ffn(normed_output2)
    ffn_output = self.dropout3(ffn_output, training = training)
    normed_output3 = self.layernorm3(ffn_output + normed_output2)
    return normed_output3, attn_weights_block1, attn_weights_block2

In [7]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self,
               num_layers,
               embedding_dim,
               num_heads,
               fully_connected_dim,
               target_vocab_size,
               maximum_position_encoding,
               dropout_rate=0.1,
               layernorm_eps=1e-6):
    super(Decoder, self).__init__()
    self.num_layers = num_layers
    self.embedding_dim = embedding_dim
    self.embedding = Embedding(target_vocab_size, embedding_dim)
    self.pos_encoding = positional_encoding(maximum_position_encoding, embedding_dim)
    self.dec_layers = [DecoderLayer(embedding_dim=embedding_dim,
                                    num_heads=num_heads,
                                    fully_connected_dim=fully_connected_dim,
                                    dropout_rate=dropout_rate,
                                    layernorm_eps=layernorm_eps)
                      for _ in range(num_layers)]
    self.dropout = Dropout(dropout_rate)

  def call(self, x,
           encoding_output,
           training,
           look_ahead_mask,
           padding_mask):
    seq_len = tf.shape(x)[1]
    attention_weights = {}

    embeddings = self.embedding(x)  # (batch_size, target_seq_len, fully_connected_dim)
    embeddings *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
    pos_encoding_output = embeddings + self.pos_encoding[:, :seq_len, :]
    outputs = self.dropout(pos_encoding_output, training = training)

    for i in range(self.num_layers):
      outputs, block1, block2 = self.dec_layers[i](outputs,
                                             encoding_output,
                                             training,
                                             look_ahead_mask,
                                             padding_mask)
      attention_weights[f"decoder_layer{i+1}_block1_self_att"] = block1
      attention_weights[f"decoder_layer{i+1}_block2_decenc_att"] = block2

    return outputs, attention_weights


## Transformer

In [18]:
class Transformer(tf.keras.Model):
  def __init__(self,
               num_layers,
               embedding_dim,
               num_heads,
               fully_connected_dim,
               input_vocab_size,
               target_vocab_size,
               max_positional_encoding_input,
               max_positional_encoding_target,
               dropout_rate = 0.1,
               layernorm_eps = 1e-6
               ):
      super(Transformer, self).__init__()

      self.encoder = Encoder(num_layers=num_layers,
                               embedding_dim=embedding_dim,
                               num_heads=num_heads,
                               fully_connected_dim=fully_connected_dim,
                               input_vocab_size=input_vocab_size,
                               maximum_position_encoding=max_positional_encoding_input,
                               dropout_rate=dropout_rate,
                               layernorm_eps=layernorm_eps)

      self.decoder = Decoder(num_layers=num_layers,
                              embedding_dim=embedding_dim,
                              num_heads=num_heads,
                              fully_connected_dim=fully_connected_dim,
                              target_vocab_size=target_vocab_size,
                              maximum_position_encoding=max_positional_encoding_target,
                              dropout_rate=dropout_rate,
                              layernorm_eps=layernorm_eps)

      self.final_layer = Dense(target_vocab_size, activation='softmax')

  def call(self,
           input_sentence,
           output_sentence,
           training: bool,
           encoding_padding_mask,
           look_ahead_mask,
           decoding_padding_mask):
    encoding_output = self.encoder(input_sentence, training, encoding_padding_mask)
    decoding_output, attention_weights = self.decoder(output_sentence,
                                                      encoding_output,
                                                      training,
                                                      look_ahead_mask,
                                                      decoding_padding_mask)
    final_output = self.final_layer(decoding_output)
    return final_output, attention_weights


In [21]:
num_layers = 6
embedding_dim = 4
num_heads = 4
fully_connected_dim = 8
input_vocab_size = 30
target_vocab_size = 35
max_positional_encoding_input = 5
max_positional_encoding_target = 6
sentence_lang_a = np.array([[2, 1, 4, 3, 0]])
sentence_lang_b = np.array([[3, 2, 1, 0, 0]])
enc_padding_mask = create_padding_mask(sentence_lang_a)
dec_padding_mask = create_padding_mask(sentence_lang_b)
look_ahead_mask = create_look_ahead_mask(sentence_lang_a.shape[1])

trans = Transformer(num_layers,
              embedding_dim,
              num_heads,
              fully_connected_dim,
              input_vocab_size,
              target_vocab_size,
              max_positional_encoding_input,
              max_positional_encoding_target)

translation, weights = trans(
        sentence_lang_a,
        sentence_lang_b,
        False,
        enc_padding_mask,
        look_ahead_mask,
        dec_padding_mask
    )

translation

<tf.Tensor: shape=(1, 5, 35), dtype=float32, numpy=
array([[[0.02122542, 0.03951558, 0.03045573, 0.01886736, 0.01792597,
         0.01548912, 0.0270176 , 0.01604852, 0.02647521, 0.05826805,
         0.01791989, 0.03370154, 0.03252686, 0.02306971, 0.04643841,
         0.01117709, 0.01843351, 0.03842779, 0.02485414, 0.02299673,
         0.04306052, 0.03768206, 0.02643834, 0.00933093, 0.01064945,
         0.047329  , 0.05123758, 0.01787438, 0.02077915, 0.03096425,
         0.05060191, 0.01340286, 0.05078856, 0.03535206, 0.01367469],
        [0.02821478, 0.0307212 , 0.02532112, 0.01353185, 0.01700304,
         0.01946055, 0.02366202, 0.01418533, 0.02688376, 0.04471272,
         0.01996359, 0.04141346, 0.02982285, 0.01805617, 0.03065509,
         0.01011677, 0.01725758, 0.04791638, 0.02592303, 0.0316627 ,
         0.03940037, 0.05005689, 0.03631482, 0.01068982, 0.01044376,
         0.05724731, 0.05910851, 0.01487506, 0.01838412, 0.03088713,
         0.04143605, 0.01502385, 0.05358619, 0.027