In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pickle
import numpy as np
import os
import json
import random
    
words_path = os.path.join(os.getcwd(), "words.pkl")
with open(words_path, 'rb') as f_words:
    words = pickle.load(f_words)
    
# 构建分类模型
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, output_dim, emb_dim, n_layers, n_heads, pf_dim, dropout, position_length, pad_idx):
        super(TransformerEncoder, self).__init__()
        
        self.pad_idx = pad_idx
        self.scale = torch.sqrt(torch.FloatTensor([emb_dim]))

        # 词的embedding
        self.token_embedding = nn.Embedding(input_dim, emb_dim)
        # 对词的位置进行embedding
        self.position_embedding = nn.Embedding(position_length, emb_dim)
        # encoder层，有几个encoder层，每个encoder有几个head
        self.layers = nn.ModuleList([EncoderLayer(emb_dim, n_heads, pf_dim, dropout) for _ in range(n_layers)])
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(emb_dim, output_dim)

    def mask_src_mask(self, src):
        # src=[batch_size, src_len]

        # src_mask=[batch_size, 1, 1, src_len]
        src_mask = (src != self.pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask
    
    def forward(self, src):
        # src=[batch_size, seq_len]
        # src_mask=[batch_size, 1, 1, seq_len]
        src_mask = self.mask_src_mask(src)
        
        batch_size = src.shape[0]
        src_len = src.shape[1]

        # 构建位置tensor -> [batch_size, seq_len]，位置序号从(0)开始到(src_len-1)
        position = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1)

        # 对词和其位置进行embedding -> [batch_size,seq_len,embdim]
        token_embeded = self.token_embedding(src) * self.scale
        position_embeded = self.position_embedding(position)

        # 对词和其位置的embedding进行按元素加和 -> [batch_size, seq_len, embdim]
        src = self.dropout(token_embeded + position_embeded)

        for layer in self.layers:
            src = layer(src, src_mask)

        # [batch_size, seq_len, emb_dim] -> [batch_size, output_dim]
        src = src.permute(0, 2, 1)
        src = torch.sum(src, dim=-1)
        src = self.fc(src)
        return src

'''
encoder layers：
    1.将src与src_mask传入多头attention层(multi-head attention)
    2.dropout
    3.使用残差连接后传入layer-norm层(输入+输出后送入norm)后得到的输出
    4.输出通过前馈网络feedforward层
    5.dropout
    6.一个残差连接后传入layer-norm层后得到的输出喂给下一层
    注意：
        layer之间不共享参数
        多头注意力层用到的是多个自注意力层self-attention
'''
class EncoderLayer(nn.Module):
    def __init__(self, emb_dim, n_heads, pf_dim, dropout):
        super(EncoderLayer, self).__init__()
        # 注意力层后的layernorm
        self.self_attn_layer_norm = nn.LayerNorm(emb_dim)
        # 前馈网络层后的layernorm
        self.ff_layer_norm = nn.LayerNorm(emb_dim)
        # 多头注意力层
        self.self_attention = MultiHeadAttentionLayer(emb_dim, n_heads, dropout)
        # 前馈层
        self.feedforward = FeedforwardLayer(emb_dim, pf_dim, dropout)

        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask):
        #src=[batch_size, seq_len, emb_dim]
        #src_mask=[batch_size, 1, 1, seq_len]

        # self-attention
        # _src=[batch size, query_len, emb_dim]
        _src, _ = self.self_attention(src, src, src, src_mask)

        # dropout, 残差连接以及layer-norm
        # src=[batch_size, seq_len, emb_dim]
        src = self.self_attn_layer_norm(src + self.dropout(_src))

        # 前馈网络
        # _src=[batch_size, seq_len, emb_dim]
        _src = self.feedforward(src)

        # dropout, 残差连接以及layer-norm
        # src=[batch_size, seq_len, emb_dim]
        src = self.ff_layer_norm(src + self.dropout(_src))

        return src
'''
多头注意力层的计算:
    1.q,k,v的计算是通过线性层fc_q,fc_k,fc_v
    2.对query,key,value的emb_dim split成n_heads
    3.通过计算Q*K/scale计算energy
    4.利用mask遮掩不需要关注的token
    5.利用softmax与dropout
    6.5的结果与V矩阵相乘
    7.最后通过一个前馈fc_o输出结果
注意:Q,K,V的长度一致
'''
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout):
        super(MultiHeadAttentionLayer, self).__init__()
        assert emb_dim % n_heads == 0
        self.emb_dim = emb_dim
        self.n_heads = n_heads
        self.head_dim = emb_dim//n_heads

        self.fc_q = nn.Linear(emb_dim, emb_dim)
        self.fc_k = nn.Linear(emb_dim, emb_dim)
        self.fc_v = nn.Linear(emb_dim, emb_dim)

        self.fc_o = nn.Linear(emb_dim, emb_dim)

        self.dropout = nn.Dropout(dropout)

        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim]))

    def forward(self, query, key, value, mask=None):
        # query=[batch_size, query_len, emb_dim]
        # key=[batch_size, key_len, emb_dim]
        # value=[batch_size, value_len, emb_dim]
        batch_size = query.shape[0]

        # Q=[batch_size, query_len, emb_dim]
        # K=[batch_size, key_len, emb_dim]
        # V=[batch_size, value_len, emb_dim]
        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)

        '''
        view与reshape的异同：
        
        torch的view()与reshape()方法都可以用来重塑tensor的shape，区别就是使用的条件不一样。view()方法只适用于满足连续性条件的tensor，并且该操作不会开辟新的内存空间，
        只是产生了对原存储空间的一个新别称和引用，返回值是视图。而reshape()方法的返回值既可以是视图，也可以是副本，当满足连续性条件时返回view，
        否则返回副本[ 此时等价于先调用contiguous()方法在使用view() ]。因此当不确能否使用view时，可以使用reshape。如果只是想简单地重塑一个tensor的shape，
        那么就是用reshape，但是如果需要考虑内存的开销而且要确保重塑后的tensor与之前的tensor共享存储空间，那就使用view()。
        '''

        # Q=[batch_size, n_heads, query_len, head_dim]
        # K=[batch_size, n_heads, key_len, head_dim]
        # V=[batch_size, n_heads, value_len, head_dim]
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)

        # 注意力打分矩阵 [batch_size, n_heads, query_len, head_dim] * [batch_size, n_heads, head_dim, key_len] = [batch_size, n_heads, query_len, key_len]
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)
        # [batch_size, n_heads, query_len, key_len]
        attention = torch.softmax(energy , dim = -1)

        # [batch_size, n_heads, query_len, key_len]*[batch_size, n_heads, value_len, head_dim]=[batch_size, n_heads, query_len, head_dim]
        x = torch.matmul(self.dropout(attention), V)

        # [batch_size, query_len, n_heads, head_dim]
        x = x.permute(0, 2, 1, 3).contiguous()

        # [batch_size, query_len, emb_dim]
        x = x.view(batch_size, -1, self.emb_dim)

        # [batch_size, query_len, emb_dim]
        x = self.fc_o(x)

        return x, attention

'''
前馈层
'''
class FeedforwardLayer(nn.Module):
    def __init__(self, emb_dim, pf_dim, dropout):
        super(FeedforwardLayer, self).__init__()
        self.fc_1 = nn.Linear(emb_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, emb_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x=[batch_size, seq_len, emb_dim]

        # x=[batch_size, seq_len, pf_dim]
        x = self.dropout(torch.relu(self.fc_1(x)))

        # x=[batch_size, seq_len, emb_dim]
        x = self.fc_2(x)

        return x
    
input_dim = len(words) 
output_dim = 9
emb_dim = 256
n_layers = 3
n_heads = 8
pf_dim = 512
dropout = 0.5
position_length = 20     
pad_index = words['<pad>']

model = TransformerEncoder(input_dim, output_dim, emb_dim, n_layers, n_heads, pf_dim, dropout, position_length, pad_index)
model_path = os.path.join(os.getcwd(), "model.h5")
model.load_state_dict(torch.load(model_path))


<All keys matched successfully>

In [3]:
from pyhanlp import HanLP

segment = HanLP.newSegment().enableCustomDictionaryForcing(True)


# 分词，需要将电影名，演员名和评分数字转为nm，nnt，ng
def sentence_segment(sentence):
    word_nature = segment.seg(sentence)
    print(word_nature)
    sentence_words = []
    for term in word_nature:
        if str(term.nature) == 'kp':
            sentence_words.append('kp')
        else:
            sentence_words.extend(list(term.word))
    print(sentence_words)
    return sentence_words

def bow(sentence, words, show_detail = True):
    sentence_words = sentence_segment(sentence)
    indexed = [words.stoi[t] for t in sentence_words]
    src_tensor = torch.LongTensor(indexed)
    src_tensor = src_tensor.unsqueeze(0)
    return src_tensor

def predict_class(sentence, model):
    sentence_bag = bow(sentence, words, False)
    model.eval()
    with torch.no_grad():
        outputs = model(sentence_bag)
    print('outputs:{}'.format(outputs))
    predicted_prob,predicted_index = torch.max(F.softmax(outputs, 1), 1)#预测最大类别的概率与索引
    print('softmax_prob:{}'.format(predicted_prob))
    print('softmax_index:{}'.format(predicted_index))
    results = []
    #results.append({'intent':index_classes[predicted_index.detach().numpy()[0]], 'prob':predicted_prob.detach().numpy()[0]})
    results.append({'intent':predicted_index.detach().numpy()[0], 'prob':predicted_prob.detach().numpy()[0]})
    print('result:{}'.format(results))
    return results
 
def get_response(predict_result):
    tag = predict_result[0]['intent']
    return tag

def predict(text):
    predict_result = predict_class(text, model)
    res = get_response(predict_result)
    return res
print(predict("包含电磁学的题目复杂度"))

[包含/v, 电磁学/kp, 的/ude1, 题目/n, 复杂度/nz]
['包', '含', 'kp', '的', '题', '目', '复', '杂', '度']
outputs:tensor([[ -8.6761,  -7.6500,  -7.9348, -12.6827,  -1.3421,   6.9967,   5.6434,
          23.2765,  -6.4214]])
softmax_prob:tensor([1.0000])
softmax_index:tensor([7])
result:[{'intent': 7, 'prob': 0.9999999}]
7
