# Evn*

In [2]:
# imports
import argparse
import os
import random
import shutil
import json
import zipfile
import math
import copy
import collections
import re

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import sentencepiece as spm
import tensorflow as tf
import tensorflow.keras.backend as K

from tqdm.notebook import tqdm, trange

In [3]:
# 환경 설정
args = {
    # random seed value
    "seed": 1234
}
args = argparse.Namespace(**args)

print(args)

Namespace(seed=1234)


In [4]:
# random seed 설정
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

In [None]:
# gpu 사용량 확인
!nvidia-smi

In [7]:
# data dir
data_dir = '/content/drive/MyDrive/Data/nlp'
os.listdir(data_dir)

# Vocabulary*

In [None]:
# vocab loading
vocab = spm.SentencePieceProcessor()
vocab.load(os.path.join(data_dir, 'kowiki', 'kowiki_32000.model'))

# Config

In [None]:
args.d_model = 8  # d_model: model hidden dim
args.n_head = 2  # n_head: multi head attention head number
args.d_head = 4  # d_head: multi head attention head dim
args.dropout = 0.1  # dropout: dropout rate
args.d_ff = 32  # d_ff: feed forward dim
args.norm_eps = 1e-9  # norm_eps: layernormal epsilon
args.n_layer = 6  # n_layer: layer number
args.n_seq = 16  # n_seq: sequence max number
args.n_vocab = len(vocab)  # n_vocab: vocab count
args.i_pad = vocab.pad_id()  # i_pad: vocab pad id

args

# Inputs

In [None]:
# 입력 문장
sentences = [
    ['나는 오늘 행복해', '나도 기분이 매우 좋아'],
    # ['나는 오늘 기분이 좋아', '나도 매우 행복하다'],
]

In [None]:
# train source, target 데이터 생성
train_src_ids, tarin_tgt_ids = [], []
for pair in sentences:
    train_src_ids.append(vocab.encode_as_ids(pair[0]))
    tarin_tgt_ids.append(vocab.encode_as_ids(pair[1]))

train_src_ids, tarin_tgt_ids

In [None]:
# train enc_inputs, dec_inputs, dec_label 생성
train_enc_inputs, train_dec_inputs, train_dec_labels = [], [], []
for source_id, target_id in zip(train_src_ids, tarin_tgt_ids):
    train_enc_inputs.append(source_id)
    train_dec_inputs.append([vocab.bos_id()] + target_id)
    train_dec_labels.append(target_id + [vocab.eos_id()])

train_enc_inputs, train_dec_inputs, train_dec_labels

In [None]:
# 문장의 길이를 모두 동일하게 변경 (최대길이 5)
for row in train_enc_inputs:
    row += [0] * (5 - len(row))

# 문장의 길이를 모두 동일하게 변경 (최대길이 8)
for row in train_dec_inputs:
    row += [0] * (8 - len(row))

# 문장의 길이를 모두 동일하게 변경 (최대길이 8)
for row in train_dec_labels:
    row += [0] * (8 - len(row))

train_enc_inputs, train_dec_inputs, train_dec_labels

In [None]:
# numpy array로 변환
train_enc_inputs = np.array(train_enc_inputs)
train_dec_inputs = np.array(train_dec_inputs)
train_dec_labels = np.array(train_dec_labels)

train_enc_inputs, train_dec_inputs, train_dec_labels

In [None]:
# embedding with random weight
embed_weight = np.random.randint(-90, 100, (args.n_vocab, args.d_model)) / 100

embed = tf.keras.layers.Embedding(args.n_vocab, args.d_model, weights=[embed_weight])
embed_weight

In [None]:
# encoder hidden
hidden_enc = embed(train_enc_inputs)
hidden_enc

In [None]:
# decoder hidden
hidden_dec = embed(train_dec_inputs)
hidden_dec

# Mask

## PAD Mask

In [None]:
# inputs
tokens = train_enc_inputs
i_pad = args.i_pad

In [None]:
# pad: True, others: False
mask = tf.math.equal(tokens, i_pad)
mask

In [None]:
# boolean -> float 32
mask = tf.cast(mask, tf.float32)
mask

In [None]:
# expand dimension for Q n_seq
mask = tf.expand_dims(mask, axis=1)
mask

### 실습
- 아래 함수를 완성하세요.

In [None]:
def get_pad_mask(tokens, i_pad=0):
    """
    pad mask 계산하는 함수
    :param tokens: tokens (bs, n_seq)
    :param i_pad: id of pad
    :return mask: pad mask (pad: 1, other: 0)
    """
    # pad: True, others: False
    mask = tf.math.equal(tokens, i_pad)
    # boolean -> float 32
    mask = tf.cast(mask, tf.float32)
    # expand dimension for Q n_seq
    mask = tf.expand_dims(mask, axis=1)
    return mask

In [None]:
enc_pad_mask = get_pad_mask(train_enc_inputs)
enc_pad_mask

## Causal Mask

In [None]:
# inputs
tokens = train_dec_inputs
i_pad = args.i_pad

In [None]:
# n_seq 조회
n_seq = tf.shape(tokens)[1]
n_seq

In [None]:
# all one mask
mask = tf.ones((n_seq, n_seq))
mask

In [None]:
# make reverse causal mask
mask = tf.linalg.band_part(mask, -1, 0)
mask

In [None]:
# 0 -> 1, 1 -> 0
mask = 1 - mask
mask

In [None]:
# expand dim for bs
mask = tf.expand_dims(mask, axis=0)
mask

In [None]:
# get pad_mask
pad_mask = get_pad_mask(tokens, i_pad)
pad_mask

In [None]:
# mask all causal_mask or pad_mask
mask = tf.maximum(mask, pad_mask)
mask

### 실습
- 아래 함수를 완성하세요.

In [None]:
def get_causal_mask(tokens, i_pad=0):
    """
    causal mask 계산하는 함수
    :param tokens: tokens (bs, n_seq)
    :param i_pad: id of pad
    :return mask: causal and pad mask (causal or pad: 1, other: 0)
    """
    # n_seq 조회
    n_seq = tf.shape(tokens)[1]
    # all one mask
    mask = tf.ones((n_seq, n_seq))
    # make reverse causal mask
    mask = tf.linalg.band_part(mask, -1, 0)
    # 0 -> 1, 1 -> 0
    mask = 1 - mask
    # expand dim for bs
    mask = tf.expand_dims(mask, axis=0)
    # get pad_mask
    pad_mask = get_pad_mask(tokens, i_pad)
    # mask all causal_mask or pad_mask
    mask = tf.maximum(mask, pad_mask)
    return mask

In [None]:
dec_causal_mask = get_causal_mask(train_dec_inputs)
dec_causal_mask

<tf.Tensor: shape=(1, 8, 8), dtype=float32, numpy=
array([[[0., 1., 1., 1., 1., 1., 1., 1.],
        [0., 0., 1., 1., 1., 1., 1., 1.],
        [0., 0., 0., 1., 1., 1., 1., 1.],
        [0., 0., 0., 0., 1., 1., 1., 1.],
        [0., 0., 0., 0., 0., 1., 1., 1.],
        [0., 0., 0., 0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 0., 0., 0., 1.],
        [0., 0., 0., 0., 0., 0., 0., 1.]]], dtype=float32)>

## Mask 생성

In [None]:
# Encoder Self Attetnion mask
enc_self_mask = get_pad_mask(train_enc_inputs)
enc_self_mask

In [None]:
# Decoder Self Attetnion mask
dec_self_mask = get_causal_mask(train_dec_inputs)
dec_self_mask

In [None]:
# Encoder-Decoder Attetnion mask
enc_dec_mask = get_pad_mask(train_enc_inputs)
enc_dec_mask

# Scaled dot product attention

In [None]:
Q = hidden_enc
K = hidden_enc
V = hidden_enc
attn_mask = enc_self_mask

In [None]:
# matmul Q, K.T
attn_score = tf.matmul(Q, K, transpose_b=True)
attn_score

In [None]:
# d_k
d_k = tf.cast(tf.shape(K)[-1], tf.float32)
d_k

In [None]:
# scale = d_k ** 0.5
scale = tf.math.sqrt(d_k)
scale

In [None]:
# divide by scale
attn_scale = tf.math.divide(attn_score, scale)
attn_scale

In [None]:
# do mask (subtract 1e-9 for masked value)
attn_scale -= 1.e9 * attn_mask
attn_scale

In [None]:
# calculate attention prob
attn_prob = tf.nn.softmax(attn_scale, axis=-1)
attn_prob

In [None]:
# weighted sum of V
attn_out = tf.matmul(attn_prob, V)
attn_out

### 실습
- 아래 Class를 완성하세요.

In [None]:
class ScaleDotProductAttention(tf.keras.layers.Layer):
    """
    Scale Dot Product Attention Class
    """
    def __init__(self, name="scale_dot_product_attention"):
        """
        생성자
        :param name: layer name
        """
        super().__init__(name=name)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: Q, K, V, attn_mask tuple
        :return attn_out: attention 실행 결과
        """
        Q, K, V, attn_mask = inputs
        # matmul Q, K.T
        attn_score = tf.matmul(Q, K, transpose_b=True)
        # d_k
        d_k = tf.cast(tf.shape(K)[-1], tf.float32)
        # scale = d_k ** 0.5
        scale = tf.math.sqrt(d_k)
        # divide by scale
        attn_scale = tf.math.divide(attn_score, scale)
        # do mask (subtract 1e-9 for masked value)
        attn_scale -= 1.e9 * attn_mask
        # calculate attention prob
        attn_prob = tf.nn.softmax(attn_scale, axis=-1)
        # weighted sum of V
        attn_out = tf.matmul(attn_prob, V)
        return attn_out

In [None]:
# Encoder Self Attetnion
Q = hidden_enc
K = hidden_enc
V = hidden_enc

attention = ScaleDotProductAttention()
attn_out = attention((Q, K, V, enc_self_mask))
attn_out

In [None]:
# Decoder Self Attetnion
Q = hidden_dec
K = hidden_dec
V = hidden_dec

attn_out = attention((Q, K, V, dec_self_mask))
attn_out

In [None]:
# Encoder-Decoder Attetnion
Q = hidden_dec
K = hidden_enc
V = hidden_enc

attn_out = attention((Q, K, V, enc_dec_mask))
attn_out

# Multi Head Attention

In [None]:
# Q, K, V input dense layer
W_Q = tf.keras.layers.Dense(args.n_head * args.d_head)
W_K = tf.keras.layers.Dense(args.n_head * args.d_head)
W_V = tf.keras.layers.Dense(args.n_head * args.d_head)
# Scale Dot Product Attention class
attention = ScaleDotProductAttention(name="self_attention")
# output dense layer
W_O = tf.keras.layers.Dense(args.d_model)

In [None]:
Q = hidden_enc
K = hidden_enc
V = hidden_enc
attn_mask = enc_self_mask

In [None]:
# split
Q_m = W_Q(Q)  # (bs, Q_len, d_model) -> (bs, Q_len, n_head * d_head)
Q_m = tf.reshape(Q_m, [-1, tf.shape(Q)[1], args.n_head, args.d_head])  # (bs, Q_len, n_head * d_head) -> (bs, Q_len, n_head,  d_head)
Q_m = tf.transpose(Q_m, [0, 2, 1, 3])  # (bs, Q_len, n_head,  d_head) -> (bs, n_head, Q_len,  d_head)
Q_m

In [None]:
# build multihead Q
Q_m = tf.transpose(tf.reshape(W_Q(Q), [-1, tf.shape(Q)[1], args.n_head, args.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
Q_m.shape

In [None]:
# build multihead K
K_m = tf.transpose(tf.reshape(W_K(K), [-1, tf.shape(K)[1], args.n_head, args.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
K_m.shape

In [None]:
# build multihead V
V_m = tf.transpose(tf.reshape(W_V(V), [-1, tf.shape(V)[1], args.n_head, args.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
V_m.shape

In [None]:
# build multihead mask
attn_mask_m = tf.expand_dims(attn_mask, axis=1)
attn_mask_m

In [None]:
# Scale Dot Product Attention with multi head Q, K, V, attn_mask
attn_out_m = attention((Q_m, K_m, V_m, attn_mask_m))  # (bs, n_head, Q_len, d_head)
attn_out_m

In [None]:
# transpose
attn_out_t = tf.transpose(attn_out_m, perm=[0, 2, 1, 3])  # (bs, n_head, Q_len, d_head) -> (bs, Q_len, n_head, d_head)
attn_out_t

In [None]:
# reshape
attn_out_c = tf.reshape(attn_out_t, [-1, tf.shape(Q)[1], args.n_head * args.d_head])  # (bs, Q_len, n_head, d_head) -> (bs, Q_len, n_head * d_head)
attn_out_c

In [None]:
# linear for output
attn_out = W_O(attn_out_c) # (bs, Q_len, n_head * d_head) -> (bs, Q_len, d_model)
attn_out

### 실습
- 아래 Class를 완성하세요.

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    """
    Multi Head Attention Class
    """
    def __init__(self, args, name="multi_head_attention"):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.d_model = args.d_model
        self.n_head = args.n_head
        self.d_head = args.d_head

        # Q, K, V input dense layer
        self.W_Q = tf.keras.layers.Dense(self.n_head * self.d_head)
        self.W_K = tf.keras.layers.Dense(self.n_head * self.d_head)
        self.W_V = tf.keras.layers.Dense(self.n_head * self.d_head)
        # Scale Dot Product Attention class
        self.attention = ScaleDotProductAttention(name="self_attention")
        # output dense layer
        self.W_O = tf.keras.layers.Dense(self.d_model)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: Q, K, V, attn_mask tuple
        :return attn_out: attention 실행 결과
        """
        Q, K, V, attn_mask = inputs
        # build multihead Q, K, V
        Q_m = tf.transpose(tf.reshape(self.W_Q(Q), [-1, tf.shape(Q)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
        K_m = tf.transpose(tf.reshape(self.W_K(K), [-1, tf.shape(K)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, K_len, d_head)
        V_m = tf.transpose(tf.reshape(self.W_V(V), [-1, tf.shape(V)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, K_len, d_head)
        # build multihead mask
        attn_mask_m = tf.expand_dims(attn_mask, axis=1)
        # Scale Dot Product Attention with multi head Q, K, V, attn_mask
        attn_out_m = self.attention((Q_m, K_m, V_m, attn_mask_m))  # (bs, n_head, Q_len, d_head)
        # transpose
        attn_out_t = tf.transpose(attn_out_m, perm=[0, 2, 1, 3])   # (bs, n_head, Q_len, d_head) -> (bs, Q_len, n_head, d_head)
        # reshape
        attn_out_c = tf.reshape(attn_out_t, [-1, tf.shape(Q)[1], self.n_head * self.d_head])  # (bs, Q_len, n_head, d_head) -> (bs, Q_len, n_head * d_head)
        # linear for output
        attn_out = self.W_O(attn_out_c) # (bs, Q_len, n_head * d_head) -> (bs, Q_len, d_model)
        return attn_out

In [None]:
# Encoder Self Attetnion
Q = hidden_enc
K = hidden_enc
V = hidden_enc

attention = MultiHeadAttention(args)
attn_out = attention((Q, K, V, enc_self_mask))
attn_out

In [None]:
# Decoder Self Attetnion
Q = hidden_dec
K = hidden_dec
V = hidden_dec

attn_out = attention((Q, K, V, dec_self_mask))
attn_out

In [None]:
# Encoder-Decoder Attetnion
Q = hidden_dec
K = hidden_enc
V = hidden_enc

attn_out = attention((Q, K, V, enc_dec_mask))
attn_out

# Feed Forward

In [None]:
W_1 = tf.keras.layers.Dense(args.d_ff, activation=tf.nn.relu)
W_2 = tf.keras.layers.Dense(args.d_model)

In [None]:
inputs = hidden_enc

In [None]:
 # linear W_1 and W_2
ff_val = W_1(inputs)
ff_val

In [None]:
ff_val = W_2(ff_val)
ff_val

### 실습
- 아래 Class를 완성하세요.

In [None]:
class PositionWiseFeedForward(tf.keras.layers.Layer):
    """
    Position Wise Feed Forward Class
    """
    def __init__(self, args, name="feed_forward"):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.W_1 = tf.keras.layers.Dense(args.d_ff, activation=tf.nn.relu)
        self.W_2 = tf.keras.layers.Dense(args.d_model)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: inputs
        :return ff_val: feed forward 실행 결과
        """
        # linear W_1 and W_2
        ff_val = self.W_1(inputs)
        ff_val = self.W_2(ff_val)
        return ff_val

In [None]:
# feed-forward class 동작 확인
feed_forward = PositionWiseFeedForward(args)
ff_val = feed_forward(hidden_enc)
ff_val.shape

# LayerNormal
- https://arxiv.org/abs/1607.06450

In [None]:
# 큰 hidden 생성
hidden = np.array([[1, 2, 3],
                   [11, 22, 33],
                   [111, 222, 333]]).astype(np.float32)
hidden

In [None]:
# layer_normal 실행
layer_norm = tf.keras.layers.LayerNormalization()
layer_norm(hidden)

In [None]:
# weights
layer_norm.get_weights()

In [None]:
# 평균 값
mean = np.mean(hidden, axis=-1, keepdims=True)
mean

In [None]:
# sqrt(var - epsiolon)
sigma = np.sqrt(np.var(hidden, axis=-1, keepdims=True) + 0.001)
sigma

In [None]:
# layer normal 계산
(hidden - mean) / sigma

# Encoder Layer

In [None]:
self_attention = MultiHeadAttention(args)
norm1 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

ffn = PositionWiseFeedForward(args)
norm2 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

dropout = tf.keras.layers.Dropout(args.dropout)

In [None]:
enc_hidden = hidden_enc
self_mask = enc_self_mask

In [None]:
# self attention
self_attn_val = self_attention((enc_hidden, enc_hidden, enc_hidden, self_mask))
self_attn_val

In [None]:
# add and layer normal
norm1_val = norm1(enc_hidden + dropout(self_attn_val))
norm1_val

In [None]:
# feed forward
ffn_val = ffn(norm1_val)
ffn_val

In [None]:
# add and layer normal
enc_out = norm2(norm1_val + dropout(ffn_val))
enc_out

### 실습
- 아래 Class를 완성하세요.

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    """
    Encoder Layer Class
    """
    def __init__(self, args, name='encoder_layer'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.self_attention = MultiHeadAttention(args)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

        self.ffn = PositionWiseFeedForward(args)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

        self.dropout = tf.keras.layers.Dropout(args.dropout)
 
    def call(self, inputs):
        """
        layer 실행
        :param inputs: enc_hidden, self_mask tuple
        :return enc_out: EncoderLayer 실행 결과
        """
        enc_hidden, self_mask = inputs
        # self attention
        self_attn_val = self.self_attention((enc_hidden, enc_hidden, enc_hidden, self_mask))
        # add and layer normal
        norm1_val = self.norm1(enc_hidden + self.dropout(self_attn_val))
        
        # feed forward
        ffn_val = self.ffn(norm1_val)
        # add and layer normal
        enc_out = self.norm2(norm1_val + self.dropout(ffn_val))

        return enc_out

In [None]:
# EncoderLayer 기능 확인
encoder_layer = EncoderLayer(args)
enc_out = encoder_layer((hidden_enc, enc_self_mask))
enc_out.shape

# Decoder Layer

In [None]:
self_attention = MultiHeadAttention(args)
norm1 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

ende_attn = MultiHeadAttention(args)
norm2 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

ffn = PositionWiseFeedForward(args)
norm3 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

dropout = tf.keras.layers.Dropout(args.dropout)

In [None]:
dec_hidden = hidden_dec
self_mask = dec_self_mask
ende_mask = enc_dec_mask

In [None]:
# self attention
self_attn_val = self_attention((dec_hidden, dec_hidden, dec_hidden, self_mask))
self_attn_val

In [None]:
# add and layer normal
norm1_val = norm1(dec_hidden + dropout(self_attn_val))
norm1_val

In [None]:
# encoder and decoder attention
ende_attn_val = ende_attn((norm1_val, enc_out, enc_out, ende_mask))
ende_attn_val

In [None]:
# add and layer normal
norm2_val = norm2(norm1_val + dropout(ende_attn_val))
norm2_val

In [None]:
# feed forward
ffn_val = ffn(norm2_val)
ffn_val

In [None]:
# add and layer normal
dec_out = norm3(norm2_val + dropout(ffn_val))
dec_out

### 실습
- 아래 Class를 완성하세요.

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    """
    Decoder Layer Class
    """
    def __init__(self, args, name='decoder_layer'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.self_attention = MultiHeadAttention(args)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

        self.ende_attn = MultiHeadAttention(args)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

        self.ffn = PositionWiseFeedForward(args)
        self.norm3 = tf.keras.layers.LayerNormalization(epsilon=args.norm_eps)

        self.dropout = tf.keras.layers.Dropout(args.dropout)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: dec_hidden, enc_out, self_mask, ende_mask tuple
        :return dec_out: DecoderLayer 실행 결과
        """
        dec_hidden, enc_out, self_mask, ende_mask = inputs
        # self attention
        self_attn_val = self.self_attention((dec_hidden, dec_hidden, dec_hidden, self_mask))
        # add and layer normal
        norm1_val = self.norm1(dec_hidden + self.dropout(self_attn_val))

        # encoder and decoder attention
        ende_attn_val = self.ende_attn((norm1_val, enc_out, enc_out, ende_mask))
        # add and layer normal
        norm2_val = self.norm2(norm1_val + self.dropout(ende_attn_val))

        # feed forward
        ffn_val = self.ffn(norm2_val)
        # add and layer normal
        dec_out = self.norm3(norm2_val + self.dropout(ffn_val))

        return dec_out

In [None]:
# Decoder 실행
decoder_layer = DecoderLayer(args)
dec_out = decoder_layer((hidden_dec, enc_out, dec_self_mask, enc_dec_mask))
dec_out.shape

# Weight Shared Embedding

In [None]:
initializer = tf.keras.initializers.TruncatedNormal(stddev=args.d_model ** -0.5)
shared_weights = initializer(shape=(args.n_vocab, args.d_model))
shared_weights

### embedding

In [None]:
inputs = train_enc_inputs
inputs

In [None]:
# lookup by gather
embed = tf.gather(shared_weights, tf.cast(inputs, tf.int32))
embed

In [None]:
# muliply d_model ** 0.5
embed *= args.d_model ** 0.5
embed

### linear

In [None]:
inputs = hidden_dec

In [None]:
# matmul inputs, shared_weights (transpose_b=True)
outputs = tf.matmul(inputs, shared_weights, transpose_b=True)
outputs

### 실습
- 아래 Class를 완성하세요.

In [None]:
class SharedEmbedding(tf.keras.layers.Layer):
    """
    Weighed Shaed Embedding Class
    """
    def __init__(self, args, name='weight_shared_embedding'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.n_vocab = args.n_vocab
        self.d_model = args.d_model
    
    def build(self, input_shape):
        """
        shared weight 생성
        :param input_shape: Tensor Shape (not used)
        """
        with tf.name_scope('shared_embedding_weight'):
            self.shared_weights = self.add_weight(
                'weights',
                shape=[self.n_vocab, self.d_model],
                initializer=tf.keras.initializers.TruncatedNormal(stddev=self.d_model ** -0.5)
            )

    def call(self, inputs, mode='embedding'):
        """
        layer 실행
        :param inputs: 입력
        :param mode: 실행 모드
        :return: embedding or linear 실행 결과
        """
        # mode가 embedding일 경우 embedding lookup 실행
        if mode == 'embedding':
            return self._embedding(inputs)
        # mode가 linear일 경우 linear 실행
        elif mode == 'linear':
            return self._linear(inputs)
        # mode가 기타일 경우 오류 발생
        else:
            raise ValueError(f'mode {mode} is not valid.')
    
    def _embedding(self, inputs):
        """
        embedding lookup
        :param inputs: 입력
        """
        # lookup by gather
        embed = tf.gather(self.shared_weights, tf.cast(inputs, tf.int32))
        # muliply d_model ** 0.5
        embed *= self.d_model ** 0.5
        return embed

    def _linear(self, inputs):  # (bs, n_seq, d_model)
        """
        linear 실행
        :param inputs: 입력
        """
        # matmul inputs, shared_weights (transpose_b=True)
        outputs = tf.matmul(inputs, self.shared_weights, transpose_b=True)
        return outputs

In [None]:
embedding = SharedEmbedding(args)
hidden_dec = embedding(train_dec_inputs)
hidden_dec.shape

In [None]:
linear_outputs = embedding(hidden_dec, mode="linear")
linear_outputs.shape

# Postional Encoding

### Sinusoid encoding

In [None]:
# calculate exps
exs = np.array([2 * (i_ang // 2) / args.d_model for i_ang in range(args.d_model)])
exs

In [None]:
# calculate power
angles = np.power(10000, exs)
angles

In [None]:
# make position
pos = np.array([[i] for i in range(n_seq)])
pos

In [None]:
# position angle
pos_encoding = pos / angles
pos_encoding

In [None]:
# sin even number
pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
pos_encoding

In [None]:
# cos odd number
pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])
pos_encoding

In [None]:
# make embedding with sinusoid encoding
embedding = tf.keras.layers.Embedding(args.n_seq, args.d_model, trainable=False, weights=[pos_encoding])

### Position encoding lookup

In [None]:
inputs = train_enc_inputs

In [None]:
# make position (0...n_seq)
position = tf.math.cumsum(tf.ones_like(inputs), axis=1, exclusive=True)
position

In [None]:
# embedding lookup
embed = embedding(position)
embed

### 실습
- 아래 Class를 완성하세요.

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
    """
    Positional Embedding Class
    """
    def __init__(self, args, name='position_embedding'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super().__init__(name=name)
        
        pos_encoding = PositionalEmbedding.get_sinusoid_encoding(args.n_seq, args.d_model)
        self.embedding = tf.keras.layers.Embedding(args.n_seq, args.d_model, trainable=False, weights=[pos_encoding])

    def call(self, inputs):
        """
        layer 실행
        :param inputs: 입력
        :return embed: positional embedding lookup 결과
        """
        # make position (0...n_seq)
        position = tf.math.cumsum(tf.ones_like(inputs), axis=1, exclusive=True)
        position = tf.cast(position, tf.int32)
        # embedding lookup
        embed = self.embedding(position)
        return embed

    @staticmethod
    def get_sinusoid_encoding(n_seq, d_model):
        """
        sinusoid encoding 생성
        :param n_seq: sequence number
        :param n_seq: model hidden dimension
        :return: positional encoding table
        """
        # calculate exp
        exs = np.array([2 * (i_ang // 2) / d_model for i_ang in range(d_model)])
        # calculate power
        angles = np.power(10000, exs)
        # make position
        pos = np.array([[i] for i in range(n_seq)])
        # position angle
        pos_encoding = pos / angles
        # sin even number
        pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
        # print(pos_encoding)
        # cos odd number
        pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])
        # print(pos_encoding)
        return tf.cast(pos_encoding, tf.float32)

In [None]:
# position encoding 확인
pos_encoding = PositionalEmbedding.get_sinusoid_encoding(6, 8)
pos_encoding

In [None]:
# display
plt.pcolormesh(pos_encoding, cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, args.d_model))
plt.ylabel('Position')
plt.colorbar()
plt.show()

In [None]:
# PositionalEmbedding 클래스 시험
pos_embedding = PositionalEmbedding(args)
dec_pos = pos_embedding(train_enc_inputs)
dec_pos.shape

In [None]:
# 512x512 position encoding table 생성
pos_encoding = PositionalEmbedding.get_sinusoid_encoding(512, 512)
# display
plt.pcolormesh(pos_encoding, cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.show()

# Transformer

In [None]:
embedding = SharedEmbedding(args)
position = PositionalEmbedding(args)
        
encoder_layers = [EncoderLayer(args, name=f'encoder_layer_{i}') for i in range(args.n_layer)]
decoder_layers = [DecoderLayer(args, name=f'decoder_layer_{i}') for i in range(args.n_layer)]

dropout = tf.keras.layers.Dropout(args.dropout)

In [None]:
enc_tokens = train_enc_inputs
dec_tokens = train_dec_inputs

In [None]:
# encoder self attention mask
enc_self_mask = get_pad_mask(enc_tokens, args.i_pad)
enc_self_mask

In [None]:
# decoder self attention mask
dec_self_mask = get_causal_mask(dec_tokens, args.i_pad)
dec_self_mask

In [None]:
# encoder and decoder attention mask
enc_dec_mask = get_pad_mask(enc_tokens, args.i_pad)
enc_dec_mask

In [None]:
# enc_tokens embedding lookup
enc_hidden = embedding(enc_tokens) + position(enc_tokens)
enc_hidden = dropout(enc_hidden)
enc_hidden

In [None]:
# call encoder layers
for encoder_layer in encoder_layers:
    enc_hidden = encoder_layer((enc_hidden, enc_self_mask))
enc_hidden

In [None]:
# dec_tokens embedding lookup
dec_hidden = embedding(dec_tokens) + position(dec_tokens)
dec_hidden = dropout(dec_hidden)
dec_hidden

In [None]:
# call decoder layers
for decoder_layer in decoder_layers:
    dec_hidden = decoder_layer((dec_hidden, enc_hidden, dec_self_mask, enc_dec_mask))
dec_hidden

In [None]:
# call weight shared embedding (model=linear)
logits = embedding(dec_hidden, mode='linear')
logits

### 실습
- 아래 Class를 완성하세요.

In [None]:
class Transformer(tf.keras.Model):
    """
    Transformer Class
    """
    def __init__(self, args, name='transformer'):
        """
        생성자
        :param args: Args 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.i_pad = args.i_pad
        self.embedding = SharedEmbedding(args)
        self.position = PositionalEmbedding(args)
        
        self.encoder_layers = [EncoderLayer(args, name=f'encoder_layer_{i}') for i in range(args.n_layer)]
        self.decoder_layers = [DecoderLayer(args, name=f'decoder_layer_{i}') for i in range(args.n_layer)]

        self.dropout = tf.keras.layers.Dropout(args.dropout)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: enc_tokens, dec_tokens tuple
        :return logits: dec_tokens에 대한 다음 토큰 예측 결과 logits
        """
        enc_tokens, dec_tokens = inputs
        # encoder self attention mask
        enc_self_mask = get_pad_mask(enc_tokens, self.i_pad)
        # decoder self attention mask
        dec_self_mask = get_causal_mask(dec_tokens, self.i_pad)
        # encoder and decoder attention mask
        enc_dec_mask = get_pad_mask(enc_tokens, self.i_pad)

        # enc_tokens, dec_tokens embedding lookup
        enc_hidden = self.embedding(enc_tokens) + self.position(enc_tokens)
        enc_hidden = self.dropout(enc_hidden)

        # call encoder layers
        for encoder_layer in self.encoder_layers:
            enc_hidden = encoder_layer((enc_hidden, enc_self_mask))
        
        # dec_tokens embedding lookup
        dec_hidden = self.embedding(dec_tokens) + self.position(dec_tokens)
        dec_hidden = self.dropout(dec_hidden)

        # call decoder layers
        for decoder_layer in self.decoder_layers:
            dec_hidden = decoder_layer((dec_hidden, enc_hidden, dec_self_mask, enc_dec_mask))

        # call weight shared embedding (model=linear)
        logits = self.embedding(dec_hidden, mode='linear')
        return logits

In [None]:
# Transformer 기능 확인. 최종 결과가 (bs, n_seq(dec), n_vocab)
transformer = Transformer(args)
logits = transformer((train_enc_inputs, train_dec_inputs))
logits.shape

# 실습
- 지금까지 작성한 Transformer의 구성요소를 정리해서 아래에 Transformer 모델을 완성하세요.