In [1]:
import torch
# mac gpu 연결
print(f"MPS 장치가 사용 가능한가? {torch.backends.mps.is_available()}") 
device=torch.device("mps")

MPS 장치가 사용 가능한가? True


In [10]:
from data.dataloader import get_dataloader
from data.tokenizer import Tokenizer
# 토크나이저 초기화
tokenizer = Tokenizer(model_name='Helsinki-NLP/opus-mt-en-fr')

# 데이터 로드
train_dataloader = get_dataloader("en_fr_data.tsv", tokenizer, batch_size=32)

# 배치 확인
for src, trg in train_dataloader:
    print("Source (input):", src)
    print("Target (label):", trg)
    break

Source (input): tensor([[13481,    18,     4,  ..., 59513, 59513, 59513],
        [  138,     6,     9,  ..., 59513, 59513, 59513],
        [   84,   581,  9012,  ..., 59513, 59513, 59513],
        ...,
        [  233,     4,  1497,  ..., 59513, 59513, 59513],
        [ 1414, 10983, 13843,  ..., 59513, 59513, 59513],
        [  237,   159, 12181,  ..., 59513, 59513, 59513]])
Target (label): tensor([[  156,     3, 10815,  ..., 59513, 59513, 59513],
        [   87,     6, 42762,  ..., 59513, 59513, 59513],
        [  267,  1624,    66,  ..., 59513, 59513, 59513],
        ...,
        [   89,     6,    82,  ..., 59513, 59513, 59513],
        [17523,    51,    34,  ..., 59513, 59513, 59513],
        [12187,  6790,  1672,  ..., 59513, 59513, 59513]])




## 데이터 전처리

In [2]:
# # ! pip show torch
# ! pip install torch==2.3.0

In [2]:
import spacy
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import Multi30k
from torch.utils.data import DataLoader
from torchtext.data.utils import get_tokenizer
from torch.nn.utils.rnn import pad_sequence
import torch
import os 
import urllib.request



In [2]:
# url = "https://downloads.tatoeba.org/exports/sentences.csv"
# filename = "sentences.csv"


# if not os.path.exists(filename):
#     print("Downloading dataset...")
#     urllib.request.urlretrieve(url, filename)
#     print("Download completed!")


In [28]:
# import pandas as pd


# data = pd.read_csv(filename, delimiter='\t', header=None, names=["id", "lang", "sentence"])
# en_sentences = data[data["lang"] == "eng"]["sentence"].tolist()
# fr_sentences = data[data["lang"] == "fra"]["sentence"].tolist()


# # 영어와 프랑스어 문장 쌍을 만듭니다.
# en_fr_pairs = list(zip(en_sentences[:10000], fr_sentences[:10000]))


# # 데이터를 TSV 파일로 저장합니다.
# with open("en_fr_data.tsv", "w") as f:
#     f.write("src\ttrg\n")
#     for en, fr in en_fr_pairs:
#         f.write(f"{en}\t{fr}\n")

In [28]:
import torch
import torch.nn as nn
import spacy
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader

# spacy 모델 다운로드 및 로드
spacy_en = spacy.load("en_core_web_sm")
spacy_fr = spacy.load("fr_core_news_sm")

# 토크나이저 정의
def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

def tokenize_fr(text):
    return [tok.text for tok in spacy_fr.tokenizer(text)]

# 데이터셋 로드 및 토큰화
def yield_tokens(data_iter, tokenizer):
    for data_sample in data_iter:
        yield tokenizer(data_sample)

with open("en_fr_data.tsv") as f:
    data = [line.strip().split("\t") for line in f.readlines()[1:]]

src_text = [item[0] for item in data]
trg_text = [item[1] for item in data]

SRC_VOCAB = build_vocab_from_iterator(yield_tokens(src_text, tokenize_en), specials=["<unk>", "<pad>", "<bos>", "<eos>"])
SRC_VOCAB.set_default_index(SRC_VOCAB["<unk>"])
TRG_VOCAB = build_vocab_from_iterator(yield_tokens(trg_text, tokenize_fr), specials=["<unk>", "<pad>", "<bos>", "<eos>"])
TRG_VOCAB.set_default_index(TRG_VOCAB["<unk>"])

# 데이터셋 정의
def data_process(src_text, trg_text, src_vocab, trg_vocab, src_tokenizer, trg_tokenizer):
    data = []
    for (src_line, trg_line) in zip(src_text, trg_text):
        src_tensor = torch.tensor(
            [src_vocab["<bos>"]] +
            [src_vocab[token] for token in src_tokenizer(src_line)] +
            [src_vocab["<eos>"]],
            dtype=torch.long
        )
        trg_tensor = torch.tensor(
            [trg_vocab["<bos>"]] +
            [trg_vocab[token] for token in trg_tokenizer(trg_line)] +
            [trg_vocab["<eos>"]],
            dtype=torch.long
        )
        data.append((src_tensor, trg_tensor))
    return data

train_data = data_process(src_text, trg_text, SRC_VOCAB, TRG_VOCAB, tokenize_en, tokenize_fr)

# 데이터 로더 생성
BATCH_SIZE = 64
PAD_IDX = SRC_VOCAB["<pad>"]

def generate_batch(data_batch):
    src_batch, trg_batch = [], []
    for (src_item, trg_item) in data_batch:
        src_batch.append(src_item)
        trg_batch.append(trg_item)
    src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=PAD_IDX)
    trg_batch = nn.utils.rnn.pad_sequence(trg_batch, padding_value=PAD_IDX)
    return src_batch, trg_batch

train_iter = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=generate_batch)


In [32]:
len(train_iter)
# 훈련 데이터 출력 
for src, trg in train_iter:
    print(src.shape, trg.shape)
    break
len(SRC_VOCAB), len(TRG_VOCAB)

torch.Size([31, 64]) torch.Size([29, 64])


(7594, 8469)

# Transformer 정의
## multi-head attention 
- query
- key
- value

- hyperparameter
    - hidden_dim: 하나의 단어에 대한 임베딩 차원
    - n_heads : 멀티헤드의 개수
    - dropou_ratio

In [4]:
import  torch.nn as nn

class Multiheadattentionlayer(nn.Module): 
    def __init__(self , hidden_dim, n_heads, dropout_ratio , device):
        super().__init__()

        assert hidden_dim % n_heads == 0 

        self.hidden_dim = hidden_dim # 한 단어에 대한 임베딩 차원 
        self.n_heads = n_heads # 멀티헤드 몇개로 설정할 것인지
        self.head_dim = hidden_dim // n_heads #각 헤드에서의 임베딩 차원
        self.fc_q = nn.Linear(hidden_dim, hidden_dim) # query에 대한 선형 레이어 : 선형 변환을 해줘야 하기 때문
        self.fc_k = nn.Linear(hidden_dim, hidden_dim) # 선형 변환 하는 차원 변환 : hidden_dim -> hidden_dim(동일한 차원)
        self.fc_v = nn.Linear(hidden_dim, hidden_dim)
        self.fc_o = nn.Linear(hidden_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device) # 헤드 차원으로 나눠줘야하기 때문에
    
    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]

        Q = self.fc_q(query) # (batch_size, query_len, hidden_dim) : 한 번에 처리할 배치, 문장의 길이, 임베딩 차원
        K = self.fc_k(key)
        V = self.fc_v(value)    

        # Q, K, V를 헤드의 개수로 나눠줌 : head_dim = hidden_dim // n_heads 
        # 0,2,1,3 : 0번째 차원은 그대로 두고 1번째 차원과 2번째 차원을 바꿔줌 : (batch_size, n_heads, query_len, head_dim)
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0,2,1,3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0,2,1,3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0,2,1,3)

        # attention energy 계산 : Q와 K의 내적을 구함 : (batch_size, n_heads, query_len, key_len)
        # 내적을 하기 위해 permute를 해줌 
        energy = torch.matmul(Q, K.permute(0,1,3,2)) / self.scale

        # mask를 적용하여 padding 부분을 -1e10으로 채워주어 softmax 후에 0이 되도록 함
        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)

        attention = torch.softmax(energy, dim=-1) # (batch_size, n_heads, query_len, key_len)

        # 여기서 dropout을 적용 후 V와 내적
        x= torch.matmul(self.dropout(attention), V) # (batch_size, n_heads, query_len, head_dim)
        x= x.permute(0,2,1,3).contiguous() # (batch_size, query_len, n_heads, head_dim) : 다시 원래 차원으로 복구
        x= x.view(batch_size, -1, self.hidden_dim) # (batch_size, query_len, hidden_dim) : 다시 원래 차원으로 복구
        x=self.fc_o(x) # (batch_size, query_len, hidden_dim) 

        return x, attention


        

# Position-wise feedforward
- 입력과 출력 차원이 동일 
- 하이퍼 파라미터 
    - hidden_dim
    - pf_dim  : feedforward 레이어에서의 내부 임베딩 차원 
    - dropout_ratio

In [12]:
class Positionwisefeedforwardlayer(nn.Module):
    def __init__(self, hidden_dim, pf_dim, dropout_ratio):
        super().__init__()
        # 선형 변환 레이어 2개로 구성
        self.fc_1 = nn.Linear(hidden_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)
    
    def forward(self, x):
        x= self.dropout(torch.relu(self.fc_1(x)))

# PositionalEncoding

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_len, device):
        super().__init__()
        # positional encoding을 위한 비어있는 tensor 생성
        self.device = device
        self.pe = torch.zeros(max_len, hidden_dim).to(device)

        # positional encoding 계산
        position = torch.arange(0, max_len).unsqueeze(1).float() # 0부터 max_len까지의 숫자를 생성
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / hidden_dim))
        # 2로 나눈 이유는 sin과 cos를 번갈아가면서 넣어주기 위함
        
        self.pe[:, 0::2] = torch.sin(position * div_term)
        self.pe[:, 1::2] = torch.cos(position * div_term)

        self.pe = self.pe.unsqueeze(0)

    def forward(self, x):
        x = x + self.pe[:, :x.shape[1]]
        return x

# encoderlayer
- 입력과 출력 차원 동일
- 하이퍼파라미터
    - hidden_dim
    - n_heads
    - pf_dim  
    - dropout_layer

In [13]:
class Encoderlayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()
        
        # attention, 피드포워드 , 정규화, 드롭아웃 층으로 구성
        # 순서 
        # self attention -> 정규화+residual -> positionwise feedforward -> 정규화+residual
        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = Multiheadattentionlayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = Positionwisefeedforwardlayer(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, src, src_mask):
        # src : (batch_size, src_len, hidden_dim)
        # src_mask : (batch_size, src_len) : padding 부분을 0으로 표시한 mask
        # 쿼리 키 밸류 모두 src로 들어감
        _src, _ = self.self_attention(src, src, src, src_mask)
        
        # 정규화, 잔차연결
        src = self.self_attn_layer_norm(src + self.dropout(_src)) # dropout, residual, layer norm

        # position-wise feedforward
        _src = self.positionwise_feedforward(src)

        # 정규화 잔차연결
        src = self.ff_layer_norm(src + self.dropout(_src))

        return src

## encoder architectue
- 하이퍼 파라미터
    - input_dim : 하나의 단어에 대한 원 핫 인코딩 차원 
    - hidden_dim : 하나의 단어에 대한 임베딩 차원
    - n_layers : 쌓을 인코더의 개수
    - n_heads
    - pf_dim
    - dropout_rate
    - max_length : 문장 내 최대 단어 개수
- positional embedding
    - bert 방식 : 학습하는 형태
    - transformer 방식  : cos, sin 함수를 활용하는 형태
- pad 토큰에 대해 마스크 값을 0으로 설정

In [15]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
        super().__init__()

        self.device = device
        # 토큰 임베딩 : 단어를 임베딩
        # 포지션 임베딩 : 단어의 위치를 임베딩
        self.tok_embedding = nn.Embedding(input_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(max_length, hidden_dim)

        # 인코더 레이어를 n_layers 만큼 쌓음
        self.layers = nn.ModuleList([Encoderlayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])
        self.dropout = nn.Dropout(dropout_ratio)
        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device) # 헤드 차원으로 나눠줘야함
    
    # bert식
    def forward(self, src, src_mask):
        batch_size = src.shape[0]
        src_len = src.shape[1] # 문장의 길이

        # 포지션 임베딩 : 0부터 src_len까지의 숫자를 순서대로 나타냄
        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device) # (batch_size, src_len)
        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos)) # (batch_size, src_len, hidden_dim) -> 임베딩 + 포지션 임베딩

        # 모든 인코더 레이어를 차례대로 거침
        # src : (batch_size, src_len, hidden_dim)
        # src_mask
        for layer in self.layers:
            src= layer(src, src_mask)
        return src

    # 실제 포지션 임베딩 적용 
    def forward2(self, src, src_mask):
        batch_size = src.shape[0]
        src_len = src.shape[1]

        # 코사인, 사인 함수 사용한 임베딩 추가
        pos = PositionalEncoding(src.shape[2], src_len, self.device) # (batch_size, src_len, hidden_dim)
        src = self.dropout((self.tok_embedding(src) * self.scale) + pos)

        for layer in self.layers:
            src = layer(src, src_mask)
        
        return src


In [14]:
torch.arange(0, 100).unsqueeze(0).repeat(50, 1)

tensor([[ 0,  1,  2,  ..., 97, 98, 99],
        [ 0,  1,  2,  ..., 97, 98, 99],
        [ 0,  1,  2,  ..., 97, 98, 99],
        ...,
        [ 0,  1,  2,  ..., 97, 98, 99],
        [ 0,  1,  2,  ..., 97, 98, 99],
        [ 0,  1,  2,  ..., 97, 98, 99]])

# decoder layer
- 입력과 출력의 차원이 같음 
- 두 개의 Multi head attention 사용
- 하이퍼 파라미터 
    - hidden_dim
    - n_heads
    - pf_dim
    - dropout_ratio
- pad 토큰에 대해 마스크 값을 0으로 설정 
- 타겟 문장에서 . 각단어는 다음 단어가 무엇인지 알 수 없도록 만들기 위해 마스크 사용 

self - multi head -> add+norm -> multi-head cross attention-> add+norm -> feedforward , add + norm

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim) # self attention 후 정규화
        self.enc_attn_layer_norm = nn.LayerNorm(hidden_dim) # cross attention 후 정규화
        self.ff_layer_norm = nn.LayerNorm(hidden_dim) # feedforward 후 정규화
        self.self_attention = Multiheadattentionlayer(hidden_dim, n_heads, dropout_ratio, device) # self attention
        self.encoder_attention = Multiheadattentionlayer(hidden_dim, n_heads, dropout_ratio, device) # cross attention
        self.positionwise_feedforward = Positionwisefeedforwardlayer(hidden_dim, pf_dim, dropout_ratio) # feedforward
        self.dropout = nn.Dropout(dropout_ratio) 
    
    def forward(self, trg, enc_src, trg_mask, src_mask):
        _trg, _ = self.self_attention(trg, trg, trg, trg_mask) # self attention, query, key, value 모두 trg
        trg = self.self_attn_layer_norm(trg + self.dropout(_trg)) # self attention 이후 dropout, residual, layer norm
        
        # enc_src : 인코더의 출력값
        # trg : 디코더의 1번째 레이어의 출력값
        _trg , attention = self.encoder_attention(trg, enc_src, enc_src, src_mask) # cross attention, query는 trg, key, value는 enc_src
        trg= self.enc_attn_layer_norm(trg + self.dropout(_trg)) # cross attention 이후 dropout, residual, layer norm

        _trg = self.positionwise_feedforward(trg) # feedforward
        trg = self.ff_layer_norm(trg + self.dropout(_trg)) # feedforward 이후 dropout, residual, layer norm

        return trg, attention
    





## decoder architecture
- 전체 디코더 구조 정의
- 하이퍼 파라미터
    - output_dim 
    - hidden_dim
    - n_layers : 인코더 레이어 개수
    - n_heads : 멀티 헤드 개수
    - pf_dim 
    - dropout_ratio
    - max_length 

- 실제는 디코더 반복적으로 넣어야 하지만, 학습 시에는 한 번에 출력 문장 구해 학습 가능
- 소스 문장의 pad 토큰에 대해 마스크 값을 0으로 설정 

In [21]:
class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
        super().__init__()

        self.device = device
        self.tok_embedding = nn.Embedding(output_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(max_length, hidden_dim)

        self.layers = nn.ModuleList([DecoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout_ratio)
        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)
    
    def forward(self, trg, enc_src, trg_mask, src_mask):
        batch_size = trg.shape[0]
        trg_len = trg.shape[1]

        pos = torch.arange(0,trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device) # 
        trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos) # (batch_size, trg_len, hidden_dim): 임베딩 + 포지션 임베딩 
        # scale 값 곱하는 이유 : 나중에 softmax를 할 때 값이 너무 작아지는 것을 방지하기 위함
        # 초기 임베딩에 sqrt(hidden_dim)을 곱해줌으로써 값이 너무 작아지는 것을 방지하여 gradeint flow 안정화

        for layer in self.layers:
            trg, attention = layer(trg, enc_src, trg_mask, src_mask)
        output = self.fc_out(trg)

        return output, attention

# Transformer architecture

In [22]:
class Transformer(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device
    
    # pad 토큰에 대해 마스크 값을 0으로 설정 
    def make_src_mask(self, src):
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask
    
    def make_trg_mask(self, trg):
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
        trg_len = trg.shape[1]
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device=self.device)).bool()

        trg_mask = trg_pad_mask & trg_sub_mask
        return trg_mask

    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        enc_src = self.encoder(src, src_mask)

        output , attention = self.decoder(trg, enc_src, trg_mask, src_mask)
        return output, attention



## Training

In [33]:
INPUT_DIM = len(SRC_VOCAB)
OUTPUT_DIM = len(TRG_VOCAB)
HIDDEN_DIM = 256
ENC_LAYERS = 3
DEC_LAYERS = 3
ENC_HEADS = 8
DEC_HEADS = 8
ENC_PF_DIM = 512
DEC_PF_DIM = 512
ENC_DROPOUT = 0.1
DEC_DROPOUT = 0.1

In [None]:
Src_PAD_IDX = SRC_VOCAB["<pad>"]
Trg_PAD_IDX = TRG_VOCAB["<pad>"]

In [35]:
SRC_VOCAB["<pad>"]

1