# RNN序列编码-分类期末大作业

本次大作业要求手动实现双向LSTM+基于attention的聚合模型，并用于古诗作者预测的序列分类任务。**请先阅读ppt中的作业说明。**

In [33]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim

import random
import numpy as np

from tqdm import tqdm

device = torch.device("cuda")

random.seed(1)
np.random.seed(1)
torch.manual_seed(1)

<torch._C.Generator at 0x1594c12a210>

## 1. 加载数据

数据位于`data`文件夹中，每一行对应一个样例，格式为“诗句 作者”。下面的代码将数据文件读取到`train_data`, `valid_data`和`test_data`中，并根据训练集中的数据构造词表`word2idx`/`idx2word`和标签集合`label2idx`/`idx2label`。

In [34]:
word2idx = {"<unk>": 0}
label2idx = {}
idx2word = ["<unk>"]
idx2label = []

train_data = []
with open("data/train.txt", encoding='utf-8') as f:
    for line in f:
        text, author = line.strip().split()
        for c in text:
            if c not in word2idx:
                word2idx[c] = len(idx2word)
                idx2word.append(c)
        if author not in label2idx:
            label2idx[author] = len(idx2label)
            idx2label.append(author)
        train_data.append((text, author))

valid_data = []
with open("data/valid.txt", encoding='utf-8') as f:
    for line in f:
        text, author = line.strip().split()
        valid_data.append((text, author))

test_data = []
with open("data/test.txt", encoding='utf-8') as f:
    for line in f:
        text, author = line.strip().split()
        test_data.append((text, author))

In [35]:
print(len(word2idx), len(idx2word), len(label2idx), len(idx2label))
print(len(train_data), len(valid_data), len(test_data))

4941 4941 5 5
11271 1408 1410


**请完成下面的函数，其功能为给定一句古诗和一个作者，构造RNN的输入。** 这里需要用到上面构造的词表和标签集合，对于不在词表中的字用\<unk\>代替。

In [36]:
def label2onehot(label):
    '''
    input: label(tensor) N*1
    output: onehot(tensor) N*len(word2idx)
    '''
    text_length = len(label)
    onehot = torch.zeros(text_length, len(word2idx))
    one = torch.ones_like(onehot)

    onehot.scatter_(dim=1, index=label.reshape(text_length,-1).long(), src=one)
    return onehot

In [37]:
def make_data(text, author):
    """
    输入
        text: str
        author: str
    输出
        x: LongTensor, shape = (1, text_length) -> (1, text_length, input_size)
        y: LongTensor, shape = (1,)
    """
    x = label2onehot(torch.tensor(list(map(lambda x: word2idx[x], text)))).unsqueeze(0)
    y = torch.tensor(label2idx[author]).unsqueeze(0)

    return x, y

## 2. LSTM算子（单个时间片作为输入）

In [38]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTM, self).__init__()
        self.f = nn.Linear(input_size + hidden_size, hidden_size)
        self.i = nn.Linear(input_size + hidden_size, hidden_size)
        self.o = nn.Linear(input_size + hidden_size, hidden_size)
        self.g = nn.Linear(input_size + hidden_size, hidden_size)
    
    def forward(self, ht, ct, xt):
        # ht: 1 * hidden_size
        # ct: 1 * hidden_size
        # xt: 1 * input_size
        input_combined = torch.cat((xt, ht), 1)
        ft = torch.sigmoid(self.f(input_combined))
        it = torch.sigmoid(self.i(input_combined))
        ot = torch.sigmoid(self.o(input_combined))
        gt = torch.tanh(self.g(input_combined))
        ct = ft * ct + it * gt
        ht = ot * torch.tanh(ct)
        return ht, ct

## 3. 实现双向LSTM（整个序列作为输入）

**要求使用上面提供的LSTM算子，不要调用torch.nn.LSTM**

In [39]:
class BiLSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(BiLSTM, self).__init__()
        # TODO
        self.fLSTM = LSTM(input_size, hidden_size)
        self.bLSTM = LSTM(input_size, hidden_size)
        self.register_buffer("_float", torch.zeros(1, hidden_size))
    
    def init_h_and_c(self):
        h = torch.zeros_like(self._float)
        c = torch.zeros_like(self._float)
        return h, c
    
    def forward(self, x):
        """
        输入
            x: 1 * length * input_size
        输出
            hiddens: 1 * length * (hidden_size*2)
        """
        # TODO

        length = x.shape[1]
        hf, cf = self.init_h_and_c()
        hb, cb = self.init_h_and_c()
        hidden_f, hidden_b = [], []

        for i in range(length):
            hf, cf = self.fLSTM(hf, cf, x[:, i, :])
            hb, cb = self.bLSTM(hb, cb, x[:, length-i-1, :])
            hidden_f.append(hf)
            hidden_b.append(hb)

        hidden_b.reverse()
        hidden_f = torch.stack(hidden_f)    # len*B*d
        hidden_b = torch.stack(hidden_b)

        hidden_f = hidden_f.reshape(-1, hidden_f.shape[2])
        hidden_b = hidden_b.reshape(-1, hidden_b.shape[2])

        hiddens = torch.hstack([hidden_f, hidden_b]) # (len*B)*(2*d)
        hiddens = hiddens.reshape(1, length, -1)
        
        return hiddens

## 4. 实现基于attention的聚合机制

In [40]:
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        # TODO
        self.feat2att = nn.Linear(hidden_size, hidden_size)
        self.to_alpha = nn.Linear(hidden_size, 1, bias=False)

    
    def forward(self, hiddens):
        """
        输入
            hiddens: 1 * length * hidden_size
        输出
            attn_outputs: 1 * hidden_size
        """
        # TODO
        attn_f = self.feat2att(hiddens) # 1*length*hidden_size
        dot = torch.tanh(attn_f) # 1*length*hidden_size
        alpha = self.to_alpha(dot) # 1*length*1
        attw = F.softmax(alpha.transpose(1, 2), dim=2) # 1*1*length
        attn_outputs = attw @ hiddens # 1*1*hidden_size
        attn_outputs = attn_outputs.squeeze(1) # 1*hidden_size

        return attn_outputs

## 5. 利用上述模块搭建序列分类模型

参考模型结构：Embedding – BiLSTM – Attention – Linear – LogSoftmax

In [41]:
class EncoderRNN(nn.Module):
    def __init__(self, num_vocab, embedding_dim, hidden_size, num_classes):
        """
        参数
            num_vocab: 词表大小
            embedding_dim: 词向量维数
            hidden_size: 隐状态维数
            num_classes: 类别数量
        """
        super(EncoderRNN, self).__init__()
        # TODO
        self.Encoder = BiLSTM(embedding_dim, hidden_size)
        self.selfatt = Attention(hidden_size*2)
        self.linear_layers = nn.Sequential(
            nn.Linear(hidden_size*2, num_classes), 
            nn.LogSoftmax()
        )
    
    def forward(self, x):
        """
        输入
            x: 1 * length, LongTensor -> 1 * length * input_size
        输出
            outputs: 1 * num_classes
        """
        # TODO
        wordfeats = self.Encoder(x) # 1 * length * (hidden_size*2)
        sentfeat = self.selfatt(wordfeats)  # 1 * (hidden_size*2)
        outputs = self.linear_layers(sentfeat)  # 1 * num_classes
        

        return outputs

## 6. 请利用上述模型在古诗作者分类任务上进行训练和测试

要求选取在验证集上效果最好的模型，输出测试集上的准确率、confusion matrix以及macro-precision/recall/F1，并打印部分测试样例及预测结果。

In [42]:
# TODO

def train(model, train_data, loss_fn, optimizer):
    model.train()
    
    for iter, sample in enumerate(train_data):

        sentence, label = sample[0], sample[1]
        x, y = make_data(sentence, label)

        optimizer.zero_grad()
        y_pred = model(x)

        loss = loss_fn(y_pred, y)
        loss.backward()
        optimizer.step()

        if iter % 500 == 0:
            print("{}/{}".format(iter, len(train_data)))


In [43]:
model = EncoderRNN(len(word2idx), len(word2idx), 512, 5)
optimizer = optim.Adam(model.parameters(), lr=1e-5)
criterion = nn.CrossEntropyLoss()

epochs = 1
train(model, train_data, criterion, optimizer)

0/11271


KeyboardInterrupt: 