In [1]:
import os
import traceback
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import shutil
import numpy as np
import random as  rnd

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import Input

from termcolor import colored

rnd.seed(32)

In [2]:
tmp_txt = '/kaggle/input/text-corpus/shakespeare_data.txt'
with open(tmp_txt) as file:
    text = file.read()

In [8]:
vocab = sorted(set(text))
vocab.insert(0, '[UNK]')
vocab.insert(1, '')

In [9]:
n_train = int(0.8*len(text))
n_val = int(0.1*len(text))
n_test = len(text)- (n_train+n_val)

train_ds = text[:n_train]
val_ds = text[n_train: n_train+n_val]
test_ds = text[n_train+n_val:]


In [10]:
len(train_ds), len(val_ds), len(test_ds)

(4227103, 528387, 528389)

In [105]:
def sequence_to_tensor(sequence, vocab):
    if isinstance(sequence, (list)):
        sequence = '\n'.join(sequence)
    chars = tf.strings.unicode_split(sequence, input_encoding='UTF-8')
    indexes = tf.keras.layers.StringLookup(vocabulary=vocab, mask_token=None)(chars)
    
    return indexes

In [12]:
def idx_to_chars(indexes, vocab):
    chars = tf.keras.layers.StringLookup(vocabulary=vocab, invert=True, mask_token=None)
    
    return tf.strings.reduce_join(chars(indexes), axis=-1)

In [13]:
def split_input_target(sequence):
    input_sequence = sequence[:-1]
    target_sequence = sequence[1:]
    
    return input_sequence, target_sequence

In [14]:
def prepare_dataset(text, vocab, sequence_length=30, batch_size=64):
    BUFFER = 10000
    
    if isinstance(text, (list)):
        text = '\n'.join(text)
        
    text_to_tensor = sequence_to_tensor(text, vocab)
    idx_dataset =  tf.data.Dataset.from_tensor_slices(tensors=text_to_tensor)
    
    datagen = idx_dataset.batch(sequence_length +1, drop_remainder=True)
    dataset_xy = datagen.map(split_input_target)
    
    dataset = (
        dataset_xy
        .shuffle(BUFFER)
        .batch(batch_size)
        .prefetch(tf.data.experimental.AUTOTUNE)
    )
    
    return dataset

In [15]:
tf.random.set_seed(32)
train_dataset = prepare_dataset(train_ds, sequence_length=100, vocab=vocab, batch_size=32)

In [16]:
class GRULM(tf.keras.Model):
    def __init__(self, vocab_size=256, rnn_units=128, embedding_dim=256):
        super().__init__(self)
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size, activation='log_softmax')
        
    def call(self, inputs, states=None, return_states=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)
        
        if return_states:
            return x, states
        else:
            return x

In [17]:
def compile_model(model):
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    optimizer = tf.keras.optimizers.Adam()
    
    model.compile(loss=loss, optimizer=optimizer)
    
    return model

In [18]:
def log_perplexity(preds, target):
    
    PADDING_ID = 1
    
    log_p = np.sum(tf.one_hot(target,preds.shape[-1]) * preds, axis= -1)
    non_pad = 1.0 - np.equal(target, PADDING_ID)
    log_p = log_p * non_pad
    log_ppx = np.sum(log_p, axis=-1) / np.sum(non_pad, axis=-1)
    log_ppx = np.mean(log_ppx)
        
    return -log_ppx

In [35]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(vocab)

# The embedding dimension
embedding_dim = 256

# RNN layers
rnn_units = 512

model = GRULM(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units = rnn_units)

In [20]:
BATCH_SIZE = 64
model.build(input_shape=(BATCH_SIZE, 100))
model.call(inputs=Input(shape=(100)))
model.summary()

Model: "grulm"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (None, 100, 256)          20992     
                                                                 
 gru (GRU)                   [(None, 100, 512),        1182720   
                              (None, 512)]                       
                                                                 
 dense (Dense)               (None, 100, 82)           42066     
                                                                 
Total params: 1245778 (4.75 MB)
Trainable params: 1245778 (4.75 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [25]:
EPOCHS = 10
model = compile_model(model)
history = model.fit(train_dataset, epochs=EPOCHS)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [40]:
output_dir = './first-nlp-model/'

try:
    shutil.rmtree(output_dir)
except OSError as e:
    pass

model.save_weights(output_dir)

In [41]:
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 512

model = GRULM(vocab_size=vocab_size, embedding_dim=embedding_dim, rnn_units = rnn_units)
model.build(input_shape=(100, vocab_size))
model.load_weights('./first-nlp-model/')

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7e1a7393d270>

In [33]:
full_dataset = train_dataset.concatenate(val_dataset)

In [36]:
EPOCHS = 10
model = compile_model(model)
history = model.fit(full_dataset, epochs=EPOCHS)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [37]:
test_dataset = prepare_dataset(test_ds, sequence_length=100, vocab=vocab, batch_size=64)

In [106]:
for x, target in test_dataset.take(1):
    i=5
    print(idx_to_chars(x[i],vocab).numpy())
    
    preds, status = model(x, training=False, states=None, return_states=True)
    
    sampled_indices = tf.math.argmax(preds[i], axis=-1)
    print(idx_to_chars(sampled_indices, vocab).numpy())
    print(idx_to_chars(target[i], vocab).numpy())


b' power, and so stand aloof for more serious\n\twooing. But I protest to thee, pretty one, my\n\tauthorit'
b'truers\nand to dhrnd usonf,mor tere.ttrvous,\tTirdng  Wut t wratest th she   arosty lnes\nay \tfrnhority'
b'power, and so stand aloof for more serious\n\twooing. But I protest to thee, pretty one, my\n\tauthority'


In [172]:
eval_ids = sequence_to_tensor(test_ds, vocab)
input_ids, target_ids = split_input_target(eval_ids)

preds, status = model(tf.expand_dims(input_ids, 0), training=False, states=None, return_states=True)

#Get the log perplexity
log_ppx = log_perplexity(preds, tf.expand_dims(target_ids, 0))
print(f'log perplexity: {log_ppx}\nperplexity:  {np.exp(log_ppx)}')

log perplexity: 1.3044070955527076
perplexity:  3.685503294490197


In [51]:
def temperature_random_sampling(log_probs, temperature=1.0):
   
    u = tf.random.uniform(minval=1e-6, maxval=1.0 - 1e-6, shape=log_probs.shape)
    
    # Apply the Gumbel distribution transformation for randomness
    g = -tf.math.log(-tf.math.log(u))
    
    # Adjust the logits with the temperature and choose the character with the highest score
    return tf.math.argmax(log_probs + g * temperature, axis=-1)

In [152]:
class GenerativeModel(tf.keras.Model):
    def __init__(self, model, vocab, temperature=1.0):
        super().__init__()
        self.model = model
        self.vocab = vocab
        self.temperature = temperature
        
        
    
    @tf.function
    def generate_one_step(self, inputs, states=None):
        input_idx = sequence_to_tensor(inputs, vocab)
        
        predicted_logits, states = self.model(
            input_idx, states=states, return_states=True
        )
        
        predicted_logits = predicted_logits[0,-1,:]
        
        predicted_idx = temperature_random_sampling(
            predicted_logits, self.temperature
        )
        
        predicted_chars = idx_to_chars([predicted_idx], vocab)
        
        return tf.expand_dims(predicted_chars, 0), states
    
    
    
    def generate_n_steps(self, prefix, n_steps):
        
        states = None
        next_char = tf.constant([prefix])
        result = [next_char]
        
        for n in range(n_steps):
            next_char, states = self.generate_one_step(next_char, states=states)
            result.append(next_char)
            
        return tf.strings.join(result)[0].numpy().decode('utf-8')

In [171]:
tf.random.set_seed(32)

gen = GenerativeModel(model, vocab, temperature=0.45)

print(gen.generate_n_steps(n_steps=500, prefix='Dear'),'\n','_'*30)

Dear the stakes and sack and promise of the same.

LUCETTA	I do not take my lord and all these fiends,
	That she will do not the better have the law
	And seek the bolt under the heart.

	[Exit PROTEUS]

	How now, my lord, I will perceive excellent
	of the stars of a man as we are but second to his country.

	[Exit POLONIUS]

	Look, how now, kinsman! no more than the matter?

EDGAR	The soul of death I will not see the way
	To the king is at home, the sea, and for the
	king of perjury that we did not  
 ______________________________
