## LSTM by Hand

来源: https://www.bilibili.com/video/BV1FV41117Uz/

In [1]:
import tensorflow as tf
import numpy as np
import jieba
import pandas as pd

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
pd.options.display.max_columns = None
pd.options.display.max_colwidth = 80
pd.options.display.precision = 4
pd.options.display.max_rows = 999
pd.options.display.float_format = '{:.4f}'.format  # 防止科学计数法，小数显示4位

In [2]:
jieba.enable_paddle()

Paddle enabled successfully......


In [3]:
batch_size = 4
sequence_length = 5
input_size = 30
output_size = 20

x = tf.random.uniform((batch_size, sequence_length, input_size))

In [4]:
x.shape

TensorShape([4, 5, 30])

In [5]:
# LSTM's input: [batch_size, sequence_length, input_size]
# LSTM's output1: [batch_size, sequence_length, input_size]
#        output2: [batch_size, input_size]


In [6]:
xt = x[:, 0, :]

In [7]:
xt.shape

TensorShape([4, 30])

In [8]:
wf = tf.random.uniform((input_size, output_size))
wi = tf.random.uniform((input_size, output_size))
wo = tf.random.uniform((input_size, output_size))
wc = tf.random.uniform((input_size, output_size))

uf = tf.random.uniform((output_size, output_size))
ui = tf.random.uniform((output_size, output_size))
uo = tf.random.uniform((output_size, output_size))
uc = tf.random.uniform((output_size, output_size))

bf = tf.random.uniform((1, output_size))
bi = tf.random.uniform((1, output_size))
bo = tf.random.uniform((1, output_size))
bc = tf.random.uniform((1, output_size))



In [9]:
sequence_outputs = []
for i in range(sequence_length):

    if i == 0:
        xt = x[:, 0, :]
        ft = tf.sigmoid(tf.matmul(xt, wf) + bf)
        it = tf.sigmoid(tf.matmul(xt, wi) + bi)
        ot = tf.sigmoid(tf.matmul(xt, wo) + bo)
        cht = tf.tanh(tf.matmul(xt, wc) + bc)

        ct = it * cht
        ht = ot * tf.tanh(ct)
    
    else:
        xt = x[:, 0, :]
        ft = tf.sigmoid(tf.matmul(xt, wf) + bf)
        it = tf.sigmoid(tf.matmul(xt, wi) + bi)
        ot = tf.sigmoid(tf.matmul(xt, wo) + bo)
        cht = tf.tanh(tf.matmul(xt, wc) + bc)

        ct = ft * ct + it * cht
        ht = ot * tf.tanh(ct)
    
    sequence_outputs.append(ht)

InternalError: Blas GEMM launch failed : a.shape=(4, 30), b.shape=(30, 20), m=4, n=20, k=30 [Op:MatMul]

In [None]:
sequence_outputs = tf.stack(sequence_outputs)
sequence_outputs = tf.transpose(sequence_outputs, (1, 0, 2))

In [None]:
sequence_outputs

In [None]:
class CustomLSTM(tf.keras.layers.Layer):
    
    """
    LSTM's input: [batch_size, sequence_length, input_size]
    LSTM's output1: [batch_size, sequence_length, input_size]
           output2: [batch_size, input_size]
    """
    
    def __init__(self, output_size, return_sequence=False):
        super(CustomLSTM, self).__init__()
        self.output_size = output_size
        self.return_sequence = return_sequence
    
    def build(self, input_shape):
        super(CustomLSTM, self).build(input_shape)
        input_size = int(input_shape[-1])
        
        self.wf = self.add_weight('wf', shape=(input_size, self.output_size))
        self.wi = self.add_weight('wi', shape=(input_size, self.output_size))
        self.wo = self.add_weight('wo', shape=(input_size, self.output_size))
        self.wc = self.add_weight('wc', shape=(input_size, self.output_size))

        self.uf = self.add_weight('uf', shape=(self.output_size, self.output_size))
        self.ui = self.add_weight('ui', shape=(self.output_size, self.output_size))
        self.uo = self.add_weight('uo', shape=(self.output_size, self.output_size))
        self.uc = self.add_weight('uc', shape=(self.output_size, self.output_size))

        self.bf = self.add_weight('bf', shape=(1, self.output_size))
        self.bi = self.add_weight('bi', shape=(1, self.output_size))
        self.bo = self.add_weight('bo', shape=(1, self.output_size))
        self.bc = self.add_weight('bc', shape=(1, self.output_size))

    def call(self, x):
        sequence_outputs = []
        for i in range(sequence_length):
            if i == 0:
                xt  = x[:, 0, :]
                ft  = tf.sigmoid(tf.matmul(xt, self.wf) + self.bf)
                it  = tf.sigmoid(tf.matmul(xt, self.wi) + self.bi)
                ot  = tf.sigmoid(tf.matmul(xt, self.wo) + self.bo)
                cht = tf.tanh(   tf.matmul(xt, self.wc) + self.bc)
                ct  = it * cht
                ht  = ot * tf.tanh(ct)

            else:
                xt  = x[:, 0, :]
                ft  = tf.sigmoid(tf.matmul(xt, self.wf) + self.bf)
                it  = tf.sigmoid(tf.matmul(xt, self.wi) + self.bi)
                ot  = tf.sigmoid(tf.matmul(xt, self.wo) + self.bo)
                cht = tf.tanh(  tf.matmul(xt, self.wc) + self.bc)
                ct  = ft * ct + it * cht
                ht  = ot * tf.tanh(ct)
                
            sequence_outputs.append(ht)
            
        sequence_outputs = tf.stack(sequence_outputs)
        sequence_outputs = tf.transpose(sequence_outputs, (1, 0, 2))
        if self.return_sequence:
            return sequence_outputs
        return sequence_outputs[:, -1, :]

In [None]:
x = tf.random.uniform((batch_size, sequence_length, input_size))

In [None]:
lstm = CustomLSTM(output_size=output_size)

In [None]:
lstm(x)

In [None]:
model = tf.keras.Sequential([
    CustomLSTM(output_size=32), 
    tf.keras.layers.Dense(2, activation='softmax')
])

model.compile(
    loss = tf.keras.losses.SparseCategoricalCrossentropy(), 
    optimizer = tf.keras.optimizers.Adam()
)

In [None]:
x_batch = tf.random.uniform((batch_size, sequence_length, input_size))
y_batch = tf.random.uniform((batch_size,), maxval=2, dtype=tf.int32)

In [None]:
x_batch.shape

In [None]:
y_batch.shape

In [None]:
y_batch

In [None]:
model.train_on_batch(x_batch, y_batch)

In [None]:
x_data = tf.random.uniform((batch_size * 1000, sequence_length, input_size))
y_data = tf.random.uniform((batch_size * 1000,), maxval=2, dtype=tf.int32)

In [None]:
model.fit(x_data, y_data, batch_size=4)

In [None]:
model.fit(x_data, y_data, batch_size=4)

In [None]:
model.fit(x_data, y_data, batch_size=4)

In [None]:
from zh_dataset_inews import title_train, label_train, content_train, title_test, label_test, content_test

In [None]:
title_train_cut = [' '.join(jieba.cut(x, cut_all=False)) for x in title_train]
title_test_cut  = [' '.join(jieba.cut(x, cut_all=False)) for x in title_test]

In [None]:
len(title_train_cut)

In [None]:
title_train_cut[:10]

In [None]:
text_vector = tf.keras.layers.experimental.preprocessing.TextVectorization()
# 学习词表
text_vector.adapt(title_train_cut)


In [None]:
vocab_size = len(text_vector.get_vocabulary())
embedding_dim = 1024 

In [None]:
type(x_data)

通过 text_vector('你 好') 和  text_vector('你好')对比发现，这里没有进行分词   

In [None]:
text_vector('你 好')

In [None]:
text_vector('你好')

In [None]:
title_train_text_vector = text_vector(title_train_cut) # [text_vector(x) for x in title_train_cut]
title_test_text_vector  = text_vector(title_test_cut) # [text_vector(x) for x in title_test_cut]


In [None]:
# tf.transpose(title_train_text_vector).shape

In [None]:
test_input_dataset = tf.data.Dataset.from_tensor_slices(title_train_text_vector)

In [None]:
title_train_text_vector[:10].shape

In [None]:
x_train = tf.convert_to_tensor(title_train_text_vector)
x_test  = tf.convert_to_tensor(title_test_text_vector)

In [None]:
type(x_train)

In [None]:
y_train = tf.convert_to_tensor(label_train)
y_test  = tf.convert_to_tensor(label_test)

In [None]:
model_text = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim),
    CustomLSTM(output_size=32), 
    tf.keras.layers.Dense(3, activation='softmax')
])

model_text.compile(
    loss = tf.keras.losses.SparseCategoricalCrossentropy(), 
    optimizer = tf.keras.optimizers.Adam(),
    metrics=['accuracy']
)

In [None]:
x_train.shape

In [None]:
y_train.shape

In [None]:
model_text.summary()

In [None]:
history_model_text = model_text.fit(
    x_train, y_train, 
    validation_split=0.1, 
    epochs=20,
    batch_size=128
)

In [None]:
model_text.evaluate(x_test, y_test)

In [None]:
y_test_pred = model_text.predict(x_test)

In [None]:
len(y_test_pred.argmax(axis=1))

In [None]:
len(x_test)

In [None]:
output_check = pd.DataFrame({'title_test': title_test, 'label_test': label_test, 'y_test_pred': y_test_pred.argmax(axis=1)})

In [None]:
output_check

In [None]:
output_check.query('label_test != y_test_pred')