<a href="https://colab.research.google.com/github/xhdixhfl/last_project/blob/main/%ED%95%9C_%EC%98%81_%EB%B2%88%EC%97%AD%EA%B8%B0_sequence_to_sequence_transformerModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
## 데이터 셋 로딩
# https://www.manythings.org/anki의 영어 - 프랑스 번역 셋 이용
!wget https://www.manythings.org/anki/kor-eng.zip
!unzip -q kor-eng.zip

--2023-01-06 05:25:54--  https://www.manythings.org/anki/kor-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 153571 (150K) [application/zip]
Saving to: ‘kor-eng.zip’


2023-01-06 05:25:55 (1009 KB/s) - ‘kor-eng.zip’ saved [153571/153571]



In [None]:
# 데이터 살펴보기
text = 'kor.txt'
with open(text) as f:
    lines = f.read().split('\n')[:-1]
text_pairs = []
for line in lines: # 라인별 처리
    eng, kor, etc = line.split('\t')
    eng = '[start]' + eng + '[end]'
    text_pairs.append((kor, eng))
    
# 랜덤 문장보기    
import random
print(random.choice(text_pairs))

('톰, 메리, 존, 앨리스는 모두 수영을 할 수 있어.', '[start]Tom, Mary, John and Alice can all swim.[end]')


In [None]:
# 셔플후 세트 분리
random.shuffle(text_pairs)
num_val_sam = int(0.15 * len(text_pairs))
num_train_sam = len(text_pairs) - 2 * num_val_sam
train_pairs = text_pairs[: num_train_sam]
val_pairs = text_pairs[num_train_sam : num_train_sam + num_val_sam]
test_pairs = text_pairs[num_train_sam + num_val_sam : ]

In [None]:
# 라이브러리 
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import string
import re


# 프랑스어 TV층에 적용하기 위해 특수 문자들 삭제
str_chars = string.punctuation 
str_chars = str_chars.replace("[","")
str_chars = str_chars.replace("]", "")
# 문자열 표준화 함수 정의
def eng_standard(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
    lowercase, f"[{re.escape(str_chars)}]", "")

# 범위제한을 위한 작업(예시라서 제한함)
vocab_size = 15000
sequence_length = 20

# 한글
source_vec = layers.TextVectorization(
    max_tokens = vocab_size,
    output_mode = 'int',
    output_sequence_length = sequence_length,
)
# 영어
target_vec = layers.TextVectorization(
    max_tokens = vocab_size,
    output_mode = 'int',
    output_sequence_length = sequence_length + 1,
    standardize = eng_standard
)

# 훈련후 어휘 사전 만들기
train_kor_texts = [pair[0] for pair in train_pairs]
train_eng_texts = [pair[1] for pair in train_pairs]
source_vec.adapt(train_kor_texts)
target_vec.adapt(train_eng_texts)

In [None]:
batch_size = 64

def format_dataset(kor, eng):
    kor = source_vec(kor)
    eng = target_vec(eng)
    return ({ # 이 dict가 inputs
        'korea': kor,
        'english' : eng[:,:-1],
    }, eng[:, 1:]) # eng가 target

def make_dataset(pairs):
    kor_texts, eng_texts = zip(*pairs)
    kor_texts = list(kor_texts)
    eng_texts = list(eng_texts)
    dataset = tf.data.Dataset.from_tensor_slices((kor_texts, eng_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls = 4)
    return dataset.shuffle(2048).prefetch(16).cache() # 전처리 속도리를 높이기 위한 캐싱

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

# 새 섹션

In [None]:
# 크기 확인
for inputs, targets in train_ds.take(1):
  print(f"inputs['kor'].shape : {inputs['korea'].shape}")
  print(f"inputs['eng'].shape : {inputs['english'].shape}")
  print(f"targets.shape: {targets.shape}")

inputs['kor'].shape : (11, 20)
inputs['eng'].shape : (11, 20)
targets.shape: (11, 20)


In [None]:
class TransformerDecoder(layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim # 입력 토큰 벡터의 크기 
    self.dense_dim = dense_dim # 내부 밀집 층의 크기
    self.num_heads = num_heads # 어텐션 해드 개수
    self.attention_1 = layers.MultiHeadAttention(
        num_heads = num_heads, key_dim = embed_dim
    )
    self.attention_2 = layers.MultiHeadAttention(
        num_heads = num_heads, key_dim = embed_dim
    )
    self.dense_proj = keras.Sequential(
        [layers.Dense(dense_dim, activation = 'relu'),
         layers.Dense(embed_dim),]
    )
    self.layernorm_1 = layers.LayerNormalization()
    self.layernorm_2 = layers.LayerNormalization()
    self.layernorm_3 = layers.LayerNormalization()
    self.supports_masking = True

# 모델 저장을 위한 직렬화(직렬형태여애 저장이 가능)
  def get_config(self):
    config = super().get_config()
    config.update({
        'embed_dim' : self.embed_dim,
        'num_heads' : self.num_heads,
        'dense_dim' : self.dense_dim,
    })
    return config

# 코잘마스킹을 생성하는 함수  (코잘 패딩을고려하여 어텐션 층 전달을 위한 행렬을 만듦)
  def get_causal_attention_mask(self, inputs):
    input_shape = tf.shape(inputs)
    batch_size, sequence_length = input_shape[0], input_shape[1]
    i = tf.range(sequence_length)[:, tf.newaxis] 
    j = tf.range(sequence_length)
    mask = tf.cast(i >= j, dtype ='int32') # 절반은 1이고, 나머지는 0인  행렬
    mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
    mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
    return tf.tile(mask, mult)


# 정방향 패스를 구현하는 완전한 연산함수?
  def call(self, inputs, encoder_outputs, mask = None): 
    causal_mask = self.get_causal_attention_mask(inputs) # 코잘 마스킹 추출
    if mask is not None:
      padding_mask = tf.cast(
          mask[:,tf.newaxis, :], dtype ='int32'
      )
      padding_mask = tf.minimum(padding_mask, causal_mask) # 두 마스킹을 함침
      attention_output_1 = self.attention_1(
          query = inputs,
          value = inputs,
          key = inputs,
          attention_mask = causal_mask
      ) # 코잘 마스킹을 타깃 시퀀스에 대해 셀프 어텐션을 수행하는 첫번째 어텐션 층으로 전달
      attention_output_1 = self.layernorm_1(inputs + attention_output_1) 
      attention_output_2 = self.attention_2(
          query = attention_output_1,
          value = encoder_outputs,
          key = encoder_outputs,
          attention_mask = padding_mask
      ) # 마스킹 소시 시퀀스와 타깃 시퀀스를 연관 시키는 두번째 어텐션 층
      attention_output_2 = self.layernorm_2(
        attention_output_1 + attention_output_2)
      proj_output = self.dense_proj(attention_output_2)
      return self.layernorm_3(attention_output_2 + proj_output) 

In [None]:
# 위치 임베딩 층
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
# 엔드투엔드 트랜스포머
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [None]:
from tensorflow import keras

In [None]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="korea")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x) # 소스 문장 인코딩

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
# 타깃 시퀀스를 인코딩하고 인코딩된 소스 문장과 합침
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x) # 출력위치의 단어 예측
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
# s2s트랜스포머 훈련
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_ds, epochs=30, validation_data=val_ds)
# print(f'정확도 : {transformer.evaluate(test_ds)[1]:.3f}')

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


NameError: ignored

In [None]:
import numpy as np

eng_vocab = target_vec.get_vocabulary()
eng_idx_lookup = dict(zip(range(len(eng_vocab)), eng_vocab))
max_decoded_sentence_length = 20

def decode_sen(input_sen):
  tokenized_input_sen = source_vec([input_sen])
  decoded_sen = "[start]"
  for i in range(max_decoded_sentence_length):
    tokenized_target_sen = target_vec(
        [decoded_sen])[:,:-1]
    preds = transformer(
        [tokenized_input_sen, tokenized_target_sen]
    )
    sampled_token_idx = np.argmax(preds[0,i,:]) # 다음 토큰을 샘플링
    sampled_token = eng_idx_lookup[sampled_token_idx]
    decoded_sen += " " + sampled_token
    if sampled_token == '[end]':
      break
  return decoded_sen

test_kor_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
  input_sen = random.choice(test_kor_texts)
  print("-")
  print(input_sen)
  print(decode_sen(input_sen))

-
톰은 예민한 것 같아.
[start] think tom is lonely[end]                his
-
한번이라도 도둑맞았던 적이 있어?
[start] have no social skills[end]              probably  when
-
뭔가 먹어.
[start] have an old country[end]            and they probably  something
-
우린 서로 얘기했어.
[start] learned how to cook from tom[end]              
-
톰이 다시 보스턴으로 돌아올 줄은 생각지도 못했다.
[start] asked tom to take back his birthday party[end]     interested[end]       when
-
그녀는 발이 작다.
[start] used a little dangerous[end]            and me probably  when
-
뭐 하러?
[start] asked tom to play the guitar[end]            probably  when
-
톰이 자신이 왜 떠나야 하는지를 이해했으면 좋겠어.
[start] asked tom a few of a red house[end]         so probably  when
-
내가 널 어디선가 본 적 있지 않았어?
[start] i didnt know how my more food[end]      native   and they probably  when
-
그녀는 꽃을 좋아한다고 말했다.
[start] asked tom a few questions that science teacher[end]          probably  when
-
그 사람들은 도망 쳤어.
[start] medicine tastes bitter[end]          lied[end]       and
-
이거 시도해봐.
[start] this quite 