In [None]:
from tokenizers import (
    models,
    normalizers,
    pre_tokenizers,
    trainers,
    Tokenizer,
)
import os 
import math
#训练分词器
tokenizer = Tokenizer(models.WordPiece(unk_token="[UNK]"))
tokenizer.normalizer = normalizers.Sequence(
    [normalizers.NFD(),normalizers.Lowercase()]
)
tokenizer.pre_tokenizer = pre_tokenizers.WhitespaceSplit()
special_tokens = ["[UNK]","[PAD]","[CLS]","[SEP]","[MASK]"]
trainer = trainers.WordPieceTrainer(vocab_size =10000,special_tokens=special_tokens)

data_list = []
for root,paths,names in os.walk("./对联data"):
    for name in names:
        data_list.append(os.path.join(root,name))

tokenizer.train(data_list,trainer=trainer)
tokenizer.save("./model_save/transformer_my_tokenizer.json")



In [8]:
#使用训练好的分词器
tokenizer = Tokenizer.from_file("./model_save/transformer_my_tokenizer.json")
s = "尽其在我，听其在天"
s = " ".join([c for c in s])
tokens = tokenizer.encode(s)
print(tokens.ids,tokens.tokens)

[1787, 462, 1209, 2510, 9026, 840, 462, 1209, 1435] ['尽', '其', '在', '我', '，', '听', '其', '在', '天']


In [17]:
import torch
import json
class ModelConfig:
    def __init__(self,
                device = torch.device("cuda:0"),
                vocab_size = None,
                max_len = None,
                hidden_dim = None,
                n_head = None,
                drop_out = None,
                encoder_layer_count = None,
                decoder_layer_count = None,
                padding_id = None
                ):
                
        self.device = device
        self.vocab_size = vocab_size
        self.max_len = max_len
        self.hidden_dim = hidden_dim
        self.n_head = n_head
        self.drop_out = drop_out
        self.encoder_layer_count = encoder_layer_count
        self.decoder_layer_count = decoder_layer_count
        self.padding_id = padding_id

    
    def save(self,path):
        f = open(path,'w')
        d = {
            "vocab_size":self.vocab_size,
            "max_len":self.max_len,
            "hidden_dim":self.hidden_dim,
            "n_head":self.n_head,
            "drop_out":self.drop_out,
            "encoder_layer_count":self.encoder_layer_count,
            "decoder_layer_count":self.decoder_layer_count,
            "padding_id":self.padding_id,
        }
        d = json.dumps(d)
        f.write(d)
        f.close()

    def load(self,path):
        d = open(path).read()
        d = json.loads(d)
        self.vocab_size = d["vocab_size"]
        self.max_len = d["max_len"]
        self.hidden_dim = d["hidden_dim"]
        self.n_head = d["n_head"]
        self.drop_out = d["drop_out"]
        self.encoder_layer_count = d["encoder_layer_count"]
        self.decoder_layer_count = d["decoder_layer_count"]
        self.padding_id = d["padding_id"]

In [9]:
import torch.nn as nn
import torch
import torch.optim as optim
from torch.utils.data import Dataset,DataLoader
import torch.nn.functional as F
from torch.nn import CrossEntropyLoss
from tqdm import tqdm
import json
import numpy as np
import random

class Embedding(nn.Module):
    def __init__(self,config):
        super(Embedding,self).__init__()
        #id编码
        self.word_embeddings = nn.Embedding(num_embeddings = config.vocab_size,embedding_dim = config.hidden_dim).to(config.device)
        self.position_embeddings = nn.Embedding(num_embeddings = config.max_len,embedding_dim = config.hidden_dim).to(config.device)
        self.token_type_embeddings = nn.Embedding(num_embeddings = 2,embedding_dim = config.hidden_dim).to(config.device)
        self.device = config.device

    def forward(self,input_ids,token_type_ids=None):
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length,dtype = torch.long,device = self.device)
        if token_type_ids is None:
            token_type_ids = torch.zeros_like(input_ids,device = self.device)

        we = self.word_embeddings(input_ids)
        pe = self.position_embeddings(position_ids)
        te = self.token_type_embeddings(token_type_ids)
        return we + pe + te

In [23]:
class MultiHeadAttention(nn.Module):
    def __init__(self,config):
        super(MultiHeadAttention,self).__init__()
        self.n_head = config.n_head
        self.dim = config.hidden_dim
        self.device = config.device

        self.wq = nn.Linear(self.dim,self.dim).to(self.device)
        self.wk = nn.Linear(self.dim,self.dim).to(self.device)
        self.wv = nn.Linear(self.dim,self.dim).to(self.device)

        self.softmax = nn.Softmax(dim=3)
        self.f = nn.Linear(self.dim,self.dim).to(self.device)
        self.norm = nn.LayerNorm(self.dim).to(self.device)
        self.dropout = nn.Dropout(config.drop_out)

    def concat(self,tensor):
        a, b, c, d = tensor.size()
        return tensor.view(a,c,b*d)
    
    def attention(self,q,k,v):
        _, _, _, d = q.size()
        kt = k.transpose(2,3)
        s = (q @ kt)/math.sqrt(d)
        s = self.softmax(s)
        v = s @ v
        return v
    
    def split(self,tensor):
        a,b,c = tensor.size()
        d = c // self.n_head
        return tensor.view(a,self.n_head,b,d)
    
    def forward(self,x,encode_output=None):
        if encode_output is not None:
            q,k,v = self.wq(encode_output),self.wk(x),self.wv(encode_output)
        else:
            q,k,v = self.wq(x),self.wk(x),self.wv(x)
        q,k,v = self.split(q),self.split(k),self.split(v)
        v = self.attention(q,k,v)
        v = self.concat(v)
        vb = v
        v = self.norm(x+v)
        v = self.f(v)
        v = self.norm(vb+v)
        return self.dropout(v)

In [14]:
class Encoder(nn.Module):
    def __init__(self,config):
        super(Encoder,self).__init__()
        self.attention = MultiHeadAttention(config)
        
    def forward(self,x):
        encoder_output = self.attention(x)
        return encoder_output

In [21]:
class Decoder(nn.Module):
    def __init__(self,config):
        super(Decoder,self).__init__()
        self.attention = MultiHeadAttention(config)
        self.encode_decode_attention = MultiHeadAttention(config)

    def forward(self,x,encode_out):
        x = self.attention(x)
        decode_out = self.encode_decode_attention(x,encode_out)
        return decode_out

In [16]:
class Transformer(nn.Module):
    def __init__(self,config):
        super(Transformer,self).__init__()
        self.embedding = Embedding(config)
        self.encoders = nn.ModuleList([Encoder(config) for _ in range(config.encoder_layer_count)])
        self.decoders = nn.ModuleList([Decoder(config) for _ in range(config.decoder_layer_count)])
        self.f = nn.Linear(config.hidden_dim,config.vocab_size).to(config.device)

    def forward(self,src_ids,drc_ids):
        em = self.embedding(src_ids)
        for encoder in self.encoders:
            em = encoder(em)
        dem = self.embedding(drc_ids)
        for decoder in self.decoders:
            dem = decoder(dem,em)
        dem = self.f(dem)
        return dem

In [26]:
class CoupleDataset(Dataset):
    def __init__(self,path):
        self.tokenizer = Tokenizer.from_file("./model_save/transformer_my_tokenizer.json")
        in_data = open(os.path.join(path,"in.txt"),encoding="utf8").readlines()
        out_data = open(os.path.join(path,"out.txt"),encoding="utf8").readlines()

        data = []
        for i in tqdm(range(len(in_data))):
            s,t = in_data[i][:30],out_data[i][:30]
            if len(s)==0 or len(t) == 0:
                continue
            sids = self.tokenizer.encode(s).ids
            tids = self.tokenizer.encode(t).ids
            size = len(tids)
            for j in range(size):
                data.append([sids,[3]+tids[:size-j]])
            self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self,index):
        s,t = self.data[index]
        if len(s) < 30:
            s = s + [1 for _ in range(30 - len(s))]
        if len(t) < 31:
            t = t + [1 for _ in range(31 - len(t))]
        return np.array(s,dtype=np.int64),np.array(t,dtype=np.int64)

def train():
    config = ModelConfig(device=torch.device("cuda:0"))
    config.load("./model_save/transformer_config.json")
    model = Transformer(config)
    data = CoupleDataset("./对联data/train")
    loader = DataLoader(data,batch_size=20,shuffle=True)
    optimizer = optim.Adam(model.parameters(),lr=1e-4)
    criterion = torch.nn.CrossEntropyLoss(ignore_index=config.padding_id)
    epoch = 20
    step = 0
    for e in range(epoch):
        pbar = tqdm(loader)
        for s,t in pbar:
            s = s.to(config.device)
            t = t.to(config.device)
            predict = model(s,t[:,:-1])
            predict_v = predict.contiguous().view(-1,predict.shape[-1])
            y = t[:,1:].contiguous().view(-1)
            optimizer.zero_grad()
            loss = criterion(predict_v,y)
            loss.backward()
            optimizer.step()
            desc = "[{}/{}][{}][{}]".format(e+1,epoch,step,float(loss))
            pbar.set_description(desc)
            step+=1
            if step% 100 ==0:
                pids = torch.argmax(predict,dim=2)
                y = t[:,1:]
                for i in range(predict.size(0)):
                    ss = data.tokenizer.decode(s.detach().cpu().numpy()[i])
                    ps = data.tokenizer.decode(pids.detach().cpu().numpy()[i])
                    oris = data.tokenizer.decode(y.detach().cpu().numpy()[i])
                    print("\n\n==[{}]==\n==[{}]==\n[{}]==\n\n".format(ss,ps,oris))

In [None]:
#train()