In [None]:
!pip install unidecode

Collecting unidecode
  Downloading Unidecode-1.3.6-py3-none-any.whl (235 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.9/235.9 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.6


In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
from unidecode import unidecode
import random

# Data Preparation

## Load text

In [None]:
file=open("clean_lyrics_without_parenthese.txt","r")
input=[]
teacher_forcing=[]
target=[]
prev=None
for line in file:
  if line.startswith(">>>") or line.startswith("<sos>") or line.startswith("<eos>"):
    continue
  if prev==None:
    line=unidecode(line)
    prev=line
  else:
    line=unidecode(line)
    input.append("<sol> "+prev[:-1])
    teacher_forcing.append("<sol> "+line[:-1])
    target.append(line[:-1]+" <eol>")
    prev=None

In [None]:
for i in range(5):
  print(input[i])
  print(teacher_forcing[i])
  print(target[i])
  print("****")

<sol> I'm doing good, I'm on some new shit
<sol> Been saying "yes" instead of "no"
Been saying "yes" instead of "no" <eol>
****
<sol> I thought I saw you at the bus stop, I didn't though
<sol> I hit the ground running each night
I hit the ground running each night <eol>
****
<sol> I hit the Sunday matinee
<sol> You know the greatest films of all time were never made
You know the greatest films of all time were never made <eol>
****
<sol> I guess you never know, never know
<sol> And if you wanted me, you really should've showed
And if you wanted me, you really should've showed <eol>
****
<sol> And if you never bleed, you're never gonna grow
<sol> And it's alright now
And it's alright now <eol>
****


In [None]:
corpus=list(zip(input,teacher_forcing,target))
for i in range(5):
  print(corpus[i])

("<sol> I'm doing good, I'm on some new shit", '<sol> Been saying "yes" instead of "no"', 'Been saying "yes" instead of "no" <eol>')
("<sol> I thought I saw you at the bus stop, I didn't though", '<sol> I hit the ground running each night', 'I hit the ground running each night <eol>')
('<sol> I hit the Sunday matinee', '<sol> You know the greatest films of all time were never made', 'You know the greatest films of all time were never made <eol>')
('<sol> I guess you never know, never know', "<sol> And if you wanted me, you really should've showed", "And if you wanted me, you really should've showed <eol>")
("<sol> And if you never bleed, you're never gonna grow", "<sol> And it's alright now", "And it's alright now <eol>")


In [None]:
random.shuffle(corpus)
for i in range(5):
  print(corpus[i])

('<sol> I can taste the midnight on your lips', '<sol> Makes me just wanna lean in for another kiss', 'Makes me just wanna lean in for another kiss <eol>')
('<sol> My daddy told me: "Slow down, boy, you\'re goin\' to blow it!"', "<sol> And I ain't gotta stop the beat a minute to tell Shady I love him", "And I ain't gotta stop the beat a minute to tell Shady I love him <eol>")
('<sol> Al final, baby, tu extranas como yo te toco', '<sol> Recuerda, de tu cuerpo se yo, mami', 'Recuerda, de tu cuerpo se yo, mami <eol>')
('<sol> I called an old friend thinking that the trouble would wait', '<sol> But then I jumped right in a week later, returned', 'But then I jumped right in a week later, returned <eol>')
("<sol> Know it's been a while, baby, a while", '<sol> Do you still feel the fire?', 'Do you still feel the fire? <eol>')


In [None]:
input,teacher_forcing,target=zip(*corpus)

In [None]:
for i in range(5):
  print(input[i])
  print(teacher_forcing[i])
  print(target[i])
  print("****")

<sol> I can taste the midnight on your lips
<sol> Makes me just wanna lean in for another kiss
Makes me just wanna lean in for another kiss <eol>
****
<sol> My daddy told me: "Slow down, boy, you're goin' to blow it!"
<sol> And I ain't gotta stop the beat a minute to tell Shady I love him
And I ain't gotta stop the beat a minute to tell Shady I love him <eol>
****
<sol> Al final, baby, tu extranas como yo te toco
<sol> Recuerda, de tu cuerpo se yo, mami
Recuerda, de tu cuerpo se yo, mami <eol>
****
<sol> I called an old friend thinking that the trouble would wait
<sol> But then I jumped right in a week later, returned
But then I jumped right in a week later, returned <eol>
****
<sol> Know it's been a while, baby, a while
<sol> Do you still feel the fire?
Do you still feel the fire? <eol>
****


## Tokenize using text vectorization

lyrics contain simple words so subword tokenization seems unnecessary

In [None]:
vocab_size=5000
max_length=25
text_vec_layer=tf.keras.layers.TextVectorization(vocab_size,output_sequence_length=max_length)
text_vec_layer.adapt(input)
text_vec_layer.adapt(target)

In [None]:
X_train=tf.constant(input[:25000])
X_valid=tf.constant(input[25000:])
X_train_dec=tf.constant(teacher_forcing[:25000])
X_valid_dec=tf.constant(teacher_forcing[25000:])
Y_train=tf.constant(target[:25000])
Y_valid=tf.constant(target[25000:])

In [None]:
X_train_vec=text_vec_layer(X_train)
X_valid_vec=text_vec_layer(X_valid)
X_train_dec_vec=text_vec_layer(X_train_dec)
X_valid_dec_vec=text_vec_layer(X_valid_dec)
Y_train_vec=text_vec_layer(Y_train)
Y_valid_vec=text_vec_layer(Y_valid)

## Embedding

In [None]:
embed_size=256
embed_layer=tf.keras.layers.Embedding(vocab_size,embed_size,mask_zero=True)

In [None]:
X_train_embed=embed_layer(X_train_vec)
X_valid_embed=embed_layer(X_valid_vec)
X_train_dec_embed=embed_layer(X_train_dec_vec)
X_valid_dec_embed=embed_layer(X_valid_dec_vec)

# Transformer

## Positional Encodings

In [None]:
def positional_encoding(length,depth):
  depth=depth/2

  positions=np.arange(length)[:,np.newaxis]
  depths=np.arange(depth)[np.newaxis,:]/depth

  angle_rates=1/(10000**depths)
  angle_rads=positions*angle_rates

  positional_encoding=np.concatenate([np.sin(angle_rads),np.cos(angle_rads)],axis=-1)

  return tf.cast(positional_encoding,dtype=tf.float32)

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self,vocab_size,d_model):
    super().__init__()
    self.d_model=d_model
    self.embedding=tf.keras.layers.Embedding(vocab_size,d_model,mask_zero=True)
    self.pos_encoding=positional_encoding(length=2048,depth=d_model)

  def compute_mask(self, *args, **kwargs):
    return self.embedding.compute_mask(*args,**kwargs)

  def call(self,tensor):
    length=tf.shape(tensor)[1]
    tensor=self.embedding(tensor)
    tensor*=tf.math.sqrt(tf.cast(self.d_model,tf.float32))
    tensor+=self.pos_encoding[tf.newaxis,:length,:]
    return tensor

## Attention layers

In [None]:
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self,**kwargs):
    super().__init__()
    self.mha=tf.keras.layers.MultiHeadAttention(**kwargs)
    self.layernorm=tf.keras.layers.LayerNormalization()
    self.add=tf.keras.layers.Add()

In [None]:
class CrossAttention(BaseAttention):
  def call(self,tensor,context):
    attention_output,attention_scores=self.mha(query=tensor,key=context,value=context,return_attention_scores=True)
    self.last_attention_scores=attention_scores
    tensor=self.add([tensor,attention_output])
    tensor=self.layernorm(tensor)
    return tensor

In [None]:
class GlobalSelfAttention(BaseAttention):
  def call(self,tensor):
    attention_output=self.mha(query=tensor,value=tensor,key=tensor)
    tensor=self.add([tensor,attention_output])
    tensor=self.layernorm(tensor)
    return tensor

In [None]:
class CausalSelfAttention(BaseAttention):
  def call(self,tensor):
    attention_output=self.mha(query=tensor,value=tensor,key=tensor,use_causal_mask=True)
    tensor=self.add([tensor,attention_output])
    tensor=self.layernorm(tensor)
    return tensor

In [None]:
class FeedForward(tf.keras.layers.Layer):
  def __init__(self,d_model,dff,dropout_rate=0.1):
    super().__init__()
    self.seq=tf.keras.Sequential([
        tf.keras.layers.Dense(dff,activation="relu"),
        tf.keras.layers.Dense(d_model),
        tf.keras.layers.Dropout(dropout_rate)
      ])
    self.add=tf.keras.layers.Add()
    self.layer_norm=tf.keras.layers.LayerNormalization()
  def call(self,tensor):
    tensor=self.add([tensor,self.seq(tensor)])
    tensor=self.layer_norm(tensor)
    return tensor

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,*,d_model,num_heads,dff,dropout_rate=0.1):
    super().__init__()
    self.self_attention=GlobalSelfAttention(num_heads=num_heads,key_dim=d_model,dropout=dropout_rate)
    self.ffn=FeedForward(d_model,dff)
  def call(self,tensor):
    tensor=self.self_attention(tensor)
    tensor=self.ffn(tensor)
    return tensor

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self,*,num_layers,d_model,num_heads,dff,vocab_size,dropout_rate=0.1):
    super().__init__()

    self.d_model=d_model
    self.num_layers=num_layers

    self.pos_embedding=PositionalEmbedding(vocab_size=vocab_size,d_model=d_model)
    self.encoder_layers=[EncoderLayer(d_model=d_model,num_heads=num_heads,dff=dff,dropout_rate=dropout_rate) for _ in range(num_layers)]
    self.dropout=tf.keras.layers.Dropout(dropout_rate)
  def call(self,vec):
    vec=self.pos_embedding(vec)
    vec=self.dropout(vec)
    for i in range(self.num_layers):
      vec=self.encoder_layers[i](vec)
    return vec

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,*,d_model,num_heads,dff,dropout_rate=0.1):
    super(DecoderLayer,self).__init__()
    self.causal_self_attention=CausalSelfAttention(num_heads=num_heads,key_dim=d_model,dropout=dropout_rate)
    self.cross_attention=CrossAttention(num_heads=num_heads,key_dim=d_model,dropout=dropout_rate)
    self.ffn=FeedForward(d_model,dff)

  def call(self,tensor,context):
    tensor=self.causal_self_attention(tensor=tensor)
    tensor=self.cross_attention(tensor=tensor,context=context)

    self.last_attention_scores=self.cross_attention.last_attention_scores

    tensor=self.ffn(tensor)
    return tensor

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self,*,num_layers,d_model,num_heads,dff,vocab_size,dropout_rate=0.1):
    super(Decoder,self).__init__()

    self.d_model=d_model
    self.num_layers=num_layers
    self.pos_embedding=PositionalEmbedding(vocab_size=vocab_size,d_model=d_model)
    self.dropout=tf.keras.layers.Dropout(dropout_rate)
    self.dec_layers=[DecoderLayer(d_model=d_model,num_heads=num_heads,dff=dff,dropout_rate=dropout_rate) for _ in range(num_layers)]

    self.last_attention_scores=None

  def call(self,tensor,context):
    tensor=self.pos_embedding(tensor)
    tensor=self.dropout(tensor)

    for i in range(self.num_layers):
      tensor=self.dec_layers[i](tensor,context)

    self.last_attention_scores=self.dec_layers[-1].last_attention_scores

    return tensor


In [None]:
class Transformer(tf.keras.Model):
  def __init__(self,*,num_layers,d_model,num_heads,dff,input_vocab_size,target_vocab_size,dropout_rate=0.1):
    super().__init__()
    self.encoder=Encoder(num_layers=num_layers,d_model=d_model,num_heads=num_heads,dff=dff,vocab_size=input_vocab_size,dropout_rate=dropout_rate)
    self.decoder=Decoder(num_layers=num_layers,d_model=d_model,num_heads=num_heads,dff=dff,vocab_size=target_vocab_size,dropout_rate=dropout_rate)
    self.final_layer=tf.keras.layers.Dense(target_vocab_size)

  def call(self,inputs):
    context,input=inputs
    context=self.encoder(context)
    input=self.decoder(input,context)

    logits=self.final_layer(input)

    try:
      del logits.__keras_mask
    except AttributeError:
      pass

    return logits

## Parameters

In [None]:
num_layers=12
d_model=256
dff=512
num_heads=8
dropout_rate=0.1

In [None]:
model=Transformer(num_layers=num_layers,d_model=d_model,num_heads=num_heads,dff=dff,input_vocab_size=5000,target_vocab_size=5000,dropout_rate=dropout_rate)

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self,d_model,warmup_steps=6000):
    super().__init__()

    self.d_model=d_model
    self.d_model=tf.cast(self.d_model,tf.float32)

    self.warmup_steps=warmup_steps

  def __call__(self,step):
    step=tf.cast(step,dtype=tf.float32)
    arg1=tf.math.rsqrt(step)
    arg2=step*(self.warmup_steps**-2)
    return tf.math.rsqrt(self.d_model)*tf.math.minimum(arg1,arg2)

In [None]:
learning_rate=CustomSchedule(d_model)
optimizer=tf.keras.optimizers.Adam(learning_rate,beta_1=0.9,beta_2=0.98,epsilon=1e-9)

## Loss and metrics

In [None]:
def masked_loss(label,pred):
  mask=label!=0
  loss_object=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True,reduction="none")
  loss=loss_object(label,pred)

  mask=tf.cast(mask,dtype=loss.dtype)
  loss*=mask

  loss=tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss

def masked_accuracy(label, pred):
  pred=tf.argmax(pred,axis=2)
  label=tf.cast(label,pred.dtype)
  matching=label==pred

  mask=label!=0
  matching=matching & mask

  matching=tf.cast(matching,dtype=tf.float32)
  mask=tf.cast(mask,dtype=tf.float32)
  return tf.reduce_sum(matching)/tf.reduce_sum(mask)

## Callbacks

In [None]:
model_ckpt=tf.keras.callbacks.ModelCheckpoint("punchline",monitor="masked_accuracy",save_best_only=True,save_weights_only=True)

## Train model

In [None]:
model.compile(loss=masked_loss,optimizer=optimizer,metrics=[masked_accuracy])

In [None]:
history=model.fit(x=(X_train_vec,X_train_dec_vec),y=Y_train_vec,batch_size=32,epochs=15,callbacks=[model_ckpt])

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


In [None]:
model.save_weights('./09122300_checkpoints/09122300_checkpoint')

In [None]:
!zip -r 09122300_checkpoints.zip 09122300_checkpoints

  adding: 09122300_checkpoints/ (stored 0%)
  adding: 09122300_checkpoints/09122300_checkpoint.data-00000-of-00001 (deflated 9%)
  adding: 09122300_checkpoints/09122300_checkpoint.index (deflated 81%)
  adding: 09122300_checkpoints/checkpoint (deflated 49%)


In [None]:
from google.colab import files
files.download("09122000_checkpoints.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Inference using model

## Copy trained weights

In [None]:
!unzip 09122200_checkpoints.zip

unzip:  cannot find or open 09120228_checkpoints.zip, 09120228_checkpoints.zip.zip or 09120228_checkpoints.zip.ZIP.


In [None]:
model.load_weights("09122200_checkpoints/09122200_checkpoint")

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7d21280faf50>

## Run inference

In [None]:
class Punchline(tf.Module):
  def __init__(self,text_vec,model):
    self.text_vec=text_vec
    self.model=model
    self.vocab_list=self.text_vec.get_vocabulary()

  def __call__(self,sentence,max_length=30):
    sentence="<sol> "+sentence
    sentence=self.text_vec(sentence)
    sentence=sentence[tf.newaxis]

    decoder_input="<sol>"
    decoder=self.text_vec(decoder_input)
    decoder=decoder[tf.newaxis]

    for i in range(max_length):
      predictions=self.model([sentence,decoder],training=False)
      result=tf.argmax(predictions,axis=-1)[0].numpy().tolist()
      word=self.vocab_list[result[i]]

      if word=="eol":
        break

      decoder_input=decoder_input+" "+word
      decoder=self.text_vec(decoder_input)
      decoder=decoder[tf.newaxis]

    return decoder_input

In [None]:
punchline=Punchline(text_vec_layer,model)

In [None]:
print(punchline("I'm doing good i'm on some new shit"))

<sol> been saying yes instead of no


In [None]:
print(punchline("When you look at me The whole world fade"))

<sol> if your love has [UNK] my name again


In [None]:
print(punchline("It's like snow on the beach"))

<sol> like snow on the beach


In [None]:
print(punchline("Flying in a dream"))

<sol> stars by the pocketful


In [None]:
print(punchline("The clouds are white on the blue sky"))

<sol> and were standing in the middle of the night


In [None]:
print(punchline("Walk down the country road"))

<sol> almost there dont be lonely


In [None]:
print(punchline("Empty bottles from sleepless nights"))

<sol> theres a light in the sky


In [None]:
print(punchline("I promise you'll never find a person like me"))

<sol> now you know that my name they got the floor


In [None]:
print(punchline("When you think of me"))

<sol> hes in the middle of the night


In [None]:
print(punchline("nice to meet you"))

<sol> well i hope youre happy


In [None]:
print(punchline("where have you been"))

<sol> now girl i gotta keep you waiting


In [None]:
print(punchline("ayy ayy ayy"))

<sol> bitch better have to keep you off
