### 2.2 Transformer Tagger
#### 2.2.1 read Data

In [10]:
import json

import numpy as np

Mydata = {}
with open("./Mydata.json", 'r+') as f:
    Mydata = json.load(f)
tmp = {}
for key in Mydata['id2label']:
    tmp[int(key)] = Mydata['id2label'][key]
Mydata['id2label'] = tmp

#### 2.2.2 Define my model

In [11]:
import math
from typing import Tuple

import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

torch.manual_seed(42)
class MyTransformerModel(nn.Module):
    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int,noutput:int, dropout: float = 0.5):
        super(MyTransformerModel, self).__init__()
        self.model_type = 'Transformer'

        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)

        self.d_model = d_model
        self.decoder = nn.Linear(d_model, noutput)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor) -> Tensor:
        """
        Args:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        # print(src.shape)
        # print(src_mask.shape)
        output = self.transformer_encoder(src, src_key_padding_mask = src_mask)
        output = self.decoder(output)
        return output


def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)



In [12]:
from torch.utils.data import Dataset, DataLoader


class MyDataset(Dataset):
    def __init__(self, data: list, label, word2id, id2label, label2id, device):
        self.data = data
        self.label = label
        self.word2id = word2id
        self.id2label = id2label
        self.label2id = label2id
        self.device = device

    def __getitem__(self, item):
        data = []
        label = []
        # print(item)
        for i in range(len(self.data[item])):
            data.append(self.word2id[self.data[item][i]])
            label.append(self.label2id[self.label[item][i]])
        return data, label

    def __len__(self):
        return len(self.data)

    def batchfy(self, batchData):
        # print(batchData)
        maxlen = max([len(each[0]) for each in batchData])
        padc, padi = self.word2id["PAD"], self.label2id["PAD"]
        batchDatas = []
        padlabels = []
        attention = []
        for i in range(len(batchData)):
            if len(batchData[i][0]) >= maxlen:
                batchDatas.append(batchData[i][0][:])
                padlabels.append(batchData[i][1][:])
                attention.append([1] * len(batchData[i][0]))
                continue
            l = len(batchData[i][0])
            padsize = maxlen - l
            batchDatas.append(batchData[i][0] + [padc] * padsize)
            padlabels.append(batchData[i][1] + [padi] * padsize)
            attention.append([1] * l + [0] * padsize)
        return batchDatas, attention, padlabels, maxlen

In [13]:

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [14]:
noutput = len(Mydata['label2id'])  # size of output
ntokens = len(Mydata['word2id'])
emsize = 128  # embedding dimension
d_hid = 120  # dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 5  # number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 2  # number of heads in nn.MultiheadAttention
dropout = 0.5 # dropout probability

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = MyTransformerModel(ntokens, emsize, nhead, d_hid, nlayers, noutput, dropout).to(device)

In [15]:
BatchSize = 100
trainDataSet = MyDataset(Mydata['trainset']['data'], Mydata['trainset']['label'], Mydata['word2id'], Mydata['id2label'],
                         Mydata['label2id'],device)

trainDataloader = DataLoader(dataset=trainDataSet, batch_size=BatchSize,
                             shuffle=False, collate_fn=trainDataSet.batchfy)

testDataSet = MyDataset(Mydata['testset']['data'], Mydata['testset']['label'],
                        Mydata['word2id'], Mydata['id2label'], Mydata['label2id'],device)
testDataloader = DataLoader(dataset=testDataSet, batch_size=BatchSize,
                            shuffle=False, collate_fn=testDataSet.batchfy)

devDataset = MyDataset(Mydata['devset']['data'], Mydata['devset']['label'],
                        Mydata['word2id'], Mydata['id2label'], Mydata['label2id'],device)
devDataloader = DataLoader(dataset=validDataset, batch_size=BatchSize,
                            shuffle=False, collate_fn=validDataset.batchfy)


In [16]:
import copy
import time
from seqeval.metrics import classification_report,accuracy_score,f1_score

criterion = nn.CrossEntropyLoss(ignore_index=Mydata['label2id']["PAD"])
lr = 1  # learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
noutput = len(Mydata['label2id'])


def train(model: nn.Module) -> None:
    model.train()  # turn on train mode
    total_loss = 0.
    log_interval = 200
    start_time = time.time()
    # src_mask = generate_square_subsequent_mask(4).to(device)

    # num_batches = len(train_data) // bptt
    num_batches = len(trainDataloader)
    for i, (batch, atten, label, size) in enumerate(trainDataloader):
        batch=torch.tensor(np.matrix(batch),dtype=torch.long,device=device)
        atten=torch.tensor(np.matrix(atten).T,dtype=torch.int,device=device)

        label = torch.tensor(np.array(label).reshape([1,size*len(batch)])[0] ,dtype=torch.long,device=device)

        output = model(batch, atten)
        # print(type(output))
        # print("outputshape",output.shape)
        # print("output",output)
        # t = output.view(-1, ntokens)
        # print("output view shape",t.shape)
        # pred = torch.argmax(output,dim=-1)
        # print("pred shape",pred.shape)
        # print("label shape",label.shape)
        vie = output.view(-1, noutput)
        # print("vie",vie)
        # print("vie shepe:", vie.shape)
        # print(label)
        # print("label",label.shape)
        loss = criterion(vie, label)
        # print("loss",loss.item())

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()

        # print(total_loss)

        if i % log_interval == 0 and i > 0:
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
            ppl = math.exp(cur_loss)
            print(f'| epoch {epoch:3d} | {i:5d}/{num_batches:5d} batches | '
                  f'lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | '
                  f'loss {cur_loss:5.2f} | ppl {ppl:8.2f}')
            total_loss = 0
            start_time = time.time()

    # for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
    #     data, targets = get_batch(train_data, i)
    #     seq_len = data.size(0)
    #     if seq_len != bptt:  # only on last batch
    #         src_mask = src_mask[:seq_len, :seq_len]
    #     output = model(data, src_mask)
    #     loss = criterion(output.view(-1, ntokens), targets)
    #
    #     optimizer.zero_grad()
    #     loss.backward()
    #     torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
    #     optimizer.step()
    #
    #     total_loss += loss.item()
    #     if batch % log_interval == 0 and batch > 0:
    #         lr = scheduler.get_last_lr()[0]
    #         ms_per_batch = (time.time() - start_time) * 1000 / log_interval
    #         cur_loss = total_loss / log_interval
    #         ppl = math.exp(cur_loss)
    #         print(f'| epoch {epoch:3d} | {batch:5d}/{num_batches:5d} batches | '
    #               f'lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | '
    #               f'loss {cur_loss:5.2f} | ppl {ppl:8.2f}')
    #         total_loss = 0
    #         start_time = time.time()


def evaluate(model: nn.Module, eval_data: DataLoader) -> float:
    model.eval()  # turn on evaluation mode
    total_loss = 0.
    # src_mask = generate_square_subsequent_mask(bptt).to(device)
    l = 0
    with torch.no_grad():
        for i, (batch, atten, label, size) in enumerate(eval_data):
            l+=len(batch)
            batch=torch.tensor(np.matrix(batch),dtype=torch.long,device=device)
            atten=torch.tensor(np.matrix(atten).T,dtype=torch.int,device=device)
            label = torch.tensor(np.array(label).reshape([1,size*len(batch)])[0] ,dtype=torch.long,device=device)

            output = model(batch, atten)
            output_flat = output.view(-1, noutput)
            total_loss += batch.size(0) * criterion(output_flat, label).item()
        # for i in range(0, eval_data.size(0) - 1, bptt):
        #     data, targets = get_batch(eval_data, i)
        #     seq_len = data.size(0)
        #     if seq_len != bptt:
        #         src_mask = src_mask[:seq_len, :seq_len]
        #     output = model(data, src_mask)
        #     output_flat = output.view(-1, ntokens)
        #     total_loss += seq_len * criterion(output_flat, targets).item()
    return total_loss / l

def generate_Report(model: nn.Module, eval_data: DataLoader,nbatch:int,id2label:dict):
    model.eval()  # turn on evaluation mode
    # src_mask = generate_square_subsequent_mask(bptt).to(device)
    predlabel_word = []
    reallabel_word = []
    with torch.no_grad():
        for i, (batch, atten, label, size) in enumerate(eval_data):
            batch=torch.tensor(np.matrix(batch),dtype=torch.long,device=device)
            atten=torch.tensor(np.matrix(atten).T,dtype=torch.int,device=device)

            output = model(batch, atten)
            pred_id = torch.argmax(output.data,dim=-1)
            for i in range(len(batch)):
                tmp_pred = []
                tmp_real = []
                for j in range(size):
                    if label[i][j] == 9:
                        break
                    tmp_real.append(id2label[label[i][j]])
                    tmp_pred.append(id2label[pred_id[i][j].item()])
                predlabel_word.append(tmp_pred[:])
                reallabel_word.append(tmp_real[:])

    # for i in range(len(real_label)):
    #     if real_label[i] == 9:
    #         continue
    #     predlabel_word.append(id2label[pred_label[i].item()])
    #     reallabel_word.append(id2label[real_label[i]])

    # print(set(predlabel_word))
    # print(set(reallabel_word))
    print("-----Output classification report:-----")
    print("ACC:",accuracy_score(reallabel_word,predlabel_word),end=" ")
    try:
        f1 = f1_score(reallabel_word,predlabel_word,zero_division=1)

    except TypeError:
        f1 = 'Nan'
    print("f1: ",f1)

    print(classification_report(reallabel_word,predlabel_word))




In [17]:
best_val_loss = float('inf')
epochs = 200
best_model = None

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(model)
    val_loss = evaluate(model, devDataloader)
    val_ppl = math.exp(val_loss)
    elapsed = time.time() - epoch_start_time
    print('-' * 89)
    print(f'| end of epoch {epoch:3d} | time: {elapsed:5.2f}s | '
          f'valid loss {val_loss:5.2f} | valid ppl {val_ppl:8.2f}')
    print('-' * 89)

    if val_loss < best_val_loss:
        print("Find a better model")
        best_val_loss = val_loss
        best_model = copy.deepcopy(model)

    scheduler.step()

generate_Report(best_model,testDataloader,BatchSize,Mydata['id2label'])

-----------------------------------------------------------------------------------------
| end of epoch   1 | time:  2.89s | valid loss  0.76 | valid ppl     2.14
-----------------------------------------------------------------------------------------
Find a better model
-----------------------------------------------------------------------------------------
| end of epoch   2 | time:  2.74s | valid loss  0.70 | valid ppl     2.01
-----------------------------------------------------------------------------------------
Find a better model
-----------------------------------------------------------------------------------------
| end of epoch   3 | time:  2.70s | valid loss  0.63 | valid ppl     1.89
-----------------------------------------------------------------------------------------
Find a better model
-----------------------------------------------------------------------------------------
| end of epoch   4 | time:  2.75s | valid loss  0.56 | valid ppl     1.74
--------------

In [18]:
torch.save(best_model,'./Transform/bestmodel.pt')