# RNN序列编码-分类期末大作业

本次大作业要求手动实现双向LSTM+基于attention的聚合模型，并用于古诗作者预测的序列分类任务。**请先阅读ppt中的作业说明。**

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import random
import numpy as np

from tqdm import tqdm

device = torch.device("cpu")

random.seed(1)
np.random.seed(1)
torch.manual_seed(1)

<torch._C.Generator at 0x248a009e350>

## 1. 加载数据

数据位于`data`文件夹中，每一行对应一个样例，格式为“诗句 作者”。下面的代码将数据文件读取到`train_data`, `valid_data`和`test_data`中，并根据训练集中的数据构造词表`word2idx`/`idx2word`和标签集合`label2idx`/`idx2label`。

In [2]:
word2idx = {"<unk>": 0}
label2idx = {}
idx2word = ["<unk>"]
idx2label = []

train_data = []
with open("data/train.txt",encoding='UTF-8') as f:
    for line in f:
        text, author = line.strip().split()
        for c in text:
            if c not in word2idx:
                word2idx[c] = len(idx2word)
                idx2word.append(c)
        if author not in label2idx:
            label2idx[author] = len(idx2label)
            idx2label.append(author)
        train_data.append((text, author))

valid_data = []
with open("data/valid.txt", encoding='UTF-8') as f:
    for line in f:
        text, author = line.strip().split()
        valid_data.append((text, author))

test_data = []
with open("data/test.txt", encoding='UTF-8') as f:
    for line in f:
        text, author = line.strip().split()
        test_data.append((text, author))

In [3]:
print(len(word2idx), len(idx2word), len(label2idx), len(idx2label))
print(len(train_data), len(valid_data), len(test_data))


4941 4941 5 5
11271 1408 1410


**请完成下面的函数，其功能为给定一句古诗和一个作者，构造RNN的输入。** 这里需要用到上面构造的词表和标签集合，对于不在词表中的字用\<unk\>代替。

In [4]:

def make_data(text, author):
    """
    输入
        text: str
        author: str
    输出
        x: LongTensor, shape = (1, text_length)
        y: LongTensor, shape = (1,)
    """
    x = [word2idx.get(word,0) for word in text]
    y = label2idx[author]
    return x, y

In [5]:
def A():
    return 1,2
(A())

(1, 2)

In [6]:
def collate(data_list):
    src = [torch.tensor(_[0]) for _ in data_list]
    tgt = [torch.tensor(_[1]) for _ in data_list]
    return src,tgt
batch_size = 1
trainloader = torch.utils.data.DataLoader([
    (make_data(text,author)) for (text, author) in train_data
], batch_size=batch_size, shuffle=True, collate_fn=collate)
validloader = torch.utils.data.DataLoader([
    (make_data(text,author)) for (text, author) in valid_data
], batch_size=batch_size, shuffle=True, collate_fn=collate)
testloader = torch.utils.data.DataLoader([
    (make_data(text,author)) for (text, author) in test_data
], batch_size=batch_size, shuffle=True, collate_fn=collate)

## 2. LSTM算子（单个时间片作为输入）

In [7]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTM, self).__init__()
        self.f = nn.Linear(input_size + hidden_size, hidden_size)
        self.i = nn.Linear(input_size + hidden_size, hidden_size)
        self.o = nn.Linear(input_size + hidden_size, hidden_size)
        self.g = nn.Linear(input_size + hidden_size, hidden_size)
    
    def forward(self, ht, ct, xt):
        # ht: 1 * hidden_size
        # ct: 1 * hidden_size
        # xt: 1 * input_size
        input_combined = torch.cat((xt, ht), 1)
        ft = torch.sigmoid(self.f(input_combined))
        it = torch.sigmoid(self.i(input_combined))
        ot = torch.sigmoid(self.o(input_combined))
        gt = torch.tanh(self.g(input_combined))
        ct = ft * ct + it * gt
        ht = ot * torch.tanh(ct)
        return ht, ct

## 3. 实现双向LSTM（整个序列作为输入）

**要求使用上面提供的LSTM算子，不要调用torch.nn.LSTM**

In [8]:
class BiLSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(BiLSTM, self).__init__()
        # TODO
        
        self.register_buffer("_float", torch.zeros(1, hidden_size))
        self.lstm_forward = LSTM(input_size,hidden_size)
        self.lstm_backward = LSTM(input_size,hidden_size)
    
    def init_h_and_c(self):
        h = torch.zeros_like(self._float)
        c = torch.zeros_like(self._float)
        return h, c
    
    def forward(self, x):
        h_forward,c_forward = self.init_h_and_c()
        h_backward,c_backward = self.init_h_and_c()
        fwd = []
        bwd = []
        
        """
        输入
            x: 1 * length * input_size
        输出
            hiddens
        """
        # 前向
        for i in range(x.shape[1]):
            h_forward,c_forward = self.lstm_forward(h_forward,c_forward,x[:,i,:])
            fwd.append(h_forward)
        fwd = torch.stack(fwd,dim = 0).squeeze(1)
        #后向
        x_reverse = torch.flip(x,dims = [1])
        for i in range(x_reverse.shape[1]):
            h_backward,c_backward = self.lstm_backward(h_backward,c_backward,x_reverse[:,i,:])
            bwd.append(h_backward)
        bwd = torch.stack(bwd,dim = 0).squeeze(1)
        bwd = torch.flip(bwd,dims = [0])
        hiddens = torch.cat((fwd,bwd),-1).unsqueeze(0)
        return hiddens

## 4. 实现基于attention的聚合机制

In [9]:
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        
        # TODO
        
    
    def forward(self, hiddens):
        """
        输入
            hiddens: 1 * length * hidden_size
        输出
            attn_outputs: 1 * hidden_size
        """
        # TODO
        weights = F.softmax(hiddens)
        attn_outputs = (weights * hiddens).sum(1)
        return attn_outputs

## 5. 利用上述模块搭建序列分类模型

参考模型结构：Embedding – BiLSTM – Attention – Linear – LogSoftmax

In [10]:
class EncoderRNN(nn.Module):
    def __init__(self, num_vocab, embedding_dim, hidden_size, num_classes):
        """
        参数
            num_vocab: 词表大小
            embedding_dim: 词向量维数
            hidden_size: 隐状态维数
            num_classes: 类别数量
        """
        super(EncoderRNN, self).__init__()
        self.num_vocab = num_vocab
        self.embedding_dim = embedding_dim
        self.hidden_size = hidden_size
        self.embed = nn.Embedding(num_vocab,embedding_dim)
        self.bilstm = BiLSTM(embedding_dim,hidden_size)
        self.attn = Attention(hidden_size)
        self.h2q = nn.Linear(hidden_size + hidden_size,hidden_size)
        self.h2o = nn.Linear(hidden_size,num_classes)
        self.softmax = nn.LogSoftmax(dim = -1)
        # TODO 
    
    def forward(self, x):
        """
        输入
            x: 1 * length, LongTensor
        输出
            outputs
        """
        # TODO
        embedding = self.embed(x)
        hidden = self.bilstm(embedding)
        hidden = self.h2q(hidden)
        #hidden = [self.h2q(h) for h in hidden]
        hidden = self.attn(hidden)
        outputs = self.h2o(hidden)
        outputs = self.softmax(outputs)
        return outputs
    def predict(self,x):
        with torch.no_grad():
            outputs = self.forward(x)         
        tgt = outputs.argmax(-1)
        return tgt

## 6. 请利用上述模型在古诗作者分类任务上进行训练和测试

要求选取在验证集上效果最好的模型，输出测试集上的准确率、confusion matrix以及macro-precision/recall/F1，并打印部分测试样例及预测结果。

In [11]:
def train_loop(model,optimizer,criterion,loader):
    model.train()
    epoch_loss = 0.0
    for src, tgt in tqdm(loader):
        B = len(src)
        loss = 0.0
        for _ in range(B):
            _src = src[_].unsqueeze(0).to(device)     # 1 * L
            _tgt = tgt[_].unsqueeze(0).to(device)     # 1 * L
            outputs = model(_src)     # 1 * L * V
            #print(outputs.shape)
            #print(_tgt.squeeze(0))
            loss += criterion(outputs.squeeze(0), _tgt.squeeze(0))
        
        loss /= B
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1)     # 裁剪梯度，将梯度范数裁剪为1，使训练更稳定
        optimizer.step()
        epoch_loss += loss.item()
    epoch_loss /= len(loader)
    return epoch_loss
def test_loop(model, loader):
    model.eval()
    a = 0
    b = 0
    for src, tgt in tqdm(loader):
        B = len(src)
        for _ in range(B):
            _src = src[_].unsqueeze(0).to(device)     # 1 * L
            _tgt = tgt[_].unsqueeze(0).to(device)
            with torch.no_grad():
                outputs = model(_src)         # 1 * L
            tgt = outputs.argmax(-1)
            if torch.equal(_tgt,tgt):
                a = a + 1
            b = b + 1
    return a / b

In [12]:
model = EncoderRNN(len(word2idx),128,128,len(idx2label))
model.to(device)
optimizer = torch.optim.SGD(model.parameters(),lr = 1)
criterion = nn.NLLLoss()
best_score = 0.0

In [13]:


for _ in range(3):
    loss = train_loop(model, optimizer, criterion, trainloader)
    score = test_loop(model, validloader)
    
    if score > best_score:
        torch.save(model.state_dict(), "model_best.pt")
        best_score = score
    print(f"Epoch {_}: loss = {loss}, valid score = {score}")

  weights = F.softmax(hiddens)
100%|████████████████████████████████████████████████████████████████████████████| 11271/11271 [06:03<00:00, 31.00it/s]
100%|█████████████████████████████████████████████████████████████████████████████| 1408/1408 [00:08<00:00, 164.37it/s]


Epoch 0: loss = 8.840874644764408, valid score = 0.4424715909090909


100%|████████████████████████████████████████████████████████████████████████████| 11271/11271 [05:53<00:00, 31.90it/s]
100%|█████████████████████████████████████████████████████████████████████████████| 1408/1408 [00:08<00:00, 163.23it/s]


Epoch 1: loss = 9.287691656442748, valid score = 0.4424715909090909


100%|████████████████████████████████████████████████████████████████████████████| 11271/11271 [05:55<00:00, 31.70it/s]
100%|█████████████████████████████████████████████████████████████████████████████| 1408/1408 [00:08<00:00, 162.56it/s]

Epoch 2: loss = 9.310083465247585, valid score = 0.3856534090909091





In [14]:
model.load_state_dict(torch.load("model_best.pt"))
score = test_loop(model, testloader)
print(f"Test score = {score}")


  weights = F.softmax(hiddens)
100%|█████████████████████████████████████████████████████████████████████████████| 1410/1410 [00:08<00:00, 161.06it/s]

Test score = 0.4177304964539007





In [15]:
A = []
B = []
for src, tgt in tqdm(testloader):
    for _ in range(len(src)):
        _src = src[_].unsqueeze(0).to(device)     # 1 * L
        _tgt = tgt[_].unsqueeze(0).to(device)
        with torch.no_grad():
            outputs = model(_src)         # 1 * L
        tgt = outputs.argmax(-1)
        A.append(_tgt)
        B.append(tgt)


  weights = F.softmax(hiddens)
100%|█████████████████████████████████████████████████████████████████████████████| 1410/1410 [00:08<00:00, 161.38it/s]


In [16]:
from sklearn.metrics import confusion_matrix
A = [x.cpu().numpy() for x in A]
B = [x.cpu().numpy() for x in B]
print(confusion_matrix(A,B))

[[  0  60 100   0   0]
 [  0 269 140   1   4]
 [  0 149 319   0   0]
 [  0 105 132   0   0]
 [  0  40  90   0   1]]


In [17]:
from sklearn.metrics import f1_score, precision_score, recall_score
f1 = f1_score( A, B, average='macro' )
p = precision_score(A, B, average='macro')
r = recall_score(A, B, average='macro')
print(f1,p,r)

0.2088637544558308 0.208046481133995 0.2678031947032943


  _warn_prf(average, modifier, msg_start, len(result))


In [18]:
input1 = "微雨秋栽竹，孤燈夜讀書。憐君亦同志，晚歲傍山居。"
ans1 = "杜牧"
x,y = make_data(input1,ans1)
x = torch.tensor([x]).to(device)
ans = model.predict(x)
print(idx2label[ans.item()])

杜甫


  weights = F.softmax(hiddens)
