In [1]:
import torch
from torch import nn
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
# get text data, Shakespearean sonnet
with open(r'C:\Users\dearm\OneDrive\Documents\PYTORCH_NOTEBOOKS\Data\shakespeare.txt','r',encoding='utf8') as f:
    text = f.read()

In [3]:
print(text[:1000])


                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,
  And tender churl mak'st waste in niggarding:
    Pity the world, or else this glutton be,
    To eat the world's due, by the grave and thee.


                     2
  When forty winters shall besiege thy brow,
  And dig deep trenches in thy beauty's field,
  Thy youth's proud livery so gazed on now,
  Will be a tattered weed of small worth held:  
  Then being asked, where all thy beauty lies,
  Where all the treasure of thy lusty days;
  To say within thine own deep su

In [4]:
# encode the entire text
all_characters = set(text)

In [5]:
# numbers -> letters
decoder = dict(enumerate(all_characters))

In [6]:
# decoder
decoder.items()

dict_items([(0, 'b'), (1, 'p'), (2, 'u'), (3, 'f'), (4, 'm'), (5, ' '), (6, ':'), (7, 'r'), (8, 'Y'), (9, '('), (10, 'a'), (11, '1'), (12, 'X'), (13, 'n'), (14, 'B'), (15, 'J'), (16, '3'), (17, '5'), (18, 'C'), (19, 's'), (20, 'w'), (21, '2'), (22, 'Q'), (23, 'R'), (24, 'd'), (25, 'E'), (26, '}'), (27, '0'), (28, 'A'), (29, '9'), (30, 'h'), (31, '&'), (32, 'T'), (33, 'F'), (34, '7'), (35, 'y'), (36, '`'), (37, 'v'), (38, 'N'), (39, '?'), (40, 'g'), (41, '['), (42, '_'), (43, 'k'), (44, 't'), (45, '-'), (46, 'H'), (47, ','), (48, '8'), (49, 'P'), (50, '<'), (51, '"'), (52, '>'), (53, 'W'), (54, ';'), (55, '|'), (56, 'e'), (57, 'D'), (58, 'G'), (59, 'L'), (60, 'c'), (61, 'O'), (62, '4'), (63, 'o'), (64, 'M'), (65, 'i'), (66, "'"), (67, 'V'), (68, '6'), (69, '!'), (70, ']'), (71, 'U'), (72, 'j'), (73, '.'), (74, 'x'), (75, ')'), (76, 'I'), (77, 'l'), (78, 'S'), (79, 'Z'), (80, 'z'), (81, 'q'), (82, '\n'), (83, 'K')])

In [7]:
# letters -> numbers
encoder = {char: ind for ind,char in decoder.items()}

In [8]:
encoded_text = np.array([encoder[char] for char in text])

In [9]:
encoded_text[:500]

array([82,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,
        5,  5,  5,  5,  5, 11, 82,  5,  5, 33,  7, 63,  4,  5,  3, 10, 65,
        7, 56, 19, 44,  5, 60,  7, 56, 10, 44,  2,  7, 56, 19,  5, 20, 56,
        5, 24, 56, 19, 65,  7, 56,  5, 65, 13, 60,  7, 56, 10, 19, 56, 47,
       82,  5,  5, 32, 30, 10, 44,  5, 44, 30, 56,  7, 56,  0, 35,  5,  0,
       56, 10,  2, 44, 35, 66, 19,  5,  7, 63, 19, 56,  5,  4, 65, 40, 30,
       44,  5, 13, 56, 37, 56,  7,  5, 24, 65, 56, 47, 82,  5,  5, 14,  2,
       44,  5, 10, 19,  5, 44, 30, 56,  5,  7, 65,  1, 56,  7,  5, 19, 30,
       63,  2, 77, 24,  5,  0, 35,  5, 44, 65,  4, 56,  5, 24, 56, 60, 56,
       10, 19, 56, 47, 82,  5,  5, 46, 65, 19,  5, 44, 56, 13, 24, 56,  7,
        5, 30, 56, 65,  7,  5,  4, 65, 40, 30, 44,  5,  0, 56, 10,  7,  5,
       30, 65, 19,  5,  4, 56,  4, 63,  7, 35,  6, 82,  5,  5, 14,  2, 44,
        5, 44, 30, 63,  2,  5, 60, 63, 13, 44,  7, 10, 60, 44, 56, 24,  5,
       44, 63,  5, 44, 30

In [10]:
# one-hot encoding
def one_hot_encoder(encoded_text, num_uni_chars):
    '''
    encoded_text : batch of encoded text
    
    num_uni_chars = number of unique characters (len(set(text)))
    '''
    
    # METHOD FROM:
    # https://stackoverflow.com/questions/29831489/convert-encoded_textay-of-indices-to-1-hot-encoded-numpy-encoded_textay
      
    # Create a placeholder for zeros.
    one_hot = np.zeros((encoded_text.size, num_uni_chars))
    
    # Convert data type for later use with pytorch (errors if we dont!)
    one_hot = one_hot.astype(np.float32)

    # Using fancy indexing fill in the 1s at the correct index locations
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    

    # Reshape it so it matches the batch sahe
    one_hot = one_hot.reshape((*encoded_text.shape, num_uni_chars))
    
    return one_hot

In [11]:
one_hot_encoder(np.array([1,2,0]),3)

array([[0., 1., 0.],
       [0., 0., 1.],
       [1., 0., 0.]], dtype=float32)

In [12]:
# creating training batches, batches of characters where next character in sequence is the label
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    
    '''
    Generate (using yield) batches for training.
    
    X: Encoded Text of length seq_len
    Y: Encoded Text shifted by one
    
    Example:
    
    X:
    
    [[1 2 3]]
    
    Y:
    
    [[ 2 3 4]]
    
    encoded_text : Complete Encoded Text to make batches from
    batch_size : Number of samples per batch
    seq_len : Length of character sequence
       
    '''
    
    # Total number of characters per batch
    # Example: If samp_per_batch is 2 and seq_len is 50, then 100
    # characters come out per batch
    char_per_batch = samp_per_batch * seq_len
    
    
    # Number of batches available to make
    # Use int() to roun to nearest integer
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    
    # Cut off end of encoded_text that
    # won't fit evenly into a batch
    encoded_text = encoded_text[:num_batches_avail * char_per_batch]
    
    
    # Reshape text into rows the size of a batch
    encoded_text = encoded_text.reshape((samp_per_batch, -1))
    

    # Go through each row in array
    for n in range(0, encoded_text.shape[1], seq_len):
        
        # Grab feature characters
        x = encoded_text[:, n:n+seq_len]
        
        # y is the target shifted over by 1
        y = np.zeros_like(x)
       
        #
        try:
            y[:, :-1] = x[:, 1:]
            y[:, -1]  = encoded_text[:, n+seq_len]
            
        # FOR POTENTIAL INDEXING ERROR AT THE END    
        except:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, 0]
            
        yield x, y

In [14]:
# GPU check, would take a lot longer on CPU
torch.cuda.is_available()

True

In [15]:
# define the LSTM model
class CharModel(nn.Module):
    
    def __init__(self, all_chars, num_hidden=256, num_layers=4,drop_prob=0.5,use_gpu=False):
        
        
        # SET UP ATTRIBUTES
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        #CHARACTER SET, ENCODER, and DECODER
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char: ind for ind,char in decoder.items()}
        
        
        self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))
      
    
    def forward(self, x, hidden):
                  
        
        lstm_output, hidden = self.lstm(x, hidden)
        
        
        drop_output = self.dropout(lstm_output)
        
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        
        
        final_out = self.fc_linear(drop_output)
        
        
        return final_out, hidden
    
    
    def hidden_state(self, batch_size):
        '''
        Used as separate method to account for both GPU and CPU users
        '''
        
        if self.use_gpu:
            
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
        else:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden))
        
        return hidden

In [16]:
# instantiate the model
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True,
)

In [17]:
# number of parameters should be same magnitude as number of characters in text
total_param  = []
for p in model.parameters():
    total_param.append(int(p.numel()))

In [18]:
sum(total_param)

5470292

In [19]:
len(encoded_text)

5445609

In [20]:
# define loss and optimizer functions
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

In [21]:
# percentage of data to be used for training
train_percent = 0.9

In [22]:
# find cutoff index for train set
train_ind = int(len(encoded_text) * (train_percent))

In [23]:
# set train and validation sets
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

In [25]:
# set training variables
epochs = 30
# batch size 
batch_size = 100

# length of sequence
seq_len = 100

# for printing report purposes
# always start at 0
tracker = 0

# number of characters in text
num_char = max(encoded_text)+1

In [26]:
# train the model!
# Set model to train
model.train()


# Check to see if using GPU
if model.use_gpu:
    model.cuda()

for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    
    for x,y in generate_batches(train_data,batch_size,seq_len):
        
        tracker += 1
        
        # One Hot Encode incoming data
        x = one_hot_encoder(x,num_char)
        
        # Convert Numpy Arrays to Tensor
        
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        # Adjust for GPU if necessary
        
        if model.use_gpu:
            
            inputs = inputs.cuda()
            targets = targets.cuda()
            
        # Reset Hidden State
        # If we dont' reset we would backpropagate through all training history
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs,hidden)
        loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
        loss.backward()
        
        # POSSIBLE EXPLODING GRADIENT PROBLEM!
        # LET"S CLIP JUST IN CASE
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)
        
        optimizer.step()
        
        
        
        ###################################
        ### CHECK ON VALIDATION SET ######
        #################################
        
        if tracker % 25 == 0:
            
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data,batch_size,seq_len):
                
                # One Hot Encode incoming data
                x = one_hot_encoder(x,num_char)
                

                # Convert Numpy Arrays to Tensor

                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)

                # Adjust for GPU if necessary

                if model.use_gpu:

                    inputs = inputs.cuda()
                    targets = targets.cuda()
                    
                # Reset Hidden State
                # If we dont' reset we would backpropagate through 
                # all training history
                val_hidden = tuple([state.data for state in val_hidden])
                
                lstm_output, val_hidden = model.forward(inputs,val_hidden)
                val_loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
                val_losses.append(val_loss.item())
            
            # Reset to training model after val for loop
            model.train()
            
            print(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")

Epoch: 0 Step: 25 Val Loss: 3.2057440280914307
Epoch: 0 Step: 50 Val Loss: 3.1939539909362793
Epoch: 0 Step: 75 Val Loss: 3.197103500366211
Epoch: 0 Step: 100 Val Loss: 3.1606411933898926
Epoch: 0 Step: 125 Val Loss: 3.043724536895752
Epoch: 0 Step: 150 Val Loss: 2.9737250804901123
Epoch: 0 Step: 175 Val Loss: 2.8906137943267822
Epoch: 0 Step: 200 Val Loss: 2.754720449447632
Epoch: 0 Step: 225 Val Loss: 2.680861473083496
Epoch: 0 Step: 250 Val Loss: 2.598978042602539
Epoch: 0 Step: 275 Val Loss: 2.474199056625366
Epoch: 0 Step: 300 Val Loss: 2.3618345260620117
Epoch: 0 Step: 325 Val Loss: 2.281284809112549
Epoch: 0 Step: 350 Val Loss: 2.228391647338867
Epoch: 0 Step: 375 Val Loss: 2.1794230937957764
Epoch: 0 Step: 400 Val Loss: 2.1438920497894287
Epoch: 0 Step: 425 Val Loss: 2.100783109664917
Epoch: 0 Step: 450 Val Loss: 2.075124979019165
Epoch: 0 Step: 475 Val Loss: 2.033742904663086
Epoch: 1 Step: 500 Val Loss: 2.0101773738861084
Epoch: 1 Step: 525 Val Loss: 1.9862054586410522
Epoch:

In [None]:
# saving the model
model_name = 'example_hidden512_layers3_sonnet.net'

In [None]:
torch.save(model.state_dict(),model_name)

In [None]:
# load the model, must match exact settings as model used during training!
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True,
)

In [None]:
model.load_state_dict(torch.load(model_name))
model.eval()

In [27]:
# generating predicted text
def predict_next_char(model, char, hidden=None, k=1):
        
        # Encode raw letters with model
        encoded_text = model.encoder[char]
        
        # set as numpy array for one hot encoding
        # NOTE THE [[ ]] dimensions!!
        encoded_text = np.array([[encoded_text]])
        
        # One hot encoding
        encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
        
        # Convert to Tensor
        inputs = torch.from_numpy(encoded_text)
        
        # Check for CPU
        if(model.use_gpu):
            inputs = inputs.cuda()
        
        
        # Grab hidden states
        hidden = tuple([state.data for state in hidden])
        
        
        # Run model and get predicted output
        lstm_out, hidden = model(inputs, hidden)

        
        # Convert lstm_out to probabilities
        probs = F.softmax(lstm_out, dim=1).data
        
        
        
        if(model.use_gpu):
            # move back to CPU to use with numpy
            probs = probs.cpu()
        
        
        # k determines how many characters to consider
        # for our probability choice.
        # https://pytorch.org/docs/stable/torch.html#torch.topk
        
        # Return k largest probabilities in tensor
        probs, index_positions = probs.topk(k)
        
        
        index_positions = index_positions.numpy().squeeze()
        
        # Create array of probabilities
        probs = probs.numpy().flatten()
        
        # Convert to probabilities per index
        probs = probs/probs.sum()
        
        # randomly choose a character based on probabilities
        char = np.random.choice(index_positions, p=probs)
       
        # return the encoded value of the predicted char and the hidden state
        return model.decoder[char], hidden

In [28]:
def generate_text(model, size, seed='The', k=1):
        
      
    
    # CHECK FOR GPU
    if(model.use_gpu):
        model.cuda()
    else:
        model.cpu()
    
    # Evaluation mode
    model.eval()
    
    # begin output from initial seed
    output_chars = [c for c in seed]
    
    # intiate hidden state
    hidden = model.hidden_state(1)
    
    # predict the next character for every character in seed
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
    
    # add initial characters to output
    output_chars.append(char)
    
    # Now generate for size requested
    for i in range(size):
        
        # predict based off very last letter in output_chars
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        
        # add predicted character
        output_chars.append(char)
    
    # return string of predicted text
    return ''.join(output_chars)

In [31]:
# write a sonnet with a starting seed word
print(generate_text(model, 1000, seed='The ', k=3))

The walls with the stage

                             Enter CAIUS

    This stands of the standing stand as she,
    As they are sent to make my soul that will
    As well defend the cheeks.
  CORIOLANUS. I have a man of him and make me.
  CLOWN. I am sure you are not, sir, which, in the face
    And state the mother of my strange things,
    And so much to your senses. I am sorry,
    And we assure the state to thee again.
    What should I see you then that I did see
    The man of my soul and my stomach to the stock
    When I am sure, as I have said, the maid
    Was not a man of honesty than those
    Will stranger so so much of me to sent you.  
    This shall not take mine own desire of him.
  KING RICHARD. I will not hear thee see the story of him,
    Will I not see thy self-a father's soldiers;
    And that the fool, the sea, with his dead soldiers
    And stand to soldiers, and his strokes to straight
    As towns are sense and fathers of their father.
    And so I should n