<a href="https://colab.research.google.com/github/toddwalters/pgaiml-python-coding-examples/blob/main/deep-learning/C7/Transformers_from_Scratch_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow_text

In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import (Add, Dense, Layer, Dropout, Embedding, LayerNormalization,
                                     MultiHeadAttention, TextVectorization, StringLookup)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers.schedules import LearningRateSchedule
import tensorflow_text as tf_text
_AUTO = tf.data.AUTOTUNE

In [None]:
!wget https://www.manythings.org/anki/fra-eng.zip
!unzip fra-eng.zip

In [None]:
class Config:
  DATA_FNAME = 'fra.txt'
  BATCH_SIZE = 256

  SOURCE_VOCAB_SIZE = 10000
  TARGET_VOCAB_SIZE = 10000

  MAX_POS_ENCODING = 256 # define the max positions in source and target

  ENCODER_NUM_LAYERS = 2 # number of layers for encoders
  DECODER_NUM_LAYERS = 2 # number of layers for encoders

  #Define the dimensions of models
  D_MODEL = 128

  #Number of units in FFNN
  DFF = 256

  #define the number of heads
  NUM_HEADS = 4
  DROP_RATE = 0.1

  #Number of epochs to train
  EPOCHS = 25

  #Define the output directory
  OUTPUT_DIR = 'output'

config = Config()

In [None]:
# exp1 = Config()
# exp1.EPOCHS = 30

# Create Attention Layers

In [None]:
class BaseAttention(Layer):

  def __init__(self, **kwargs):
    super().init()
    # Layer Definitions
    self.mha = MultiHeadAttention(**kwargs)
    self.layernorm = LayerNormalization()
    self.add = Add()

class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    attentionOuputs = self.mha(query=x,
                               key = x,
                               value=x)
    x = self.add([x,attentionOuputs])
    x = self.layernorm(x)
    return x

class CrossAttention(BaseAttention):
  def call(self,x,context):
    (attentionOutputs,attentionScores) = self.mha(query=x,
                                                  key=context,
                                                  value=context,
                                                  return_attention_scores=True)
    x = self.add([x,attentionOutputs])
    x = self.layernorm(x)
    return x

class CausalAttention(BaseAttention): # Masked Attention
  def call(self,x):
    attentionOutputs = self.mha(query=x,
                                key=x,
                                value=x,
                                use_causal_mask=True)
    x = self.add([x,attentionOutputs])
    x = self.layernorm(x)
    return x

In [None]:
import random
def load_data(file_name):
  with open(file_name, 'rt') as textFile:
    lines = textFile.readlines()
    pairs = [line.split("\t")[:-1] for line in lines]
    random.shuffle(pairs)
    source = [src for src,_ in pairs]
    target = [trgt for _,trgt in pairs]
  return (source,target)

def splitting_dataset(source,target):
  trainSize = int(len(source) * 0.8)
  valsize = int(len(source) * 0.1)

  (trainSource,trainTarget) = (source[:trainSize],target[:trainSize])
  (valSource,valTarget) = (source[trainSize:trainSize+valsize], target[trainSize:trainSize+valsize])
  (testSource,testTarget) = (source[trainSize+valsize :], target[trainSize+valsize :])

  return (
      (trainSource,trainTarget),
      (valSource,valTarget),
      (testSource,testTarget)
  )

In [None]:
source,target = load_data(file_name=config.DATA_FNAME)

In [None]:
source[:5]

In [None]:
target[:5]

In [None]:
def make_dataset(splits, batchSize,sourceTextProcessor, targetTextProcessor, train=False):
  (source,target) = splits
  dataset = tf.data.Dataset.from_tensor_slices((source,target))

  def prepare_batch(source,target):
    source = sourceTextProcessor(source)
    targetBuffer = targetTextProcessor(target)
    targetInput = targetBuffer[:,:-1]
    targetOutput = targetBuffer[:,1:]
    return (source,targetInput), targetOutput

  if train:
    dataset = (
        dataset.shuffle(dataset.cardinality().numpy())
        .batch(batchSize)
        .map(prepare_batch,_AUTO)
        .prefetch(_AUTO)
    )
  else:
    dataset = dataset.batch(batchSize).map(prepare_batch,_AUTO).prefetch(_AUTO)

  return dataset

In [None]:
def tf_lower_and_split_punct(text):
    # split accented characters
    text = tf_text.normalize_utf8(text, "NFKD")
    text = tf.strings.lower(text)

    # keep space, a to z, and selected punctuations
    text = tf.strings.regex_replace(text, "[^ a-z.?!,]", "")

    # add spaces around punctuation
    text = tf.strings.regex_replace(text, "[.?!,]", r" \0 ")

    # strip whitespace and add [START] and [END] tokens
    text = tf.strings.strip(text)
    text = tf.strings.join(["[START]", text, "[END]"], separator=" ")

    # return the processed text
    return text

In [None]:
# Load the dataset
source,target = load_data(file_name=config.DATA_FNAME)

# Split the dataset
(train,val,test) = splitting_dataset(source=source,target=target)

# Create Text processing layer

sourceTextProcessor = TextVectorization(standardize = tf_lower_and_split_punct ,
                                        max_tokens=config.SOURCE_VOCAB_SIZE)
sourceTextProcessor.adapt(train[0]) # source text data

TargetTextProcessor = TextVectorization(standardize = tf_lower_and_split_punct ,
                                        max_tokens=config.TARGET_VOCAB_SIZE)
TargetTextProcessor.adapt(train[1]) # target text data

In [None]:
trainDs = make_dataset(
    splits=train,
    batchSize=config.BATCH_SIZE,
    train=True,
    sourceTextProcessor=sourceTextProcessor,
    targetTextProcessor=TargetTextProcessor
)

validDs = make_dataset(
    splits=val,
    batchSize=config.BATCH_SIZE,
    train=False,
    sourceTextProcessor=sourceTextProcessor,
    targetTextProcessor=TargetTextProcessor
)

testDs = make_dataset(
    splits=test,
    batchSize=config.BATCH_SIZE,
    train=False,
    sourceTextProcessor=sourceTextProcessor,
    targetTextProcessor=TargetTextProcessor
)

In [None]:
trainDs

In [None]:
class FeedForward(Layer):
  def __init__(self,dff, dModel, dropoutRate=0.1, **kwargs ):
    super().__init__(**kwargs)

    #Sequential
    self.seq = Sequential([
        Dense(units=dff, activation='relu'),
        Dense(units=dModel),
        Dropout(rate=dropoutRate)
    ])

    self.add = Add()
    self.layernorm = LayerNormalization()

  def call(self,x):
    x = self.add([x,self.seq(x)])
    x = self.layernorm(x)
    return x

# Encoder

In [None]:
class EncoderLayer(Layer):

  def __init__(self, dModel, numHeads, dff, dropoutRate=0.1, **kwargs):

    super.__init__(**kwargs)

    self.globalSelfAttention = GlobalSelfAttention(
        num_heads = numHeads,
        key_dim = dModel//numHeads,
        dropout = dropoutRate
    )

    self.ffn = FeedForward(dff=dff, dModel = dModel, dropoutRate = dropoutRate)

  def call(self,x):
    x = self.globalSelfAttention(x)
    x = self.ffn(x)
    return x

class Encoder(Layer):
  def __init__(self,
               numLayers,
               dModel,
               numHeads,
               sourceVocabSize,
               maximumPositionEncoding,
               dff, dropoutRate, **kwargs):
    super.__init__(**kwargs)
    self.dModel = dModel
    self.numLayers = numLayers

    self.positionalEmbedding = PositionalEmbedding(vocabSize = sourceVocabSize, dModel = dModel,
                                                   maximumPositionEncoding = maximumPositionEncoding)

    self.encoderLayers = [
        EncoderLayer(dModel = dModel, dff = dff, numHeads = numHeads, dropoutRate = dropoutRate) for _ in range(numLayers)
    ]

    self.dropout = Dropout(rate=dropoutRate)

  def call(self,x):
    x = self.positionalEmbedding(x)
    x = self.dropout(x)
    for encoderLayer in self.encoderLayers:
      x = encoderLayer(x=x)
    return x

In [None]:
def positional_encoding(length,depth):
  depth = depth/2
  positions = np.arange(length)[:,np.newaxis]
  depths = np.arange(depth)[np.newaxis,:]/depth

  angleRates = 1/(10000**depths)
  angleRads = positions * angleRates
  posEncoding = np.concatenate([np.sin(angleRads), np.cos(angleRads)],axis=-1)
  return tf.cast(posEncoding,dtype=tf.float32)

class PositionalEmbedding(Layer):
  def __init__(self,vocabSize,dModel, maximomPositonEncoding,**kwargs):
    super.__init__(**kwargs)

    self.embedding = Embedding(
        input_dim = vocabSize, output_dim = dModel,mask_zero=True
    )

    self.posEncoding = position_encoding(length = maximumPositionEncoding,
                                         depth=dModel)
    self.dModel = dModel

  def call(self,x):
    seqLen = tf.shape(x)[-1]

    x = self.embedding(x)
    x *= tf.math.sqrt(tf.cast(self.DModel,tf.float32))
    x += self.posEncoding[tf.newaxis, :seqLen, : ]
    return x