In [3]:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_model = False
device

'cuda'

## MultiHeadAttention

### einops 사용

In [4]:
from einops import rearrange

x = torch.randn(2, 3, 4) #Tensor shape : (2, 3, 4)
y = rearrange(x, 'a b c -> c a b')
y.shape

torch.Size([4, 2, 3])

In [5]:
x = torch.randn(2, 3, 4)
y = rearrange(x, 'a b c -> (a b) c')
y.shape

torch.Size([6, 4])

In [6]:
x = torch.randn(6, 4)
y = rearrange(x, '(a b) c -> a b c', a = 2, b = 3)
y.shape

torch.Size([2, 3, 4])

In [7]:
from torch import nn
from einops import rearrange
import torch

class MHA(nn.Module):
    def __init__(self, d_model = 512, n_heads = 8): #d_model : 임베딩 벡터의 차원, n_heads : 멀티 헤드 어텐션의 헤드 개수
        super().__init__()
        self.n_heads = n_heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out_linear = nn.Linear(d_model, d_model)
        #어텐션 스코어를 스케일링 하기 위한 값
        self.scale = torch.sqrt(torch.tensor(d_model / n_heads))


    def froward(self, Q, K, V, mask = None):
        Q = self.q_linear(Q) #(N, t, D) -> (N, t, D)
        K = self.k_linear(K) #(N, t, D) -> (N, t, D)
        V = self.v_linear(V) #(N, t, D) -> (N, t, D)
        #멀티헤드를 위해 임베딩 차원 D를 헤드 개수 n_heads로 분할
        Q = rearrange(Q, 'N t (h dk) -> N h t dk', h = self.n_heads) #(N, t, D) -> (N, h, t, D//h)
        K = rearrange(K, 'N t (h dk) -> N h t dk', h = self.n_heads) #(N, t, D) -> (N, h, t, D//h)
        V = rearrange(V, 'N t (h dk) -> N h t dk', h = self.n_heads) #(N, t, D) -> (N, h, t, D//h)
        # 어텐션 스코어 구하기
        attention_score = Q @ K.transpose(-2, -1) / self.scale #(N, h, t, dk) @ (N, h, dk, t) -> (N, h, t(쿼리의 길이), t(키의 길이))
        # 패딩의 위치에 굉장히 작은 값 강제로 부여 -> 소프트맥스 적용 시에 0이 될 수 있도록
        if mask is not None: #패딩 위치를 의미하는 인덱스들이 존재한다면
            attention_score[mask] = -1e10
        #에너지를 구하기 위해서는 키의 방향으로 소프트맥스를 적용
        energy = torch.softmax(attention_score, dim = -1)
        #에너지와 V를 곱해서 최종 어텐션 값 구함
        attention = energy @ V #(N, h, t, t) @ (N, h, t, dk) -> (N, h, t, dk)
        #헤드 차원을 연결해서 원래의 차원으로 되돌림
        x = rearrange(attention, 'N h t dk -> N t (h dk)') #(N, h, t, dk) -> (N, t, D)
        #최종 출력값에 대해 선형 변환을 적용. 각각의 헤드의 생각을 섞어줌
        output = self.out_linear(x) #(N, t, D) -> (N, t, D)
        return output, energy

## FNN(FeedForward Network)
- 정확한 이름은 Position-wise Feed Forward Network
- 인코더 디코더의 Multi Head Attention의 결과를 하나로 합쳐주는 역할

In [9]:
class FeedForward(nn.Module):
    def __init__(self, d_model = 512, d_ff = 2048, drop_p = 0.1):
        super().__init__()
        self.linear = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(drop_p),
            nn.Linear(d_ff, d_model)
        )

    def forward(self, mha_output):
        out = self.linear(mha_output)
        return out

## Encoder 구현

In [10]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, d_ff, n_heads, drop_p):
        super().__init__()
        # Multi Head Attention Layer
        self.self_attention = MHA(d_model, n_heads)
        #MHA에 대한 layer normalization
        self.atten_ln = nn.LayerNorm(d_model)
        #Feed Forward Network
        self.ff = FeedForward(d_model, d_ff, drop_p)
        #Normalization
        self.ff_ln = nn.LayerNorm(d_model)
        #dropout
        self.dropout = nn.Dropout(drop_p)

    def forward(self, x, enc_mask):
        residual, attention_enc = self.self_attention(Q = x, K = x, V = x, mask = enc_mask)
        residual = self.dropout(residual)
        #skip connection & layer norm
        encoder_self_atten_output = self.atten_ln(x + residual)
        # FFN
        residual = self.ff(encoder_self_atten_output)
        residual = self.dropout(residual)
        encoder_ffn_output = self.ff_ln(encoder_self_atten_output + residual)
        return encoder_ffn_output, attention_enc

In [14]:
class Encoder(nn.Module):
    def __init__(self, input_embedding, max_len, n_layers, d_model, d_ff, n_heads, drop_p):
        super().__init__()
        #d_model의 제곱근 값으로 scale을 정의 -> 임베딩 벡터의 크기를 조정
        self.scale = torch.sqrt(torch.tensor(d_model))
        #입력 임베딩 레이어
        self.input_embedding = input_embedding
        # 위치 임베딩
        self.pos_embedding = nn.Embedding(max_len, d_model)
        self.dropout = nn.Dropout(drop_p)
        # 여러 개의 인코더 레이어를 쌓기 위해 모듈 리스트 활용
        self.layers = nn.ModuleList(
            [EncoderLayer(d_model, d_ff, n_heads, drop_p) for _ in range(n_layers)]
        )
        self.device = device

    def forward(self, src, mask, atten_map_save = False):
        # 위치 인덱스 텐서 생성 : 각 배치에서 시퀀스의 길이만큼 위치 인덱스를 반복
        pos = torch.arange(src.shape[1]).repeat(src.shape[0], 1).to(self.device)
        x_embedding = self.input_embedding(src) + self.pos_embedding(pos)
        x_embedding = self.dropout(x_embedding)
        atten_encs = torch.tensor([]).to(self.device)
        for layer in self.layers:
            encoder_output, atten_encs = layer(encoder_output, mask)
            if atten_map_save:
                atten_encs = torch.cat(atten_encs, atten_encs[0].unsqueeze())

        return encoder_output, atten_encs


## Decoder

In [15]:
import torch

# 예제 설정
batch_size = 3
seq_len = 10
padding = 3
n_heads = 8

# attention_score 텐서 생성 (무작위 값으로 초기화)
# Shape: (batch_size, n_heads, seq_len, seq_len)
attention_score = torch.randn(batch_size, n_heads, seq_len, seq_len)

# enc_mask 생성: 패딩이 있는 위치를 마스킹
# 각 시퀀스의 마지막 3개 위치에 패딩이 있다고 가정합니다.
enc_mask = torch.tensor([
    [0, 0, 0, 0, 0, 0, 0, 1, 1, 1],  # 첫 번째 시퀀스
    [0, 0, 0, 0, 0, 0, 0, 1, 1, 1],  # 두 번째 시퀀스
    [0, 0, 0, 0, 0, 0, 0, 1, 1, 1]   # 세 번째 시퀀스
], dtype=torch.bool).unsqueeze(1).unsqueeze(2)  # Shape: (batch_size, 1, 1, seq_len)

# enc_mask의 shape을 (batch_size, n_heads, seq_len, seq_len)로 확장
enc_mask = enc_mask.expand(batch_size, n_heads, seq_len, seq_len)

print("="*20, "attention_score에 마스킹 적용 전", "="*20)
print("attention_score[0, 0] (첫 번째 배치, 첫 번째 헤드):\n",attention_score[0, 0])

# attention_score에 enc_mask 적용
if enc_mask is not None:
    attention_score[enc_mask] = 0  # 마스크된 위치에 매우 작은 값을 넣어 softmax 결과에 영향을 미치지 않도록 합니다.

print("="*20, "attention_score에 마스킹 적용 전", "="*20)
print("attention_score shape:", attention_score.shape)
print(enc_mask.shape)

# 이제 attention_score 텐서에서 마스크가 적용된 부분을 확인할 수 있습니다.
# 특정 헤드에 대한 attention_score 확인 (예: 첫 번째 헤드)
print("="*20, "attention_score에 마스킹 적용 후", "="*20)
print("attention_score[0, 0] (첫 번째 배치, 첫 번째 헤드):\n", attention_score[0, 0])

attention_score[0, 0] (첫 번째 배치, 첫 번째 헤드):
 tensor([[-0.6378,  0.3716,  1.1410, -0.1698, -0.4231, -0.3101,  0.8877,  1.0603,
          0.2497, -0.8980],
        [-0.1975, -0.1490, -0.8522,  0.7071, -1.9913, -2.4263, -0.6625,  1.4701,
         -0.1316,  0.4624],
        [-1.0739,  0.2155, -0.0336, -0.3841,  0.5567, -0.2092, -0.9420, -0.2604,
          1.4805,  0.1632],
        [ 0.3468, -0.6148, -0.4828, -0.3025, -0.9346,  1.0462, -0.6604,  0.1264,
          0.4500, -0.7462],
        [ 0.7712,  0.8880, -0.5860,  1.6809, -0.9139, -0.3729, -1.2812,  0.4910,
          0.2157, -2.3383],
        [-0.5301, -0.3016, -1.4346, -0.3275,  1.1542, -0.6850, -0.3721, -0.4474,
          0.1354,  1.2107],
        [-0.2449, -1.2759,  1.3128,  1.8228,  0.4469, -0.4300, -1.6372, -0.9488,
         -0.0604,  0.7848],
        [ 0.9471, -0.3743, -0.3295,  0.0621,  1.7466,  0.2034,  0.2784,  1.2467,
         -0.9168, -1.9631],
        [ 0.8056, -1.3352, -1.2483,  1.3437,  0.0956, -0.5525,  0.8257,  0.3949,
    

In [16]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, d_ff, n_heads, drop_p):
        super().__init__()
        self.self_atten = MHA(d_model, n_heads)
        self.self_atten_ln = nn.LayerNorm(d_model)
        # encoder-decoder attention layer
        self.enc_dec_atten = MHA(d_model, n_heads)
        self.enc_dec_atten_ln = nn.LayerNorm(d_model)
        # FeedForward
        self.ff = FeedForward(d_model, d_ff, drop_p)
        self.ff_ln = nn.LayerNorm(d_model)
        # Dropout
        self.dropout = nn.Dropout(drop_p)

    def forward(self, x, enc_out, dec_mask, enc_dec_mask):
        residual, atten_dec = self.self_atten(Q = x, K = x, V = x, mask = dec_mask)
        residual = self.dropout(residual)
        decoder_masked_self_attention_output = self.self_atten_ln(x + residual)
        residual, atten_dec_enc = self.enc_dec_atten(
            Q = decoder_masked_self_attention_output,
            K = enc_out,
            V = enc_out,
            mask = enc_dec_mask
        )
        residual = self.dropout(residual)
        decoder_self_attention_output = self.enc_dec_atten_ln(x + residual)
        residual = self.ff(decoder_self_attention_output)
        residual = self.dropout(residual)
        decoder_output = self.ff_ln(decoder_self_attention_output + residual)

        return decoder_output, atten_dec, atten_dec_enc

In [17]:
class Decoder(nn.Module):
    def __init__(self, input_embedding, max_len, n_layers, d_model, d_ff, n_heads, drop_p, vocab_size):
        super().__init__()
        self.scale = torch.sqrt(torch.tensor(d_model))
        self.input_embedding = input_embedding
        self.pos_embedding = nn.Embedding(max_len, d_model)
        self.dropout = nn.Dropout(drop_p)
        self.layers = nn.ModuleList(
            [DecoderLayer(d_model, d_ff, n_heads, drop_p) for _ in range(n_layers)]
        )
        self.linear = nn.Linear(d_model, vocab_size)
        self.device = device

    def forward(self, trg, enc_out, dec_mask, enc_dec_mask, atten_map_save = False):
        pos = torch.arange(trg.shape[1]).repeat(trg.shape[0], 1).to(self.device)
        y_embedding = self.scale * self.input_embedding(trg) + self.pos_embedding(pos)
        y_embedding = self.dropout(y_embedding)
        atten_decs = torch.tensor([]).to(self.device)
        atten_enc_decs = torch.tensor([]).to(self.device)
        decoder_output = y_embedding
        for layer in self.layers:
            decoder_output, atten_dec, atten_enc_dec = layer(decoder_output, enc_out, dec_mask, enc_dec_mask)
            if atten_map_save:
                atten_decs = torch.cat([atten_decs, atten_dec[0].unsqueeze()])
                atten_enc_decs = torch.cat([atten_enc_decs, atten_enc_dec[0].unsqueeze()])
        decoder_output_linear = self.linear(decoder_output)

        return decoder_output_linear, atten_decs, atten_enc_decs

In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, max_len, n_layers, pad_idx, d_model = 512, d_ff = 2048, n_heads = 8, drop_p = 0.1)
        super().__init__()
        self.input_embedding = nn.Embedding(vocab_size, d_model)
        self.encoder = Encoder(self.input_embedding, max_len, n_layers, d_model, d_ff, n_heads, drop_p)
        self.decoder = Decoder(self.input_embedding, max_len, n_layers, d_model, d_ff, n_heads, drop_p, vocab_size)
        self.n_heads = n_heads
        for m in self.modules():
            if hasattr(m, 'weight') and m.weight.dim() > 1:
                nn.init.xavier_uniform_(m.weight)
        self.pad_idx = pad_idx

    def make_enc_mask(self, src):
        enc_mask = (src == self.pad_idx).unsqueeze(1).unsqueeze(2)
        enc_mask = enc_mask.repeat(1, self.n_heads, src.shape[1], 1)
        return enc_mask
    
    def make_dec_mask(self, trg):
        trg_pad_mask = (trg.to('cpu') == self.pad_idx).unsqueeze(1).unsqueeze(2)
        trg_pad_mask = trg_pad_mask.repeat(1, self.n_heads, trg.shape[1], 1)
        trg_future_mask = torch.tril(torch.ones(trg.shape[0], self.n_heads, trg.shape[1], trg.shape[1])) == 0
        dec_mask = trg_pad_mask | trg_future_mask
        return dec_mask
    
    def make_enc_dec_mask(self, src, trg):
        enc_dec_mask = (src == self.pad_idx).unsqueeze(1).unsqueeze(2)
        enc_dec_mask = enc_dec_mask.repeat(1, self.n_heads, trg.shape[1], 1)
        return enc_dec_mask
    
    def forward(self, src, trg):