In [None]:
import torch
import torch.nn as nn
import math

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        """
        Arguments:
            d_model: モデルの隠れ層の次元数
            dropout: ドロップアウト率
            max_len: 想定される入力シーケンス最大長
        """

        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Positional Encoding行列[max_len, d_model]の初期化
        pe = torch.zeros(max_len, d_model)

        # 位置情報のベクトル
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # 10000^(2i/d_model)の計算
        # 対数空間で計算してからexpで戻すことで数値安定性を確保
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # 偶数次元にsin、奇数次元にcosを適用
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # バッチ次元を追加してshapeを[1, max_len, d_model]に変形
        pe = pe.unsqueeze(0)

        # モデルのパラメータとして登録（学習されない）
        # state_dictに保存されるが、勾配計算optimizerの対象にはならない
        self.register_buffer('pe', pe)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Arguments:
            x: Enbeddingされた入力テンソル、shapeは[batch_size, seq_len, d_model]
        
        Returns:
            Positional Encodingが加算されたテンソル、shapeは[batch_size, seq_len, d_model]
        """
        # 入力テンソルの長さに合わせてPositional Encodingをスライスして加算
        x = x + self.pe[:, :x.size(1), :]

        # ドロップアウトを適用して出力
        return self.dropout(x)
    

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
        """
        Args:
            d_model (int): モデルの隠れ層の次元数
            num_heads (int): ヘッドの数
            dropout (float, optional): ドロップアウト率. Defaults to 0.1.
        """

        super().__init__()

        # d_modelがnum_headsで割り切れることを確認
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads # 各ヘッドの次元数

        # Q, K, Vの線形変換
        # 実際には全ヘッド分を一度に計算するため、出力次元はd_modelのまま
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)

        self.fc_out = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        """

        Args:
            query (torch.Tensor): [batch_size, seq_len, d_model]
            key (torch.Tensor):   [batch_size, seq_len, d_model]
            value (torch.Tensor): [batch_size, seq_len, d_model]
            mask (torch.Tensor, optional): [batch_size, 1, 1, seq_len] または [batch_size, 1, seq_len, seq_len]
                                           (0: マスクなし、1: マスクありなどの定義によるが、ここでは加算マスクを想定)
                                           Defaults to None.

        Returns:
            torch.Tensor: [batch_size, seq_len, d_model]
        """
        batch_size = query.size(0)

        # 1. 線形変換
        # [batch_size, seq_len, d_model] -> [batch_size, seq_len, num_heads]
        Q = self.w_q(query)
        K = self.w_k(key)
        V = self.w_v(value)

        # 2. ヘッドの分割
        # [batch_size, seq_len, num_heads] -> [batch_size, seq_len, num_heads, d_k]
        # その後、計算しやすいようにヘッドの次元を先頭に移動させる(転置) -> [batch_size, num_heads, seq_len, d_k]
        Q = Q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # 3. Scaled Dot-Product Attention
        # 3.1. スコアの計算 Q * K^T / sqrt(d_k)
        # Q: [..., seq_len_q, d_k], K^T: [..., d_k, seq_len_k] -> scores: [..., seq_len_q, seq_len_k]
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # 3.2. マスクの適用(optional)
        if mask is not None:
            # ここではマスクが0の場所を非常に小さい値(-1e9)でマスクすると仮定
            # 実装により1と0の定義が異なる場合があるため注意する
            # 非常に小さい値(-1e9)で埋めることで、softmax後にほぼ0になるようにする
            # scores = scores.masked_fill(mask == 0, -1e9)
            scores = scores + mask # 加算マスクの場合 (maskが0の場所に-1e9が入っている想定)
        
        # 3.3. softmax & dropout
        attention_weights = torch.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)

        # 3.4. Valueとの積
        # attention_weights: [..., seq_len_q, seq_len_k] * V: [..., seq_len_k, d_k] -> [...,seq_lenn_q, d_k]
        output = torch.matmul(attention_weights, V)

        # 4. ヘッドの結合
        # [batch_size, num_beads, seq_len, d_k] -> [batch_size, seq_len, num_heads, d_k]
        output = output.transpose(1, 2).contiguous()

        # [batch_size, seq_len, d_model]に戻す
        output = output.view(batch_size, -1, self.d_model)

        # 5. 線形変換
        output = self.fc_out(output)
        return output


class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        """
        Args:
            d_model (int): モデルの次元数
            d_ff (int): FFNの中間層の次元数
            dropout (float, optional): ドロップアウト率. Defaults to 0.1.
        """
        super().__init__()

        # 一層目 d_model -> d_ff
        self.w_1 = nn.Linear(d_model, d_ff)
        # 二層目 d_ff -> d_model
        self.w_2 = nn.Linear(d_ff, d_model)
        # ドロップアウト
        self.dropout = nn.Dropout(dropout)
        # 活性化関数 ReLU
        # 元論文ではReLUが使われているが、近年のLLMではGELUがよく使われている
        self.activation = nn.ReLU()
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x (torch.Tensor): [batch_size, seq_len, d_model]

        Returns:
            torch.Tensor: [batch_size, seq_len, d_model]
        """
        # Linear -> ReLU -> Dropout -> Linear
        # x: [batch_size, seq_len, d_model] -> [batch_size, seq_len, d_ff]
        hidden = self.activation(self.w_1(x))
        hidden = self.dropout(hidden)

        # [batch_size, seq_len, d_ff] -> [batch_size, seq_len, d_model]
        output = self.w_2(hidden)
        return output

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()

        # 1. Self-Attention layer
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)

        # 2. Feed-Forward Network layer
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)

        # 3. Layer Normalization & Dropout layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        """

        Args:
            x (torch.Tensor): [batch_size, seq_len, d_model]
            mask (torch.Tensor, optional): Padding Maskなど. Defaults to None.

        Returns:
            torch.Tensor: [batch_size, seq_len, d_model]
        """

        # 1. Sublayer 1: Self-Attention
        # Residual Connection: x + Sublayer(x)
        # Post-LN: Norm(x + Sublayer(x))
        # Attentionの入出力は同じshape
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))

        # 2. Sublayer 2: Feed-Forward Network
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))

        return x


class DecoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()

        # 1. Masked Self-Attention layer
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)

        # 2. Cross-Attention layer (Source-Target Attention)
        self.cross_attn = MultiHeadAttention(d_model, num_heads, dropout)

        # 3. Feed-Forward Network layer
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)

        # 4. Normalization & Dropout layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x: torch.Tensor, memory: torch.Tensor, src_mask: torch.Tensor, tgt_mask: torch.Tensor) -> torch.Tensor:
        """

        Args:
            x (torch.Tensor): Decoderへの入力テンソル、shapeは[batch_size, tgt_len, d_model]
            memory (torch.Tensor): Encoderの出力テンソル、shapeは[batch_size, src_len, d_model]
            src_mask (torch.Tensor): Memoryに対するマスク(Padding Mask)
            tgt_mask (torch.Tensor): Self-Attention用のマスク(Look-Ahead Mask + Padding Mask)

        Returns:
            torch.Tensor: Decoderの出力テンソル、shapeは[batch_size, tgt_len, d_model]
        """

        # 1. Sublayer 1: Masked Self-Attention
        # 未来の単語を見ないように tgt_mask を適用
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))

        # 2. Sublayer 2: Cross-Attention
        # Query = x(Decoderの出力), Key = Value = memory(Encoderの出力)
        # Encoder側のパディングを見ないように src_mask を適用
        attn_output = self.cross_attn(x, memory, memory, src_mask)
        x = self.norm2(x + self.dropout(attn_output))

        # 3. Sublayer 3: Feed-Forward Network
        ffn_output = self.ffn(x)
        x = self.norm3(x + self.dropout(ffn_output))

        return x


class Encoder(nn.Module):
    def __init__(self, vocab_size: int, d_model: int, num_layers: int, num_heads: int, d_ff: int, max_len: int, dropout: float = 0.1):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, dropout, max_len)

        # EncoderLayerをnum_layers個積み重ねる
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])

        self.dropout = nn.Dropout(dropout)
        self.d_model = d_model

    def forward(self, x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
        # 1. Embedding & Positional Encoding
        # 論文通り sqrt(d_model)でスケーリング
        x = self.embedding(x) * math.sqrt(self.d_model)
        x = self.pos_encoding(x)

        # 2. Apply all layers
        for layer in self.layers:
            x = layer(x, mask)
        
        return x


class Decoder(nn.Module):
    def __init__(self, vocab_size: int, d_model: int, num_layers: int, num_heads: int, d_ff: int, max_len: int, dropout: float = 0.1):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)
        self. pos_encoding = PositionalEncoding(d_model, dropout, max_len)

        # DecoderLayerをnum_layers個積み重ねる
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])

        self.dropout = nn.Dropout(dropout)
        self.d_model = d_model

    def forward(self, x: torch.Tensor, memory: torch.Tensor, src_mask: torch.Tensor, tgt_mask: torch.Tensor) -> torch.Tensor:
        # 1. Embedding & Positional Encoding
        # 論文通り sqrt(d_model)でスケーリング
        x = self.embedding(x) * math.sqrt(self.d_model)
        x = self.pos_encoding(x)

        # 2. Apply all layers
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        
        return x


class Transformer(nn.Module):
    def __init__(self, src_vocab_size: int, tgt_vocab_size: int, d_model: int = 512, num_layers: int = 6, num_heads: int = 8, d_ff: int = 2048, max_len: int = 5000, dropout: float = 0.1):
        super().__init__()

        self.encoder = Encoder(src_vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout)

        # 最終出力層の線形変換(Linear Projection)
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src: torch.Tensor, tgt: torch.Tensor, src_mask: torch.Tensor = None, tgt_mask: torch.Tensor = None) -> torch.Tensor:
        """

        Args:
            src (torch.Tensor): [batch, src_len] Encoderへの入力単語ID列
            tgt (torch.Tensor): [batch, tgt_len] Decoderへの入力単語ID列
            src_mask (torch.Tensor, optional): Encoder用のマスク. Defaults to None.
            tgt_mask (torch.Tensor, optional): Decoder用のマスク. Defaults to None.

        Returns:
            torch.Tensor: [batch, tgt_len, tgt_vocab_size] 出力単語の確率分布
        """

        # 1. Encode
        # memory: [batch, src_len, d_model]
        memory = self.encoder(src, src_mask)

        # 2. Decode
        # decoder_output: [batch, tgt_len, d_model]
        decoder_output = self.decoder(tgt, memory, src_mask, tgt_mask)

        # 3. Final linear layer
        # logits: [batch, tgt_len, tgt_vocab_size]
        logits = self.fc_out(decoder_output)

        return logits
    
    def encode(self, src: torch.Tensor, src_mask: torch.Tensor = None) -> torch.Tensor:
        """推論時にEncoderのみを動かすためのヘルパー"""
        return self.encoder(src, src_mask)
    
    def decode(self, tgt: torch.Tensor, memory: torch.Tensor, src_mask: torch.Tensor = None, tgt_mask: torch.Tensor = None) -> torch.Tensor:
        """推論時にDecoderのみを動かすためのヘルパー"""
        return self.decoder(tgt, memory, src_mask, tgt_mask)

In [None]:
def verify_transformer():
    src_vocab_size = 1000
    tgt_vocab_size = 1000
    d_model = 512

    model = Transformer(src_vocab_size, tgt_vocab_size, d_model=d_model)

    # ダミー入力
    src = torch.randint(0, src_vocab_size, (2, 10))  # [batch_size, src_len]
    tgt = torch.randint(0, tgt_vocab_size, (2, 10))  # [batch_size, tgt_len]

    # マスクはNoneで動作確認
    output = model(src, tgt)

    # 出力は[Batch, tgt_seq_len, tgt_vocab_size]の形状
    expected_shape = (2, 10, tgt_vocab_size)
    assert output.shape == expected_shape, f"Expected {expected_shape}, but got {output.shape}"
    print("Transformer architecture verification passed!")

verify_transformer()