<a href="https://colab.research.google.com/github/saeu5407/daily_tensorflow/blob/main/seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Seq2seq

seq2seq를 활용해 기계번역을 진행해보도록 하겠습니다.

[원본 텐서플로우 튜토리얼 링크](https://www.tensorflow.org/text/tutorials/nmt_with_attention?hl=ko)


In [None]:
pip install tensorflow_text

In [2]:
import numpy as np

import typing
from typing import Any, Tuple

import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

import tensorflow_text as tf_text

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

In [3]:
# 모양을 확인하는 데 사용하기 위해 정의해 둔 저수준 API 관련 옵션
use_builtins = True

하단 코드는 모양 검사를 위한 코드입니다.

In [None]:
class ShapeChecker():
  def __init__(self):
    # Keep a cache of every axis-name seen
    self.shapes = {}

  def __call__(self, tensor, names, broadcast=False):
    if not tf.executing_eagerly():
      return

    if isinstance(names, str):
      names = (names,)

    shape = tf.shape(tensor)
    rank = tf.rank(tensor)

    if rank != len(names):
      raise ValueError(f'Rank mismatch:\n'
                       f'    found {rank}: {shape.numpy()}\n'
                       f'    expected {len(names)}: {names}\n')

    for i, name in enumerate(names):
      if isinstance(name, int):
        old_dim = name
      else:
        old_dim = self.shapes.get(name, None)
      new_dim = shape[i]

      if (broadcast and new_dim == 1):
        continue

      if old_dim is None:
        # If the axis name is new, add its length to the cache.
        self.shapes[name] = new_dim
        continue

      if new_dim != old_dim:
        raise ValueError(f"Shape mismatch for dimension: '{name}'\n"
                         f"    found: {new_dim}\n"
                         f"    expected: {old_dim}\n")

## 데이터셋 로드 및 준비

텐서플로우 튜토리얼을 따라 영어-스페인어 데이터 세트를 사용해 데이터셋을 로드합니다.
해당 데이터세트는 다음과 같이 한 쌍으로 구성되어 있습니다.

```
May I borrow this book? ¿Puedo tomar prestado este libro?
```

다운받은 데이터에 대해 다음과 같은 작업을 진행합니다.
1. 각 문장에 토큰 시작과 끝을 추가
2. 특수 문자 제거
3. 단어 -> id, id -> 단어 등 사전 매핑
4. 각 문장을 최대 길이로 채움

### 데이터 다운로드 및 로드

In [4]:
# 데이터 다운로드
import pathlib

path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

path_to_file = pathlib.Path(path_to_zip).parent/'spa-eng/spa.txt'

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


In [5]:
# 데이터 로드 함수 정의
def load_data(path):
  text = path.read_text(encoding='utf-8')

  lines = text.splitlines() # 여러 라인으로 구분된 문자열을 한 라인씩 분리하여 리스트 반환
  pairs = [line.split('\t') for line in lines] # 그 리스트를 탭으로 분리하여 영어, 스페인 문장을 나눔

  inp = [inp for targ, inp in pairs]
  targ = [targ for targ, inp in pairs]

  return targ, inp

In [6]:
# 데이터 로드 및 샘플 프린트
targ, inp = load_data(path_to_file)
print("스페인어")
print(inp[-1])
print("영어")
print(targ[-1])

스페인어
Si quieres sonar como un hablante nativo, debes estar dispuesto a practicar diciendo la misma frase una y otra vez de la misma manera en que un músico de banjo practica el mismo fraseo una y otra vez hasta que lo puedan tocar correctamente y en el tiempo esperado.
영어
If you want to sound like a native speaker, you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo.


### 데이터셋 생성

`tf.data.Dataset` 을 활용하여 데이터셋을 생성합니다.

In [7]:
BUFFER_SIZE = len(inp)
BATCH_SIZE = 64

# tf.data.Dataset.from_tensor_slices는 알아서 slice를 한다. -> 배치를 위한 전처리
dataset = tf.data.Dataset.from_tensor_slices((inp, targ)).shuffle(BUFFER_SIZE) # shuffle로 랜덤하게 넣는다
# 를 배치에 적용
dataset = dataset.batch(BATCH_SIZE)

In [8]:
# 배치 적용 샘플 테스트
for example_input_batch, example_target_batch in dataset.take(1):
  print(example_input_batch[:5])
  print()
  print(example_target_batch[:5])
  break

tf.Tensor(
[b'\xc2\xbfEn qu\xc3\xa9 camino queda la playa?'
 b'Es un poco m\xc3\xa1s tarde de las once menos cuarto.'
 b'\xc3\x89l puede leer lo suficiente.'
 b'Tom tiene una hermana en Boston.' b'Nos subimos al bus en Shinjuku.'], shape=(5,), dtype=string)

tf.Tensor(
[b'Which way is the beach?' b'It is a little after a quarter to eleven.'
 b'He can read well.' b'Tom has a sister in Boston.'
 b'We got on the bus at Shinjuku.'], shape=(5,), dtype=string)


### 텍스트 전처리

이번 튜토리얼에서는 `tf.saved_model`을 활용하여 모델을 저장하는 것을 목표로 하고 있습니다.

우선 표준화 작업부터 진행합니다.

#### 표준화

악센트를 분할하고, 호환성 문자를 해당 ASCII 문자로 대체하겠습니다.<br>
`tensorflow_text` 패키지에 해당 작업들이 존재합니다.

In [9]:
# 전처리 샘플
example_text = tf.constant('¿Todavía está en casa?')

print(example_text.numpy())
print(tf_text.normalize_utf8(example_text, 'NFKD').numpy())

b'\xc2\xbfTodav\xc3\xada est\xc3\xa1 en casa?'
b'\xc2\xbfTodavi\xcc\x81a esta\xcc\x81 en casa?'


In [10]:
# Standardization Function
def tf_lower_and_split_punct(text):
  # Split accecented characters.
  text = tf_text.normalize_utf8(text, 'NFKD')
  text = tf.strings.lower(text)
  # Keep space, a to z, and select punctuation.
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  # Add spaces around punctuation.
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  # Strip whitespace.
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

In [11]:
print(example_text.numpy().decode())
print(tf_lower_and_split_punct(example_text).numpy().decode())

¿Todavía está en casa?
[START] ¿ todavia esta en casa ? [END]


#### 텍스트 벡터화

위의 Standardization Function 함수는 `tf.keras.layers.TextVectorization` 안에서 텍스트를 벡터화합니다.

`tf.keras.layers.TextVectorization`는 어휘 추출 및 입력 텍스트를 토큰 시퀀스로 변환하는 레이어입니다.

TextVectorization 층 또는 다른 전처리 층들은 `adapt` 메서드을 가지고 있습니다.
이 메서드는 우선 트레이닝 데이터의 한 에포크를 읽은 후, `Model.fix`와 같이 작업을 수행합니다.
이 `adapt` 메서드는 데이터에 기반하여 레이어를 초기화합니다. 여기에서 어휘를 결정합니다.

In [11]:
max_vocab_size = 5000

input_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

In [11]:
input_text_processor.adapt(inp)

# Here are the first 10 words from the vocabulary:
input_text_processor.get_vocabulary()[:10]

스페인어 TextVectorization 레이어가 영어로 build, .adapt()되는 예시

In [None]:
output_text_processor = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

output_text_processor.adapt(targ)
output_text_processor.get_vocabulary()[:10]

이제 레이어는 string 배치에서 token ID 배치로 변환할 수 있습니다.

In [None]:
example_tokens = input_text_processor(example_input_batch)
example_tokens[:3, :10]

참고로 `get_vocabulary` 메서드는  token ID를 다시 text로 변환할 수 있는 메서드입니다.

In [None]:
input_vocab = np.array(input_text_processor.get_vocabulary())
tokens = input_vocab[example_tokens[0].numpy()]
' '.join(tokens)

변환된 token ID들은 0으로 변환이 되어 있어서 쉽게 마스킹할 수 있습니다. 다음은 마스킹을 한 시각화 예입니다.

In [None]:
plt.subplot(1, 2, 1)
plt.pcolormesh(example_tokens)
plt.title('Token IDs')

plt.subplot(1, 2, 2)
plt.pcolormesh(example_tokens != 0)
plt.title('Mask')

## 인코더, 디코더 모델

하단의 그림은 모델의 overview를 나타냅니다. 각 타임스탭마다 디코더의 output은 인코딩된 input의 가중치 합과 결합하고, 다음 단어를 예측합니다.

![encoder-decoder overview](https://www.tensorflow.org/images/seq2seq/attention_mechanism.jpg)

우선 몇 가지 파라미터를 설정해두고 시작하도록 하겠습니다.

In [None]:
embedding_dim = 256
units = 1024

### 인코더

위의 그림에서 파란색 부분을 인코더라고 합니다. 

인코더는 다음과 같이 동작합니다.

1. token ID를 가져옵니다.(`input_text_processor`를 통해)
2. 각 토큰에 대한 임베딩 벡터를 찾습니다.(`layers.Embedding`을 사용)
3. 임베딩 벡터를 새로운 시퀀스로 처리합니다.(`layers.GRU`을 사용)

이에 대한 리턴 값은 다음 두 개가 됩니다.
1. 처리된 시퀀스 : 어텐션의 input으로 사용됩니다.
2. internal state(내부 상태) : 디코더를 초기화하는데 사용됩니다. 


In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, input_vocab_size, embedding_dim, enc_units):
    super(Encoder, self).__init__()
    self.enc_units = enc_units
    self.input_vocab_size = input_vocab_size

    # The embedding layer converts tokens to vectors
    # 임베딩을 수행하는 레이어(문자 데이터를 모델링을 위해 벡터로 임베딩)
    self.embedding = tf.keras.layers.Embedding(self.input_vocab_size,
                                               embedding_dim)

    # The GRU RNN layer processes those vectors sequentially.
    self.gru = tf.keras.layers.GRU(self.enc_units,
                                   # Return the sequence and state
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')

  def call(self, tokens, state=None):
    shape_checker = ShapeChecker() # 위에 정의해 둔 ShapeChecker Class를 사용해 shape를 확인한다.
    shape_checker(tokens, ('batch', 's'))

    # 2. The embedding layer looks up the embedding for each token.
    vectors = self.embedding(tokens)
    shape_checker(vectors, ('batch', 's', 'embed_dim'))

    # 3. The GRU processes the embedding sequence.
    #    output shape: (batch, s, enc_units)
    #    state shape: (batch, enc_units)
    output, state = self.gru(vectors, initial_state=state)
    shape_checker(output, ('batch', 's', 'enc_units'))
    shape_checker(state, ('batch', 'enc_units'))

    # 4. Returns the new sequence and its state.
    # GRU를 통해 최종 output과, 디코더를 초기화할 state를 return합니다.
    return output, state

In [None]:
# Convert the input text to tokens.
example_tokens = input_text_processor(example_input_batch)

# Encode the input sequence.
encoder = Encoder(input_text_processor.vocabulary_size(),
                  embedding_dim, units)
example_enc_output, example_enc_state = encoder(example_tokens)

print(f'Input batch, shape (batch): {example_input_batch.shape}')
print(f'Input batch tokens, shape (batch, s): {example_tokens.shape}')
print(f'Encoder output, shape (batch, s, units): {example_enc_output.shape}')
print(f'Encoder state, shape (batch, units): {example_enc_state.shape}')

인코더는 위의 주석에 달려있듯이 디코더를 초기화할 state를 return합니다.  
이것은 RNN이 여러 호출을 통해 시퀀스를 처리할 수 있도록 상태를 반환하는 방법처럼 일반적인 방법으로,
앞으로 실습에서 더 많은 디코더 빌드 방법을 확인할 수 있을 것입니다.

### 어텐션(Attention)

**참고 : 원문에서는 attention head라고 표현하고 있습니다.**

디코더는 어텐션을 사용하여 입력 시퀀스의 일부에 선택적으로 포커싱합니다.  
어텐션은 각 예제에 대한 입력으로 벡터 시퀀스를 사용하고, 각 예제에 대해 어텐션 벡터를 반환합니다.  
이 어텐션 레이어는 `layers.GlobalAveragePoling1D`와 유사하지만, GAP와는 달리 가중 평균을 수행합니다.

이제 어텐션이 어떻게 동작하는지 살펴보도록 하겠습니다.

![ATTENTION_1](https://www.tensorflow.org/text/tutorials/images/attention_equation_1.jpg)
![ATTENTION_2](https://www.tensorflow.org/text/tutorials/images/attention_equation_2.jpg)

각각의 기호는 다음의 뜻과 같습니다.

* $s$ : 인코더 인덱스
* $t$ : 디코더 인덱스
* $\alpha_{ts}$ : 어텐션 가중치
* $h_s$ : 인코더의 아웃풋 (the attention "key" and "value" in transformer terminology).
* $h_t$ : 디코더의 상태 (the attention "query" in transformer terminology).
* $c_t$ : 결과값(컨텍스트 벡터)
* $a_t$ : 컨텍스트와 쿼리를 결합한 최종 값

방정식에 대해서 설명하면,

1. 어텐션 가중치($\alpha_{ts}$)를 계산하는 식입니다. 이 가중치는 인코더의 아웃풋 시퀀스들을 소프트맥스 결합하여 나온 값입니다.  
    **설명 추가 : 이 연산을 통해 나온 가중치로 어떠한 $_s$에 집중할 지를 결정합니다.**
2. 어텐션 가중치와 인코더의 아웃풋을 가중합해 컨텍스트 벡터를 계산합니다.  
    **설명 추가 : 매 디코더 연산 step마다 가중치가 바뀌어야 하므로 반복 수행합니다.**

마지막으로 $score$ 함수를 알아보겠습니다. 이 함수는 키-쿼리 쌍에 대한 스칼라 로짓 점수를 계산합니다.
두 가지 계산 방법이 존재하는데,

![ATTENTION_3](https://www.tensorflow.org/text/tutorials/images/attention_equation_4.jpg)

이번 튜토리얼에서는 [Bahdanau's additive attention](https://arxiv.org/pdf/1409.0473.pdf)를 사용하도록 하겠습니다. 텐서플로우에서는 `layers.Attention` 와
`layers.AdditiveAttention` 모두 구현이 되어 있다고 합니다.  
이 클래스는 'layers.Dense' 레이어 쌍에서 가중치 행렬을 처리하고 내장 구현을 호출합니다.

In [None]:
class BahdanauAttention(tf.keras.layers.Layer):
  def __init__(self, units):
    super().__init__()
    # For Eqn. (4), the  Bahdanau attention
    self.W1 = tf.keras.layers.Dense(units, use_bias=False)
    self.W2 = tf.keras.layers.Dense(units, use_bias=False)

    self.attention = tf.keras.layers.AdditiveAttention()

  def call(self, query, value, mask):
    shape_checker = ShapeChecker()
    shape_checker(query, ('batch', 't', 'query_units'))
    shape_checker(value, ('batch', 's', 'value_units'))
    shape_checker(mask, ('batch', 's'))

    # From Eqn. (4), `W1@ht`.
    w1_query = self.W1(query)
    shape_checker(w1_query, ('batch', 't', 'attn_units'))

    # From Eqn. (4), `W2@hs`.
    w2_key = self.W2(value)
    shape_checker(w2_key, ('batch', 's', 'attn_units'))

    query_mask = tf.ones(tf.shape(query)[:-1], dtype=bool)
    value_mask = mask

    context_vector, attention_weights = self.attention(
        inputs = [w1_query, value, w2_key],
        mask=[query_mask, value_mask],
        return_attention_scores = True,
    )
    shape_checker(context_vector, ('batch', 't', 'value_units'))
    shape_checker(attention_weights, ('batch', 't', 's'))

    return context_vector, attention_weights

### 어텐션 레이어 테스트
우선 `BahdanauAttention` 레이어를 생성합니다.

In [None]:
attention_layer = BahdanauAttention(units)

이 어텐션 레이어는 3개의 인풋을 필요합니다.

`query` : 디코더에 의해 생성되는 값  
`value` : 인코더의 아웃풋값.  
`mask` : 패딩을 제거하고자 한다면 다음과 같이 사용한다. `example_tokens != 0`

In [None]:
(example_tokens != 0).shape

The vectorized implementation of the attention layer lets you pass a batch of sequences of query vectors and a batch of sequence of value vectors. The result is:

A batch of sequences of result vectors the size of the queries.
A batch attention maps, with size (query_length, value_length).