In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term) # 짝수 번째 인덱스에는 sin
        pe[:, 1::2] = torch.cos(position * div_term) # 홀수 번째 인덱스에는 cos
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, nhead, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.dropout = nn.Dropout(p=dropout)

        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):
        # Compute the queries, keys, and values for all the heads in parallel.
        bs = query.size(0)
        query = self.q_linear(query).view(bs, -1, self.nhead, self.d_model // self.nhead).transpose(1, 2)
        key = self.k_linear(key).view(bs, -1, self.nhead, self.d_model // self.nhead).transpose(1, 2)
        value = self.v_linear(value).view(bs, -1, self.nhead, self.d_model // self.nhead).transpose(1, 2)

        # Compute the dot products between queries and keys for all the heads in parallel.
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_model // self.nhead)

        # Apply the mask (if any).
        if mask is not None:
            mask = mask.unsqueeze(1)
            scores = scores.masked_fill(mask == 0, -1e9)

        # Apply the softmax function to get the attention weights for all the heads in parallel.
        attn_weights = F.softmax(scores, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # Compute the weighted sum of the values using the attention weights for all the heads in parallel.
        out = torch.matmul(attn_weights, value)
        out = out.transpose(1, 2).contiguous().view(bs, -1, self.nhead * (self.d_model // self.nhead))

        # Apply the final linear layer to get the output of the multi-head attention.
        out = self.out(out)
        return out, attn_weights

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(FeedForward, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.dropout(F.relu(self.linear1(x)))
        x = self.linear2(x)
        return x

class EncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead, dropout=dropout)
        self.feed_forward = FeedForward(d_model, d_ff, dropout=dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(p=dropout)
        self.dropout2 = nn.Dropout(p=dropout)

    def forward(self, src, src_mask=None):
        # Compute the output of the self-attention layer and apply residual connections and layer normalization.
        src2, attn_weights = self.self_attn(src, src, src, mask=src_mask)
        src = src + self.dropout1(src2)
        src = self.norm1(src)

        # Compute the output of the feed-forward layer and apply residual connections and layer normalization.
        src2 = self.feed_forward(src)
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src, attn_weights

class Encoder(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_layers, d_ff, dropout=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.dropout = nn.Dropout(p=dropout)
        self.layers = nn.ModuleList([EncoderLayer(d_model, nhead, d_ff, dropout) for _ in range(num_layers)])

    def forward(self, src, src_mask=None):
        # Compute the embeddings and apply positional encoding and dropout.
        src = self.embedding(src)
        src = self.pos_encoder(src)
        src = self.dropout(src)

        # Apply the multi-layer Transformer encoder.
        for layer in self.layers:
            src, attn_weights = layer(src, src_mask)
        return src, attn_weights

class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_layers, d_ff, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(input_dim, d_model, nhead, num_layers, d_ff, dropout)
        self.fc = nn.Linear(d_model, output_dim)

    def forward(self, src, src_mask=None):
        # Apply the Transformer encoder.
        src, attn_weights = self.encoder(src, src_mask)

        # Compute the final output by applying a linear layer.
        out = self.fc(src[:, 0, :])
        return out


In [None]:
# Create a Transformer model.
model = Transformer(input_dim=1000, output_dim=10, d_model=512, nhead=8, num_layers=6, d_ff=2048, dropout=0.1)

# Generate some example input data.
src = torch.randint(0, 1000, (32, 20))

# Compute the output of the Transformer model.
out = model(src)

# Print the shape of the output tensor.
print(out.shape)  # should be (32, 10)