In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np


2.2. Positional Encoding
Transformer 모델은 입력 순서를 인식하지 못하므로, 위치 정보를 추가합니다.

In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        self.encoding.requires_grad = False  # 이 값은 학습되지 않음

        pos = torch.arange(0, max_len).unsqueeze(1).float()
        _2i = torch.arange(0, d_model, 2).float()
        
        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))

        # encoding에 배치 차원 추가
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        batch_size, seq_len, d_model = x.size()
        
        if seq_len > self.encoding.size(1):
            raise ValueError(f"Sequence length {seq_len} is greater than the maximum length {self.encoding.size(1)}")

        # 필요한 길이만큼 positional encoding 슬라이스하여 더하기
        pos_enc = self.encoding[:, :seq_len, :].to(x.device)
        return x + pos_enc

# 사용 예제
d_model = 512
max_len = 5000
pos_encoder = PositionalEncoding(d_model, max_len)
input_tensor = torch.randn(32, 100, d_model)  # (batch_size, seq_len, d_model)
output_tensor = pos_encoder(input_tensor)
print(output_tensor.shape)  # (32, 100, 512)

2.3. Multi-Head Attention

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)

        # Linear transformations
        q = self.q_linear(q).view(batch_size, -1, self.num_heads, self.d_k)
        k = self.k_linear(k).view(batch_size, -1, self.num_heads, self.d_k)
        v = self.v_linear(v).view(batch_size, -1, self.num_heads, self.d_k)

        # Transpose to get dimensions batch_size * num_heads * seq_len * d_k
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        # Scaled dot-product attention
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention = torch.nn.functional.softmax(scores, dim=-1)

        x = torch.matmul(attention, v)

        # Concat heads and put through final linear layer
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.out(x)


2.4. Feed-Forward Network

In [4]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.dropout(F.relu(self.linear1(x)))
        return self.linear2(x)


2.5. Encoder Layer

In [5]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff, dropout)

        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        _x = x
        x = self.layer_norm1(x + self.dropout(self.self_attn(x, x, x, mask)))
        x = self.layer_norm2(x + self.dropout(self.feed_forward(x)))
        return x


2.6. Encoder

In [6]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x


2.7. Decoder Layer

In [7]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.enc_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff, dropout)

        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.layer_norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        _x = x
        x = self.layer_norm1(x + self.dropout(self.self_attn(x, x, x, tgt_mask)))
        x = self.layer_norm2(x + self.dropout(self.enc_attn(x, enc_output, enc_output, src_mask)))
        x = self.layer_norm3(x + self.dropout(self.feed_forward(x)))
        return x


2.8. Decoder

In [8]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, enc_output, src_mask, tgt_mask)
        return x


2.9. Transformer 모델

In [9]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_layers=6, num_heads=8, d_ff=2048, max_len=5000, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout)
        self.out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        enc_output = self.encoder(src, src_mask)
        dec_output = self.decoder(tgt, enc_output, src_mask, tgt_mask)
        output = self.out(dec_output)
        return output


3. 학습 및 예제 실행
3.1. 모델 인스턴스 생성

In [10]:
src_vocab_size = 10000  # 원시 단어 사전 크기
tgt_vocab_size = 10000  # 타겟 단어 사전 크기

model = Transformer(src_vocab_size, tgt_vocab_size)


In [11]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)


  from .autonotebook import tqdm as notebook_tqdm


In [12]:
# 학습 설정
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

Transformer(
  (encoder): Encoder(
    (embedding): Embedding(10000, 512)
    (pos_encoding): PositionalEncoding()
    (layers): ModuleList(
      (0-5): 6 x EncoderLayer(
        (self_attn): MultiHeadAttention(
          (q_linear): Linear(in_features=512, out_features=512, bias=True)
          (k_linear): Linear(in_features=512, out_features=512, bias=True)
          (v_linear): Linear(in_features=512, out_features=512, bias=True)
          (out): Linear(in_features=512, out_features=512, bias=True)
        )
        (feed_forward): FeedForward(
          (linear1): Linear(in_features=512, out_features=2048, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=2048, out_features=512, bias=True)
        )
        (layer_norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (layer_norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
    (dropout): Dr

# test

In [13]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from transformers import GPT2Tokenizer
from datasets import load_dataset

# 토크나이저 로드
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# 데이터셋 로드
dataset = load_dataset('cnn_dailymail', '3.0.0')
train_data = dataset['train']
val_data = dataset['validation']
test_data = dataset['test']

# 전처리 함수
def preprocess_data(data, tokenizer, max_input_length, max_output_length):
    inputs = []
    outputs = []

    for example in data:
        input_text = example['article']
        output_text = example['highlights']

        # 토크나이징
        input_ids = tokenizer.encode(input_text, truncation=True, padding='max_length', max_length=max_input_length)
        output_ids = tokenizer.encode(output_text, truncation=True, padding='max_length', max_length=max_output_length)

        inputs.append(input_ids)
        outputs.append(output_ids)

    return torch.tensor(inputs), torch.tensor(outputs)

max_input_length = 512
max_output_length = 150

# 학습 및 검증 데이터 전처리
train_inputs, train_outputs = preprocess_data(train_data, tokenizer, max_input_length, max_output_length)
val_inputs, val_outputs = preprocess_data(val_data, tokenizer, max_input_length, max_output_length)

# 학습 데이터셋 정의
train_dataset = TensorDataset(train_inputs, train_outputs)

# DataLoader로 학습 데이터 로드
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


ValueError: Asking to pad but the tokenizer does not have a padding token. Please select a token to use as `pad_token` `(tokenizer.pad_token = tokenizer.eos_token e.g.)` or add a new pad token via `tokenizer.add_special_tokens({'pad_token': '[PAD]'})`.

In [None]:
from torch.utils.data import DataLoader, TensorDataset

# 학습 데이터셋 정의
train_dataset = TensorDataset(train_inputs, train_outputs)

# DataLoader로 학습 데이터 로드
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


In [None]:
for epoch in range(num_epochs):
    model.train()
    for src_batch, tgt_batch in train_loader:
        src = src_batch.to(device)
        tgt = tgt_batch.to(device)
        
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]

        src_mask = None  # 실제로는 패딩 마스크 등을 사용
        tgt_mask = None  # 실제로는 시퀀스 마스크 등을 사용

        optimizer.zero_grad()
        output = model(src, tgt_input, src_mask, tgt_mask)
        loss = criterion(output.view(-1, tgt_vocab_size), tgt_output.view(-1))
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")


NameError: name 'train_loader' is not defined

In [None]:
def generate_text(model, tokenizer, prompt, max_length=50):
    model.eval()
    input_ids = tokenizer.encode(prompt, return_tensors='pt').to(device)
    output = input_ids

    for _ in range(max_length):
        tgt_mask = (output != 0).unsqueeze(1).unsqueeze(2)
        logits = model(input_ids, output, src_mask=None, tgt_mask=tgt_mask)
        next_token = logits[:, -1, :].argmax(dim=-1).unsqueeze(-1)
        output = torch.cat([output, next_token], dim=1)
        
        if next_token.item() == tokenizer.eos_token_id:
            break

    return tokenizer.decode(output[0], skip_special_tokens=True)

# 예제 사용법
prompt = "Once upon a time"
generated_text = generate_text(model, tokenizer, prompt)
print(generated_text)
