In [2]:
import logging
import time
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_text
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers import Tokenizer, trainers
from datasets import load_dataset

In [3]:
# import requests

# # URL of the file
# url = "https://archive.org/stream/obrascompletasd03saavgoog/obrascompletasd03saavgoog_djvu.txt"

# # Send a HTTP request to the URL
# response = requests.get(url)

# # Check if the request was successful
# if response.status_code == 200:
#     # Write the contents of the response to a file
#     with open("cervantes_text.txt", "wb") as file:
#         file.write(response.content)
#     print("File downloaded and saved as 'cervantes_text.txt'")
# else:
#     print("Failed to retrieve the file")

In [4]:
# Create a BPE tokenizer
vocab_size = 5000
tokenizer = Tokenizer(BPE())

# Configure the trainer
trainer = BpeTrainer(special_tokens=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"], vocab_size=vocab_size)

# Train the tokenizer on your text files
# files = ['/root/Projects/Transformers/shakespear.txt']
files = ['/root/Projects/Transformers/cervantes_text.txt']
# read text from shakespear
with open('/root/Projects/Transformers/cervantes_text.txt', 'r') as f:
    text = f.read()

tokenizer.train(files, trainer)
tokenizer.save("my_custom_bpe_tokenizer.json")
# Assume `tokens` is your list of tokens from the Shakespeare text

tokens = tokenizer.encode(text).ids






In [5]:
# Set sequence length and split ratio for validation set
sequence_length = 128
val_split = 0.20
batch_size = 96
d_model = 192

# Create xs and ys
xs = []
ys = []
for i in range(len(tokens) - sequence_length):
    xs.append(tokens[i:i + sequence_length])
    ys.append(tokens[i+1 : i+1+ sequence_length])

# Convert xs and ys to numpy arrays
xs = np.array(xs)
ys = np.array(ys)

# Calculate the number of samples in the validation set
val_size = int(len(xs) * val_split)

# Split the data into training and validation sets
x_train, x_val = xs[:-val_size], xs[-val_size:]
y_train, y_val = ys[:-val_size], ys[-val_size:]

train_dataset = tf.data.Dataset.from_tensor_slices(((x_train, x_train), y_train)).shuffle(buffer_size=10000).batch(batch_size)
val_dataset = tf.data.Dataset.from_tensor_slices(((x_val, x_val), y_val)).batch(batch_size)

# take 1 from train_dataset
for x, y in train_dataset.take(1):
    print(x[0].shape, y[0].shape)
for x, y in train_dataset.take(1):
    tr_sample = x

(96, 128) (128,)


In [6]:
def positional_encoding(length, depth):
  depth = depth/2

  positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
  depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)

  angle_rates = 1 / (10000**depths)         # (1, depth)
  angle_rads = positions * angle_rates      # (pos, depth)

  pos_encoding = np.concatenate(
      [np.sin(angle_rads), np.cos(angle_rads)],
      axis=-1) 

  return tf.cast(pos_encoding, dtype=tf.float32)

class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.d_model = d_model
    self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True) 
    self.pos_encoding = positional_encoding(length=2048, depth=d_model)

  def compute_mask(self, *args, **kwargs):
    return self.embedding.compute_mask(*args, **kwargs)

  def call(self, x):
    length = tf.shape(x)[1]
    x = self.embedding(x)
    # This factor sets the relative scale of the embedding and positonal_encoding.
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x = x + self.pos_encoding[tf.newaxis, :length, :]
    return x
  
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x,
        use_causal_mask = True)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x
  
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1):
    super().__init__()
    self.seq = tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model),
      tf.keras.layers.Dropout(dropout_rate)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = self.add([x, self.seq(x)])
    x = self.layer_norm(x) 
    return x
  
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               *,
               d_model,
               num_heads,
               dff,
               dropout_rate=0.1):
    super(DecoderLayer, self).__init__()

    self.causal_self_attention = CausalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x, context):
    x = self.causal_self_attention(x=x)

    x = self.ffn(x)  # Shape `(batch_size, seq_len, d_model)`.
    return x
class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size,
               dropout_rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                             d_model=d_model)
    self.dropout = tf.keras.layers.Dropout(dropout_rate)
    self.dec_layers = [
        DecoderLayer(d_model=d_model, num_heads=num_heads,
                     dff=dff, dropout_rate=dropout_rate)
        for _ in range(num_layers)]

    self.last_attn_scores = None

  def call(self, x, context):
    # `x` is token-IDs shape (batch, target_seq_len)
    x = self.pos_embedding(x)  # (batch_size, target_seq_len, d_model)
    x = self.dropout(x)

    for i in range(self.num_layers):
      x  = self.dec_layers[i](x, context)

    # The shape of x is (batch_size, target_seq_len, d_model).
    return x
  
class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               input_vocab_size, target_vocab_size, dropout_rate=0.1):
    super().__init__()

    self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=target_vocab_size,
                           dropout_rate=dropout_rate)

    self.final_layer = tf.keras.layers.Dense(target_vocab_size)

  def call(self, inputs):
    # To use a Keras model with `.fit` you must pass all your inputs in the
    # first argument.
    context, x  = inputs
    x = self.decoder(x, context)  # (batch_size, target_len, d_model)

    # Final linear layer output.
    logits = self.final_layer(x)  # (batch_size, target_len, target_vocab_size)

    try:
      # Drop the keras mask, so it doesn't scale the losses/metrics.
      # b/250038731
      del logits._keras_mask
    except AttributeError:
      pass

    # Return the final output and the attention weights.
    return logits

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    step = tf.cast(step, dtype=tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
  

def masked_loss(label, pred):
  mask = label != 0
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss


def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  match = label == pred

  mask = label != 0
  match = match & mask

  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match)/tf.reduce_sum(mask)


In [7]:
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
dropout_rate = 0.1
learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=vocab_size,
    target_vocab_size=vocab_size,
    dropout_rate=dropout_rate)

learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

transformer.compile(
    loss=masked_loss,
    optimizer=optimizer,
    metrics=[masked_accuracy])

output = transformer((x[0], x[1]))
transformer.summary()
transformer.fit(train_dataset,
                epochs=10,
                validation_data=val_dataset)

# Build the model by running it on some dummy data
dummy_input = (tf.random.uniform((1, 128), minval=0, maxval=vocab_size, dtype=tf.int32),
               tf.random.uniform((1, 128), minval=0, maxval=vocab_size, dtype=tf.int32))
_ = transformer(dummy_input)

# Print the model summary
transformer.summary()

Model: "transformer"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder (Decoder)           multiple                  3278848   
                                                                 
 dense_8 (Dense)             multiple                  645000    
                                                                 
Total params: 3,923,848
Trainable params: 3,923,848
Non-trainable params: 0
_________________________________________________________________
Epoch 1/10


KeyboardInterrupt: 

In [None]:
def generate_text(transformer, tokenizer, prompt, max_length=300):
    input_tokens = tokenizer.encode(prompt).ids

    while len(input_tokens) < sequence_length:
        input_tokens.append(0)
    
    input_tensor = tf.convert_to_tensor([input_tokens])
    
    for i in range(max_length):
        logits = transformer([input_tensor[:, -sequence_length:], input_tensor[:, -sequence_length:]])[0, -1]
        
        probs = tf.nn.softmax(logits)
        sampled_token = np.random.choice(len(probs), p=probs.numpy())
        
        input_tokens.append(sampled_token)
        input_tensor = tf.convert_to_tensor([input_tokens])

    pieces = [tokenizer.id_to_token(token_id) for token_id in input_tokens]
    
    text = ''.join([piece.replace('▁', ' ') for piece in pieces])
    
    return text

prompt = "En un lugar de la mancha de cuyo nombre no quiero acordarme."  # You can change this prompt
generated_text = generate_text(transformer, tokenizer, prompt, max_length=500)
print(generated_text)