In [10]:
import torch
from torch import nn
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

## Get Text Data

In [11]:
with open('./datasets/borges_full.txt','r',encoding='utf8') as f:
    text = f.read()

In [12]:
print(text[:1000])



JORGE LUIS BORGES 


1929: Segundo Premio Municipal de Li- 
teratura. 

1944: Gran Premio de Honor de ia So- 
ciedad Argentina de Escritores. 

1949: Miembro de la Academia Goethea- 
na de San Pablo, Brasil. 

1950: Presidente de la Sociedad Argen- 
tina de Escritores (hasta 1953), 

1955: Director de i a Biblioteca Na- 
cional (hasta 1973). 

Miembro de número de la Academia 
Argentina de Letras, 

Director del Instituto de Literatura 
í Alemana de la Facultad de Filosofía y 
! Letras de la Universidad de Buenos 
Aires. 

1956: Primer Premio Nacional de Lite- 
ratura. 

Doctor honorís causa de ia Universi- 
dad de Cuyo (Mendoza), 

Profesor titular de Literatura Inglesa 
y Norteamericana de la Facultad de Fi- 
losofía y Letras de la Universidad de 
Buenos Aires, 

i 

1961: Premio Internacional de Literatu- 
ra Formentor, Mallorca. 

Commsndatore del Gobierno de Ita- 
lia. 

1962: Commandeur da 1‘Ordre des 
L&ttres et des Arta del Gobierno de 
Francia. 

1963: Gran Premio del Fondo 

In [13]:
all_characters = set(text)
len(all_characters)

131

In [14]:
decoder = dict(enumerate(all_characters))
encoder = {char: ind for ind,char in decoder.items()}
encoded_text = np.array([encoder[char] for char in text])

In [15]:
def one_hot_encoder(encoded_text, num_uni_chars):
    '''
    encoded_text : batch of encoded text
    
    num_uni_chars = number of unique characters (len(set(text)))
    '''
    
    # METHOD FROM:
    # https://stackoverflow.com/questions/29831489/convert-encoded_textay-of-indices-to-1-hot-encoded-numpy-encoded_textay
      
    # Create a placeholder for zeros.
    one_hot = np.zeros((encoded_text.size, num_uni_chars))
    
    # Convert data type for later use with pytorch (errors if we dont!)
    one_hot = one_hot.astype(np.float32)

    # Using fancy indexing fill in the 1s at the correct index locations
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    

    # Reshape it so it matches the batch sahe
    one_hot = one_hot.reshape((*encoded_text.shape, num_uni_chars))
    
    return one_hot

In [22]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):

    char_per_batch = samp_per_batch * seq_len
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    encoded_text = encoded_text[:num_batches_avail * char_per_batch]
    encoded_text = encoded_text.reshape((samp_per_batch, -1))

    def batching(encoded_text, seq_len):
        # Go through each row in array.
        for n in range(0, encoded_text.shape[1], seq_len):

            # Grab feature characters
            x = encoded_text[:, n:n+seq_len]

            # y is the target shifted over by 1
            y = np.zeros_like(x)

            #
            try:
                y[:, :-1] = x[:, 1:]
                y[:, -1]  = encoded_text[:, n+seq_len]

            # FOR POTENTIAL INDEXING ERROR AT THE END
            except:
                y[:, :-1] = x[:, 1:]
                y[:, -1] = encoded_text[:, 0]

            yield x, y

    return batching(encoded_text, seq_len)

gen = generate_batches(encoded_text)
x, y = next(iter(gen))
x.shape


(10, 50)

# Model architecture

In [23]:
class CharModel(nn.Module):
    
    def __init__(self, all_chars, num_hidden=256, num_layers=4,drop_prob=0.5,use_gpu=False):
        
        
        # SET UP ATTRIBUTES
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        #CHARACTER SET, ENCODER, and DECODER
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char: ind for ind,char in decoder.items()}
        
        
        self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))
      
    
    def forward(self, x, hidden):
                  
        
        lstm_output, hidden = self.lstm(x, hidden)
        
        
        drop_output = self.dropout(lstm_output)
        
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        
        
        final_out = self.fc_linear(drop_output)
        
        
        return final_out, hidden
    
    
    def hidden_state(self, batch_size):
        '''
        Used as separate method to account for both GPU and CPU users.
        '''
        
        if self.use_gpu:
            
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).to("mps"),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden).to("mps"))
        else:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden))
        
        return hidden
        

## Training Data and Validation Data

In [24]:
train_percent = 0.1
train_ind = int(len(encoded_text) * (train_percent))
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

In [25]:
torch.cuda.is_available()

False

In [26]:
## HyperParams
epochs = 20
batch_size = 1
seq_len = 100
tracker = 10
num_lstm_layers = 3
dropout_p = .03
hidden_dim = 256
num_char = max(encoded_text)+1

model = CharModel(
    all_chars=all_characters,
    num_hidden=hidden_dim,
    num_layers=num_lstm_layers,
    drop_prob=dropout_p,
    use_gpu=False,
)


data_iter = generate_batches(train_data,batch_size,seq_len)
print(f" size of generator {sum(1 for _ in data_iter)}")
train_data.shape

 size of generator 2045


(204531,)

In [None]:
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

model.train()

# Check to see if using GPU

if model.use_gpu:
    torch.device("mps")
    model.to("mps")

for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    for x,y in generate_batches(train_data,batch_size,seq_len):
        
        tracker += 1
        x = one_hot_encoder(x,num_char)
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        if model.use_gpu:
            inputs = inputs.to("mps")
            targets = targets.to("mps")
            
        # If we don't reset we would backpropagate through all training history
        hidden = tuple([state.data for state in hidden])
        
        optimizer.zero_grad()
        lstm_output, hidden = model.forward(inputs,hidden)
        loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        loss.backward()
        
        # LET"S CLIP JUST IN CASE
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)
        optimizer.step()

        
        ###################################
        ### CHECK ON VALIDATION SET ######
        #################################
        
        if tracker % 25 == 0:
            
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data,batch_size,seq_len):
                
                x = one_hot_encoder(x,num_char)
                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)
                if model.use_gpu:
                    inputs = inputs.to("mps")
                    targets = targets.to("mps")
                    
                val_hidden = tuple([state.data for state in val_hidden])
                lstm_output, val_hidden = model.forward(inputs,val_hidden)
                val_loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
                val_losses.append(val_loss.item())
            
            # Reset to training model after val for loop
            model.train()
            
            print(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")

-------
------

## Saving the Model

https://pytorch.org/tutorials/beginner/saving_loading_models.html

In [51]:
# Be careful to overwrite our original name file!
model_name = 'borges_first_pass.net'

In [52]:
torch.save(model.state_dict(),model_name)

## Load Model

In [53]:
# MUST MATCH THE EXACT SAME SETTINGS AS MODEL USED DURING TRAINING!

model = CharModel(
    all_chars=all_characters,
    num_hidden=hidden_dim,
    num_layers=num_lstm_layers,
    drop_prob=dropout_p,
    use_gpu=False,
)

In [54]:
model.load_state_dict(torch.load(model_name))
model.eval()

CharModel(
  (lstm): LSTM(91, 256, num_layers=3, batch_first=True, dropout=0.03)
  (dropout): Dropout(p=0.03, inplace=False)
  (fc_linear): Linear(in_features=256, out_features=91, bias=True)
)

# Generating Predictions

--------

In [55]:
def predict_next_char(model, char, hidden=None, k=1):
        
        encoded_text = model.encoder[char]
        encoded_text = np.array([[encoded_text]])
        encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
        inputs = torch.from_numpy(encoded_text)
        
        if(model.use_gpu):
            inputs = inputs.to("mps")
        

        hidden = tuple([state.data for state in hidden])
        lstm_out, hidden = model(inputs, hidden)
        probs = F.softmax(lstm_out, dim=1).data
        
        if(model.use_gpu):
            probs = probs.to("mps")
        

        probs, index_positions = probs.topk(k)
        index_positions = index_positions.numpy().squeeze()
        
        probs = probs.numpy().flatten()
        probs = probs/probs.sum()
        
        # randomly choose a character based on probabilities
        char = np.random.choice(index_positions, p=probs)
       
        # return the encoded value of the predicted char and the hidden state
        return model.decoder[char], hidden

In [56]:
def generate_text(model, size, seed='The', k=1):
        
    if(model.use_gpu):
        model.to("mps")
    else:
        model.cpu()
    
    model.eval()

    output_chars = [c for c in seed]

    hidden = model.hidden_state(1)
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
    

    output_chars.append(char)

    for i in range(size):
        
        # predict based off very last letter in output_chars
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        
        # add predicted character
        output_chars.append(char)
    
    # return string of predicted text
    return ''.join(output_chars)

In [57]:
print(generate_text(model, 1000, seed='El universo ', k=3))

El universo del poenio de la praceria de presiontas enta laseras en pranatina des pora en
tomesas cristertan y las pracion de la raatianta en en tontorao de las dencara de la riata de la poesa a las calte de poraateros consas era ento de parecaro el poena de la reretara de las porcera asa el esa el estrota la paesaros eltoraas del precero estas arastorades ena
ses cara
ento listoracan en en poera de preracras de poraas de los paena antorios que la porata el tranataras de presas des pricena la laciateria de la porrecana lan presara, de lo priciés de paraa una lan esto el pareces de portaco de las decarios del
dentician el eltariantes del prinaces de palosonte los tronatasas de caratarianan en estorio del
estasas, esterante de lo rastas de las deciante de la

uasa esta del palacian del crentino en esitaraa en elto lertentara al entarera de las poratas del pracecoros crensorta des prantera de los decas erta de la raasa el lo sastentaro del lacatoraca las pritira an esitrita del caratario 