In [58]:
import os
import re
import torch
from torch import nn
from d2l import torch as d2l

# 1. 读取数据集

In [238]:
# 预处理
def preprocess_nmt(text):
    '''在字符与标点符号之间添加空格'''
    def no_space(char, prev_char):
        return char in set(',.!?') and prev_char != ' '
    #用空格代替不间断空格（non-breaking space）\xa0 是不间断空白符 
    text = text.replace('\u202f',' ').replace('\xa0',' ')
    #在单词和标点符号之间插入空格
    out = [' ' + char if i > 0 and no_space(char,text[i-1]) 
          else char for i, char in enumerate(text)]
    return ''.join(out)

def read_snli(data_dir, is_train):
    """将SNLI数据集解析为前提、假设和标签"""
    label_set = {'entailment':0, 'contradiction':1 ,'neutral':2}
    file_name = os.path.join(data_dir,'snli_1.0_train.csv'
                            if is_train else 'snli_1.0_test.csv')
    
    data = pd.read_csv(file_name)[['gold_label','sentence1','sentence2']]
    data = data[data['gold_label']!='-']
    premises = data['sentence1'].map(preprocess_nmt).values
    hypotheses = data['sentence2'].astype(str).map(preprocess_nmt).values
    labels = data['gold_label'].map(label_set).values
    return premises, hypotheses, labels


is_train = True
data_dir = 'archive'
train_data = read_snli(data_dir, is_train=True)
for x0,x1,y in zip(train_data[0][:3],train_data[1][:3],train_data[2][:3]):
    print('前提：', x0)
    print('假设：', x1)
    print('标签：', y)

前提： A person on a horse jumps over a broken down airplane .
假设： A person is training his horse for a competition .
标签： 2
前提： A person on a horse jumps over a broken down airplane .
假设： A person is at a diner , ordering an omelette .
标签： 1
前提： A person on a horse jumps over a broken down airplane .
假设： A person is outdoors , on a horse .
标签： 0


In [191]:
data = pd.read_csv(file_name)[['gold_label','sentence1','sentence2']]
data = data[data['gold_label']!='-']

data['gold_label'].value_counts()

entailment       183416
contradiction    183187
neutral          182764
Name: gold_label, dtype: int64

In [192]:
test_data = read_snli(data_dir, is_train = False)
print('训练集和测试集中每类样本的数量')
for data in [train_data, test_data]:
    for i in range(3):
        print(f'第{i}类样本数量：',[row for row in data[2]].count(i))
    print('---------------')

训练集和测试集中每类样本的数量
第0类样本数量： 183416
第1类样本数量： 183187
第2类样本数量： 182764
---------------
第0类样本数量： 3368
第1类样本数量： 3237
第2类样本数量： 3219
---------------


## 1.1 定义加载数据集的类
### 1.1.1数据集预处理

In [194]:
def tokenize(lines, token='word'):
    '''文本词元化'''
    if token=='word':
        lines = [line.split(' ') for line in lines]
    elif token == 'char':
        lines = [list(line) for line in lines]
    else:
        print('ERROR：未知词元类型：'+ token)
    return lines

import collections
class Vocab():
    def __init__(self,tokens=None, min_freq=0 ,reversed_token=None):
        if tokens is None:
            tokens = []
        if reversed_token is None:
            reversed_token = []
        counter = corpus_freq(tokens)
        # 定义私有变量，只有当前类内的方法或函数可以访问，统计所有词的词频
        self._token_freq = sorted(counter.items(), key = lambda x:x[1], 
                                reverse = True)
        self.idx_to_token = ['<unk>'] + reversed_token
        self.token_to_idx = {token:idx for token,idx 
                             in enumerate(self.idx_to_token)}
        for (token,freq) in self._token_freq:
            if freq < min_freq:
                break
            else:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1
        
    def __len__(self):
        return len(self.idx_to_token)
    
    def __getitem__(self,tokens):
        if not isinstance(tokens, (list,tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]
    
    def to_token(self, indices):
        if not isinstance(indices,(list,tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[idx] for idx in indices]
    
    @property #装饰器，作用是把类中的方法变成属性来进行调用
    def unk(self):
        return 0
    
    @property
    def token_freq(self):
        return self._token_freq
        
def corpus_freq(tokens):
    '''计算所有token的词频'''
    if isinstance(tokens[0], list) or len(tokens)==0:
        token = [token for line in tokens for token in line]
    return collections.Counter(token)       

def truncate_pad(line, num_step, padding_token):
    '''对一个样本进行裁剪和填充，保证长度为num_step'''
    if len(line) < num_step:
        line += [padding_token]*(num_step - len(line))
    else:
        line = line[:num_step]
    return line

In [239]:
        
class SNLIDataset(torch.utils.data.Dataset):
    """用于加载SNLI数据集的自定义数据集"""
    def __init__(self, dataset, num_steps , vocab=None):
        self.num_steps = num_steps
        # 根据训练集建立词表, 前提和假设，两个部分
        all_premise_tokens = tokenize(dataset[0]) 
        all_hypotheses_tokens = tokenize(dataset[1])
        if vocab is None:
            self.vocab = Vocab(all_premise_tokens + all_hypotheses_tokens,
                              min_freq=5, reversed_token=['<pad>'])
        else:
            self.vocab = vocab #采用预训练的词向量模型，比如fasttext，glove
            
        self.premises = self._pad(all_premise_tokens)
        self.hypotheses = self._pad(all_hypotheses_tokens)
        self.labels = torch.tensor(dataset[2])
        print('read '+ str(len(self.premises)) + ' examples')
        
    def _pad(self, lines):
        """输入tokens序列，进行idx转换并裁剪"""
        return torch.tensor([truncate_pad(self.vocab[line], self.num_steps,
                                        self.vocab['<pad>'])
                            for line in lines])
    
    def __getitem__(self, idx):
        return (self.premises[idx], self.hypotheses[idx]), self.labels[idx]
    
    def __len__(self):
        return len(self.premises)
    

In [240]:
def load_data_snli(batch_size, num_steps = 50):
    """下载SNLI数据集并返回数据迭代器和词表"""
    # 读取数据集
    train_data = read_snli(data_dir, True)
    test_data = read_snli(data_dir, False)
    # 自定义SNLI数据集
    train_set = SNLIDataset(train_data, num_steps)
    test_set = SNLIDataset(test_data, num_steps, train_set.vocab)
    # 创建数据迭代器
    train_iter = torch.utils.data.DataLoader(train_set, 
                                             batch_size, shuffle = True)
    test_iter = torch.utils.data.DataLoader(test_set,
                                           batch_size, shuffle = False)
    return train_iter, test_iter, train_set.vocab

train_iter, test_iter, vocab = load_data_snli(128, 50)
len(vocab)

read 549367 examples
read 9824 examples


19173

In [241]:
for X, y in train_iter:
    print(X[0].shape)
    print(X[1].shape)
    print(y.shape)
    break

torch.Size([128, 50])
torch.Size([128, 50])
torch.Size([128])


# 2. 自然语言推断：使用注意力
## 2.1 模型

In [201]:
def mlp(num_inputs, num_hiddens, flatten):
    net = []
    net.append(nn.Dropout(0.2))
    net.append(nn.Linear(num_inputs, num_hiddens))
    net.append(nn.ReLU())
    if flatten:
        net.append(nn.Flatten(start_dim=1))
    net.append(nn.Dropout(0.2))
    net.append(nn.Linear(num_hiddens, num_hiddens))
    net.append(nn.ReLU())
    if flatten:
        net.append(nn.Flatten(start_dim=1))
    return nn.Sequential(*net)

In [202]:
from torch.nn import functional as F

class Attend(nn.Module):
    '''
    将一个文本序列中的词元与另一个序列中的每个词元对齐
    '''
    def __init__(self, num_inputs, num_hiddens, **kwargs):
        super(Attend, self).__init__(**kwargs)
        self.f = mlp(num_inputs, num_hiddens, flatten=False)
        
    def forward(self, A, B):
        # A/B的形状：（批量大小，序列A/B的词元数，embed_size）
        # f_A/f_B的形状：（批量大小，序列A/B的词元数，num_hiddens）
        f_A = self.f(A)
        f_B = self.f(B)
        # e的形状：（批量大小，序列A的词元数，序列B的词元数）
        e = torch.bmm(f_A, f_B.permute(0,2,1))
        # beta的形状：（批量大小，序列A的词元数，embed_size），
        # 意味着序列B被软对齐到序列A的每个词元(beta的第1个维度)
        beta = torch.bmm(F.softmax(e,dim=-1), B)
        # alpha的形状：（批量大小，序列B的词元数，embed_size），
        # 意味着序列A被软对齐到序列B的每个词元(alpha的第1个维度)
        alpha = torch.bmm(F.softmax(e.permute(0,2,1), dim=-1), A)
        return beta, alpha

In [231]:
class Compare(nn.Module):
    '''
    将一个序列中的词元与该词元软对齐的另一个序列进行比较
    '''
    def __init__(self, num_inputs, num_hiddens, **kwargs):
        super(Compare, self).__init__(**kwargs)
        self.g = mlp(num_inputs, num_hiddens, flatten=False)
        
    def forward(self, A, B, beta, alpha):
        V_A = self.g(torch.cat([A, beta], dim=2))
        V_B = self.g(torch.cat([B, alpha], dim=2))
        return V_A, V_B

In [232]:
class Aggregate(nn.Module):
    '''
    将两个求和结果的连结提供给函数（一个多层感知机），以获得逻辑关系的分类结果
    '''
    def __init__(self, num_inputs, num_hiddens, num_outputs, **kwargs):
        super(Aggregate, self).__init__(**kwargs)
        self.h = mlp(num_inputs, num_hiddens, flatten=True)
        self.linear = nn.Linear(num_hiddens, num_outputs)

    def forward(self, V_A, V_B):
        # 对两组比较向量分别求和
        V_A = V_A.sum(dim=1)
        V_B = V_B.sum(dim=1)
        # 将两个求和结果的连结送到多层感知机中
        Y_hat = self.linear(self.h(torch.cat([V_A, V_B], dim=1)))
        return Y_hat

In [250]:
##-----------整合代码
class DecomposableAttention(nn.Module):
    def __init__(self, vocab, embed_size, num_hiddens, num_inputs_attend=100,
                 num_inputs_compare=200, num_inputs_agg=400, **kwargs):
        super(DecomposableAttention, self).__init__(**kwargs)
        self.embedding = nn.Embedding(len(vocab), embed_size)
        self.attend = Attend(num_inputs_attend, num_hiddens)
        self.compare = Compare(num_inputs_compare, num_hiddens)
        # 有3种可能的输出：蕴涵、矛盾和中性
        self.aggregate = Aggregate(num_inputs_agg, num_hiddens, num_outputs=3)

    def forward(self, X):
        premises, hypotheses = X
        A = self.embedding(premises)
        B = self.embedding(hypotheses)
        beta, alpha = self.attend(A, B)
        V_A, V_B = self.compare(A, B, beta, alpha)
        Y_hat = self.aggregate(V_A, V_B)
        return Y_hat

# 2.2 训练和评估模型
## 1. 读取数据集

In [244]:
batch_size, num_steps = 256, 50
train_iter, test_iter, vocab = load_data_snli(batch_size, num_steps)

read 549367 examples
read 9824 examples


## 2. 创建模型

In [None]:
embed_size, num_hiddens = 100, 200
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class TokenEmbedding:
    """Glove嵌入"""
    def __init__(self, embedding_name):
        self.idx_to_token, self.idx_to_vec = self._load_embedding(embedding_name)
        self.unknown_idx = 0
        self.token_to_idx = {token:idx
                            for idx, token in enumerate(self.idx_to_token)}
        
    def _load_embedding(self, embedding_name):
        idx_to_token, idx_to_vec = ['<unk>'],[]
        data_dir = d2l.download_extract(embedding_name)
        # GloVe网站：https://nlp.stanford.edu/projects/glove/
        # fastText网站：https://fasttext.cc/
        with open(os.path.join(data_dir,'vec.txt'), 'r',encoding='utf-8') as f:
            for line in f:
                elems = line.rstrip().split(' ')
                token ,elems = elems[0], [float(elem) for elem in elems[1:]]
                # 跳过标题信息，例如fastText中的首行
                if len(elems) > 1:
                    idx_to_token.append(token)
                    idx_to_vec.append(elems)
            
        idx_to_vec = [[0]*len(idx_to_vec[0])] + idx_to_vec
        return idx_to_token, torch.tensor(idx_to_vec)
    
    # 返回对应token的vecs向量
    def __getitem__(self, tokens):
        indices = [self.token_to_idx.get(token,self.unknown_idx)
                  for token in tokens]
        vecs = self.idx_to_vec[torch.tensor(indices)]
        return vecs
    
    def __len__(self):
        return len(self.idx_to_token)
    
glove_embedding = TokenEmbedding('glove.6b.100d')    

# 取出了token对应的vec
embeds = glove_embedding[vocab.idx_to_token]
embeds.shape

In [None]:
net = DecomposableAttention(vocab, embed_size, num_hiddens)
net.embedding.weight.data.copy_(embeds);

## 3. 训练和评估模型

In [None]:
lr, num_epochs = 0.001, 4
trainer = torch.optim.Adam(net.parameters(), lr = lr)
loss = nn.CrossEntropyLoss(reduction="none") #不综合， 保留每个样本的损失

In [None]:
def train_batch(net,X,y,loss, trainer,device):
    if isinstance(X,list):
        X = [x.to(device) for x in X]
    else:
        X = X.to(device)
    y = y.to(device)
    net.train()
    trainer.zero_grad()
    pred = net(X)
    l = loss(pred, y)
    l.sum().backward()
    trainer.step()
    train_loss_sum = l.sum()
    train_acc_sum = (pred.argmax(1)==y).sum()
    return train_loss_sum, train_acc_sum

def train(net, train_iter, test_iter, loss, trainer, num_epochs,device):
    timer, num_batches = d2l.Timer(),len(train_iter)
    animator = d2l.Animator(xlabel='epoch',xlim=[1,num_epochs],ylim=[0,1],
                           legend=['train loss','train acc', 'test acc'])
    net = net.to(device)
#     num_batches = len(train_iter)
    for epoch in range(num_epochs):
        metric = d2l.Accumulator(4)
        for i, (features, labels) in enumerate(train_iter):
            timer.start()
            l,acc = train_batch(net,features, labels, loss,trainer,device)
            metric.add(l ,acc ,labels.shape[0] ,labels.numel())
            timer.stop()
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (metric[0] / metric[2], metric[1] / metric[3],
                              None))
        net.eval()
        with torch.no_grad():
            test_acc = 0
            test_num = 0
            for X, y in test_iter:
                if isinstance(X,list):
                    X = [x.to(device) for x in X]
                else:
                    X = X.to(device)
                y = y.to(device)
                pred = net(X)
                test_acc += (pred.argmax(1)==y).sum()
                test_num += y.numel()
            animator.add(epoch + 1, (None, None, test_acc/test_num))
    print(f'loss {metric[0] / metric[2]:.3f}, train acc '
          f'{metric[1] / metric[3]:.3f}, test acc {test_acc/test_num:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on '
          f'{str(device)}')

In [None]:
train(net, train_iter, test_iter, loss, trainer, num_epochs,device)

In [None]:
#------------- 预测

def predict_snli(net, vocab, premise, hypothesis):
    """预测前提和假设之间的逻辑关系"""
    net.eval()
    premise = torch.tensor(vocab[premise],device = device)
    hypothesis = torch.tensor(vocab[hypothesis], device = device)
    label = torch.argmax(net([premise.reshape(1,-1), 
                              hypothesis.reshape(-1,1)]), dim=1)
    return 'entailment' if label==0 else (
    'contradiction' if label == 1 else 'neural')

In [None]:
predict_snli(net, vocab, ['he', 'is', 'good', '.'], ['he', 'is', 'bad', '.'])