# Модель

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
!pip install Navec

In [None]:
!wget https://storage.yandexcloud.net/natasha-navec/packs/navec_hudlit_v1_12B_500K_300d_100q.tar

In [8]:
!pip install slovnet

Collecting slovnet
  Using cached slovnet-0.6.0-py3-none-any.whl (46 kB)
Collecting razdel
  Using cached razdel-0.5.0-py3-none-any.whl (21 kB)
Collecting navec
  Using cached navec-0.10.0-py3-none-any.whl (23 kB)
Installing collected packages: razdel, navec, slovnet
Successfully installed navec-0.10.0 razdel-0.5.0 slovnet-0.6.0
[0m

In [9]:
import numpy as np
import pandas as pd
from navec import Navec
from slovnet.model.emb import NavecEmbedding

import nltk
import tqdm
import re

import torch
from tqdm.notebook import tqdm



In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [11]:
VOCAB_SIZE = 50000
SEQUENCE_LEN = 5
MAX_LINES = 1

In [12]:
navec_path = 'navec_hudlit_v1_12B_500K_300d_100q.tar'
navec = Navec.load(navec_path)

In [17]:
train_dataset = pd.read_csv('/kaggle/input/russian-poetry-data/results_fin/train.csv')
test_dataset = pd.read_csv('/kaggle/input/russian-poetry-data/results_fin/test.csv')

In [18]:
train_dataset = train_dataset
test_dataset = test_dataset

In [19]:
def get_list_of_verses(dataset):
    verses = []
    for text in dataset['poetry']:
        lines = text.split('\n')
        verse = []
        count = 0
        for line in lines:
            if line.strip():  # Skip empty lines
                verse.append(line)
                count += 1
            if count == MAX_LINES:
                verses.append('\n'.join(verse))
                verse = []
                count = 0
        if verse:
            verses.append('\n'.join(verse))
    verses = [verse.replace('\n', ' ') for verse in verses]
    return verses


def get_vocabulary(dataset):
    poetry_data = dataset['poetry'].values
    all_words = ' '.join(poetry_data).split()
    word_freq = pd.Series(all_words).value_counts().reset_index()
    word_freq.columns = ['word', 'frequency']
    sorted_word_freq = word_freq.sort_values(by='frequency', ascending=False)
    top_words = [word for word in sorted_word_freq['word'] if word in navec][:VOCAB_SIZE]
    top_words.append('<unk>')
    return top_words

def get_word2idx(dataset):
    poetry_data = dataset['poetry'].values
    all_words = list(set(' '.join(poetry_data).split()))
    word2idx = {'<pad>':navec.vocab.words.index('<pad>'), \
                '<unk>': navec.vocab.words.index('<unk>')}
    for w in tqdm(all_words):
        if w in navec:
            word2idx[w] = navec.vocab.words.index(w)
        else:
            word2idx[w] = navec.vocab.words.index('<unk>')
    return word2idx
      
def get_idx2word(vocabulary):
    idx2word = {}
    for w in tqdm(vocabulary):
        idx2word[navec.vocab.words.index(w)] = w
    return idx2word

In [20]:
def batch_generator(dataset, word2idx, vocabulary, navec_path, batch_size=64, shuffle=True):
    navec = Navec.load(navec_path)
    
    verse_dataset = get_list_of_verses(dataset)
  
    X, Y = [], []
    for verse in verse_dataset:
        words = verse.split()[::-1] # Reverse words
        for i in range(len(words) - SEQUENCE_LEN):
            if (all(word in navec for word in words[i:i + SEQUENCE_LEN])
                and all(len(re.findall(r"[a-zA-Z]", word)) == 0 for word in words[i:i + SEQUENCE_LEN])):
                if words[i + SEQUENCE_LEN] in vocabulary:
                    X.append(tuple(words[i:i + SEQUENCE_LEN]))
                    Y.append(words[i + SEQUENCE_LEN])
                    
    # We remove repeating pairs
    unique_pairs = {}
    for x, y in zip(X, Y):
        unique_pairs[(x, y)] = 1
        
    X = [x for x, y in unique_pairs.keys()]
    Y = [y for x, y in unique_pairs.keys()]
          
    n_samples = len(X)

    list_of_indexes = np.linspace(
      0, n_samples - 1, n_samples, dtype=np.int64)
    List_X = []
    List_Y = []
  
    if shuffle:
        np.random.shuffle(list_of_indexes)
        
    for indx in list_of_indexes:
        List_X.append(X[indx])
        List_Y.append(Y[indx])
    
    n_batches = n_samples//batch_size
    if n_samples % batch_size != 0:
        n_batches += 1
        
    for k in range(n_batches):
        this_batch_size = batch_size
        
        if k == n_batches - 1:
            if n_samples % batch_size > 0:
                this_batch_size = n_samples % batch_size
                
        This_X = List_X[k*batch_size:k*batch_size + this_batch_size]
        This_Y = List_Y[k*batch_size:k*batch_size + this_batch_size]
        
        x_arr = np.zeros(shape=[len(This_X), SEQUENCE_LEN], dtype=np.int64)
        y_arr = np.zeros(shape=[len(This_Y)], dtype=np.int64)
        
        for i, sequence in enumerate(This_X):
            for j, word in enumerate(sequence):
                x_arr[i, j] = word2idx.get(word, navec.vocab.words.index('<unk>'))
                
        for i, word in enumerate(This_Y):
            target = idx2target.get(word2idx.get(word, navec.vocab.words.index('<unk>')))
            y_arr[i] = target
        
        x = torch.LongTensor(x_arr)
        y = torch.LongTensor(y_arr)

        yield x, y

In [21]:
def train_on_batch(model, batch_of_x, batch_of_y, optimizer, loss_function):
    model.train()
    model.zero_grad()
    
    output = model(batch_of_x.to(model.device))
    loss = loss_function(output, batch_of_y.to(model.device))
    
    loss.backward()
    optimizer.step()
    
    return loss.cpu().item()

In [22]:
def train_epoch(train_generator,  model, loss_function, optimizer, callback=None):
    epoch_loss = 0
    total = 0
    for it, (batch_of_x, batch_of_y) in enumerate(train_generator):
        local_loss = train_on_batch(
            model, batch_of_x, batch_of_y, optimizer, loss_function)
        train_generator.set_postfix({'train batch loss': local_loss})
        
        if callback is not None:
            callback(model, local_loss)

        epoch_loss += local_loss*len(batch_of_x)
        total += len(batch_of_x)
    
    return epoch_loss/total

In [23]:
def trainer(count_of_epoch, 
            batch_size,
            model,
            train_dataset,
            word2idx,
            loss_function,
            optimizer,
            callback=None):
    
    iterations = tqdm(range(count_of_epoch))
    voc = get_vocabulary(train_dataset)

    for it in iterations:
        optima = optimizer

        number_of_batch = len(train_dataset)//batch_size + (len(train_dataset)%batch_size>0)
        generator = tqdm(
            batch_generator(train_dataset, word2idx, voc, navec_path, batch_size), 
            leave=False, total=number_of_batch)
        
        epoch_loss = train_epoch(
            train_generator = generator, model = model, 
            loss_function = loss_function, 
            optimizer = optima, callback = callback)
        
        test_loss = test(
            batch_generator(test_dataset, word2idx, voc, navec_path, batch_size),
                            model, loss_function)

        iterations.set_postfix({'train epoch loss': epoch_loss})
        #print("Test loss: ", test_loss)
        
    

In [26]:
class LSTMModel(torch.nn.Module):
    @property
    def device(self):
        return next(self.parameters()).device
    
    def __init__(self,
                 hidden_dim,
                 num_layers,
                 output_dim,
                 bidirectional = False,
                 dropout=0.2):
        super(LSTMModel, self).__init__()
        navec_embedding = NavecEmbedding(navec)
        
        self.num_direction = int(bidirectional + 1)
        self.emb_dim = 300 # navec embeddings
        self.hidden_dim = hidden_dim
        
        self.embedding = navec_embedding
        for param in navec_embedding.parameters():
            param.requires_grad = False

        
        self.lstm = torch.nn.LSTM(
                    self.emb_dim, hidden_dim, num_layers, dropout=dropout)
        
        self.linear = torch.nn.Linear(
                     hidden_dim, output_dim)
        self.relu = torch.nn.ReLU()
        
    def forward(self, input):
        input = self.embedding(input)
        input = torch.transpose(input, 0, 1)
        d, _ = self.lstm(input)
        answers = self.relu(self.linear(d[-1, :, :])) # Select the last timestep's output
        answers.unsqueeze(1)
        return answers

In [25]:
word2idx = get_word2idx(train_dataset)
vocabulary = get_vocabulary(train_dataset)
idx2word = get_idx2word(vocabulary)

idx2target = {word2idx[w]: i for i, w in enumerate(vocabulary)}
target2idx = {i: word2idx[w] for i, w in enumerate(vocabulary)}

  0%|          | 0/137049 [00:00<?, ?it/s]

  0%|          | 0/50001 [00:00<?, ?it/s]

In [27]:
from torch.utils.tensorboard import SummaryWriter
from torchmetrics.functional import precision

class callback:
    def __init__(self, writer, test_dataset, word2idx, voc, navec_path, loss_function, delimiter=18, batch_size=1024):
        self.step = 0
        self.writer = writer
        self.test_dataset = test_dataset
        self.word2idx = word2idx
        self.delimiter = delimiter
        self.loss_function = loss_function
        self.batch_size = batch_size
        self.voc = voc
        self.navec_path = navec_path

    def forward(self, model, loss):
        self.step += 1
        prec = 0
        self.writer.add_scalar('lstm_final/train_loss', loss, self.step)

        if self.step % self.delimiter == 0:
            test_loss = 0
            prec = 0
            with torch.no_grad():
                for batch_of_x, batch_of_y in batch_generator(self.test_dataset, self.word2idx, self.voc, self.navec_path, self.batch_size):
                    x_batch = batch_of_x.to(model.device)
                    y_batch = batch_of_y.to(model.device)
                    output = model(x_batch)
                    test_loss += self.loss_function(output, y_batch).cpu().item() * len(x_batch)
                    #print(output.shape, y_batch.shape)
                    prec +=  precision(output, y_batch, num_classes=len(self.voc), task='multiclass')
                    test_loss /= len(self.test_dataset)
                    prec /= len(self.test_dataset)


                    self.writer.add_scalar('lstm_final/test_loss', test_loss, self.step)
                    self.writer.add_scalar('lstm_final/test_precision', prec, self.step)
          
    def __call__(self, model, loss):
        return self.forward(model, loss)

In [28]:
model = LSTMModel(output_dim=len(idx2word), num_layers=2, hidden_dim=300)

optimizer = torch.optim.Adam(
    list(model.parameters()), lr=3e-3)
loss_function = torch.nn.CrossEntropyLoss()

  torch.from_numpy(navec.pq.indexes),


In [29]:
writer = SummaryWriter(log_dir = 'lstm_final/model')
call = callback(writer, test_dataset, word2idx, get_vocabulary(train_dataset), 
                navec_path, loss_function, delimiter = 18)

In [None]:
trainer(count_of_epoch = 3,
        batch_size = 2048,
        model = model,
        train_dataset = train_dataset, 
        word2idx = word2idx,
        loss_function = loss_function,
        optimizer = optimizer,
        callback = call)

In [434]:
class ModelContainer:
    def __init__(self, model):
        self.model = model
    
    def get_model(self, list_values):
        
        while len(list_values) < SEQUENCE_LEN:
            random_number = np.random.randint(0, VOCAB_SIZE - 1)
            list_values.append(random_number)
            
        list_values = list_values[:SEQUENCE_LEN]
            
        input_tensor = torch.tensor([list_values])
        input_tensor.to(device)

        d = model(input_tensor)
        with torch.no_grad():
             return d.numpy()[0]

In [435]:
model_container = ModelContainer(model)
model_container.get_model([])

array([9.084438, 8.343832, 0.      , ..., 0.      , 0.      , 0.      ],
      dtype=float32)

# Бим path и фильтры

In [510]:
model.load_state_dict(torch.load('/kaggle/input/lstmmodel/model.pth'))

<All keys matched successfully>

## Рифма

In [31]:
!git clone https://github.com/sunn1t/Different-NLP-models-for-Russian-poetry-generation

Cloning into 'Different-NLP-models-for-Russian-poetry-generation'...
remote: Enumerating objects: 42, done.[K
remote: Counting objects: 100% (42/42), done.[K
remote: Compressing objects: 100% (35/35), done.[K
remote: Total 42 (delta 7), reused 37 (delta 5), pack-reused 0[K
Unpacking objects: 100% (42/42), 6.53 MiB | 1.58 MiB/s, done.


In [32]:
%cd Different-NLP-models-for-Russian-poetry-generation

/kaggle/working/Different-NLP-models-for-Russian-poetry-generation


In [36]:
!pip install -r requirements.txt

Collecting russ==0.0.2
  Downloading russ-0.0.2-py3-none-any.whl (23 kB)
Collecting dicttoxml==1.7.16
  Downloading dicttoxml-1.7.16-py3-none-any.whl (24 kB)
Collecting jsonpickle==3.0.1
  Downloading jsonpickle-3.0.1-py2.py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.5/40.5 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pygtrie>=2.2
  Downloading pygtrie-2.5.0-py3-none-any.whl (25 kB)
Installing collected packages: pygtrie, jsonpickle, dicttoxml, russ
Successfully installed dicttoxml-1.7.16 jsonpickle-3.0.1 pygtrie-2.5.0 russ-0.0.2
[0m

In [51]:
from src.metre_classifier.stress.dict import StressDict
from src.metre_classifier.stress.predictor import CombinedStressPredictor
from src.metre_classifier.markup.markup import Markup
from src.metre_classifier.markup.markup import Syllable
from src.metre_classifier.metre_classifier import MetreClassifier
from src.metre_classifier.stress.word import Stress, StressedWord
from src.metre_classifier.util.preprocess import get_first_vowel_position

In [77]:
Stress('ворона', stress_predictor.predict('ворона'))

ворона	[1, 3, 5]

In [93]:
{enumerate(Stress(s) for s in stress_predictor.predict(w))}

{<enumerate at 0x7fe61990c540>}

In [240]:
class StressVocabulary():
    def __init__(self, vocabulary, stress_predictor):
        self.word_to_index = {}
        self.index_to_word = {}
        
        for w in vocabulary:
            #stress = Stress(w, stress_predictor.predict(w))
            w_stressed = StressedWord(w, {enumerate(Stress(s) for s in stress_predictor.predict(w))})
            self.word_to_index[w_stressed] = idx2target[word2idx[w]]
            self.index_to_word[idx2target[word2idx[w]]] = w_stressed
    
    def get_word(self, index):
        return self.index_to_word[index]

In [241]:
#stress_predictor = CombinedStressPredictor()
sv = StressVocabulary(vocabulary, stress_predictor)

# Для рифм

In [98]:
VOWELS = "aeiouAEIOUаоэиуыеёюяАОЭИУЫЕЁЮЯ"
CLOSED_SYLLABLE_CHARS = "рлймнРЛЙМН"

In [222]:
from typing import List, Set
import copy

class Graphemes:
    @staticmethod
    def get_syllables(word: str) -> List[Syllable]:
        """
        Разделение слова на слоги.
        :param word: слово для разбивки на слоги.
        :return syllables: массив слогов слова.
        """
        syllables = []
        begin = 0
        number = 0

        # В случае наличия дефиса разбиваем слова на подслова, находим слоги в них, объединяем.
        if "-" in word:
            word_parts = word.split("-")
            word_syllables = []
            last_part_end = 0
            for part in word_parts:
                part_syllables = Graphemes.get_syllables(part)
                if len(part_syllables) == 0:
                    continue
                for i in range(len(part_syllables)):
                    part_syllables[i].begin += last_part_end
                    part_syllables[i].end += last_part_end
                    part_syllables[i].number += len(word_syllables)
                word_syllables += part_syllables
                last_part_end = part_syllables[-1].end + 1
            return word_syllables

        # Для слов или подслов, в которых нет дефиса.
        for i, ch in enumerate(word):
            if ch not in VOWELS:
                continue
            if i + 1 < len(word) - 1 and word[i + 1] in CLOSED_SYLLABLE_CHARS:
                if i + 2 < len(word) - 1 and word[i + 2] in "ьЬ":
                    # Если после сонорного согласного идёт мягкий знак, заканчиваем на нём. ("бань-ка")
                    end = i + 3
                elif i + 2 < len(word) - 1 and word[i + 2] not in VOWELS and \
                        (word[i + 2] not in CLOSED_SYLLABLE_CHARS or word[i + 1] == "й"):
                    # Если после сонорного согласного не идёт гласная или другой сонорный согласный,
                    # слог закрывается на этом согласном. ("май-ка")
                    end = i + 2
                else:
                    # Несмотря на наличие закрывающего согласного, заканчиваем на гласной.
                    # ("со-ло", "да-нный", "пол-ный")
                    end = i + 1
            else:
                # Если после гласной идёт не закрывающая согласная, заканчиваем на гласной. ("ко-гда")
                end = i + 1
            syllables.append(Syllable(begin, end, number, word[begin:end]))
            number += 1
            begin = end
        if get_first_vowel_position(word) != -1:
            # Добиваем последний слог до конца слова.
            syllables[-1] = Syllable(syllables[-1].begin, len(word), syllables[-1].number,
                                     word[syllables[-1].begin:len(word)])
        return syllables

# Рифмы

In [102]:
from collections import defaultdict
from typing import List

# -*- coding: utf-8 -*-
# Автор: Гусев Илья
# Описание: Класс рифм.


class RhymeProfile:
    def __init__(self, syllable_count: int, stressed_syllable_number: int,
                 stressed_syllable_text: str, next_syllable_text: str, next_char: str):
        self.syllable_count = syllable_count
        self.stressed_syllable_number = stressed_syllable_number
        self.stressed_syllable_text = stressed_syllable_text
        self.next_syllable_text = next_syllable_text
        self.next_char = next_char

    def __str__(self):
        return "Syllable count: {}; Stressed syllable: {}; " \
               "Stressed syllable text: {}; Next syllable: {}; " \
               "Next char: {}".format(self.syllable_count, self.stressed_syllable_number,
                                      self.stressed_syllable_text, self.next_syllable_text, self.next_char)

    def __repr__(self):
        return self.__str__()


class Rhymes(object):
    @staticmethod
    def is_rhyme(word1: StressedWord, word2: StressedWord, score_border: int=4, syllable_number_border: int=4) -> bool:
        profile1 = Rhymes.__get_rhyme_profile(word1)
        profile2 = Rhymes.__get_rhyme_profile(word2)
        score = 0
        for i, ch1 in enumerate(profile1.stressed_syllable_text):
            for j, ch2 in enumerate(profile2.stressed_syllable_text[i:]):
                if ch1 != ch2:
                    continue
                if ch1 in VOWELS:
                    score += 3
                else:
                    score += 1
        if profile1.next_syllable_text == profile2.next_syllable_text and profile1.next_syllable_text != '':
            score += 3
        elif profile1.next_char == profile2.next_char and profile1.next_char != '':
            score += 1
        return (profile1.stressed_syllable_number == profile2.stressed_syllable_number and
                profile1.syllable_count == profile2.syllable_count and
                profile1.stressed_syllable_number <= syllable_number_border and
                score >= score_border)

    @staticmethod
    def __get_rhyme_profile(word: StressedWord) -> 'RhymeProfile':
        profile = RhymeProfile(syllable_count=0,
                               stressed_syllable_number=-1,
                               stressed_syllable_text="",
                               next_syllable_text="",
                               next_char="")
        syllables = list(word.syllables)
        profile.syllable_count = len(syllables)
        for i, syllable in enumerate(reversed(syllables)):
            if syllable.stress == -1:
                continue
            profile.stressed_syllable_text = syllable.text
            profile.stressed_syllable_number = -i-1
            if i != 0:
                profile.next_syllable = syllables[-i].text
            if syllable.stress + 1 < len(word.text):
                profile.next_char = word.text[syllable.stress + 1]
            break
        return profile

## Фильтры

In [212]:
from collections import defaultdict
from typing import List

import numpy as np

In [213]:
class Filter(object):
    def filter_model(self, model: np.array, vocabulary: StressVocabulary) -> np.array:
        for i in range(len(model)):
            if not self.filter_word(vocabulary.get_word(i)):
                model[i] = 0.0
        return model

    def filter_words(self, words: List[StressedWord]) -> List[StressedWord]:
        return [word for word in words if self.filter_word(word)]

In [231]:
class MetreFilter(Filter):
    def __init__(self, metre_pattern: str):
        self.metre_pattern = metre_pattern
        self.position = len(metre_pattern) - 1

    def filter_word(self, word: StressedWord) -> bool:
        syllables = word.syllables
        syllables_count = len(syllables)
        if syllables_count == 0:
            return False
        if syllables_count > self.position + 1:
            return False
        for i in range(syllables_count):
            syllable = syllables[i]
            syllable_number = self.position - syllables_count + i + 1
            if syllables_count >= 2 and syllable.stress == -1 and self.metre_pattern[syllable_number] == "+":
                for j in range(syllables_count):
                    other_syllable = syllables[j]
                    other_syllable_number = other_syllable.number - syllable.number + syllable_number
                    if i != j and other_syllable.stress != -1 and self.metre_pattern[other_syllable_number] == "-":
                        return False
        return True
    def pass_word(self, word: StressedWord) -> None:
        self.position -= len(word.syllables)

    def revert_word(self, word: StressedWord) -> None:
        self.position += len(word.syllables)

    def reset(self) -> None:

        self.position = len(self.metre_pattern) - 1

    def is_completed(self):

        return self.position < 0

In [232]:
class RhymeFilter(Filter):
    """
    Фильтр по шаблону рифмы.
    """
    def __init__(self, rhyme_pattern: str, letters_to_rhymes: dict=None,
                 score_border=4):
        self.rhyme_pattern = rhyme_pattern
        self.position = len(self.rhyme_pattern) - 1
        self.letters_to_rhymes = defaultdict(set)
        self.score_border = score_border
        if letters_to_rhymes is not None:
            for letter, words in letters_to_rhymes.items():
                for word in words:
                    self.letters_to_rhymes[letter].add(word)

    def filter_word(self, word: StressedWord) -> bool:
        """
        Фильтрация слова по рифме в текущей позиции.
        :param word: слово.
        :return: подходит слово или нет.
        """
        if len(word.syllables) <= 1:
            return False
        if len(self.letters_to_rhymes[self.rhyme_pattern[self.position]]) == 0:
            return True
        first_word = list(self.letters_to_rhymes[self.rhyme_pattern[self.position]])[0]

        is_rhyme = Rhymes.is_rhyme(first_word, word, score_border=self.score_border, syllable_number_border=2 ) and \
            first_word.text != word.text
        return is_rhyme
    
    def pass_word(self, word: StressedWord) -> None:
        """
        Сдвинуть позицию в шаблоне рифмы на строчку.
        :param word: рифмующееся слово.
        """
        self.letters_to_rhymes[self.rhyme_pattern[self.position]].add(word)
        self.position -= 1

    def revert_word(self, word: StressedWord) -> None:
        """
        Сдвинуть позицию в шаблоне рифмы на строчку назад.
        :param word: рифмующееся слово.
        """
        self.position += 1
        self.letters_to_rhymes[self.rhyme_pattern[self.position]].remove(word)

    def is_completed(self):
        """
        :return: закончена ли генерация по фильтру?
        """
        return self.position < 0

    def reset(self) -> None:
        """
        Сброс позиции в шаблоне.
        """
        self.position = len(self.rhyme_pattern) - 1

In [511]:
class BeamPath(object):
    def __init__(self, indices: List[int], metre_filter: MetreFilter, rhyme_filter: RhymeFilter,
                 probability: float, line_ends: List[int]):
        self.indices = indices  # type: List[int]
        self.metre_filter = metre_filter  # type: MetreFilter
        self.rhyme_filter = rhyme_filter  # type: RhymeFilter
        self.probability = probability  # type: float
        self.line_ends = line_ends  # type: List[int]

    def put_line_end(self):
        self.line_ends.append(len(self.indices))

    def get_words(self, vocabulary: StressVocabulary) -> List[str]:
        return [vocabulary.get_word(word_index).text.lower() for word_index in self.indices]

    def get_poem(self, vocabulary: StressVocabulary) -> str:
        words = self.get_words(vocabulary)
        prev_end = 1
        lines = []
        for end in self.line_ends:
            line = " ".join(list(reversed(words[prev_end:end]))).capitalize()
            prev_end = end
            lines.append(line)
        return "\n".join(list(reversed(lines))) + "\n"

    def get_current_model(self, model_container: ModelContainer, vocabulary: StressVocabulary, use_rhyme: bool=False) -> np.array:
        model = model_container.get_model(self.indices)
        model = self.metre_filter.filter_model(model, vocabulary)
        if use_rhyme:
            model = self.rhyme_filter.filter_model(model, vocabulary)
        return model

    def is_empty(self) -> bool:
        return len(self.indices) == 0

    def __str__(self):
        return str(self.metre_filter.position) + " " + str(self.rhyme_filter.position) + " " + \
               str(self.probability) + " " + str(self.indices) + " " + str(self.line_ends)

    def __repr__(self):
        return self.__str__()

In [512]:
from numpy.random import choice

class Generator(object):

    def __init__(self, model_container: ModelContainer, vocabulary: StressVocabulary,):

        self.model_container = model_container  # type: ModelContainer
        self.vocabulary = vocabulary  # type: StressVocabulary

    def generate_poem(self, metre_schema: str="+-", rhyme_pattern: str="aabb", n_syllables: int=8,
                      letters_to_rhymes: dict=None, beam_width: int=4, rhyme_score_border: int=4) -> str:

        metre_pattern = ""
        while len(metre_pattern) <= n_syllables:
            metre_pattern += metre_schema
        metre_pattern = metre_pattern[:n_syllables]
        metre_filter = MetreFilter(metre_pattern)
        rhyme_filter = RhymeFilter(rhyme_pattern, letters_to_rhymes, score_border=rhyme_score_border)

        result_paths = []
        indices = []
        
        empty_path = BeamPath(indices, metre_filter, rhyme_filter, 1.0, [])
        paths = [empty_path]
        while len(paths) != 0:
            paths = self.__top_paths(paths, beam_width)
            for path in paths:
                #print(paths)
                result_paths.append(path)
            #for path in copy.deepcopy(paths):
            for path in paths:
                paths.pop(0)
                paths += self.generate_line_beam(path, beam_width)
            paths, to_result = self.__filter_path_by_rhyme(paths)
            result_paths += to_result
        if len(result_paths) == 0:
            return None
        best_path = self.__top_paths(result_paths, 1)[0]
        return best_path.get_poem(self.vocabulary)

    def generate_line_beam(self, path, beam_width=5):

        path.metre_filter.reset()
        paths = self.generate_paths(path, beam_width, use_rhyme=True)
        result_paths = []
        while len(paths) != 0:
            paths = self.__top_paths(paths, beam_width)
            for i, path in enumerate(copy.copy(paths)):
                new_paths = self.generate_paths(path, beam_width, use_rhyme=False)
                paths.pop(0)
                paths += new_paths
            paths, to_result = self.__filter_path_by_metre(paths)
            result_paths += to_result
        result_paths = self.__top_paths(result_paths, beam_width)
        for i in range(len(result_paths)):
            result_paths[i].put_line_end()
        return result_paths

    def generate_paths(self, path: BeamPath, beam_width: int=10, use_rhyme: bool=False):
        

        model = path.get_current_model(self.model_container, self.vocabulary, use_rhyme)
        if np.sum(model) == 0.0:
            return []
        if len(path.indices) != 0:
            new_indices = Generator.__choose(model, beam_width)
        else:
            new_indices = Generator.__choose_uniform(self.vocabulary.size(), beam_width)
        new_paths = []
        for index in new_indices:
            word = self.vocabulary.get_word(index)
            word_probability = model[index]
            metre_filter = copy.copy(path.metre_filter)
            metre_filter.pass_word(word)
            rhyme_filter = copy.copy(path.rhyme_filter)
            if use_rhyme:
                #rhyme_filter.letters_to_rhymes = copy.deepcopy(path.rhyme_filter.letters_to_rhymes)
                #rhyme_filter.letters_to_rhymes = path.rhyme_filter.letters_to_rhymes
                rhyme_filter.pass_word(word)
            new_paths.append(BeamPath(path.indices+[index], metre_filter, rhyme_filter,
                                      path.probability * word_probability, copy.copy(path.line_ends)))
        return new_paths

    @staticmethod
    def __top_paths(paths, n):
        if len(paths) <= n:
            return paths
        max_indices = np.array([p.probability for p in paths]).argsort()[-n:][::-1]
        max_paths = [path for i, path in enumerate(paths) if i in max_indices]
        return max_paths

    @staticmethod
    def __filter_path_by_metre(paths):
        result_paths = [path for path in paths if path.metre_filter.position == -1]
        ok_paths = [path for path in paths if path.metre_filter.position > -1]
        return ok_paths, result_paths

    @staticmethod
    def __filter_path_by_rhyme(paths):
        result_paths = [path for path in paths if path.rhyme_filter.position == -1]
        ok_paths = [path for path in paths if path.rhyme_filter.position > -1]
        return ok_paths, result_paths

    @staticmethod
    def __choose_uniform(size: int, n: int = 1):
        return [np.random.randint(1, size) for _ in range(n)]

    @staticmethod
    def __choose(model: np.array, n: int=1):
        
        norm_model = model / np.sum(model)
        try:
            return choice(range(len(norm_model)), n, p=norm_model, replace=False)
        except ValueError:
            return choice(range(len(norm_model)), n, p=norm_model, replace=True)

    @staticmethod
    def __top(model: np.array, n: int=1):
        return [i for i in model.argsort()[-n:][::-1] if model[i] != 0.0]

In [513]:
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abba", n_syllables=8,
                      letters_to_rhymes=None, beam_width=10, rhyme_score_border=4)
print(poem)

И и и но и и меня
И мы так и и где меня шкафу внутри сердцам повинность



In [514]:
np.random.seed(99)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abab", n_syllables=8,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=4)
print(poem)

И и мы и что и давно
Как и и но и мы меня ругу полочке шалью сокрыла



In [515]:
np.random.seed(98)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abab", n_syllables=6,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=4)
print(poem)

И и мы ты давно
И и мы и давно полного окошком наконец свернутый



In [516]:
np.random.seed(97)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abab", n_syllables=6,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=4)
print(poem)

И и как и будто
Ну и и и давно козырной зловещая трюме этажа



In [517]:
np.random.seed(95)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="aa", n_syllables=6,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=4)
print(poem)

И и и но будто опахала портить означен монеты



In [518]:
np.random.seed(95)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abba", n_syllables=6,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=4)
print(poem)

И и и и будто
И и мне и давно опахала портить означен монеты



In [520]:
np.random.seed(93)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abba", n_syllables=6,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=4)
print(poem)

И и и что будто
И но и и только лихорадкой помчались мокрому являл



In [522]:
np.random.seed(11)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abba", n_syllables=8,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=2)
print(poem)

И и и и что и давно
Я и и и я и тебя дерзнула отдаст работай скверны



In [524]:
np.random.seed(22)
gen = Generator(model_container, sv)
poem = gen.generate_poem(metre_schema="+-", rhyme_pattern="abba", n_syllables=8,
                      letters_to_rhymes=None, beam_width=20, rhyme_score_border=2)
print(poem)

И и что но и я давно
И я и и был и меня грозны соседство приветливых действовать



In [508]:
type(model)

__main__.LSTMModel