<a href="https://colab.research.google.com/github/woosansam/-/blob/main/Transformer_%EC%98%81%EC%96%B4_%ED%95%9C%EA%B8%80_%EC%A7%80%EC%A7%80%EC%A7%84%EC%A7%9C_%EB%A7%9D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % num_heads == 0

        self.depth = d_model // num_heads
        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)
        self.dense = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        return x.permute(0, 2, 1, 3)

    def forward(self, v, k, q, mask):
        batch_size = q.size(0)

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = scaled_attention.permute(0, 2, 1, 3).contiguous()
        original_size_attention = scaled_attention.view(batch_size, -1, self.d_model)
        output = self.dense(original_size_attention)
        return output, attention_weights

    def scaled_dot_product_attention(self, q, k, v, mask):
        matmul_qk = torch.matmul(q, k.permute(0, 1, 3, 2))

        dk = torch.tensor(k.shape[-1], dtype=torch.float32)
        scaled_attention_logits = matmul_qk / torch.sqrt(dk)

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        attention_weights = F.softmax(scaled_attention_logits, dim=-1)
        output = torch.matmul(attention_weights, v)

        return output, attention_weights

class PositionwiseFeedforward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionwiseFeedforward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = PositionwiseFeedforward(d_model, d_ff)

        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, input_vocab_size, maximum_position_encoding, dropout=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_vocab_size, d_model)
        self.pos_encoding = self.positional_encoding(maximum_position_encoding, d_model)

        self.enc_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)

    def positional_encoding(self, position, d_model):
      angle_rads = self.get_angles(torch.arange(position).unsqueeze(1), torch.arange(d_model).unsqueeze(0), d_model)
      angle_rads[:, 0::2] = torch.sin(angle_rads[:, 0::2])
      angle_rads[:, 1::2] = torch.cos(angle_rads[:, 1::2])
      pos_encoding = angle_rads.unsqueeze(0).transpose(0, 1)
      return pos_encoding

    def get_angles(self, pos, i, d_model):
        angle_rates = 1 / torch.pow(10000, (2 * (i // 2)) / torch.tensor(d_model, dtype=torch.float32))
        return pos * angle_rates

    def forward(self, x, mask):
      seq_len = x.size(1)
      x = self.embedding(x)
      x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32)).to(x.device)
      pos_encoding = self.pos_encoding[:seq_len, :].to(x.device)
      x += pos_encoding.transpose(0, 1)  # Ensure pos_encoding is correctly shaped
      x = self.dropout(x)

      for i in range(self.num_layers):
          x = self.enc_layers[i](x, mask)

      return x

class Transformer(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, input_vocab_size, target_vocab_size, pe_input, pe_target, dropout=0.1):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, d_ff, input_vocab_size, pe_input, dropout)

        self.final_layer = nn.Linear(d_model, target_vocab_size)

    def forward(self, inp, tar, enc_padding_mask, look_ahead_mask, dec_padding_mask):
      enc_output = self.encoder(inp, enc_padding_mask)

      tar_seq_len = tar.size(1)
      enc_output = enc_output[:, :tar_seq_len, :]

      final_output = self.final_layer(enc_output)

      return final_output

In [2]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# 데이터셋 로드 및 전처리
data_path = '/content/drive/MyDrive/ML/transformer 번역기/1_구어체(1).xlsx'
df = pd.read_excel(data_path)



In [3]:
class TranslationDataset(Dataset):
    def __init__(self, dataframe, src_tokenizer, tgt_tokenizer):
        self.dataframe = dataframe
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        src_text = self.dataframe.iloc[idx, 1]  # '원문' 열
        tgt_text = self.dataframe.iloc[idx, 2]  # '번역문' 열
        src_tensor = torch.tensor(self.src_tokenizer.encode(src_text), dtype=torch.long)
        tgt_tensor = torch.tensor(self.tgt_tokenizer.encode(tgt_text), dtype=torch.long)
        return src_tensor, tgt_tensor

# 샘플 토크나이저 구현
class SimpleTokenizer:
    def __init__(self, vocab):
        self.vocab = vocab
        self.word2idx = {w: i for i, w in enumerate(vocab)}
        self.idx2word = {i: w for i, w in enumerate(vocab)}

    def encode(self, text):
        return [self.word2idx[word] if word in self.word2idx else self.word2idx['<unk>'] for word in text.split(' ')]

    def decode(self, tokens):
        return ' '.join([self.idx2word[token] for token in tokens if token in self.idx2word])

# 토크나이저 초기화
src_vocab = ['<pad>', '<sos>', '<eos>', '<unk>'] + sorted(set(' '.join(df['원문']).split()))
tgt_vocab = ['<pad>', '<sos>', '<eos>', '<unk>'] + sorted(set(' '.join(df['번역문']).split()))

src_tokenizer = SimpleTokenizer(src_vocab)
tgt_tokenizer = SimpleTokenizer(tgt_vocab)

# 데이터셋 및 데이터로더 생성
dataset = TranslationDataset(df, src_tokenizer, tgt_tokenizer)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=lambda x: (torch.nn.utils.rnn.pad_sequence([item[0] for item in x], batch_first=True, padding_value=0),
                                                                                   torch.nn.utils.rnn.pad_sequence([item[1] for item in x], batch_first=True, padding_value=0)))

# 학습을 위한 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Transformer(num_layers=4, d_model=128, num_heads=8, d_ff=512, input_vocab_size=len(src_vocab), target_vocab_size=len(tgt_vocab), pe_input=5000, pe_target=5000).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 학습 루프
def create_padding_mask(seq):
    seq = torch.eq(seq, 0).float()
    return seq[:, None, None, :]  # (batch_size, 1, 1, seq_len)

def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)

        enc_padding_mask = create_padding_mask(src)
        look_ahead_mask = None
        dec_padding_mask = create_padding_mask(tgt)

        optimizer.zero_grad()

        output = model(src, tgt, enc_padding_mask, look_ahead_mask, dec_padding_mask)

        output = output.view(-1, output.shape[-1])
        tgt = tgt.view(-1)

        min_length = min(output.size(0), tgt.size(0))
        output = output[:min_length]
        tgt = tgt[:min_length]

        loss = criterion(output, tgt)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

num_epochs = 10
for epoch in range(num_epochs):
    loss = train_epoch(model, dataloader, criterion, optimizer, device)
    print(f'Epoch {epoch+1}, Loss: {loss}')

print("Training complete.")

Epoch 1, Loss: 7.367151946105957
Epoch 2, Loss: 7.286356198425293
Epoch 3, Loss: 7.277790290374756
Epoch 4, Loss: 7.2730036095428465
Epoch 5, Loss: 7.272238734436035
Epoch 6, Loss: 7.27068165435791
Epoch 7, Loss: 7.271549900817871
Epoch 8, Loss: 7.269747857666015
Epoch 9, Loss: 7.269885456390381
Epoch 10, Loss: 7.2706543907928465
Training complete.


In [11]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)

        enc_padding_mask = create_padding_mask(src)
        look_ahead_mask = None
        dec_padding_mask = create_padding_mask(tgt)

        optimizer.zero_grad()

        output = model(src, tgt, enc_padding_mask, look_ahead_mask, dec_padding_mask)

        # CrossEntropyLoss expects (N, C) shape for input and (N) shape for target
        # Reshape output and target to be compatible with CrossEntropyLoss
        output = output.view(-1, output.shape[-1])
        tgt = tgt.view(-1)

        # Ensure the lengths match
        min_length = min(output.size(0), tgt.size(0))
        output = output[:min_length]
        tgt = tgt[:min_length]

        loss = criterion(output, tgt)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

# 학습 및 결과 확인
# 학습 및 모델 저장
num_epochs = 10
for epoch in range(num_epochs):
    loss = train_epoch(model, dataloader, criterion, optimizer, device)
    print(f'Epoch {epoch+1}, Loss: {loss}')

    # 주기적으로 번역 결과 출력
    if (epoch + 1) % 2 == 0:  # 예를 들어, 매 2 에폭마다
        example_input = torch.tensor([src_tokenizer.encode("예시 입력 문장")], dtype=torch.long).to(device)
        model.eval()
        with torch.no_grad():
            enc_padding_mask = create_padding_mask(example_input)
            enc_output = model.encoder(example_input, enc_padding_mask)
            translated_output = model.final_layer(enc_output)
            translated_tokens = translated_output.argmax(dim=-1).squeeze().tolist()
            translated_text = tgt_tokenizer.decode(translated_tokens)
            print(f"Epoch {epoch+1}, Example translation: {translated_text}")

    # 모델 저장
    torch.save(model.state_dict(), f'model_epoch_{epoch+1}.pth')

print("Training complete.")

Epoch 1, Loss: 7.269251545333862
Epoch 2, Loss: 7.27025401550293
Epoch 2, Example translation: the the the
Epoch 3, Loss: 7.273096961288452
Epoch 4, Loss: 7.27777986366272
Epoch 4, Example translation: the the the
Epoch 5, Loss: 7.280298522796631
Epoch 6, Loss: 7.279367275238037
Epoch 6, Example translation: the the the
Epoch 7, Loss: 7.2843097615814205
Epoch 8, Loss: 7.282118112335205
Epoch 8, Example translation: the the the
Epoch 9, Loss: 7.2816738863372805
Epoch 10, Loss: 7.2851067995452885
Epoch 10, Example translation: the the the
Training complete.


In [12]:
# 모델 로드
model.load_state_dict(torch.load('model_epoch_10.pth'))
model.to(device)

example_sentence = "카이스트 보내주세요 제발요"
translated_text = translate_sentence(model, example_sentence, src_tokenizer, tgt_tokenizer, device)
print(f"Translated text: {translated_text}")

Translated Tokens: [73725, 73725, 73725]
Translated text: the the the
