In [1]:
import torch 
from torch import nn
import torch.nn.functional as F


import numpy as np

In [2]:
with open('poems.txt', 'r', encoding='utf8') as f:
    text = f.read()
    
    

all_characters = set(text)
all_characters = sorted(all_characters)
print(all_characters)
# nums --> letters
decoder = dict(enumerate(all_characters))
# letters --> nums
encoder = {char: index for index, char in decoder.items()}

# build list of all encoded chars for text
encoded_text = np.array([encoder[char] for char in text])

['\n', ' ', '!', '"', '#', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '=', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', '[', '\\', ']', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '|', '~', '©', '«', '»', 'Ё', 'А', 'Б', 'В', 'Г', 'Д', 'Е', 'Ж', 'З', 'И', 'Й', 'К', 'Л', 'М', 'Н', 'О', 'П', 'Р', 'С', 'Т', 'У', 'Ф', 'Х', 'Ц', 'Ч', 'Ш', 'Щ', 'Ъ', 'Ы', 'Ь', 'Э', 'Ю', 'Я', 'а', 'б', 'в', 'г', 'д', 'е', 'ж', 'з', 'и', 'й', 'к', 'л', 'м', 'н', 'о', 'п', 'р', 'с', 'т', 'у', 'ф', 'х', 'ц', 'ч', 'ш', 'щ', 'ъ', 'ы', 'ь', 'э', 'ю', 'я', 'ё', '–', '—', '’', '“', '”', '„', '•', '…', '№']


In [3]:
def one_hot_encoder(encoded_text, num_unique_chars):
    
    # encoded_text --> batch of encoded text
    # num_unique_chars --> len(set(text))
    
    one_hot = np.zeros((encoded_text.size, num_unique_chars))
    one_hot = one_hot.astype(np.float32)
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    one_hot = one_hot.reshape((*encoded_text.shape, num_unique_chars))
    
    return one_hot

In [4]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    
    # X : encoded text of length seq_len
    # [0, 1, 2]
    # [1, 2, 3]
    
    # Y : encoded text shifted by one
    
    # [1, 2, 3]
    # [2, 3, 4]
    
    # how many chars per batch?
    char_per_batch = samp_per_batch * seq_len 
    # how many batches can we make, given the len of encoded text?
    num_batches_available = int(len(encoded_text) / char_per_batch)
    # cut off the end of the encoded text, that won't fit evenly into a batch
    encoded_text = encoded_text[ : num_batches_available * char_per_batch]
    # reshape the encoded text
    encoded_text = encoded_text.reshape((samp_per_batch, -1))
    
    for n in range(0, encoded_text.shape[1], seq_len):
        
        x = encoded_text[ : , n : n + seq_len]
        # zeros array to the same shape as x
        y = np.zeros_like(x)
        
        try:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, n+seq_len]
            
        except:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, 0]
            
        yield x, y

In [5]:
class CharModel(nn.Module):
    
    def __init__(self, all_chars, num_hidden=256, num_layers=4,drop_prob=0.5,use_gpu=False):
        
        
        # SET UP ATTRIBUTES
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        #CHARACTER SET, ENCODER, and DECODER
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char: ind for ind,char in decoder.items()}
        
        
        self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))
      
    
    def forward(self, x, hidden):
                  
        lstm_output, hidden = self.lstm(x, hidden)
        
        drop_output = self.dropout(lstm_output)
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        
        final_out = self.fc_linear(drop_output)

        return final_out, hidden
    
    
    def hidden_state(self, batch_size):
        '''
        Used as separate method to account for both GPU and CPU users.
        '''
        
        if self.use_gpu:
            
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
        else:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden))
        
        return hidden
        

In [6]:
model = CharModel(all_chars=all_characters,
                 num_hidden=512,
                 num_layers=3,
                 drop_prob=0.5,
                 use_gpu=True)

In [7]:
total_param = []

for p in model.parameters():
    total_param.append(int(p.numel()))
    
total_param

[333824,
 1048576,
 2048,
 2048,
 1048576,
 1048576,
 2048,
 2048,
 1048576,
 1048576,
 2048,
 2048,
 83456,
 163]

In [8]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train_percent = 0.9
train_index = int(len(encoded_text) * train_percent)

train_data = encoded_text[:train_index]
val_data = encoded_text[train_index:]

In [9]:
# Variables

epochs = 80
batch_size = 250
seq_len = 200
tracker = 0
num_char = max(encoded_text) + 1

previous_loss = 100
min_loss = 100

In [None]:
# Set model to train
model.train()


# Check to see if using GPU
if model.use_gpu:
    model.cuda()

for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    
    for x,y in generate_batches(train_data,batch_size,seq_len):
        
        tracker += 1
        
        # One Hot Encode incoming data
        x = one_hot_encoder(x,num_char)
        
        # Convert Numpy Arrays to Tensor
        
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        # Adjust for GPU if necessary
        
        if model.use_gpu:
            
            inputs = inputs.cuda()
            targets = targets.cuda()
            
        # Reset Hidden State
        # If we dont' reset we would backpropagate through all training history
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs,hidden)
        loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
        loss.backward()
        
        # POSSIBLE EXPLODING GRADIENT PROBLEM!
        # LET"S CLIP JUST IN CASE
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)
        
        optimizer.step()
        
        
        
        ###################################
        ### CHECK ON VALIDATION SET ######
        #################################
        
        if tracker % 25 == 0:
            
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data,batch_size,seq_len):
                
                # One Hot Encode incoming data
                x = one_hot_encoder(x, num_char)
                

                # Convert Numpy Arrays to Tensor

                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)

                # Adjust for GPU if necessary

                if model.use_gpu:

                    inputs = inputs.cuda()
                    targets = targets.cuda()
                    
                # Reset Hidden State
                # If we dont' reset we would backpropagate through 
                # all training history
                val_hidden = tuple([state.data for state in val_hidden])
                
                lstm_output, val_hidden = model.forward(inputs,val_hidden)
                val_loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
                val_losses.append(val_loss.item())
            
            # Reset to training model after val for loop
            model.train()
    
            current_loss = val_loss.item()
        
            print(f"Epoch: {i} Step: {tracker} \nCurrent Validation Loss: {current_loss} \nPrevious Loss: {previous_loss}")

            if (current_loss < previous_loss):
                if (current_loss < min_loss):
                    print('Error decreased!')
                    torch.save(model.state_dict(), 'min_hidden512_layers3_poems.net')
                    print('Saved model.')
                    min_loss = current_loss
                else:
                    print('Error increased!')
                
            previous_loss = val_loss.item()
            print()

Epoch: 0 Step: 25 
Current Validation Loss: 3.4680662155151367 
Previous Loss: 100
Error decreased!
Saved model.

Epoch: 0 Step: 50 
Current Validation Loss: 3.4630939960479736 
Previous Loss: 3.4680662155151367
Error decreased!
Saved model.

Epoch: 1 Step: 75 
Current Validation Loss: 3.4641547203063965 
Previous Loss: 3.4630939960479736

Epoch: 1 Step: 100 
Current Validation Loss: 3.4623382091522217 
Previous Loss: 3.4641547203063965
Error decreased!
Saved model.

Epoch: 1 Step: 125 
Current Validation Loss: 3.467604160308838 
Previous Loss: 3.4623382091522217

Epoch: 2 Step: 150 
Current Validation Loss: 3.452200412750244 
Previous Loss: 3.467604160308838
Error decreased!
Saved model.

Epoch: 2 Step: 175 
Current Validation Loss: 3.434162139892578 
Previous Loss: 3.452200412750244
Error decreased!
Saved model.

Epoch: 2 Step: 200 
Current Validation Loss: 3.348487138748169 
Previous Loss: 3.434162139892578
Error decreased!
Saved model.

Epoch: 3 Step: 225 
Current Validation Loss: 

In [None]:
torch.save(model.state_dict(), 'hidden512_layers3_poems.net')

In [None]:
def predict_next_char(model, char, hidden=None, k=1):
    
    encoded_text = model.encoder[char]
    encoded_text = np.array([[encoded_text]])
    encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
    inputs = torch.from_numpy(encoded_text)
    
    if model.use_gpu:
        inputs = inputs.cuda()
        
    hidden = tuple([state.data for state in hidden])
    
    lstm_out, hidden = model(inputs, hidden)
    
    probs = F.softmax(lstm_out, dim=1).data
    
    if model.use_gpu:
        probs = probs.cpu()
        
    probs, index_positions = probs.topk(k)
    
    index_positions = index_positions.numpy().squeeze()
    
    probs = probs.numpy().flatten()
    
    probs = probs / probs.sum()
    
    char = np.random.choice(index_positions, p=probs)
    
    return model.decoder[char], hidden
    
    

In [None]:
def generate_text(model, size, seed='Дело было ', k=1):
    
    if model.use_gpu:
        model.cuda()
    else:
        model.cpu()
        
    model.eval()
    
    output_chars = [c for c in seed]
    
    hidden = model.hidden_state(1)
    
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
        
    output_chars.append(char)
    
    for i in range(size):
        
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        
        output_chars.append(char)
        
    return ''.join(output_chars)

In [None]:
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True,
)

model.load_state_dict(torch.load('min_hidden512_layers3_poems.net'))
model.eval()

In [None]:
print(generate_text(model, 3000, seed='Сердце ', k=2))