## This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
!pip install torch tqdm -q

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import numpy as np
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm 

In [3]:
with open("/kaggle/input/ukr-poems/poems.txt", "r") as file:
    poetry = "".join(file.readlines())

In [4]:
poetry = poetry.lower()
poetry[:100]

'<і смеркає, і світає,\nдень божий минає,\nі знову люд потомлений\nі все спочиває.\nтілько я, мов окаянни'

In [5]:
import string
ukrainian_alphabet = "абвгґдеєжзиіїйклмнопрстуфхцчшщьюя"

punctuation = ".,-:;?!…"

special_tokens = "<> \n"

In [6]:
chars = tuple(set(ukrainian_alphabet + punctuation + special_tokens))

char_to_idx = {ch: idx for idx, ch in enumerate(chars)}
idx_to_char = {idx: ch for idx, ch in enumerate(chars)}

In [7]:
def clean_text(text):
    """Removes characters not in the 'chars' set."""
    text = "".join(ch if ch in chars else "" for ch in text)
    text = text.replace("><", "\n")
    text = text[1:-1]
    return text


cleaned_poetry = clean_text(poetry)

In [8]:
encoded_poetry = [char_to_idx[c] for c in cleaned_poetry]

In [9]:
# Defining method to make mini-batches for training
def get_batches(arr, batch_size, seq_length):
    # determine the flattened batch size, i.e. sequence length times batch size
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total

    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))

    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [10]:
def one_hot_encode(arr, n_labels):
    
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [26]:
class LSTM(nn.Module):
    
    def __init__(self, chars, device, n_hidden=256, n_layers=2, drop_prob=0.5, bidirectional=False):
        super().__init__()
        
        self.device = device
        
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        

        self.n_chars = len(chars)
        self.int2char = dict(enumerate(chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        

        self.lstm = nn.LSTM(self.n_chars, n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True, bidirectional=bidirectional)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc = nn.Linear(n_hidden, self.n_chars)
        
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            The inputs are x, and the hidden & cell state in a tuple. '''
        

        r_output, hidden = self.lstm(x, hidden)
        
        out = self.dropout(r_output)

        out = out.contiguous().view(-1, self.n_hidden)
        
        out = self.fc(out)
        
        return out, hidden
    
    def init_hidden(self, batch_size=1):
        ''' Initializes hidden state '''

        weight = next(self.parameters()).data

        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))
        
        return hidden

In [27]:
# Declaring the train method
def train(model, data, device, optimizer, criterion, epochs=10, batch_size=10,
          seq_length=50, clip=5):
    model.train()

    for epoch in range(epochs):
        h = model.init_hidden(batch_size)
        total_loss = 0
        for x, y in get_batches(data, batch_size, seq_length):
            x = one_hot_encode(x, model.n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            inputs, targets = inputs.to(device), targets.to(device)

            model.zero_grad()

            output, h = model(inputs, h)
            h = (h[0].detach(), h[1].detach())



            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward(retain_graph=True)

            total_loss += loss.item()


            nn.utils.clip_grad_norm_(model.parameters(), clip)
            optimizer.step()

        if (epoch + 1) % 100 == 0:
            checkpoint_path = os.path.join("model_checkpoints", f"model_epoch_{epoch+1}.pt")
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': total_loss/batch_size,
            }, checkpoint_path)
            print(f"Checkpoint saved at {checkpoint_path}")


        print("Epoch: {}/{}:".format(epoch + 1, epochs),
              "Loss: {:.4f}:".format(total_loss/batch_size))
        total_loss = 0

In [None]:
import os

device = "cuda" if torch.cuda.is_available() else "cpu"

checkpoint_dir = "model_checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True) 

# Define the model
n_hidden=512
n_layers=2

model = LSTM(chars, device, n_hidden, n_layers).to(device)

# Declaring the hyperparameters
batch_size = 128
seq_length = 100
epochs = 600 

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# train the model
train(model, np.array(encoded_poetry), device, optimizer, criterion, epochs=epochs,
      batch_size=batch_size, seq_length=seq_length)

Epoch: 1/600: Loss: 3.5147:
Epoch: 2/600: Loss: 2.8579:
Epoch: 3/600: Loss: 2.6663:
Epoch: 4/600: Loss: 2.5461:
Epoch: 5/600: Loss: 2.4465:
Epoch: 6/600: Loss: 2.3691:
Epoch: 7/600: Loss: 2.3131:
Epoch: 8/600: Loss: 2.2659:
Epoch: 9/600: Loss: 2.2275:
Epoch: 10/600: Loss: 2.1957:
Epoch: 11/600: Loss: 2.1681:
Epoch: 12/600: Loss: 2.1449:
Epoch: 13/600: Loss: 2.1242:
Epoch: 14/600: Loss: 2.1064:
Epoch: 15/600: Loss: 2.0903:
Epoch: 16/600: Loss: 2.0750:
Epoch: 17/600: Loss: 2.0598:
Epoch: 18/600: Loss: 2.0475:
Epoch: 19/600: Loss: 2.0361:
Epoch: 20/600: Loss: 2.0236:
Epoch: 21/600: Loss: 2.0132:
Epoch: 22/600: Loss: 2.0024:
Epoch: 23/600: Loss: 1.9936:
Epoch: 24/600: Loss: 1.9843:
Epoch: 25/600: Loss: 1.9765:
Epoch: 26/600: Loss: 1.9687:
Epoch: 27/600: Loss: 1.9606:
Epoch: 28/600: Loss: 1.9544:
Epoch: 29/600: Loss: 1.9472:
Epoch: 30/600: Loss: 1.9444:
Epoch: 31/600: Loss: 1.9360:
Epoch: 32/600: Loss: 1.9303:
Epoch: 33/600: Loss: 1.9253:
Epoch: 34/600: Loss: 1.9188:
Epoch: 35/600: Loss: 1.

In [29]:
def predict(model, char, device, h=None, top_k=5):
        ''' Given a character & hidden state, predict the next character.
            Returns the predicted character and the hidden state.
        '''

        # tensor inputs
        x = np.array([[model.char2int[char]]])
        x = one_hot_encode(x, model.n_chars)
        inputs = torch.from_numpy(x).to(device)

        with torch.no_grad():
            # get the output of the model
            out, h = model(inputs, h)

            # get the character probabilities
            # move to cpu for further processing with numpy etc.
            p = F.softmax(out, dim=1).data.cpu()

            # get the top characters with highest likelihood
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()

            # select the likely next character with some element of randomness
            # for more variability
            p = p.numpy().squeeze()
            char = np.random.choice(top_ch, p=p/p.sum())

        # return the encoded value of the predicted char and the hidden state
        return model.int2char[char], h

In [30]:
def sample(model, size, device, prime='A', top_k=None):
    # method to generate new text based on a "prime"/initial sequence.
    # Basically, the outer loop convenience function that calls the above
    # defined predict method.
    model.eval() # eval mode

    # Calculate model for the initial prime characters
    chars = [ch for ch in prime]
    with torch.no_grad():
        # initialize hidden with 0 in the beginning. Set our batch size to 1
        # as we wish to generate one sequence only.
        h = model.init_hidden(batch_size=1)
        for ch in prime:
            char, h = predict(model, ch, device, h=h, top_k=top_k)

        # append the characters to the sequence
        chars.append(char)

        # Now pass in the previous/last character and get a new one
        # Repeat this process for the desired length of the sequence to be
        # generated
        for ii in range(size):
            char, h = predict(model, chars[-1], device, h=h, top_k=top_k)
            chars.append(char)

    return ''.join(chars)

In [32]:
checkpoint_path = "model_checkpoints/model_epoch_600.pt"
checkpoint = torch.load(checkpoint_path)

model_loaded = LSTM(chars, device, n_hidden, n_layers).to(device)


model_loaded.load_state_dict(checkpoint['model_state_dict'])

  checkpoint = torch.load(checkpoint_path)


<All keys matched successfully>

In [33]:
print(sample(model_loaded, 1000, device, prime='соняшник у полі', top_k=5))

соняшник у полі співає:
серце, стане і своє, і за мене вовчий
такої великі плоди. сидів у полкаві.
ти не славний, не просто загадай,
не винен така прокляте й на мале.
а я з подаленя серце співанем.
все наталює вітер в палети.
з тебе віти святого насарти,
при тебе бачить від серденька підвів.
саранські серця з ваші пахощів
в серці від них був на світі несуть.
там вона повівала від себе з дому…
а все ж дуже вираз тобі спала.
про те, що нас не проститься в світі,
як потопа і в сонці, дола світля!
ми страшніше, якби винула вона,
неначе в своїй відьма стільки сила.
проклята буде в морозок принесла,
на світі ж прості вічність принусеш.
та в тебе ніччю не вмію, загубив,
і тоді вітер підкупив в дорогу.
тільки так не те сивий навіть мала,
як встав світанку і все сміх мене.
а всі досидять наш достого невідомо,
скрізь від своїх вариних не встидаю,
і все забув сестри при мій драмину,
засвітивсь навколо в невідрожі,
про сліпим скорінням над водою,
як повідряному пора не встане.
і навіть в світі жив