In [None]:
########################################################################################################################
Reference: https://blog.csdn.net/weixin_42521185/article/details/124768029
########################################################################################################################
import math
from torch.autograd import Variable
from torch import nn
import torch
import copy
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F


########################################################################################################################

########################################################################################################################
# Word embedding layer
# vocab: length of vocabulary
# d_model: dimension of word embedding
class Embedding(nn.Module):
    def __init__(self, vocab, d_model):
        super(Embedding, self).__init__()
        self.lut = nn.Embedding(vocab, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.lut(x) * math.sqrt(self.d_model)


# vocabulary: 1000*512, 1000 words in total
vocab = 1000
d_model = 512

x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))

emb = Embedding(vocab, d_model)
embr = emb(x)

########################################################################################################################
# Position encoder
# d_model: dimension of word embedding
# dropout: zero ratio of dropout layer
# max_len: max length of every sentence
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)

        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)  
        pe[:, 1::2] = torch.cos(position * div_term)  
        pe = pe.unsqueeze(0)  # 1 * max_len * d_model
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + Variable(self.pe[:, : x.size(1)], requires_grad=False) 
        return self.dropout(x)


d_model = 512
dropout = 0.1
max_len = 60
vocab = 1000

x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))

emb = Embedding(vocab, d_model)
embr = emb(x)
x = embr  # shape: [2, 4, 512]
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)
# print(pe_result)



def attention(query, key, value, mask=None, dropout=None):
    # query, key, value: three tensors of attention input
    # mask: mask tensor
    d_k = query.size(-1)

    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    # print("..", scores.shape)
    # judge whether mask tensor is used
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    p_attn = F.softmax(scores, dim=-1)

    # judge whether dropout is used
    if dropout is not None:
        p_attn = dropout(p_attn)

    return torch.matmul(p_attn, value), p_attn


query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print('attn', attn)
# print('attn.shape', attn.shape)
# print("p_attn", p_attn)
# print(p_attn.shape)


def clones(module, N):
    # module: objective layer to be cloned
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

# Multiple heads attention
class MultiHeadAttention(nn.Module):
    def __init__(self, head, embedding_dim, dropout=0.1):
        super(MultiHeadAttention, self).__init__()

        assert embedding_dim % head == 0

        self.d_k = embedding_dim // head
        self.head = head
        self.embedding_dim = embedding_dim

        self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)

        self.attn = None

        self.drop = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        # query, key, value: three tensors of attention input
        # mask: mask tensor
        if mask is not None:
            mask = mask.unsqueeze(1)

        batch_size = query.size(0)

        query, key, value = \
            [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)
             for model, x in zip(self.linears, (query, key, value))]

        x, self.attn = attention(query, key, value, mask=mask, dropout=self.drop)

        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head*self.d_k)

        return self.linears[-1](x)

head = 8
embedding_dim = 512
dropout = 0.2

query = key = value = pe_result

mask = Variable(torch.zeros(2, 4, 4))

mha = MultiHeadAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)

# print(mha_result)
# print(mha_result.shape)

In [None]:
import math
from torch.autograd import Variable
from torch import nn
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F


class PositionwiseFeedForward(nn.Module):
    # d_model: dimension of word embedding
    # d_ff: output dimension of first linear layer, input dimension of second linear layer
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()

        self.w1 = nn.Linear(d_model, d_ff)
        self.w2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):
        # x: output from previos layer
        return self.w2(self.dropout(F.relu((self.w1(x)))))


d_model = 512
d_ff = 64
dropout = 0.2

x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)
# print(ff_result)
# print(ff_result.shape)

# Normalize layers
class LayerNorm(nn.Module):
    # features: dimension of word embedding
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a2 = nn.Parameter(torch.ones(features))
        self.b2 = nn.Parameter(torch.zeros(features))
        self.eps = eps  

    def forward(self, x):
        # x: output from previos layer
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a2 * (x-mean) / (std + self.eps) + self.b2


features = d_model = 512
eps = 1e-6

x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)
# print(ln_result)
# print(ln_result.shape)

# Sublayer connection
class SublayerConnection(nn.Module):
    # size: dimension of word embedding
    def __init__(self, size, dropout=0.1):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)\
        self.dropout = nn.Dropout(p=dropout)
        self.size = size

    def forward(self, x, sublayer):
        # x: tensor of previous layer
        return x + self.dropout(sublayer(self.norm(x)))

size = d_model = 512
head = 8
dropout = 0.2

x = pe_result
mask = Variable(torch.zeros(2, 4, 4))
self_attn = MultiHeadAttention(head, d_model)

sublayer = lambda x: self_attn(x, x, x, mask)

sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)
# print(sc_result)
# print(sc_result.shape)

# Encoder layer
class EncoderLayer(nn.Module):
    # size: dimension of previous layer
    # self_attn: instantiated object of multi-head attention sublayer
    # dropout: zero ratio of dropout
    def __init__(self, size, self_attn, feed_forward, dropout):
        super(EncoderLayer, self).__init__()

        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.size = size

        self.sublayer = clones(SublayerConnection(size, dropout), 2)

    def forward(self, x, mask):
        # x: tensor from previous layer
        # mask: mask tensor
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
        return self.sublayer[1](x, self.feed_forward)


size = d_model = 512
head = 8
d_ff = 64
x = pe_result
dropout = 0.2
self_attn = MultiHeadAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(2, 4, 4))

el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)
# print(el_result)
# print(el_result.shape)

# Encoder
class Encoder(nn.Module):
    # layer: encoder layer
    # N: number of encoder layer
    def __init__(self, layer, N):
        super(Encoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)

    def forward(self, x, mask):
        # x: tensor from previous layer
        # mask: mask tensor
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)


size = d_model = 512
head = 8
d_ff = 64
c = copy.deepcopy
attn = MultiHeadAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = EncoderLayer(size, c(attn), c(ff), dropout)
N = 8
mask = Variable(torch.zeros(2, 4, 4))
en = Encoder(layer, N)
en_result = en(x, mask)
# print(en_result)
# print(en_result.shape)

# Decoder layer
class DecoderLayer(nn.Module):
    # size: dimension of word embedding
    # self_attn: multi-head attention objective
    # src_attn: normal attention objective
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
        super(DecoderLayer, self).__init__()

        self.size = size
        self.self_attn = self_attn
        self.src_attn = src_attn
        self.feed_forward = feed_forward
        self.dropout = dropout

        self.sublayer = clones(SublayerConnection(size, dropout), 3)

    def forward(self, x, memory, source_mask, target_mask):
        # x: tensor from previous layer
        # memory: encoder semantic storage tensor
        # source_mask: mask tensor of source code
        # target_mask: mask tensor of target code
        m = memory
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))
        x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))
        return self.sublayer[2](x, self.feed_forward)


size = d_model = 512
head = 8
d_ff = 64
dropout = 0.2

self_attn = src_attn = MultiHeadAttention(head, d_model, dropout)
ff  = PositionwiseFeedForward(d_model, d_ff, dropout)
x = pe_result
memory = en_result
mask = Variable(torch.zeros(2, 4, 4))
source_mask = target_mask = mask

dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
# print(dl_result)
# print(dl_result.shape)

# Decoder
class Decoder(nn.Module):
    # layer: target of decoder layer
    # N: number of copy
    def __init__(self, layer, N):
        super(Decoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)

    def forward(self, x, memory, source_mask, target_mask):
        for layer in self.layers:
            x = layer(x, memory, source_mask, target_mask)
        return self.norm(x)


size = d_model = 512
head = 8
d_ff = 64
dropout = 0.2
c = copy.deepcopy
attn = MultiHeadAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
layer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout)

N = 8
x = pe_result
memory = en_result
mask = Variable(torch.zeros(2, 4, 4))
source_mask = target_mask = mask

de = Decoder(layer, N)
de_result = de(x, memory, source_mask, target_mask)
# print(de_result)
# print(de_result.shape)

# Generator
class Generator(nn.Module):
    # d_model: dimension of word embedding
    # vocab_size: size of vocabulary
    def __init__(self, d_model, vocab_size):
        super(Generator, self).__init__()
        self.project = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # x: output tensor of previous layer
        return F.log_softmax(self.project(x), dim=-1)

d_model = 512
vocab_size = 1000
x = de_result

gen = Generator(d_model, vocab_size)
gen_result = gen(x)
print(gen_result)
print(gen_result.shape)

In [None]:
# Encoder-Decoder structure
class EncoderDecoder(nn.Module):
    def __init__(self, encoder, decoder, source_embed, target_embed, generator):
        super(EncoderDecoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_emned = source_embed
        self.tgt_embed = target_embed
        self.generator = generator

    def forward(self, source, target, source_mask, target_mask):
        # source: source data
        # target: target data
        # source_mask: mask tensor of source data
        # target_mask: mask tensor of target data
        return self.decode(self.encode(source, source_mask), source_mask,
                           target, target_mask)

    def encode(self, source, source_mask):
        return self.encoder(self.src_emned(source), source_mask)

    def decode(self, memory, source_mask, target, target_mask):
        return self.decoder(self.tgt_embed(target), memory, source_mask, target_mask)


vocab_size = 1000
d_model = 512
encoder = en
decoder = de
source_embed = nn.Embedding(vocab_size, d_model)
target_embed = nn.Embedding(vocab_size, d_model)
generator = gen

source = target = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
source_mask = target_mask = Variable(torch.zeros(2, 4, 4))

ed = EncoderDecoder(encoder, decoder, source_embed, target_embed, generator)
ed_result = ed(source, target, source_mask, target_mask)
print(ed_result)
print(ed_result.shape)

In [None]:
def make_model(source_vocab, target_vocab, N=6, d_model=512, d_ff=2048, head=8, dropout=0.1):
    # source_vocab: word number of source vocabulary
    # target_vocab: word number of target vocabulary
    # N: layer of encoders and decoders
    # d_model: dimension of word embedding
    # d_ff: transformation matrix dimension in feedforward full connection layer
    # head: multiply head number
    # dropout: zero ratio
    c = copy.deepcopy
    attn = MultiHeadAttention(head, d_model)
    ff = PositionwiseFeedForward(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    model = EncoderDecoder(
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
        Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),
        nn.Sequential(Embedding(d_model, source_vocab), c(position)),
        nn.Sequential(Embedding(d_model, target_vocab), c(position)),\
        Generator(d_model, target_vocab)
    )

    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform(p)

    return model

In [None]:
# data generator
def data_generator(V, batch_size, num_batch):
    # batch_size: sample number
    # num_batch: round number
    for i in range(num_batch):
        data = torch.from_numpy(np.random.randint(1, V, size=(batch_size, 10)))

        data[:, 0] = 1

        source = Variable(data, requires_grad=False)
        target = Variable(data, requires_grad=False)

        yield Batch(source, target)

In [None]:
from pyitcast.transformer_utils import Batch
from pyitcast.transformer_utils import get_std_opt
from pyitcast.transformer_utils import LabelSmoothing
from pyitcast.transformer_utils import SimpleLossCompute
from pyitcast.transformer_utils import run_epoch
from pyitcast.transformer_utils import greedy_decode

V = 11
batch_size = 20
num_batch = 30

model = make_model(V, V, N=2)

model_optimizer = get_std_opt(model)

criterion = LabelSmoothing(size=V, padding_idx=0, smoothing=0.0)

loss = SimpleLossCompute(model.generator, criterion, model_optimizer)

In [None]:
from pyitcast.transformer_utils import LabelSmoothing
import matplotlib.pyplot as plt
from torch.autograd import Variable
import torch

crit = LabelSmoothing(size=5, padding_idx=0, smoothing=0.5)
predict = Variable(torch.FloatTensor([[0, 0.2, 0.7, 0.1, 0],
                                     [0, 0.2, 0.7, 0.1, 0],
                                     [0, 0.2, 0.7, 0.1, 0]]))

target = Variable(torch.LongTensor([2, 1, 0]))

crit(predict, target)
print(crit.true_dist)
plt.imshow(crit.true_dist)
plt.show()

In [None]:
model = make_model(V, V, N=2)

model_optimizer = get_std_opt(model)

criterion = LabelSmoothing(size=V, padding_idx=0, smoothing=0.0)

loss = SimpleLossCompute(model.generator, criterion, model_optimizer)


def run(model, loss, epochs=10):
    # model: model to be trained
    # loss: loss calculation
    # epochs: training round
    for epoch in range(epochs):
        model.train()
        run_epoch(data_generator(V, 8, 20), model, loss)
        print("Running AT")
        model.eval()
        run_epoch(data_generator(V, 8, 5), model, loss)

epochs = 10

if __name__ == '__main__':
    run(model, loss)