<a href="https://colab.research.google.com/github/sjunc/AI-Library/blob/main/class/W12_Attention_Mechanism.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 데이터 확인해보기

import string

I = [] # 전처리된 문장을 저장할 리스트

# 한글 텍스트 파일을 읽기 위해 utf-8 인코딩으로 읽어옴

with open(
    "kor.txt",
    'r', encoding = "utf-8") as f:
    lines = f.read().split("\n")
    for line in lines:
      # 특수 문자를 지우고 모든 글자를 소문자로 변경
      txt = "".join(v for v in line if v not in string.punctuation).lower()
      I.append(txt)

print(I[:5]) # 정제된 문장 중 앞부분 5개 출력 (예시 확인용)

['go\t가\tccby 20 france attribution tatoebaorg 2877272 cm  8363271 eunhee', 'hi\t안녕\tccby 20 france attribution tatoebaorg 538123 cm  8355888 eunhee', 'run\t뛰어\tccby 20 france attribution tatoebaorg 906328 papabear  8355891 eunhee', 'run\t뛰어\tccby 20 france attribution tatoebaorg 4008918 jsakuragi  8363273 eunhee', 'who\t누구\tccby 20 france attribution tatoebaorg 2083030 ck  6820074 yesjustryan']


In [None]:
# BOW (bag of word) 생성 함수 정의

import numpy as np
import torch

from torch.utils.data.dataset import Dataset

def get_BOW(corpus):    # 문장들로부터 BOW를 만드는 함수
  BOW = {"<SOS>": 0, "<EOS>": 1}  # <SOS> 토큰과 <EOS> 토큰을 추가
# <SOS> 토큰: Start of Seqeunce, 문장의 시작을 알리는 토큰 - 디코딩을 시작하라는 신호
# <EOS> 토큰: End of Sequence, 문장의 끝을 알리는 토큰 - 디코딩을 끝내라는 신호
  #문장 내 단어들을 이용해 BOW를 생성
  for line in corpus:   # 각 문장을 순회하면서
      for word in line.split():     # 각 문장을 단어 단위로 나눈 후
          if word not in BOW.keys():    # BOW에 없는 단어라면 새로운 인덱스로 추가
              BOW[word] = len(BOW.keys())

  return BOW

In [None]:
# 학습에 사용할 데이터셋 정의

class Eng2Kor(Dataset):   # 학습에 이용할 데이터셋
  def __init__(
      self,
      pth2txt = \
      "kor.txt"):
      self.eng_corpus = [] # 영어 문장이 들어가는 변수
      self.kor_corpus = [] # 한글 문장이 들어가는 변수

      # 텍스트 파일을 읽어서 영어 문장과 한글 문장을 저장
      with open(pth2txt, 'r', encoding = "utf-8") as f:
        lines = f.read().split("\n")
        for line in lines:
          # 빈 줄 건너뛰기
           if not line.strip():
              continue

           parts = line.split("\t") # 탭(\t)으로 영어/한글 문장 분리
           if len(parts) >= 2:  # 영어, 한글 문장이 모두 있는 경우만 처리
              # 영어 문장에서 특수 문자 제거 및 소문자 변환
              engtxt = "".join(
                  v for v in parts[0] if v not in string.punctuation
              ).lower()

              # 한국어 문장에서 특수 문자 제거
              kortxt = "".join(
                  v for v in parts[1] if v not in string.punctuation
              )

              # 너무 긴 문장은 제외 (10단어 이하만 사용)
              if len(engtxt.split()) <= 10 and len(kortxt.split()) <= 10:
                  self.eng_corpus.append(engtxt)
                  self.kor_corpus.append(kortxt)

      self.engBOW = get_BOW(self.eng_corpus)    # 영어 BOW
      self.korBOW = get_BOW(self.kor_corpus)    # 한글 BOW

 # 문장을 단어 리스트로 나눈 뒤 <EOS> 토큰 추가
  def gen_seq(self, line):
      seq = line.split()
      seq.append("<EOS>")

      return seq

  def __len__(self):
      return len(self.eng_corpus)

  def __getitem__(self, i):
    # 문자열로 되어 있는 문장을 숫자 표현으로 변경
    data = np.array([
        self.engBOW[txt] for txt in self.gen_seq(self.eng_corpus[i])])

    label = np.array([
        self.korBOW[txt] for txt in self.gen_seq(self.kor_corpus[i])])

    return data, label

  # 샘플 데이터를 출력하는 함수 추가
  def print_samples(self, num_samples = 5):
    """
    데이터셋에서 num_samples 개수만큼 샘플을 출력합니다.
    """
    print(f"데이터셋 크기: {len(self.eng_corpus)} 쌍의 문장")
    print("\n샘플 데이터:")

    # 데이터 셋 크기보다 많은 샘들을 요청한 경우 조정
    num_samples = min(num_samples, len(self.eng_corpus))

    for i in range(num_samples):
        print(f"샘플 {i+1} :")
        print(f" 영어: {self.eng_corpus[i]}")
        print(f" 한글: {self.kor_corpus[i]}")

        # 숫자 표현도 확인

        eng_indices = [self.engBOW[txt] for txt in self.gen_seq(self.eng_corpus[i])]
        kor_indices = [self.korBOW[txt] for txt in self.gen_seq(self.kor_corpus[i])]

        print(f" 영어 인덱스: {eng_indices}")
        print(f" 한국어 인덱스: {kor_indices}")
        print()

In [None]:
# 샘플 데이터 출력해보기

# 데이터셋 생성
dataset = Eng2Kor()

# 샘플 데이터 10개 출력
dataset.print_samples(10)

# BOW 사전 크기 확인
print(f"영어 어휘 크기: {len(dataset.engBOW)}")
print(f"한국어 어휘 크기: {len(dataset.korBOW)}")

# 몇 가지 단어의 인덱스 확인
print("\n영어 단어 인덱스 예시:")
for word in ["go", "hello", "thank", "<SOS>", "<EOS>"]:
    if word in dataset.engBOW:
        print(f"  '{word}': {dataset.engBOW[word]}")
    else:
        print(f"  '{word}':  사전에 없습니다.")

print("\n한국어 단어 인덱스 예시:")
for word in ["가", "안녕", "감사합니다", "<SOS>", "<EOS>"]:
    if word in dataset.korBOW:
        print(f"  '{word}': {dataset.korBOW[word]}")
    else:
        print(f"  '{word}':  사전에 없습니다.")

데이터셋 크기: 5701 쌍의 문장

샘플 데이터:
샘플 1 :
 영어: go
 한글: 가
 영어 인덱스: [2, 1]
 한국어 인덱스: [2, 1]

샘플 2 :
 영어: hi
 한글: 안녕
 영어 인덱스: [3, 1]
 한국어 인덱스: [3, 1]

샘플 3 :
 영어: run
 한글: 뛰어
 영어 인덱스: [4, 1]
 한국어 인덱스: [4, 1]

샘플 4 :
 영어: run
 한글: 뛰어
 영어 인덱스: [4, 1]
 한국어 인덱스: [4, 1]

샘플 5 :
 영어: who
 한글: 누구
 영어 인덱스: [5, 1]
 한국어 인덱스: [5, 1]

샘플 6 :
 영어: wow
 한글: 우와
 영어 인덱스: [6, 1]
 한국어 인덱스: [6, 1]

샘플 7 :
 영어: duck
 한글: 숙여
 영어 인덱스: [7, 1]
 한국어 인덱스: [7, 1]

샘플 8 :
 영어: fire
 한글: 쏴
 영어 인덱스: [8, 1]
 한국어 인덱스: [8, 1]

샘플 9 :
 영어: help
 한글: 도와줘
 영어 인덱스: [9, 1]
 한국어 인덱스: [9, 1]

샘플 10 :
 영어: hide
 한글: 숨어
 영어 인덱스: [10, 1]
 한국어 인덱스: [10, 1]

영어 어휘 크기: 3048
한국어 어휘 크기: 7466

영어 단어 인덱스 예시:
  'go': 2
  'hello': 15
  'thank': 180
  '<SOS>': 0
  '<EOS>': 1

한국어 단어 인덱스 예시:
  '가': 2
  '안녕': 3
  '감사합니다': 6442
  '<SOS>': 0
  '<EOS>': 1


In [None]:
# 학습에 사용할 데이터 로더 정의

def loader(dataset):    # 데이터셋의 문장을 한문장씩 불러오기 위한 함수
    for i in range(len(dataset)):
        data, label = dataset[i]

        # numpy array -> torch tensor로 변환 후 반환
        # 매 반복마다 하나의 (입력, 정답) 쌍을 yield (제네레이터 형태)
        yield torch.tensor(data), torch.tensor(label)   #  yield 키워드를 사용하면 제너레이터를 반환한다


In [None]:
# 인코더 정의

import torch.nn as nn

class Encoder(nn.Module):
  def __init__(self, input_size, hidden_size):
    super(Encoder, self).__init__()

    # 단어 인덱스를 임베딩 벡터로 변환 (input_size: 단어 수, hidden_size: 벡터 차원)
    self.embedding = nn.Embedding(input_size, hidden_size)
    # GRU 정의 (입력과 은닉 상태의 차원이 같음)
    self.gru = nn.GRU(hidden_size, hidden_size)

  def forward(self, x, h):
    # 배치 차원과 시계열 차원 추가 (모양 맞춰주기 용도)
    x = self.embedding(x).view(1, 1, -1)
    # GRU에 입력과 이전 hidden state를 넣고, 출력과 새로운 hidden state를 반환
    output, hidden = self.gru(x, h)
    return output, hidden


In [None]:
# 디코더 정의

class Decoder(nn.Module):
  def __init__(self, hidden_size, output_size, dropout_p = 0.1, max_length = 11):
      super(Decoder, self).__init__()

      # 출력 단어 인덱스를 hidden_size 차원의 임베딩 벡터로 변환
      self.embedding = nn.Embedding(output_size, hidden_size)

      # 어텐션 가중치를 계산하기 위한 MLP 층
      self.attention = nn.Linear(hidden_size *2, max_length)

      # context vector + 임베딩을 결합한 후 특징 추출하는 MLP
      self.context = nn.Linear(hidden_size * 2, hidden_size)

      # 과적합을 피하기 위한 드롭아웃 층
      self.dropout = nn.Dropout(dropout_p)

      # GRU 계층 (입력: context 특징, 은닉 상태)
      self.gru = nn.GRU(hidden_size, hidden_size)

      # 최종 출력층: hidden -> 단어 개수 크기의 벡터 (단어 분류용)
      self.out = nn.Linear(hidden_size, output_size)

      # 활성화 함수들
      self.relu = nn.ReLU()
      self.softmax = nn.LogSoftmax(dim = 1)

  def forward(self, x, h, encoder_outputs):
      # 입력 단어 인덱스를 임베딩하고 (1, 1, hidden_size) 형태로 변형
      x = self.embedding(x).view(1, 1, -1)
      x = self.dropout(x)

      # 어텐션 가중치 계산:
      # 현재 입력(임베딩)과 이전 hidden state를 이어붙여 attention score 계산
      attn_weights = self.softmax(
        self.attention(torch.cat((x[0], h[0]), -1)) # 결과 shape: (1, max_length)
      )

      # 인코더의 출력 전체에 어텐션 가중치를 곱해 context vector 생성
      attn_applied = torch.bmm(
          attn_weights.unsqueeze(0),
          encoder_outputs.unsqueeze(0)
      )
      # 인코더 각 시점의 중요도와 밀집 표현을 합쳐 MLP 층으로 특징 추출
      output = torch.cat((x[0], attn_applied[0]), 1)
      output = self.context(output).unsqueeze(0)
      output = self.relu(output)

      # GRU로 다음 hidden state 계산
      output, hidden = self.gru(output, h)

      # hidden -> vocabulary size로 변환 (각 단어의 확률 분포)
      output = self.out(output[0])

      return output

In [None]:
# 학습에 필요한 요소 정의

import random
import tqdm

from torch.optim.adam import Adam

# 학습에 사용할 프로세서 정의
device = "cuda" if torch.cuda.is_available() else "cpu"
# 학습에 사용할 데이터셋 정의 (전처리 + BOW 사전 포함)
dataset = Eng2Kor()

# 인코더 디코더 정의
encoder = Encoder(input_size=len(dataset.engBOW), hidden_size=64).to(device)
decoder = Decoder(64, len(dataset.korBOW), dropout_p = 0.1).to(device)

# 인코더 디코더 학습을 위한 최적화 정의
encoder_optimizer = Adam(encoder.parameters(), lr = 0.001)
decoder_optimizer = Adam(decoder.parameters(), lr = 0.001)



In [None]:
# 학습 루프 정의

for epoch in range(300):
  iterator = tqdm.tqdm(loader(dataset), total = len(dataset))
  total_loss = 0

  for data, label in iterator:
      data = torch.tensor(data, dtype = torch.long).to(device)
      label = torch.tensor(label, dtype = torch.long).to(device)

      # 인코더의 초기 은닉 상태
      encoder_hidden  = torch.zeros(1, 1, 64).to(device)
      # 인코더의 모든 시점의 출력을 저장하는 변수
      encoder_outputs = torch.zeros(11, 64).to(device)

      encoder_optimizer.zero_grad()
      decoder_optimizer.zero_grad()

      loss = 0

      for ei in range(len(data)):
          # 한 단어씩 인코더에 넣어줌
          encoder_output, encoder_hidden = encoder(
            data[ei], encoder_hidden)
          # 인코더의 은닉 상태를 저장
          encoder_outputs[ei] = encoder_output[0, 0]

      decoder_input = torch.tensor([[0]]).to(device)

      # 인코더의 마지막 은닉 상태를 디코더의 초기 은닉 상태로 저장
      decoder_hidden = encoder_hidden
      # (option 1) 50% 확률로 teacher forcing 사용
      # use_teacher_forcing = True if random.random() < 0.5 else False
      # (option 2) 강제로 teacher forcing 사용

      use_teacher_forcing = True
      if use_teacher_forcing:
          for di in range(len(label)):    # di인 이유? decoder index? 다른의미?
              decoder_output = decoder(
                  decoder_input, decoder_hidden, encoder_outputs)

              # 직접적으로 정답을 다음 시점의 입력으로 넣어줌
              target = torch.tensor(label[di], dtype = torch.long).to(device)
              target = target.unsqueeze(0).to(device)
              loss += nn.CrossEntropyLoss()(decoder_output, target)
              decoder_input = target
      else:
          for di in range(len(label)):
              # 디코더의 출력 계산
              decoder_output = decoder(
                  decoder_input, decoder_hidden, encoder_outputs)
              # 예측된 단어 중 가장 확률이 높은 top1 단어를 다음 입력으로 사용
              topv, topi = decoder_output.topk(1)
              decoder_input = topi.squeeze().detach()

              # 현재 시점의 정답과 비교하여 손실계산
              target = torch.tensor(label[di], dtype = torch.long).to(device)
              target = target.unsqueeze(target, dim=0).to(device)
              loss += nn.CrossEntropyLoss()(decoder_output, target)

              if decoder_input.item() == 1:         # <EOS> 토큰을 만나면 중지
                  break

          # 문장 하나에 대한 평균 손실 계산 후 누적
          total_loss += loss.item()/len(dataset)
          iterator.set_description(f"Epoch {epoch} loss: {total_loss}")
          loss.backward()

          encoder_optimizer.step()
          decoder_optimizer.step()

torch.save(encoder.state_dict(), "attn_enc.pth")
torch.save(decoder.state_dict(), "attn_dec.pth")


  data = torch.tensor(data, dtype = torch.long).to(device)
  label = torch.tensor(label, dtype = torch.long).to(device)
  target = torch.tensor(label[di], dtype = torch.long).to(device)
100%|██████████| 5701/5701 [00:18<00:00, 303.84it/s]
100%|██████████| 5701/5701 [00:17<00:00, 329.65it/s]
100%|██████████| 5701/5701 [00:17<00:00, 317.93it/s]
100%|██████████| 5701/5701 [00:21<00:00, 261.50it/s]
100%|██████████| 5701/5701 [00:30<00:00, 189.02it/s]
100%|██████████| 5701/5701 [00:30<00:00, 188.00it/s]
100%|██████████| 5701/5701 [00:29<00:00, 191.51it/s]
100%|██████████| 5701/5701 [00:30<00:00, 189.04it/s]
100%|██████████| 5701/5701 [00:30<00:00, 188.37it/s]
100%|██████████| 5701/5701 [00:30<00:00, 187.77it/s]
100%|██████████| 5701/5701 [00:30<00:00, 186.79it/s]
100%|██████████| 5701/5701 [00:30<00:00, 186.74it/s]
100%|██████████| 5701/5701 [00:30<00:00, 187.21it/s]
100%|██████████| 5701/5701 [00:30<00:00, 186.76it/s]
100%|██████████| 5701/5701 [00:30<00:00, 187.84it/s]
100%|██████████| 57

In [None]:
# 모델 성능 평가에 필요한 요소 정의

# 인코더 가중치 불러오고
encoder.load_state_dict(torch.load("attn_enc.pth", map_location=device))
# 디코더 가중치 불러오기
decoder.load_state_dict(torch.load("attn_dec.pth", map_location=device))

# 불러올 영어 문장을 랜덤하게 지정
idx = random.randint(0, len(dataset))
# 테스트에 사용할 문장
input_sentence = dataset.eng_corpus[idx]
# 신경망이 번역한 문장
pred_sentence = ""

data, label = dataset[idx]
data = torch.tensor(data, dtype = torch.long).to(device)
label = torch.tensor(label, dtype = torch.long).to(device)

# 인코더의 초기 은닉 상태 정의
encoder_hidden = torch.zeros(1, 1, 64).to(device)
# 인코더의 모든 시점의 출력을 저장하는 변수
encoder_outputs = torch.zeros(11, 64).to(device)

  encoder.load_state_dict(torch.load("attn_enc.pth", map_location=device))
  decoder.load_state_dict(torch.load("attn_dec.pth", map_location=device))


RuntimeError: Error(s) in loading state_dict for Decoder:
	Missing key(s) in state_dict: "attention.weight", "attention.bias", "context.weight", "context.bias", "out.weight", "out.bias". 
	size mismatch for embedding.weight: copying a param with shape torch.Size([3048, 64]) from checkpoint, the shape in current model is torch.Size([7466, 64]).

In [None]:
# 인코더 동작

for ei in range(len(data)):
  # 한 단어씩 인코더에 넣어줌
  encoder_output, encoder_hidden = encoder(data[ei], encoder_hidden)

  # 인코더의 출력을 저장
  encoder_outputs[ei] = encoder_output[0, 0]

# 디코더의 초기 입력
# 0은 <SOS>  토큰
decoder_input = torch.tensor([[0]]).to(device)

# 디코더의 초기 은닉 상태는 인코더의 마지막 hidden state로 설정
decoder_hidden = encoder_hidden

In [None]:
# 디코더 동작

for di in range(11): # 최대 11단어까지 예측 (길이 제한)
    # 현재 입력, hidden state, 인코더 출력을 기반으로 다음 단어 예측
    decoder_output = decoder(
        decoder_input, decoder_hidden, encoder_outputs)
    # 예측 단어 중 가장 확률이 높은 단어 선택
    topv, topi = decoder_output.topk(1) # topi: 예측된 단어 인덱스
    decoder_input = topi.squeeze().detach() # 다음 입력으로 설정 (detach로 gradient 제외)

    # <EOS> 토큰을 만나면 중지
    if decoder_input.item() == 1:
      break

    # 가장 높은 확률값의 단어를 문자열에 추가
    pred_sentence += list(dataset.korBOW.keys())[decoder_input] + " "

print(input_sentence)   # 영어 문장
print(pred_sentence)    # 번역한 한글 문장