In [1]:
import numpy as np
import keras
from keras import layers
import re
from keras.backend import dot

# Обработка данных

In [2]:
def preprocess(line: str) -> str:
    words = list(filter(len, line.strip().lower().split()))
    for i, word in enumerate(words):
        word = re.sub('[?!]', '.', word)
        words[i] = re.sub('[^a-z.]', '', word)
    return ' '.join(words)

def read_preprocess(filename: str) -> list[str]:
    text = ''
    for line in open(filename):
        text += preprocess(line) + '.'

    sentences = map(str.strip, text.split('.'))
    sentences = filter(len, sentences)
    sentences = list(map(lambda s: s + '.', sentences))
    return sentences


data = read_preprocess('/Users/alv.popov/study/ml/data/lab7/book.txt')
data[:3]

['to sherlock holmes she is always the woman.',
 'i have seldom heard him mention her under any other name.',
 'in his eyes she eclipses and predominates the whole of her sex.']

In [3]:
max_len = max(map(len, data))
max_len

547

In [4]:
all_symbols = set()
for line in data:
    all_symbols.update(line)

all_symbols = list(all_symbols)
len(all_symbols)

28

In [5]:
sym2one_code = dict()
for i, s in enumerate(all_symbols):
    cur = np.zeros(len(all_symbols), dtype='int')
    cur[i] = 1
    sym2one_code[s] = cur


def encode_text(text: str) -> np.ndarray:
    res = []
    for sym in text:
        res.append(sym2one_code[sym])
    return np.array(res)


data_one_hot = list(map(encode_text, data))
len(data[0]), data_one_hot[0].shape, data[0], data_one_hot[0]

(43,
 (43, 28),
 'to sherlock holmes she is always the woman.',
 array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0]]))

In [6]:
def xy_split(sentences):
    x, y = [], []
    l = 10
    for sentence in sentences:
        if len(sentence) <= l:  # не обрабатываем маленькие последовательности
            continue

        for i in range(len(sentence) - l):
            x.append(sentence[i:i+l])
            y.append(sentence[i+l])
    return np.array(x), np.array(y)

x, y = xy_split(data_one_hot)
print(x.shape, y.shape)

(464340, 10, 28) (464340, 28)


In [11]:
import keras.backend as backend
from keras import initializers
from keras.activations import tanh, sigmoid
import tensorflow as tf
from keras.layers.rnn.rnn_utils import generate_zero_filled_state_for_cell

class RNNCell(keras.layers.Layer):

    def __init__(self, units, **kwargs):
        self.units = units
        super(RNNCell, self).__init__(**kwargs)

    def build(self, input_shape):
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units * 4),
            name='kernel',
            initializer=initializers.get('glorot_uniform')
        )
        self.recurrent_kernel = self.add_weight(
            shape=(self.units, self.units * 4),
            name='recurrent_kernel',
            initializer=initializers.get('orthogonal')
        )

        def bias_initializer(_, *args, **kwargs):
            return backend.concatenate([
                initializers.get('zeros')((self.units,), *args, **kwargs),
                initializers.get('ones')((self.units,), *args, **kwargs),
                initializers.get('zeros')((self.units * 2,), *args, **kwargs),
            ])
        self.bias = self.add_weight(
          shape=(self.units * 4,),
          name='bias',
          initializer=bias_initializer,
        )

        self.built = True

    def call(self, inputs, states):
        h1, c1 = states  # previous memory and carry state

        k_i, k_f, k_c, k_o = tf.split(self.kernel, num_or_size_splits=4, axis=1)
        x_i = backend.dot(inputs, k_i)
        x_f = backend.dot(inputs, k_f)
        x_c = backend.dot(inputs, k_c)
        x_o = backend.dot(inputs, k_o)

        b_i, b_f, b_c, b_o = tf.split(self.bias, num_or_size_splits=4, axis=0)
        x_i = backend.bias_add(x_i, b_i)
        x_f = backend.bias_add(x_f, b_f)
        x_c = backend.bias_add(x_c, b_c)
        x_o = backend.bias_add(x_o, b_o)

        i = sigmoid(x_i + backend.dot(h1, self.recurrent_kernel[:, :self.units]))
        f = sigmoid(x_f + backend.dot(h1, self.recurrent_kernel[:, self.units:self.units * 2]))
        o = sigmoid(x_o + backend.dot(h1, self.recurrent_kernel[:, self.units * 3:]))
        cs = tanh(x_c + backend.dot(h1, self.recurrent_kernel[:, self.units * 2:self.units * 3]))

        c = f * c1 + i * cs
        h = o * tanh(c)
        return h, [h, c]

    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        print(inputs, batch_size, dtype)
        return list(generate_zero_filled_state_for_cell(self, inputs, batch_size, dtype))


lstm_model = keras.Sequential([
    keras.Input((None, x.shape[-1])),
    keras.layers.RNN(RNNCell(128)),
    keras.layers.Dropout(0.3),
    layers.Dense(len(all_symbols), activation='softmax'),
])

lstm_model.compile(loss='categorical_crossentropy', optimizer='adam')

lstm_model.fit(x, y, epochs=10)

OperatorNotAllowedInGraphError: Exception encountered when calling layer "rnn_4" (type RNN).

Iterating over a symbolic `tf.Tensor` is not allowed in Graph execution. Use Eager execution or decorate this function with @tf.function.

Call arguments received by layer "rnn_4" (type RNN):
  • inputs=tf.Tensor(shape=(None, None, 28), dtype=float32)
  • mask=None
  • training=None
  • initial_state=None
  • constants=None

In [None]:
def finish_sentence(model, prefix: str) -> str:
    result_string = prefix
    encoded_suffix = encode_text(preprocess(prefix)).tolist()

    for _ in range(max_len):
        prediction = model.predict(np.array([encoded_suffix]))
        ind = np.random.choice(range(len(prediction[0])), p=prediction[0])
        new_sym = all_symbols[ind]
        result_string += new_sym
        if new_sym == '.':
            break
        encoded_suffix = encoded_suffix[1:]
        encoded_suffix.append(sym2one_code[new_sym])

    if result_string[-1] != '.':
        result_string += '.'

    return result_string

In [None]:
finish_sentence(lstm_model, 'Do you note ')

In [None]:
finish_sentence(lstm_model, 'Twice burglars in my pay ransacked her house')

In [None]:
finish_sentence(lstm_model, 'hello mister')

# Марковская цепь

In [None]:
class Markov:
    def __init__(self, texts: list[str], n: int = 10):
        edges = dict()
        for text in texts:
            for i in range(len(text) - n - 1):
                cur = text[i : i + n]
                next = text[i + 1 : i + n + 1]
                edges.setdefault(cur, dict())
                edges[cur].setdefault(next, 0)
                edges[cur][next] += 1

        for k1, v in edges.items():
            s = sum(v.values())
            for k2 in v.keys():
                edges[k1][k2] /= s

        self.p = edges
        self.n = n

    def predict(self, line: str) -> str:

        for _ in range(max_len):
            if line[-1] == '.':
                break

            suffix = line[-self.n:]
            if suffix not in self.p:
                prediction = '.'
            else:
                prediction = max(self.p[suffix], key=self.p[suffix].get)
            line += prediction[-1]

        if line[-1] != '.':
            line += '.'
        return line

model = Markov(data)

In [None]:
model.predict('Do you note ')

In [None]:
model.predict('Twice burglars in my pay ransacked her house')

In [None]:
model.predict('hello mister')