## RNN 文本生成实战

### 手写RNN
RNN公式：$$ h_t = \tanh(W_{ih}x_t + b_{ih} + W_{hh}h_{t-1} + b_{hh}) $$

In [None]:
import torch
import torch.nn as nn


class XuanRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()

        self.hidden_size = hidden_size

        #初始化参数
        self.W_ih = nn.Parameter(torch.randn(input_size, batch_size))
        self.W_hh = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_h = nn.Parameter(torch.randn(hidden_size))

        self.hidden = None

    def init_hidden(self, batch_size):
        return torch.zeros(batch_size, self.hidden_size)

    def forward(self, x):
        batch_size, seq_len, input_size = x.shape

        x = torch.transpose(x, 0, 1)

        self.hidden = self.init_hidden(batch_size)

        hidden_states = []

        for t in range(seq_len):
            x_t = x[t]
            # 计算当前时间步的隐藏状态
            self.hidden = torch.tanh(
                torch.mm(x_t, self.W_ih) +
                torch.mm(self.hidden, self.W_hh) +
                self.b_h
            )
            hidden_states.append(self.hidden)

        return torch.stack(hidden_states), self.hidden


input_size = 3
hidden_size = 2
seq_len = 4
batch_size = 1

rnn = XuanRNN(input_size, hidden_size)
x = torch.randn(batch_size, seq_len, input_size)
print(x)

In [None]:
output, hidden = rnn(x)
print(output)
print(hidden)

In [1]:
text = """
臣密言：臣以险衅，夙遭闵凶。生孩六月，慈父见背；行年四岁，舅夺母志。
祖母刘愍臣孤弱，躬亲抚养。
臣少多疾病，九岁不行，零丁孤苦，至于成立。
既无伯叔，终鲜兄弟，门衰祚薄，晚有儿息。
外无期功强近之亲，内无应门五尺之僮，茕茕孑立，形影相吊。
而刘夙婴疾病，常在床蓐，臣侍汤药，未曾废离。
"""

words = set(text)
vocab_size = len(words)
word_to_index = {word: i for i, word in enumerate(words)}
index_to_word = {i: word for i, word in enumerate(words)}

print(word_to_index)

{'相': 0, '在': 1, '既': 2, '孤': 3, '强': 4, '内': 5, '药': 6, '无': 7, '应': 8, '五': 9, '功': 10, '孩': 11, '父': 12, '息': 13, '至': 14, '苦': 15, '叔': 16, '抚': 17, '终': 18, '侍': 19, '床': 20, '舅': 21, '伯': 22, '六': 23, '茕': 24, '祚': 25, '有': 26, '年': 27, '岁': 28, '常': 29, '亲': 30, '；': 31, '儿': 32, '，': 33, '而': 34, '志': 35, '成': 36, '薄': 37, '夙': 38, '汤': 39, '僮': 40, '之': 41, '影': 42, '尺': 43, '夺': 44, '言': 45, '未': 46, '曾': 47, '以': 48, '背': 49, '晚': 50, '凶': 51, '形': 52, '衅': 53, '密': 54, '见': 55, '祖': 56, '慈': 57, '四': 58, '母': 59, '。': 60, '期': 61, '鲜': 62, '孑': 63, '愍': 64, '\n': 65, '不': 66, '少': 67, '兄': 68, '弟': 69, '蓐': 70, '养': 71, '遭': 72, '离': 73, '衰': 74, '吊': 75, '闵': 76, '零': 77, '：': 78, '外': 79, '立': 80, '疾': 81, '生': 82, '弱': 83, '病': 84, '近': 85, '险': 86, '于': 87, '刘': 88, '废': 89, '多': 90, '丁': 91, '婴': 92, '九': 93, '月': 94, '行': 95, '臣': 96, '躬': 97, '门': 98}


In [2]:
from torch.utils.data import Dataset
import torch

SEQ_LEN = 5
BATCH_SIZE = 1
HIDDEN_SIZE = 128
EMBEDDING_SIZE = 128


class TextDataset(Dataset):
    def __init__(self, text, seq_len):
        self.text = text
        self.seq_len = seq_len
        self.data = [word_to_index[ch] for ch in text]

    def __len__(self):
        return len(self.data) - self.seq_len

    def __getitem__(self, index):
        input_seq = self.data[index:index + self.seq_len]
        target_seq = self.data[index + 1:index + self.seq_len + 1]
        return torch.tensor(input_seq), torch.tensor(target_seq)


dataset = TextDataset(text, SEQ_LEN)
train_loader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
print(dataset.data)

[65, 96, 54, 45, 78, 96, 48, 86, 53, 33, 38, 72, 76, 51, 60, 82, 11, 23, 94, 33, 57, 12, 55, 49, 31, 95, 27, 58, 28, 33, 21, 44, 59, 35, 60, 65, 56, 59, 88, 64, 96, 3, 83, 33, 97, 30, 17, 71, 60, 65, 96, 67, 90, 81, 84, 33, 93, 28, 66, 95, 33, 77, 91, 3, 15, 33, 14, 87, 36, 80, 60, 65, 2, 7, 22, 16, 33, 18, 62, 68, 69, 33, 98, 74, 25, 37, 33, 50, 26, 32, 13, 60, 65, 79, 7, 61, 10, 4, 85, 41, 30, 33, 5, 7, 8, 98, 9, 43, 41, 40, 33, 24, 24, 63, 80, 33, 52, 42, 0, 75, 60, 65, 34, 88, 38, 92, 81, 84, 33, 29, 1, 20, 70, 33, 96, 19, 39, 6, 33, 46, 47, 89, 73, 60, 65]


In [None]:
for input_seq, target_seq in train_loader:
    print(input_seq)
    print(target_seq)
    break

### 定义模型

In [None]:
import torch
import torch.nn as nn


class XuanRNN(nn.Module):
    def __init__(self, vocab_size, input_size, hidden_size):
        super().__init__()

        self.hidden_size = hidden_size

        #定义词嵌入层
        self.embedding = nn.Embedding(vocab_size, input_size)

        #初始化参数
        self.W_ih = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_hh = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_h = nn.Parameter(torch.randn(hidden_size))

        self.Out_Linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):

        embedded = self.embedding(x)
        batch_size, seq_len, input_size = embedded.shape
        embedded = torch.transpose(embedded, 0, 1)

        if hidden is None:
            hidden = torch.zeros(batch_size, self.hidden_size)

        outputs = []

        for t in range(seq_len):
            x_t = embedded[t]
            # 计算当前时间步的隐藏状态
            hidden = torch.tanh(
                torch.mm(x_t, self.W_ih) +
                torch.mm(hidden, self.W_hh) +
                self.b_h
            )
            outputs.append(self.Out_Linear(hidden))

        outputs = torch.stack(outputs, dim=1)
        return outputs, hidden

In [None]:
model = XuanRNN(vocab_size, EMBEDDING_SIZE, HIDDEN_SIZE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(100):
    for i, (input_seq, target_seq) in enumerate(train_loader):
        output, _ = model(input_seq)
        loss = criterion(
            output.view(-1, vocab_size),
            target_seq.view(-1)
        )
        optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        if i % 10 == 0:
            print(f'Epoch [{epoch + 1}/100], Step [{i + 10}/{len(train_loader)}], Loss: {loss.item():.4f}')

In [None]:
model.eval()


def generate_text(context, step, temperature=0.8):
    words = [word for word in context]
    hidden = None
    for _ in range(step):
        input_seq = torch.tensor([word_to_index[word] for word in words[-1:]])
        input_seq = torch.LongTensor(input_seq)
        input_seq = input_seq.view(1, -1)

        with torch.no_grad():
            output, hidden = model(input_seq, hidden)
            last_output = output[0, -1, :]
            probs = torch.softmax(last_output / temperature, dim=-1)
            result_index = torch.multinomial(probs, 1).item()
            result = index_to_word[result_index]
            words.append(result)
    return ''.join(words)


print(generate_text('臣密言：', 5, 0.1))

### pytorch RNN

In [9]:
import torch
import torch.nn as nn


class RNN(nn.Module):
    def __init__(self, vocab_size, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        self.out_linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        embedding = self.embedding(x)
        outputs, hidden = self.rnn(embedding, hidden)
        outputs = self.out_linear(outputs)
        return outputs, hidden


In [10]:
model = RNN(vocab_size, EMBEDDING_SIZE, HIDDEN_SIZE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(100):
    for i, (input_seq, target_seq) in enumerate(train_loader):
        output, _ = model(input_seq)
        loss = criterion(
            output.view(-1, vocab_size),
            target_seq.view(-1)
        )
        optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        if i % 10 == 0:
            print(f'Epoch [{epoch + 1}/100], Step [{i + 10}/{len(train_loader)}], Loss: {loss.item():.4f}')

Epoch [1/100], Step [10/140], Loss: 4.6556
Epoch [1/100], Step [20/140], Loss: 4.7749
Epoch [1/100], Step [30/140], Loss: 4.0097
Epoch [1/100], Step [40/140], Loss: 4.1328
Epoch [1/100], Step [50/140], Loss: 3.3954
Epoch [1/100], Step [60/140], Loss: 3.8861
Epoch [1/100], Step [70/140], Loss: 3.6354
Epoch [1/100], Step [80/140], Loss: 3.2553
Epoch [1/100], Step [90/140], Loss: 2.9739
Epoch [1/100], Step [100/140], Loss: 2.6875
Epoch [1/100], Step [110/140], Loss: 2.6058
Epoch [1/100], Step [120/140], Loss: 2.2229
Epoch [1/100], Step [130/140], Loss: 2.6014
Epoch [1/100], Step [140/140], Loss: 2.6199
Epoch [2/100], Step [10/140], Loss: 1.8955
Epoch [2/100], Step [20/140], Loss: 1.6531
Epoch [2/100], Step [30/140], Loss: 1.1175
Epoch [2/100], Step [40/140], Loss: 1.5310
Epoch [2/100], Step [50/140], Loss: 2.3577
Epoch [2/100], Step [60/140], Loss: 1.1428
Epoch [2/100], Step [70/140], Loss: 1.5352
Epoch [2/100], Step [80/140], Loss: 1.6164
Epoch [2/100], Step [90/140], Loss: 1.2225
Epoch 

In [11]:
model.eval()


def generate_text(context, step, temperature=0.8):
    words = [word for word in context]
    hidden = None
    for _ in range(step):
        input_seq = torch.tensor([word_to_index[word] for word in words[-1:]])
        input_seq = torch.LongTensor(input_seq)
        input_seq = input_seq.view(1, -1)

        with torch.no_grad():
            output, hidden = model(input_seq, hidden)
            last_output = output[0, -1, :]
            probs = torch.softmax(last_output / temperature, dim=-1)
            result_index = torch.multinomial(probs, 1).item()
            result = index_to_word[result_index]
            words.append(result)
    return ''.join(words)


print(generate_text('臣密言：', 5, 0.1))

臣密言：臣以险衅，
