In [None]:
트랜스포머로 한영 번역기 만들기

In [None]:
0. 라이브러리 버전 확인

In [None]:
import numpy
import tensorflow as tf
import matplotlib.pyplot
import re
import os
import io
import time
import random
import seaborn

print(tensorflow.__version__)
print(numpy.__version__)
print(matplotlib.__version__)

In [None]:
1. 어텐션 맵 한국어 폰트 적용

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm

%config InlineBackend.figure_format = 'retina'
 
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic') 
mpl.font_manager.findfont(font)

In [None]:
2. 데이터 정제 및 토큰화

In [None]:
1) list(set(corpus)) 사용해서 중복 데이터 제거

In [None]:
data_dir = os.getenv('HOME')+'/aiffel/transformer/data'
kor_path = data_dir+"/korean-english-park.train.ko"
eng_path = data_dir+"/korean-english-park.train.en"

# 데이터 정제 및 토큰화
def clean_corpus(kor_path, eng_path):
    with open(kor_path, "r") as f: kor = f.read().splitlines()
    with open(eng_path, "r") as f: eng = f.read().splitlines()
    assert len(kor) == len(eng)

    # (한국어, 영어) 병렬 데이터로 튜플로 묶기
    corpus = list(zip(kor, eng))

    # set을 사용하여 중복을 제거하고 리스트로 변환
    cleaned_corpus = list(set(corpus))

    # 중복을 제거한 병렬 데이터를 다시 분리해서 저장
    cleaned_corpus = [(k, e) for k, e in cleaned_corpus]


    return cleaned_corpus

cleaned_corpus = clean_corpus(kor_path, eng_path)
print(cleaned_corpus[:5])

In [None]:
2) 전처리 함수 만들기

In [None]:
import re

def preprocess_sentence(sentence):
    # 1. 모든 입력을 소문자로 변환
    sentence = sentence.lower()
    
    # 2. 알파벳, 한글, 문장부호(.,!?가 기준)만 남기고 모두 제거
    sentence = re.sub(r"[^a-zA-Z가-힣0-9.,!?]+", " ", sentence)

    # 3. 문장부호 양옆에 공백 추가 (.,!?)
    sentence = re.sub(r"([.,!?])", r" \1 ", sentence)

    # 4. 문장 앞뒤의 불필요한 공백 제거
    sentence = sentence.strip()
    
    return sentence

# 테스트 예시
test_sentence = "Hello! 이 문장은 Test 문장입니다. 다른 기호들 #@$도 있어요."
cleaned_sentence = preprocess_sentence(test_sentence)
print(cleaned_sentence)

In [None]:
3) 한글 말뭉치 kor_corpus 와 영문 말뭉치 eng_corpus 를 각각 분리한 후, 정처리 후 SentencePiece로 토큰화를 진행

In [None]:
import tempfile
import sentencepiece as spm

def generate_tokenizer(corpus, 
                       vocab_size, 
                       lang="ko", 
                       pad_id=0, 
                       bos_id=1, 
                       eos_id=2, 
                       unk_id=3):
    # 1. 말뭉치를 텍스트 파일로 저장
    temp_file = f"corpus_{lang}.txt"
    with open(temp_file, 'w', encoding='utf-8') as f:
        for line in corpus:
            f.write(f"{line}\n")
    
    # 2. 모델을 저장할 디렉토리가 없으면 생성
    model_dir = "tk_model"
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    
    # 3. SentencePiece 모델 학습
    model_prefix = f"{model_dir}/{lang}_tokenizer"
    spm.SentencePieceTrainer.train(
        input=temp_file,
        model_prefix=model_prefix,
        vocab_size=vocab_size,
        pad_id=pad_id,
        bos_id=bos_id,
        eos_id=eos_id,
        unk_id=unk_id,
        model_type='bpe',  # bpe 방식으로 토큰화
        user_defined_symbols=["<PAD>", "<BOS>", "<EOS>", "<UNK>"]  # 특수 토큰 설정
    )
    
    # 4. 학습된 SentencePiece 모델을 불러오기
    sp = spm.SentencePieceProcessor()
    sp.load(f"{model_prefix}.model")
    
    # 5. 불필요한 파일 삭제
    os.remove(temp_file)
    
    return sp

# 한국어 및 영어 말뭉치 정제 및 토큰화
SRC_VOCAB_SIZE = TGT_VOCAB_SIZE = 20000

# 예시에서 cleaned_corpus를 분리
eng_corpus = []
kor_corpus = []

for pair in cleaned_corpus:
    k, e = pair  # cleaned_corpus는 이미 병렬 데이터를 tuple로 저장
    kor_corpus.append(preprocess_sentence(k))
    eng_corpus.append(preprocess_sentence(e))

# 한국어 및 영어 토크나이저 생성
ko_tokenizer = generate_tokenizer(kor_corpus, SRC_VOCAB_SIZE, "ko")
en_tokenizer = generate_tokenizer(eng_corpus, TGT_VOCAB_SIZE, "en")

# 타겟 토큰에 BOS와 EOS를 포함하도록 설정
en_tokenizer.set_encode_extra_options("bos:eos")

# 토크나이저 확인
print("슝=3")


In [None]:
4) 토크나이저를 활용해 토큰의 길이가 50 이하인 데이터를 선별하여 src_corpus 와 tgt_corpus 를 각각 구축하고, 텐서 enc_train 과 dec_train 으로 변환

In [None]:
from tqdm.notebook import tqdm  # Progress Bar 보기 위해 사용
import tensorflow as tf

src_corpus = []
tgt_corpus = []

assert len(kor_corpus) == len(eng_corpus)  # 한국어와 영어 문장의 개수가 일치하는지 확인

# 토큰의 길이가 50 이하인 문장만 남깁니다.
for idx in tqdm(range(len(kor_corpus))):
    # 한국어 문장 토큰화
    src_tokens = ko_tokenizer.encode(kor_corpus[idx], out_type=int)
    
    # 영어 문장 토큰화 (BOS와 EOS 포함)
    tgt_tokens = en_tokenizer.encode(eng_corpus[idx], out_type=int)
    
    # 토큰 길이가 50 이하인 경우만 추가
    if len(src_tokens) <= 50 and len(tgt_tokens) <= 50:
        src_corpus.append(src_tokens)
        tgt_corpus.append(tgt_tokens)

# 패딩 처리를 완료하여 학습용 데이터를 완성합니다.
enc_train = tf.keras.preprocessing.sequence.pad_sequences(src_corpus, padding='post')
dec_train = tf.keras.preprocessing.sequence.pad_sequences(tgt_corpus, padding='post')

# enc_train과 dec_train의 형태 출력
print(f"enc_train shape: {enc_train.shape}")
print(f"dec_train shape: {dec_train.shape}")


In [None]:
3. 모델 설계

In [None]:
def positional_encoding(pos, d_model):
    def cal_angle(position, i):
        return position / np.power(10000, int(i) / d_model)

    def get_posi_angle_vec(position):
        return [cal_angle(position, i) for i in range(d_model)]

    sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(pos)])
    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])
    return sinusoid_table

print("슝=3")

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
            
        self.depth = d_model // self.num_heads
            
        self.W_q = tf.keras.layers.Dense(d_model)
        self.W_k = tf.keras.layers.Dense(d_model)
        self.W_v = tf.keras.layers.Dense(d_model)
            
        self.linear = tf.keras.layers.Dense(d_model)

    def scaled_dot_product_attention(Q, K, V, mask=None):
        # Q: [batch_size, num_heads, seq_len_q, depth_q]
        # K: [batch_size, num_heads, seq_len_k, depth_k]
        # V: [batch_size, num_heads, seq_len_k, depth_v]

        # Q, K, V의 shape 출력
        print(f"Q shape: {Q.shape}")
        print(f"K shape: {K.shape}")
        print(f"V shape: {V.shape}")

        # 마스크의 shape 출력
        if mask is not None:
            print(f"mask shape: {mask.shape}")

        # Q와 K의 매트릭스 곱셈
        matmul_qk = tf.matmul(Q, K, transpose_b=True)  # (batch_size, num_heads, seq_len_q, seq_len_k)

        # Scale the matmul_qk
        dk = tf.cast(tf.shape(K)[-1], tf.float32)
        scaled_qk = matmul_qk / tf.math.sqrt(dk)

        # Apply mask (if provided)
        if mask is not None:
            # 마스크와 scaled_qk의 shape이 맞는지 확인
            print(f"scaled_qk shape: {scaled_qk.shape}")
            print(f"mask shape (before applying): {mask.shape}")

            # 마스크 추가 (shape이 맞는지 확인하고, 맞지 않으면 reshape 필요)
            scaled_qk += (mask * -1e9)

        # Softmax on the last axis (seq_len_k)
        attention_weights = tf.nn.softmax(scaled_qk, axis=-1)

        # Multiply attention weights with V
        output = tf.matmul(attention_weights, V)  # (batch_size, num_heads, seq_len_q, depth_v)

        return output, attention_weights



    def split_heads(self, x):
        batch_size = x.shape[0]
        split_x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        split_x = tf.transpose(split_x, perm=[0, 2, 1, 3])

        return split_x

    def combine_heads(self, x):
        batch_size = x.shape[0]
        combined_x = tf.transpose(x, perm=[0, 2, 1, 3])
        combined_x = tf.reshape(combined_x, (batch_size, -1, self.d_model))

        return combined_x

        
    def call(self, Q, K, V, mask):
        WQ = self.W_q(Q)
        WK = self.W_k(K)
        WV = self.W_v(V)
        
        WQ_splits = self.split_heads(WQ)
        WK_splits = self.split_heads(WK)
        WV_splits = self.split_heads(WV)
            
        out, attention_weights = self.scaled_dot_product_attention(
            WQ_splits, WK_splits, WV_splits, mask)
    				        
        out = self.combine_heads(out)
        out = self.linear(out)
                
        return out, attention_weights
    
print("슝=3")

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
            
        self.depth = d_model // self.num_heads
            
        self.W_q = tf.keras.layers.Dense(d_model)
        self.W_k = tf.keras.layers.Dense(d_model)
        self.W_v = tf.keras.layers.Dense(d_model)
            
        self.linear = tf.keras.layers.Dense(d_model)


    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        Perform scaled dot-product attention with mask.
        """
        # Matmul of Q and K
        matmul_qk = tf.matmul(Q, K, transpose_b=True)  # (batch_size, num_heads, seq_len_q, seq_len_k)

        # Scale matmul_qk
        dk = tf.cast(tf.shape(K)[-1], tf.float32)
        scaled_qk = matmul_qk / tf.math.sqrt(dk)

        # Apply mask (if provided)
        if mask is not None:
            scaled_qk += (mask * -1e9)

        # Softmax over the last axis (seq_len_k)
        attention_weights = tf.nn.softmax(scaled_qk, axis=-1)

        # Matmul of attention weights and V
        output = tf.matmul(attention_weights, V)  # (batch_size, num_heads, seq_len_q, depth_v)

        return output, attention_weights


    def scaled_dot_product_attention(self, Q, K, V, mask=None):  # self 추가
        # Q: [batch_size, num_heads, seq_len_q, depth_q]
        # K: [batch_size, num_heads, seq_len_k, depth_k]
        # V: [batch_size, num_heads, seq_len_k, depth_v]

        # Q, K, V의 shape 출력
        print(f"Q shape: {Q.shape}")
        print(f"K shape: {K.shape}")
        print(f"V shape: {V.shape}")

        # 마스크의 shape 출력
        if mask is not None:
            print(f"mask shape: {mask.shape}")

        # Q와 K의 매트릭스 곱셈
        matmul_qk = tf.matmul(Q, K, transpose_b=True)  # (batch_size, num_heads, seq_len_q, seq_len_k)

        # Scale the matmul_qk
        dk = tf.cast(tf.shape(K)[-1], tf.float32)
        scaled_qk = matmul_qk / tf.math.sqrt(dk)

        # Apply mask (if provided)
        if mask is not None:
            # 마스크와 scaled_qk의 shape이 맞는지 확인
            print(f"scaled_qk shape: {scaled_qk.shape}")
            print(f"mask shape (before applying): {mask.shape}")

            # 마스크 추가 (shape이 맞는지 확인하고, 맞지 않으면 reshape 필요)
            scaled_qk += (mask * -1e9)

        # Softmax on the last axis (seq_len_k)
        attention_weights = tf.nn.softmax(scaled_qk, axis=-1)

        # Multiply attention weights with V
        output = tf.matmul(attention_weights, V)  # (batch_size, num_heads, seq_len_q, depth_v)

        return output, attention_weights


    def split_heads(self, x):
        batch_size = x.shape[0]
        split_x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        split_x = tf.transpose(split_x, perm=[0, 2, 1, 3])

        return split_x

    def combine_heads(self, x):
        batch_size = x.shape[0]
        combined_x = tf.transpose(x, perm=[0, 2, 1, 3])
        combined_x = tf.reshape(combined_x, (batch_size, -1, self.d_model))

        return combined_x

        
    def call(self, Q, K, V, mask):
        WQ = self.W_q(Q)
        WK = self.W_k(K)
        WV = self.W_v(V)
        
        WQ_splits = self.split_heads(WQ)
        WK_splits = self.split_heads(WK)
        WV_splits = self.split_heads(WV)
            
        # scaled_dot_product_attention 호출 시 정확한 4개의 인자 전달
        out, attention_weights = self.scaled_dot_product_attention(
            WQ_splits, WK_splits, WV_splits, mask)
    				        
        out = self.combine_heads(out)
        out = self.linear(out)
                
        return out, attention_weights

print("슝=3")


In [None]:
class PoswiseFeedForwardNet(tf.keras.layers.Layer):
    def __init__(self, d_model, d_ff):
        super(PoswiseFeedForwardNet, self).__init__()
        self.w_1 = tf.keras.layers.Dense(d_ff, activation='relu')
        self.w_2 = tf.keras.layers.Dense(d_model)

    def call(self, x):
        out = self.w_1(x)
        out = self.w_2(out)
            
        return out

print("슝=3")

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, n_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()

        self.enc_self_attn = MultiHeadAttention(d_model, n_heads)
        self.ffn = PoswiseFeedForwardNet(d_model, d_ff)

        self.norm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout = tf.keras.layers.Dropout(dropout)
        
    def call(self, x, mask):

        """
        Multi-Head Attention
        """
        residual = x
        out = self.norm_1(x)
        out, enc_attn = self.enc_self_attn(out, out, out, mask)
        out = self.dropout(out)
        out += residual
        
        """
        Position-Wise Feed Forward Network
        """
        residual = out
        out = self.norm_2(out)
        out = self.ffn(out)
        out = self.dropout(out)
        out += residual
        
        return out, enc_attn

print("슝=3")

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()

        self.dec_self_attn = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)

        self.ffn = PoswiseFeedForwardNet(d_model, d_ff)

        self.norm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm_3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout = tf.keras.layers.Dropout(dropout)
    
    def call(self, x, enc_out, causality_mask, padding_mask):

        """
        Masked Multi-Head Attention (Self-Attention)
        """
        residual = x
        out = self.norm_1(x)
        out, dec_attn = self.dec_self_attn(out, out, out, causality_mask)  # Self-Attention에서는 causality_mask 사용
        out = self.dropout(out)
        out += residual

        """
        Encoder-Decoder Cross Attention (No Causality Mask, Only Padding Mask)
        """
        residual = out
        out = self.norm_2(out)
        out, dec_enc_attn = self.enc_dec_attn(out, enc_out, enc_out, padding_mask)  # padding_mask만 사용
        out = self.dropout(out)
        out += residual
        
        """
        Position-Wise Feed Forward Network
        """
        residual = out
        out = self.norm_3(out)
        out = self.ffn(out)
        out = self.dropout(out)
        out += residual

        return out, dec_attn, dec_enc_attn

print("슝=3")


In [None]:
class Encoder(tf.keras.Model):
    def __init__(self,
                 n_layers,
                 d_model,
                 n_heads,
                 d_ff,
                 dropout):
        super(Encoder, self).__init__()
        self.n_layers = n_layers
        self.enc_layers = [EncoderLayer(d_model, n_heads, d_ff, dropout) 
                        for _ in range(n_layers)]
        
    def call(self, x, mask):
        out = x
    
        enc_attns = list()
        for i in range(self.n_layers):
            out, enc_attn = self.enc_layers[i](out, mask)
            enc_attns.append(enc_attn)
        
        return out, enc_attns

print("슝=3")

In [None]:
class Decoder(tf.keras.Model):
    def __init__(self,
                 n_layers,
                 d_model,
                 n_heads,
                 d_ff,
                 dropout):
        super(Decoder, self).__init__()
        self.n_layers = n_layers
        self.dec_layers = [DecoderLayer(d_model, n_heads, d_ff, dropout) 
                            for _ in range(n_layers)]
                            
    def call(self, x, enc_out, causality_mask, padding_mask):
        out = x
    
        dec_attns = list()
        dec_enc_attns = list()
        for i in range(self.n_layers):
            # 수정: DecoderLayer의 call 함수에서 cross attention에서는 padding_mask만 전달
            out, dec_attn, dec_enc_attn = \
            self.dec_layers[i](out, enc_out, causality_mask, padding_mask)

            dec_attns.append(dec_attn)
            dec_enc_attns.append(dec_enc_attn)

        return out, dec_attns, dec_enc_attns

print("슝=3")


In [None]:
class Transformer(tf.keras.Model):
    def __init__(self,
                    n_layers,
                    d_model,
                    n_heads,
                    d_ff,
                    src_vocab_size,
                    tgt_vocab_size,
                    pos_len,
                    dropout=0.2,
                    shared=True):
        super(Transformer, self).__init__()
        self.d_model = tf.cast(d_model, tf.float32)

        self.enc_emb = tf.keras.layers.Embedding(src_vocab_size, d_model)
        self.dec_emb = tf.keras.layers.Embedding(tgt_vocab_size, d_model)

        self.pos_encoding = positional_encoding(pos_len, d_model)
        self.dropout = tf.keras.layers.Dropout(dropout)

        self.encoder = Encoder(n_layers, d_model, n_heads, d_ff, dropout)
        self.decoder = Decoder(n_layers, d_model, n_heads, d_ff, dropout)

        self.fc = tf.keras.layers.Dense(tgt_vocab_size)

        self.shared = shared

        if shared: self.fc.set_weights(tf.transpose(self.dec_emb.weights))

    def embedding(self, emb, x):
        seq_len = x.shape[1]
        out = emb(x)

        if self.shared: out *= tf.math.sqrt(self.d_model)

        out += self.pos_encoding[np.newaxis, ...][:, :seq_len, :]
        out = self.dropout(out)

        return out

        
    def call(self, enc_in, dec_in, enc_mask, causality_mask, dec_mask):
        enc_in = self.embedding(self.enc_emb, enc_in)
        dec_in = self.embedding(self.dec_emb, dec_in)

        enc_out, enc_attns = self.encoder(enc_in, enc_mask)
        
        # 수정: 디코더에서 cross-attention에는 dec_mask만 전달
        dec_out, dec_attns, dec_enc_attns = \
        self.decoder(dec_in, enc_out, causality_mask, dec_mask)  # dec_mask는 padding_mask로 전달
        
        logits = self.fc(dec_out)
        
        return logits, enc_attns, dec_attns, dec_enc_attns

print("슝=3")


In [None]:
import numpy as np
import tensorflow as tf

def generate_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def generate_causality_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)  # 크기만을 받도록 수정
    return mask  # (seq_len, seq_len)


def generate_masks(src, tgt):
    # 인코더 패딩 마스크 (src 시퀀스에 대해)
    enc_mask = generate_padding_mask(src)

    # 디코더의 패딩 마스크 (tgt 시퀀스에 대해)
    dec_mask = generate_padding_mask(tgt)
    
    # 디코더의 자기 자신에 대한 인과 마스크 (룩어헤드 마스크)
    causality_mask = generate_causality_mask(tf.shape(tgt)[1])
    combined_dec_mask = tf.maximum(dec_mask, causality_mask)

    # 인코더-디코더 크로스 어텐션을 위한 패딩 마스크 (src 시퀀스에 대해)
    dec_enc_mask = generate_padding_mask(src)

    return enc_mask, dec_enc_mask, combined_dec_mask


print("슝=3")

In [None]:
4. 훈련하기

In [None]:
# 필요한 하이퍼파라미터 설정
n_layers = 2          # 트랜스포머 레이어 수
d_model = 512         # 모델 차원
n_heads = 8           # 멀티 헤드 어텐션에서의 헤드 수
d_ff = 2048           # 피드 포워드 네트워크의 차원
src_vocab_size = 20000 # 소스 언어의 단어 사전 크기
tgt_vocab_size = 20000 # 타겟 언어의 단어 사전 크기
pos_len = 100         # 포지션 인코딩의 길이
dropout = 0.1         # 드롭아웃 비율

# 2 Layer를 가지는 Transformer 모델 선언
transformer = Transformer(n_layers=n_layers,
                          d_model=d_model,
                          n_heads=n_heads,
                          d_ff=d_ff,
                          src_vocab_size=src_vocab_size,
                          tgt_vocab_size=tgt_vocab_size,
                          pos_len=pos_len,
                          dropout=dropout)

print("Transformer 모델 생성 완료!")

In [None]:
import tensorflow as tf

# 논문에서 사용된 Learning Rate Scheduler 정의
class LearningRateScheduler(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(LearningRateScheduler, self).__init__()
        self.d_model = d_model
        self.warmup_steps = warmup_steps
    
    def __call__(self, step):
        arg1 = step ** -0.5  # step의 역수의 제곱근
        arg2 = step * (self.warmup_steps ** -1.5)  # warmup_steps의 제곱근을 곱한 값
        return (self.d_model ** -0.5) * tf.math.minimum(arg1, arg2)

# 모델의 차원(d_model)에 따라 학습률 스케줄러 생성
learning_rate = LearningRateScheduler(d_model=512)

# Adam Optimizer 선언 (논문에서 사용된 파라미터와 동일하게 설정)
optimizer = tf.keras.optimizers.Adam(learning_rate,
                                     beta_1=0.9,
                                     beta_2=0.98, 
                                     epsilon=1e-9)

print("슝=3")


In [None]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    # Masking 되지 않은 입력의 개수로 Scaling하는 과정
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)

In [None]:
@tf.function()
def train_step(src, tgt, model, optimizer):
    gold = tgt[:, 1:]  # 타겟의 실제값에서 <BOS> 토큰을 제외한 부분
    
    # 마스크 생성
    enc_mask, dec_enc_mask, dec_mask = generate_masks(src, tgt)

    # 계산된 loss에 tf.GradientTape() 적용하여 학습 진행
    with tf.GradientTape() as tape:
        predictions, enc_attns, dec_attns, dec_enc_attns = \
        model(src, tgt, enc_mask, dec_enc_mask, dec_mask)
        loss = loss_function(gold, predictions[:, :-1])

    # 그라디언트 계산 및 적용
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    return loss, enc_attns, dec_attns, dec_enc_attns



In [None]:
# Attention 시각화 함수

def visualize_attention(src, tgt, enc_attns, dec_attns, dec_enc_attns):
    def draw(data, ax, x="auto", y="auto"):
        import seaborn
        seaborn.heatmap(data, 
                        square=True,
                        vmin=0.0, vmax=1.0, 
                        cbar=False, ax=ax,
                        xticklabels=x,
                        yticklabels=y)
        
    for layer in range(0, 2, 1):
        fig, axs = plt.subplots(1, 4, figsize=(20, 10))
        print("Encoder Layer", layer + 1)
        for h in range(4):
            draw(enc_attns[layer][0, h, :len(src), :len(src)], axs[h], src, src)
        plt.show()
        
    for layer in range(0, 2, 1):
        fig, axs = plt.subplots(1, 4, figsize=(20, 10))
        print("Decoder Self Layer", layer+1)
        for h in range(4):
            draw(dec_attns[layer][0, h, :len(tgt), :len(tgt)], axs[h], tgt, tgt)
        plt.show()

        print("Decoder Src Layer", layer+1)
        fig, axs = plt.subplots(1, 4, figsize=(20, 10))
        for h in range(4):
            draw(dec_enc_attns[layer][0, h, :len(tgt), :len(src)], axs[h], src, tgt)
        plt.show()

In [None]:
# 번역 생성 함수

def evaluate(sentence, model, src_tokenizer, tgt_tokenizer):
    sentence = preprocess_sentence(sentence)

    pieces = src_tokenizer.encode_as_pieces(sentence)
    tokens = src_tokenizer.encode_as_ids(sentence)

    _input = tf.keras.preprocessing.sequence.pad_sequences([tokens],
                                                           maxlen=enc_train.shape[-1],
                                                           padding='post')
    
    ids = []
    output = tf.expand_dims([tgt_tokenizer.bos_id()], 0)
    for i in range(dec_train.shape[-1]):
        enc_padding_mask, combined_mask, dec_padding_mask = \
        generate_masks(_input, output)

        predictions, enc_attns, dec_attns, dec_enc_attns =\
        model(_input, 
              output,
              enc_padding_mask,
              combined_mask,
              dec_padding_mask)

        predicted_id = \
        tf.argmax(tf.math.softmax(predictions, axis=-1)[0, -1]).numpy().item()

        if tgt_tokenizer.eos_id() == predicted_id:
            result = tgt_tokenizer.decode_ids(ids)
            return pieces, result, enc_attns, dec_attns, dec_enc_attns

        ids.append(predicted_id)
        output = tf.concat([output, tf.expand_dims([predicted_id], 0)], axis=-1)

    result = tgt_tokenizer.decode_ids(ids)

    return pieces, result, enc_attns, dec_attns, dec_enc_attns

In [None]:
# 번역 생성 및 Attention 시각화 결합

def translate(sentence, model, src_tokenizer, tgt_tokenizer, plot_attention=False):
    pieces, result, enc_attns, dec_attns, dec_enc_attns = \
    evaluate(sentence, model, src_tokenizer, tgt_tokenizer)
    
    print('Input: %s' % (sentence))
    print('Predicted translation: {}'.format(result))

    if plot_attention:
        visualize_attention(pieces, result.split(), enc_attns, dec_attns, dec_enc_attns)

In [None]:
from tqdm import tqdm_notebook 

BATCH_SIZE = 64
EPOCHS = 1

examples = [
            "오바마는 대통령이다.",
            "시민들은 도시 속에 산다.",
            "커피는 필요 없다.",
            "일곱 명의 사망자가 발생했다."
]

for epoch in range(EPOCHS):
    total_loss = 0
    
    idx_list = list(range(0, enc_train.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm_notebook(idx_list)

    for (batch, idx) in enumerate(t):
        batch_loss, enc_attns, dec_attns, dec_enc_attns = \
        train_step(enc_train[idx:idx+BATCH_SIZE],
                    dec_train[idx:idx+BATCH_SIZE],
                    transformer,
                    optimizer)

        total_loss += batch_loss
        
        t.set_description_str('Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))

    for example in examples:
        translate(example, transformer, ko_tokenizer, en_tokenizer)

In [None]:
회고

In [None]:
트랜스포머 학습을 하면서 가장 깊게 공부할 수 있었던 것 같다. 노드의 도입부 대로 차원을 확인하면서 공부하니 각 함수의 의미와 흐름, 그리고 코드에서 차원의 크기에 문제가 생겼을 때도 덜 겁을 먹고 코드를 뜯어보며 해결하려고 노력했던 것 같다. 다만, 트랜스포머 모듈? 구조에 오랜 시간을 써서 뒷부분인 손실함수, 학습률 등은 사실 자세히 못봐서 아쉽다. 다음에 또 이에 대해 학습할 기회가 있으면 좋을 것 같당!

