In [None]:
import tensorflow as tf
import numpy as np
import scipy
from scipy import sparse
from scipy.sparse import linalg

In [None]:
class lstm_cell:

    def __init__(self, sess, input_dim, hidden_dim, learn_rate, optimizer='adagrad'):
        self.sess = sess
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        
        #self.ct_state = np.random.normal(size=[1, seq_dim])
        #self.ht_state = np.random.normal(size=[1, seq_dim])
        
        # Начальное скрытое состояние ячейки.
        self.ct_state = np.zeros([1, hidden_dim])
        self.ht_state = np.zeros([1, hidden_dim])
        
        # Пара значений вход x и желаемый выход y.
        self.x = tf.placeholder(tf.float32, [1, input_dim])
        self.y = tf.placeholder(tf.float32, [1, input_dim])
        
        # Состояние сети. ht выходное значение, ct ячейка памяти.
        self.ct = tf.placeholder(tf.float32, [1, hidden_dim])
        self.ht = tf.placeholder(tf.float32, [1, hidden_dim])
        
        self.ft_wx,  self.ft_wh,  self.ft_b  = self.three() # Фильтр забывания (forget gate).
        self.it_wx,  self.it_wh,  self.it_b  = self.three() # Фильтр входа (input gate).
        self.ctt_wx, self.ctt_wh, self.ctt_b = self.three() # Фильтр кандидатов (candidate gate).
        self.ot_wx,  self.ot_wh,  self.ot_b  = self.three() # Фильтр выхода (output gate).
        
        self.ft = tf.math.sigmoid(tf.matmul(self.x, self.ft_wx) + tf.matmul(self.ht, self.ft_wh) + self.ft_b)
        self.it = tf.math.sigmoid(tf.matmul(self.x, self.it_wx) + tf.matmul(self.ht, self.it_wh) + self.it_b)
        self.ctt = tf.math.tanh(tf.matmul(self.x, self.ctt_wx) + tf.matmul(self.ht, self.ctt_wh) + self.ctt_b)
        self.ot = tf.math.sigmoid(tf.matmul(self.x, self.ot_wx) + tf.matmul(self.ht, self.ot_wh) + self.ot_b)
        
        self.ct_out = tf.math.multiply(self.ft, self.ct) + tf.math.multiply(self.it, self.ctt)
        self.ht_out = tf.math.multiply(self.ot, tf.math.tanh(self.ct_out))
        
        self.out_w = tf.Variable(tf.random.normal([hidden_dim, input_dim]), dtype=tf.float32)
        self.out_b = tf.Variable(tf.zeros([1, input_dim]), dtype=tf.float32)
        self.out = tf.matmul(self.ht_out, self.out_w) + self.out_b
        
        self.cost = tf.reduce_mean(tf.square(self.out - self.y))
        
        if optimizer == 'adagrad':
            self.train_op = tf.train.AdagradOptimizer(learn_rate).minimize(self.cost)
        elif optimizer == 'adam':
            self.train_op = tf.train.AdamOptimizer(learn_rate).minimize(self.cost)
        elif optimizer == 'gradientdescent':
            self.train_op = tf.train.GradientDescentOptimizer(learn_rate).minimize(self.cost)
        elif optimizer == 'adadelta':
            self.train_op = tf.train.AdadeltaOptimizer(learn_rate).minimize(self.cost)
        
        self.sess.run(tf.global_variables_initializer())
    
    # Метод производит сохранение полного состояния ячейки включая матрицы весов.
    def save_all(self):
        
        self.saved_ct_state = self.ct_state
        self.saved_ht_state = self.ht_state
        
        self.saved_ft_wx = self.sess.run(self.ft_wx)
        self.saved_ft_wh = self.sess.run(self.ft_wh)
        
        self.saved_it_wx = self.sess.run(self.it_wx)
        self.saved_it_wh = self.sess.run(self.it_wh)
        
        self.saved_ctt_wx = self.sess.run(self.ctt_wx)
        self.saved_ctt_wh = self.sess.run(self.ctt_wh)
        
        self.saved_ot_wx = self.sess.run(self.ot_wx)
        self.saved_ot_wh = self.sess.run(self.ot_wh)
    
    # Метод производит восстановление ранее сохранённого состояния ячейки включая матрицы весов.
    def restore_all(self):
        
        self.ct_state = self.saved_ct_state
        self.ht_state = self.saved_ht_state
        
        self.sess.run(tf.assign(self.ft_wx, self.saved_ft_wx))
        self.sess.run(tf.assign(self.ft_wh, self.saved_ft_wh))
        
        self.sess.run(tf.assign(self.it_wx, self.saved_it_wx))
        self.sess.run(tf.assign(self.it_wh, self.saved_it_wh))
        
        self.sess.run(tf.assign(self.ctt_wx, self.saved_ctt_wx))
        self.sess.run(tf.assign(self.ctt_wh, self.saved_ctt_wh))
        
        self.sess.run(tf.assign(self.ot_wx, self.saved_ot_wx))
        self.sess.run(tf.assign(self.ot_wh, self.saved_ot_wh))
        
    # Метод производит SVD сжатие слоёв ячейки. Можно установить различные факторы сжатия
    # для входного и скрытого слоёв.
    def svd_compress(self, factor_input, factor_hidden):

        # Делаем SVD разложение для каждого слоя.
        ft_wx_compr, ft_wx_norm, _, ft_wx_size = self.svd_compress_one(self.ft_wx, factor_input)
        ft_wh_compr, ft_wh_norm, _, ft_wh_size = self.svd_compress_one(self.ft_wh, factor_hidden)
        
        it_wx_compr, it_wx_norm, _, it_wx_size = self.svd_compress_one(self.it_wx, factor_input)
        it_wh_compr, it_wh_norm, _, it_wh_size = self.svd_compress_one(self.it_wh, factor_hidden)
        
        ctt_wx_compr, ctt_wx_norm, _, ctt_wx_size = self.svd_compress_one(self.ctt_wx, factor_input)
        ctt_wh_compr, ctt_wh_norm, _, ctt_wh_size = self.svd_compress_one(self.ctt_wh, factor_hidden)
        
        ot_wx_compr, ot_wx_norm, _, ot_wx_size = self.svd_compress_one(self.ot_wx, factor_input)
        ot_wh_compr, ot_wh_norm, _, ot_wh_size = self.svd_compress_one(self.ot_wh, factor_hidden)
        
        # Заменяем весовые коэффициенты данными после SVD разложения.
        self.sess.run(tf.assign(self.ft_wx, ft_wx_compr))
        self.sess.run(tf.assign(self.ft_wh, ft_wh_compr))
        
        self.sess.run(tf.assign(self.it_wx, it_wx_compr))
        self.sess.run(tf.assign(self.it_wh, it_wh_compr))
        
        self.sess.run(tf.assign(self.ctt_wx, ctt_wx_compr))
        self.sess.run(tf.assign(self.ctt_wh, ctt_wh_compr))
        
        self.sess.run(tf.assign(self.ot_wx, ot_wx_compr))
        self.sess.run(tf.assign(self.ot_wh, ot_wh_compr))
        
        # Считаем среднюю норму всех матриц весов после сжатия.
        norm = (ft_wx_norm + ft_wh_norm + it_wx_norm + it_wh_norm + ctt_wx_norm + ctt_wh_norm + ot_wx_norm + ot_wh_norm) / 8
        size = ft_wx_size + ft_wh_size + it_wx_size + it_wh_size + ctt_wx_size + ctt_wh_size + ot_wx_size + ot_wh_size
        
        return norm, size

    # Метод производит сингулярное разложение матрицы mat и оставляет только перых factor элементов матрицы.
    def svd_compress_one(self, mat, factor):
        
        A = self.sess.run(mat)
        #A = np.matrix([[1, 1, 1, 0, 0], [3, 3, 3, 0, 0], [4, 4, 4, 0, 0],
            #[5, 5, 5, 0, 0], [0, 2, 0, 4, 4], [0, 0, 0, 5, 5], [0, 1, 0, 2, 2]], dtype=np.double)
        m, n = A.shape
        
        # Вычисляем сколько сингулярных значений оставить.
        k_top = int(min(m, n) * factor)
        
        U, s, Vt = scipy.sparse.linalg.svds(A, k=k_top)
        #U, s, Vt = np.linalg.svd(A, full_matrices=False)
        S = np.diag(s)
        
        U_m, U_n = U.shape
        S_m, S_n = S.shape
        Vt_m, Vt_n = Vt.shape
        
        # Считаем суммарное количество элементов после разложения.
        size = U_m * U_n + S_m * S_n + Vt_m * Vt_n
        
        # Новая матрица с выкинутой чатью элементов. По размеру она совпадает с исходной матрицей.
        # Тем не менее таким образом симулируется поведение прореживания матрицы.
        B = np.dot(np.dot(U, S), Vt)
        norm = self.frob_norm(A, B)
        
        return B, norm, k_top, size

    # Норма Фробениуса здесь используется в качестве метрики схожести двух матриц.
    # Используется нормализационный множитель 1/n где n количество элементов матрицы.
    def frob_norm(self, A, B):
        
        assert A.shape == B.shape
        m, n = A.shape
        return np.sqrt(np.sum(np.square(A - B)) / (m * n))
    
    def clear_state(self):
        
        self.ct_state = np.zeros([1, self.hidden_dim])
        self.ht_state = np.zeros([1, self.hidden_dim])
    
    def three(self):
        # Для LSTM распространённая практика bias задавать как 1.
        # https://datascience.stackexchange.com/questions/17987/how-should-the-bias-be-initialized-and-regularized
        # Вектор входных значений переводим в скрытое состояние.
        return (tf.Variable(tf.random.normal([input_dim, hidden_dim]), dtype=tf.float32),
            tf.Variable(tf.random.normal([hidden_dim, hidden_dim]), dtype=tf.float32),
            tf.Variable(tf.zeros([1, hidden_dim]), dtype=tf.float32)) # <- попробуем на нулевом смещении
            #tf.Variable(tf.random.normal([1, hidden_dim]), dtype=tf.float32)) # <- пробуем на случайном смещении
            #tf.Variable(tf.ones([1, hidden_dim]), dtype=tf.float32)) # <- пробуем все 1
    
    def train(self, train_x, train_y, repeat):
                    
        for i in range(repeat):
            
            cost, _ = self.sess.run([self.cost, self.train_op],
                feed_dict={self.x:train_x, self.y:train_y, self.ct:self.ct_state, self.ht:self.ht_state})
            
        self.ct_state, self.ht_state = self.sess.run([self.ct_out, self.ht_out],
            feed_dict={self.x:train_x, self.ct:self.ct_state, self.ht:self.ht_state})
            
        return cost
    
    def test(self, test_x):
                
        self.ct_state, self.ht_state, out = self.sess.run([self.ct_out, self.ht_out, self.out],
            feed_dict={self.x:test_x, self.ct:self.ct_state, self.ht:self.ht_state})
        
        return out