# Text Generation with Neural Networks

## Imports

In [1]:
import torch
from torch import nn
import torch.nn.functional as F
import mlflow
import mlflow.pytorch
import numpy as np
import matplotlib.pyplot as plt
torch.manual_seed(0)

<torch._C.Generator at 0x7f52f27ef170>

## Get Text Data

This is the text we'll use as a basis for our generations: let's try to generate 'Shakespearean' texts.

This text is from Shakespeare's Sonnet 1. It's one of the 154 sonnets written by William Shakespeare that were first published in 1609. This particular sonnet, like many others, discusses themes of beauty, procreation, and the transient nature of life, urging the beautiful to reproduce so their beauty can live on through their offspring.

In [2]:
with open('../../data/shakespeare.txt','r',encoding='utf8') as f:
    text = f.read()

In [3]:
print('First 600 chars: \n')
print(text[:600])

First 600 chars: 


                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,
  And tender churl mak'st waste in niggarding:
    Pity the world, or else th


## Preparing textual data

We need to encode our data to give the model a proper numerical representation of our text.

In [4]:
all_characters = set(text) # creates a set of unique characters found in the text
# all_characters

In [5]:
len(all_characters)

84

In [6]:
decoder = dict(enumerate(all_characters))
# assigns a unique integer to each character in a dictionary format, 
# creating a mapping that can later be used to transform encoded predictions back into characters
# decoder

In [7]:
encoder = {char: ind for ind, char in decoder.items()} 
# reverses the decoder dictionary, providing a mapping from characters to their respective assigned integers, which is used to encode the text.
# encoder

In [8]:
# torch.save(decoder, 'models/decoder.pt')
# torch.save(encoder, 'models/encoder.pt')

In [9]:
encoded_text = np.array([encoder[char] for char in text])
# encodes the entire text as an array of integers, with each integer representing the character at that position
#in the text according to the encoder dictionary
# encoded_text

## One Hot Encoding

One-hot encoding is a way to convert categorical data into a fixed-size vector of numerical values.

This encoding allows the model to treat input data uniformly and is particularly important for models that need to determine the presence or absence of a feature, such as a particular character.

In [10]:
def one_hot_encoder(encoded_text, num_uni_chars):
    '''
    encoded_text : batch of encoded text
    
    num_uni_chars = number of unique characters (len(set(text)))
    '''

    # Create a placeholder for zeros
    one_hot = np.zeros((encoded_text.size, num_uni_chars))
    
    # Convert data type for later use with pytorch
    one_hot = one_hot.astype(np.float32)

    # Using indexing fill in the 1s at the correct index locations
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    
    # Reshape it so it matches the batch sahe
    one_hot = one_hot.reshape((*encoded_text.shape, num_uni_chars))
    
    return one_hot

# Creating Training Batches

Training batches are a way of dividing the dataset into smaller, manageable groups of data points that are fed into a machine learning model during the training process.

In [11]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    
    '''
    Generate (using yield) batches for training.
    
    X: Encoded Text of length seq_len
    Y: Encoded Text shifted by one
    
    Example:
    
    X:
    
    [[1 2 3]]
    
    Y:
    
    [[ 2 3 4]]
    
    encoded_text : Complete Encoded Text to make batches from
    batch_size : Number of samples per batch
    seq_len : Length of character sequence
       
    '''
    
    # Total number of characters per batch
    # Example: If samp_per_batch is 2 and seq_len is 50, then 100
    # characters come out per batch.
    char_per_batch = samp_per_batch * seq_len
    
    # Number of batches available to make
    # Use int() to roun to nearest integer
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    
    # Cut off end of encoded_text that
    # won't fit evenly into a batch
    encoded_text = encoded_text[:num_batches_avail * char_per_batch]
    
    # Reshape text into rows the size of a batch
    encoded_text = encoded_text.reshape((samp_per_batch, -1))

    # Go through each row in array.
    for n in range(0, encoded_text.shape[1], seq_len):
        # Grab feature characters
        x = encoded_text[:, n:n+seq_len]
        # y is the target shifted over by 1
        y = np.zeros_like(x)
        try:
            y[:, :-1] = x[:, 1:]
            y[:, -1]  = encoded_text[:, n+seq_len]
        except:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, 0]
            
        yield x, y

# Creating the LSTM Model

In [12]:
class CharModel(nn.Module):
    
    def __init__(self, all_chars, num_hidden=256, num_layers=4,drop_prob=0.5, use_gpu=False):
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        self.all_chars = all_chars
        self.decoder = torch.load('models/decoder.pt')
        self.encoder = torch.load('models/encoder.pt')
        
        self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))
      
    
    def forward(self, x, hidden):
        lstm_output, hidden = self.lstm(x, hidden)       
        drop_output = self.dropout(lstm_output)
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        final_out = self.fc_linear(drop_output)
        
        return final_out, hidden
    
    
    def hidden_state(self, batch_size):
        if self.use_gpu:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
        else:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden))
        
        return hidden
        

## Instance of the Model

In [57]:
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True
)

### Optimizer and Loss

In [58]:
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

## Training Data and Validation Data

In [59]:
# percentage of data to be used for training
train_percent = 0.5

In [60]:
int(len(encoded_text) * (train_percent))

2722804

In [61]:
train_ind = int(len(encoded_text) * (train_percent))

In [62]:
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

# Training the Network

## Variables

In [63]:
# Epochs to train for
epochs = 30
# batch size 
batch_size = 128
# Length of sequence
seq_len = 100
# for printing report purposes
# always start at 0
tracker = 0
# number of characters in text
num_char = max(encoded_text)+1

In [64]:
mlflow.set_experiment('RNN text generation')

<Experiment: artifact_location='/phoenix/mlflow/260869678564322375', creation_time=1712947833783, experiment_id='260869678564322375', last_update_time=1712947833783, lifecycle_stage='active', name='RNN text generation', tags={}>

In [65]:
mlflow.start_run(run_name='RNN Text Generation')

mlflow.log_param("epochs", epochs)
mlflow.log_param("batch_size", batch_size)

# Set model to train
model.train()

# Check to see if using GPU
if model.use_gpu:
    torch.cuda.manual_seed_all(0)
    model.cuda()

for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    
    for x,y in generate_batches(train_data, batch_size, seq_len):
        
        tracker += 1
        
        # One Hot Encode incoming data
        x = one_hot_encoder(x, num_char)
        
        # Convert Numpy Arrays to Tensor
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        # Adjust for GPU if necessary
        if model.use_gpu:
            inputs = inputs.cuda()
            targets = targets.cuda()
            
        # Reset Hidden State
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs, hidden)
        loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
        
        loss.backward()
        
        # Clipping gradients to avoid explosion
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)
        
        optimizer.step()
        
        if tracker % 100 == 0:
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data, batch_size, seq_len):
                x = one_hot_encoder(x, num_char)
                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)
                
                if model.use_gpu:
                    inputs = inputs.cuda()
                    targets = targets.cuda()
                
                val_hidden = tuple([state.data for state in val_hidden])
                
                lstm_output, val_hidden = model.forward(inputs, val_hidden)
                val_loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
        
                val_losses.append(val_loss.item())
            
  
            mlflow.log_metric("Val Loss", val_loss.item(), step=tracker)
        
            model.train()
            
    print(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")


mlflow.end_run()

Epoch: 0 Step: 212 Val Loss: 2.607346296310425
Epoch: 1 Step: 424 Val Loss: 2.0837960243225098
Epoch: 2 Step: 636 Val Loss: 1.8453664779663086
Epoch: 3 Step: 848 Val Loss: 1.7025185823440552
Epoch: 4 Step: 1060 Val Loss: 1.6200140714645386
Epoch: 5 Step: 1272 Val Loss: 1.5581440925598145
Epoch: 6 Step: 1484 Val Loss: 1.5168195962905884
Epoch: 7 Step: 1696 Val Loss: 1.4805821180343628
Epoch: 8 Step: 1908 Val Loss: 1.4503957033157349
Epoch: 9 Step: 2120 Val Loss: 1.4314182996749878
Epoch: 10 Step: 2332 Val Loss: 1.4215327501296997
Epoch: 11 Step: 2544 Val Loss: 1.4067339897155762
Epoch: 12 Step: 2756 Val Loss: 1.4023497104644775
Epoch: 13 Step: 2968 Val Loss: 1.3925913572311401
Epoch: 14 Step: 3180 Val Loss: 1.387810468673706
Epoch: 15 Step: 3392 Val Loss: 1.3735700845718384
Epoch: 16 Step: 3604 Val Loss: 1.368165135383606
Epoch: 17 Step: 3816 Val Loss: 1.3656190633773804
Epoch: 18 Step: 4028 Val Loss: 1.3587713241577148
Epoch: 19 Step: 4240 Val Loss: 1.3619024753570557
Epoch: 20 Step: 4

## Saving the Model

In [15]:
model_name = 'dict_torch_rnn_model.pt'

In [22]:
torch.save(model.state_dict(), f'models/{model_name}')

## Load Model

In [13]:
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=False
)

In [16]:
model.load_state_dict(torch.load(f'models/{model_name}'))
model.eval()

CharModel(
  (lstm): LSTM(84, 512, num_layers=3, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc_linear): Linear(in_features=512, out_features=84, bias=True)
)

# Generating Predictions

--------

In [17]:
def predict_next_char(model, char, hidden=None, k=1):
    
        encoded_text = model.encoder[char]
        encoded_text = np.array([[encoded_text]])
        encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
        inputs = torch.from_numpy(encoded_text)
    
        if(model.use_gpu):
            inputs = inputs.cuda()  

        hidden = tuple([state.data for state in hidden])
        lstm_out, hidden = model(inputs, hidden)        
        probs = F.softmax(lstm_out, dim=1).data
    
        if(model.use_gpu):
            probs = probs.cpu()

    # Getting the top 'k' for next char probs
        probs, index_positions = probs.topk(k)        
        index_positions = index_positions.numpy().squeeze()
        probs = probs.numpy().flatten()
        probs = probs/probs.sum()
        char = np.random.choice(index_positions, p=probs)    
    
        return model.decoder[char], hidden

In [18]:
def generate_text(model, size, seed='The', k=1):
    
    if(model.use_gpu):
        model.cuda()
    else:
        model.cpu()
        
    model.eval()
    output_chars = [c for c in seed]
    hidden = model.hidden_state(1)
    
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)

    output_chars.append(char)
    for i in range(size):
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        output_chars.append(char)
        
    return ''.join(output_chars)

#### Generating a text with 1000 chars starting with word 'Confidence'

In [20]:
print(generate_text(model, 1000, seed='Confidence ', k=3))

Confidence to the Castle.
  PAROLLES. The stars of treason, shall I see your singer his
    child that I have been my hand to her when they will stay to
    me to had the season of the courage in the wilt and the storms of
    the state of heaven is send out them and broke their contrary time and
    seal and soldiers that I would not tell thine other to the way
    and to her soul, they are the controversy and chair, to see the word
    of the counterfeit and thanks that that they are that he was the
    soldier and his head, and that they are as the world in home. I am
    stars, sir.                      Exeunt COMINIUS

    Thou art the with and the chiding that shall stir
    That with the world is sent. Therefore that she said he
    They should not buy the success of his service,
    And when I seek me so.
  CASSIUS. Why, she is so,
    That with the charge that was the chang'd of hell
    That the wind should. I will not be, there is the court
    And wars to speak, and wherefo

#### Generating a text with 1000 chars starting with word 'Love'

In [28]:
print(generate_text(model, 1000, seed='Love ', k=3))

Love of the King.
                            [Stands on their contemplations]
  CORIOLANUS. With him, my lord, the wind and here it should
    That stay the sun as shorts as sound to see the state,
    That we will sen thy statue, that the words
    With such as he is so.
  SUFFOLK. If I should start and send thy hand it in
    This world as thou art straight and store to him,
    Who shakes me with this cause to stop thy field,
    That which I have a sheep of trumpets, when they should
    Have stay into the world, we had been and strange
    An enemy.
  BRUTUS. The most so fair a man a prince of truth.
    Who is to be a marking on this soul
    To the comparation of a sea of thousand?
    I have been such a season that I would
    I should have speak'd a stocks that thou shouldst see
    A shape and son. I have not so to see the strain
    This she was all that shall be strange and truth,
    We have been, whom he should see me, then will not  
    And take his head as standed at 