In [14]:
import torch
import torch.nn as nn

class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, d_model, h, P_drop):
        super().__init__()

        assert d_model % h == 0

        self.d_model = d_model # 임베딩 차원
        self.h = h # 헤드(head)의 개수: 서로 다른 어텐션(attention) 컨셉의 수
        self.d_k = d_model // h # 각 헤드(head)에서의 임베딩 차원

        self.fc_q = nn.Linear(d_model, d_model) # Query 값에 적용될 FC 레이어
        self.fc_k = nn.Linear(d_model, d_model) # Key 값에 적용될 FC 레이어
        self.fc_v = nn.Linear(d_model, d_model) # Value 값에 적용될 FC 레이어

        self.fc_o = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(P_drop)
        self.scale = torch.sqrt(torch.FloatTensor([self.d_k]))
        
    def forward(self, query, key, value, mask = None):

        batch_size = query.shape[0]

        # query: [batch_size, query_len, d_model]
        # key: [batch_size, key_len, d_model]
        # value: [batch_size, value_len, d_model]
 
        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)

        # Q: [batch_size, query_len, d_model]
        # K: [batch_size, key_len, d_model]
        # V: [batch_size, value_len, d_model]

        # d_model → h X d_k 형태로 변형
        # h(h)개의 서로 다른 어텐션(attention) 컨셉을 학습하도록 유도
        Q = Q.view(batch_size, -1, self.h, self.d_k).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.h, self.d_k).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.h, self.d_k).permute(0, 2, 1, 3)

        # Q: [batch_size, h, query_len, d_k]
        # K: [batch_size, h, key_len, d_k]
        # V: [batch_size, h, value_len, d_k]

        # Attention Energy 계산
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale # energy: [batch_size, h, query_len, key_len]

        # 마스크(mask)를 사용하는 경우
        if mask is not None:
            # 마스크(mask) 값이 0인 부분을 -1e10으로 채우기
            energy = energy.masked_fill(mask==0, -1e10)

        # 어텐션(attention) 스코어 계산: 각 단어에 대한 확률 값
        attention = torch.softmax(energy, dim=-1) # attention: [batch_size, h, query_len, key_len]

        # 여기에서 Scaled Dot-Product Attention을 계산
        x = torch.matmul(self.dropout(attention), V) # x: [batch_size, h, query_len, d_k]

        x = x.permute(0, 2, 1, 3).contiguous() # x: [batch_size, query_len, h, d_k]

        x = x.view(batch_size, -1, self.d_model) # x: [batch_size, query_len, d_model]

        x = self.fc_o(x) # x: [batch_size, query_len, d_model]

        return x, attention

class PositionwiseFeedforwardLayer(nn.Module):
    def __init__(self, d_model, d_ff, P_drop):
        super().__init__()

        self.fc_1 = nn.Linear(d_model, d_ff)
        self.fc_2 = nn.Linear(d_ff, d_model)

        self.dropout = nn.Dropout(P_drop)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]

        x = self.dropout(torch.relu(self.fc_1(x))) # x: [batch_size, seq_len, d_ff]
        x = self.fc_2(x) # x: [batch_size, seq_len, d_model]

        return x

In [15]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, h, d_ff, P_drop):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(d_model)
        self.ff_layer_norm = nn.LayerNorm(d_model)
        self.self_attention = MultiHeadAttentionLayer(d_model, h, P_drop)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(d_model, d_ff, P_drop)
        self.dropout = nn.Dropout(P_drop)

    # 하나의 임베딩이 복제되어 Query, Key, Value로 입력되는 방식
    def forward(self, src, src_mask):

        # src: [batch_size, src_len, d_model]
        # src_mask: [batch_size, src_len]

        # self attention
        # 필요한 경우 마스크(mask) 행렬을 이용하여 어텐션(attention)할 단어를 조절 가능
        src_mask= None
        # _src, _ = self.self_attention(src, src, src, src_mask)
        _src, _ = self.self_attention(src, src, src)

        # dropout, residual connection and layer norm
        src = self.self_attn_layer_norm(src + self.dropout(_src))

        # src: [batch_size, src_len, d_model]

        # position-wise feedforward
        _src = self.positionwise_feedforward(src)

        # dropout, residual and layer norm
        src = self.ff_layer_norm(src + self.dropout(_src))

        # src: [batch_size, src_len, d_model]

        return src

In [16]:
class Encoder(nn.Module):
    def __init__(self, input_dim, d_model, n_layers, h, d_ff, P_drop, seq_length=100):
        super().__init__()

        # self.device = device

        # self.tok_embedding = nn.Embedding(input_dim, d_model)
        # self.pos_embedding = nn.Embedding(seq_length, d_model)

        self.layers = nn.ModuleList([EncoderLayer(d_model, h, d_ff, P_drop) for _ in range(n_layers)])

        self.dropout = nn.Dropout(P_drop)

        self.scale = torch.sqrt(torch.FloatTensor([d_model]))#.to(device)

    def forward(self, src, src_mask):
    # def forward(self, src):

        # src: [batch_size, src_len]
        # src_mask: [batch_size, src_len]

        batch_size = src.shape[0]
        src_len = src.shape[1]

        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1)#.to(self.device)
        print(pos)
        # pos: [batch_size, src_len]

        # 소스 문장의 임베딩과 위치 임베딩을 더한 것을 사용
        
        # src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos)) # original
        src = self.dropout((src * self.scale) + pos)

        # src: [batch_size, src_len, d_model]

        # 모든 인코더 레이어를 차례대로 거치면서 순전파(forward) 수행
        for layer in self.layers:
            src = layer(src, src_mask)

        # src: [batch_size, src_len, d_model]

        return src # 마지막 레이어의 출력을 반환

In [17]:
INPUT_DIM = 100
# OUTPUT_DIM = 200

ENC_d_model = 512
ENC_N = 6
ENC_h = 8
ENC_d_ff = 512
ENC_P_drop = 0.1

enc = Encoder(INPUT_DIM, ENC_d_model, ENC_N, ENC_h, ENC_d_ff, ENC_P_drop)
enc

Encoder(
  (layers): ModuleList(
    (0): EncoderLayer(
      (self_attn_layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (ff_layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (self_attention): MultiHeadAttentionLayer(
        (fc_q): Linear(in_features=512, out_features=512, bias=True)
        (fc_k): Linear(in_features=512, out_features=512, bias=True)
        (fc_v): Linear(in_features=512, out_features=512, bias=True)
        (fc_o): Linear(in_features=512, out_features=512, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (positionwise_feedforward): PositionwiseFeedforwardLayer(
        (fc_1): Linear(in_features=512, out_features=512, bias=True)
        (fc_2): Linear(in_features=512, out_features=512, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (1): EncoderLayer(
      (self_attn_layer_norm): LayerNorm((512,), eps=1e-05, element

In [20]:
x = torch.rand(1,1,512)
yhat = enc(x,x)

torch.Size([1, 1, 512]) torch.Size([1, 1])
torch.Size([1, 1, 512]) tensor([22.6274]) torch.Size([1, 1])


In [None]:
import math
import pylab as plt

n = 4
dim = 8

def get_angle(pos, i, dim):
    angles = 1 / math.pow(10000, (2 * (i // 2))/ dim)
    return pos * angles

def get_positional_encoding(pos, i, dim):
    if i % 2 == 0:
        return math.sin(get_angle(pos, i, dim))
    return math.cos(get_angle(pos, i, dim))

result = [[0] * dim for _ in range(n)]

for i in range(n):
    for j in range(dim):
        result[i][j] = get_positional_encoding(i, j, dim)

plt.pcolormesh(result,cmap='Blues')
plt.xlabel('Dimension')
plt.ylabel('Position')