In [69]:
vocab_size = 2000
m = 32
embedding_size = 64
training_prompt = "Take me to the lakes"

## Installs, Imports

In [None]:
!pip install tensorflow_text
!pip install sentencepiece

In [14]:
import tensorflow as tf
import tensorflow_text as text
import tensorflow_datasets as tfds
import io
import datetime
import tqdm
import tensorflow.keras as K
import sentencepiece as sp
import math
import tqdm
import logging
import re

In [62]:
tf.autograph.set_verbosity(0, True)

## Dataset, Preprocessing, Tokenization

In [70]:
# Download Lyrics
path = K.utils.get_file("lyrics", origin="https://raw.githubusercontent.com/irenetrampoline/taylor-swift-lyrics/master/all_tswift_lyrics.txt")

In [71]:
# read lyrics into string
with open(path) as f:
    lyrics = f.read()

In [72]:
# basic pre-processing, i.e. removing annotations in "[]" and removing special characters
lyrics = re.sub(r'\[.*\]', "", lyrics)
lyrics = re.sub(r'[^\w\d\s]', "", lyrics)

In [73]:
# write pre-processed text to file
with open("lyrics.txt", "w") as f:
  f.write(lyrics)

In [74]:
# Train SentencePieceTokenizer on lyrics
sp.SentencePieceTrainer.train(input="lyrics.txt", model_prefix='tokenizer_model', model_type="unigram", vocab_size=vocab_size)

In [75]:
# deserialize the trained model file to load it in the correct format
trained_tokenizer_model = tf.io.gfile.GFile('tokenizer_model.model', "rb").read()

# load the model as a tokenizer that can be used inside a tensorflow model
tokenizer = text.SentencepieceTokenizer(
    model=trained_tokenizer_model, out_type=tf.int32, nbest_size=-1, alpha=1, reverse=False,
    add_bos=False, add_eos=False, return_nbest=False, name=None
)

# tokenize lyrics
tokenized_text = tokenizer.tokenize(lyrics)

# create sliding window inputs of length m+1 
input_text = text.sliding_window(tokenized_text, m + 1)

In [76]:
# check which token is used for padding
spm = sp.SentencePieceProcessor()
spm.load_from_serialized_proto(trained_tokenizer_model)
print(spm.piece_to_id('<pad>')) 

0


In [77]:
# create training data from tokenized text
train_data = tf.data.Dataset.from_tensor_slices(input_text)

# create targets
train_data = train_data.map(lambda x: (x[:m], x[m:m+1]))

# shuffle, batch
train_data = train_data.shuffle(1000)
train_data = train_data.batch(10)


## Model

In [78]:
class EmbeddingLayer(K.layers.Layer):
  
  def __init__(self, m=32, vocab_size=2000, embedding_size=64):
    super(EmbeddingLayer, self).__init__()

    self.m = m
    self.vocab_size = vocab_size
    self.embedding_size = embedding_size

    self.positional_embedding_layer = K.layers.Embedding(input_dim=self.m, output_dim=self.embedding_size)
    self.token_embedding_layer = K.layers.Embedding(input_dim=self.vocab_size, output_dim=self.embedding_size)

  def call(self, inputs):

    positional_embedding = self.positional_embedding_layer(tf.range(0, self.m))
    token_embedding = self.token_embedding_layer(inputs)
    y = positional_embedding + token_embedding
    
    return y

In [79]:
class TransformerBlock(K.layers.Layer):

  def __init__(self, embedding_size=64):

    super(TransformerBlock, self).__init__()

    self.embedding_size = embedding_size

    self.mha_layer = K.layers.MultiHeadAttention(num_heads=2, key_dim=embedding_size)

    self.dense_layer1 = K.layers.Dense(units=self.embedding_size, activation=K.activations.relu)
    self.dense_layer2 = K.layers.Dense(units=self.embedding_size)

    self.dropout_layer1 = K.layers.Dropout(rate=0.1)
    self.dropout_layer2 = K.layers.Dropout(rate=0.1)

    self.norm_layer1 = K.layers.LayerNormalization(epsilon=1e-6)
    self.norm_layer2 = K.layers.LayerNormalization(epsilon=1e-6)


  def call(self, inputs):
    
    out = self.mha_layer(inputs, inputs)
    out = self.dropout_layer1(out)
    out = K.layers.Add()([inputs, out])
    in_out = self.norm_layer1(out)

    in_out2 = self.dense_layer1(in_out)
    in_out2 = self.dense_layer2(in_out2)
    in_out2 = self.dropout_layer2(in_out2)

    y = K.layers.Add()([in_out, in_out2])
    y = self.norm_layer2(y)

    return y

In [80]:
class MyModel(K.Model):

  def __init__(self, tokenizer, m=32, vocab_size=2000, embedding_size=64):
    super(MyModel, self).__init__()

    self.tokenizer = tokenizer
    self.m = m
    self.vocab_size = vocab_size
    self.embedding_size = embedding_size

    self.optimizer = K.optimizers.Adam()
    self.loss_function = K.losses.SparseCategoricalCrossentropy(from_logits=True)
  
    self.metrics_list = [
                        tf.keras.metrics.Mean(name="loss"),
                        tf.keras.metrics.CategoricalAccuracy(name="acc"),
                        tf.keras.metrics.TopKCategoricalAccuracy(3,name="top-3-acc") 
                       ]

    self.embedding_layer = EmbeddingLayer(self.m, self.vocab_size, self.embedding_size)
    self.transformer_layer = TransformerBlock(self.embedding_size)

    self.global_pool_layer = K.layers.GlobalMaxPool1D()
    self.dense_layer = K.layers.Dense(units=self.vocab_size)

  def call(self, input):
    
    x = self.embedding_layer(input)
    x = self.transformer_layer(x)
    x = self.global_pool_layer(x)
    y = self.dense_layer(x)

    return y

  def reset_metrics(self):
    for metric in self.metrics:
      metric.reset_states()

  @tf.function
  def train_step(self, data):
    
    x, targets = data
    
    with tf.GradientTape() as tape:
      predictions = self(x, training=True)
      
      loss = self.loss_function(targets, predictions) + tf.reduce_sum(self.losses)
        
    gradients = tape.gradient(loss, self.trainable_variables)
    self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        
    # update loss metric
    self.metrics[0].update_state(loss)
        
    # for all metrics except loss, update states (accuracy etc.)
    for metric in self.metrics[1:]:
      metric.update_state(targets,predictions)

    # Return a dictionary mapping metric names to current value
    return {m.name: m.result() for m in self.metrics}

  def generate_text(self, prompt, top_k=50):
 
    #tokenize, add batch dimension and pad prompt
    prompt = self.tokenizer.tokenize(prompt)
    length = len(prompt)
    prompt = tf.expand_dims(prompt, axis=0)
    prompt = tf.pad(prompt, [[0,0], [self.m - prompt.shape[1], 0]], mode="CONSTANT", constant_values=0)

    # range of how many additional words to generate
    rng = self.m - length

    for i in range(rng):

      # get probability for tokens
      logits = self.call(prompt)

      # top_k sampling
      values, indices = tf.math.top_k(logits, k=top_k, sorted=True)
      indices = tf.cast(indices, tf.float32)

      # random next token
      next_token = tf.random.categorical(indices, num_samples=1, dtype=tf.int32, seed=42)

      # avoid pad token as next token
      while (next_token == 0):
        next_token = tf.random.categorical(indices, num_samples=1, dtype=tf.int32, seed=42)

      # add new word to prompt
      prompt = tf.concat((prompt, next_token), axis=1)

      # truncate beginning of prompt
      prompt = tf.slice(prompt, [0, 1], [1, self.m])

    print(self.tokenizer.detokenize(prompt))
    return self.tokenizer.detokenize(prompt)

## Tensorboard

In [None]:
# load tensorboard extension
%load_ext tensorboard

# define file-path for log file
file_path = "test_logs/test"

# define the tf file-writer (we usually use a separate one for train and validation)
summary_writer = tf.summary.create_file_writer(file_path)

loss_function = tf.keras.losses.MeanSquaredError()

# write 100 logs for loss


for i in range(100):
    
    # compute loss (here targets and predictions would come from the data and the model)
    targets = tf.constant([0.3,0.3,-0.8])
    predictions = targets + tf.random.normal(shape=targets.shape, stddev=100/(i+1)) # decreasing noise
    
    loss = loss_function(targets,predictions)
    
    # image batch (these would be obtained from the model)
    
    image_batch = tf.random.uniform(shape=(32,28,28,1),dtype=tf.float32)
    
    
    # audio batch (would be obtained from the model but here it's just a hard coded sine wave of 110hz)
    
    x = 2* math.pi*tf.cast(tf.linspace(0,32000*5, 32000*5), tf.float32)*110/32000
    x = tf.expand_dims(x, axis=0) # add batch dimension
    x = tf.expand_dims(x, axis=-1) # add last dimension
    x = tf.repeat(x, 32, axis=0) # repeat to have a batch of 32
    audio_batch = tf.math.sin(x) # obtain sine wave
    
    
    # text (this would be the output of a language model after one training epoch)
    
    text = tf.constant("This is the sampled output of a language model")
    
    
    # histogram (e.g. of activations of a dense layer during training)
    
    activations_batch = tf.random.normal(shape=(32,20,1))
    min_activations = tf.reduce_min(activations_batch, axis=None)
    max_activations = tf.reduce_max(activations_batch, axis=None)
    histogram = tf.histogram_fixed_width_bins(activations_batch, 
                                              value_range=[min_activations, max_activations])
    
    
    # now we want to write all the data to a log-file.
    with summary_writer.as_default():
        
        # save the loss scalar for the "epoch"
        tf.summary.scalar(name="loss", data=loss, step=i)
        
        # save a batch of images for this epoch (have to be between 0 and 1)
        tf.summary.image(name="generated_images",data = image_batch, step=i, max_outputs=32)
        
        # save the batch of audio for this epoch
        tf.summary.audio(name="generated_audio", data = audio_batch, 
                         sample_rate = 32000, step=i, max_outputs=32)
        
        # save the generated text for that epoch
        tf.summary.text(name="generated_text", data = text, step=i)
        
        # save a histogram (e.g. of activations in a layer)
        tf.summary.histogram(name="layer_N_activations", data = histogram, step=i)

In [None]:
# Define where to save the log
hyperparameter_string= "training"
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

train_log_path = f"logs/{hyperparameter_string}/{current_time}/train"

# log writer for training metrics
train_summary_writer = tf.summary.create_file_writer(train_log_path)

In [None]:
# open the tensorboard to inspect the data for the 100 steps
%tensorboard --logdir test_logs/

## Train

In [None]:
model = MyModel(tokenizer, m=m, vocab_size=vocab_size, embedding_size=embedding_size)

In [64]:
for epoch in range(100):
    
    print(f"Epoch {epoch}:")
    
    # Training:
    
    for data in tqdm.notebook.tqdm(train_data, position=0, leave=True):
      metrics = model.train_step(data)
    
    # print the metrics
    print([f"{key}: {value}" for (key, value) in zip(list(metrics.keys()), list(metrics.values()))])
    print(model.generate_text(training_prompt))
    
    # logging the validation metrics to the log file which is used by tensorboard
    #with train_summary_writer.as_default():
        #for metric in model.metrics:
            #tf.summary.scalar(f"{metric.name}", metric.result(), step=epoch)
    
    # reset all metrics (requires a reset_metrics method in the model)
    #model.reset_metrics()

    print("\n")

Epoch 0:


  0%|          | 0/6327 [00:00<?, ?it/s]

['loss: 6.032236576080322', 'acc: 0.0001264442253159359', 'top-3-acc: 0.0019756911788135767']
tf.Tensor([b'Take me to the lakes and in knowing that is Andd You nownt me itve it up all Im andingll itve itve'], shape=(1,), dtype=string)
tf.Tensor([b'Take me to the lakes and in knowing that is Andd You nownt me itve it up all Im andingll itve itve'], shape=(1,), dtype=string)


Epoch 1:


  0%|          | 0/6327 [00:00<?, ?it/s]

['loss: 5.65757942199707', 'acc: 0.002750162035226822', 'top-3-acc: 0.007507625967264175']
tf.Tensor([b'Take me to the lakes when just me the now were love now But that all my this were up know in love this isingll what were back'], shape=(1,), dtype=string)
tf.Tensor([b'Take me to the lakes when just me the now were love now But that all my this were up know in love this isingll what were back'], shape=(1,), dtype=string)


Epoch 2:


  0%|          | 0/6327 [00:00<?, ?it/s]

['loss: 5.414196491241455', 'acc: 0.0024972734972834587', 'top-3-acc: 0.007818467915058136']
tf.Tensor([b'Take me to the lakes we this when just your when of  But to nevernana what was like my was what itd back ant and'], shape=(1,), dtype=string)
tf.Tensor([b'Take me to the lakes we this when just your when of  But to nevernana what was like my was what itd back ant and'], shape=(1,), dtype=string)


Epoch 3:


  0%|          | 0/6327 [00:00<?, ?it/s]

KeyboardInterrupt: ignored

## Generate Text

In [None]:
prompt = "Sweet tea in the summer"
model.generate_text(prompt)