# Evn

In [None]:
import os
import random
import shutil
import json
import zipfile
import math
import copy
import collections
import re

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as KK

from tqdm.notebook import tqdm

In [None]:
# random seed initialize
random_seed = 1234
random.seed(random_seed)
np.random.seed(random_seed)
tf.random.set_seed(random_seed)

In [None]:
!nvidia-smi

# Config

In [None]:
class Config(dict):
    """
    json을 config 형태로 사용하기 위한 Class
    :param dict: config dictionary
    """
    __getattr__ = dict.__getitem__
    __setattr__ = dict.__setitem__
    print(__getattr__)
    print(__setattr__)
    @classmethod
    def load(cls, file):
        """
        file에서 Config를 생성 함
        :param file: filename
        """
        with open(file, 'r') as f:
            config = json.loads(f.read())
            return Config(config)

In [None]:
# config 생성
# d_model: model hidden dim
# n_head: multi head attention head number
# d_head: multi head attention head dim
# dropout: dropout rate
# d_ff: feed forward dim
# norm_eps: layernormal epsilon
# n_layer: layer number
# n_seq: sequence max number
# n_vocab: vocab count
# i_pad: vocab pad id
config = Config({"d_model": 8,
                 "n_head": 2,
                 "d_head": 4,
                 "dropout": 0.1,
                 "d_ff": 32,
                 "norm_eps": 0.001,
                 "n_layer": 6,
                 "n_seq": 16,
                 "n_vocab": 16,
                 "i_pad": 0})
config

# Input

In [None]:
# 입력 문장
sentences = [
    ['나는 오늘 행복해', '나도 기분이 좋아'],
    # ['나는 오늘 기분이 좋아', '나도 매우 행복하다'],
]

In [None]:
# 각 문장을 띄어쓰기 단위로 분할
words = []
for pair in sentences:
    for sentence in pair:
        words.extend(sentence.split())

# 중복 단어 제거
words = list(dict.fromkeys(words))

# 각 단어별 고유한 번호 부여
word_to_id = {'[PAD]': 0, '[UNK]': 1, '[BOS]': 2, '[EOS]': 3}
for word in words:
    word_to_id[word] = len(word_to_id)

# 각 숫자별 단어 부여
id_to_word = {_id:word for word, _id in word_to_id.items()}

word_to_id, id_to_word

In [None]:
# Question과 Answer를 숫자료
question_list, answer_list = [], []

for pair in sentences:
    question_list.append([word_to_id[word] for word in pair[0].split()])
    answer_list.append([word_to_id[word] for word in pair[1].split()])

# 학습용 입력 데이터 생성
train_enc_inputs, train_dec_inputs, train_labels = [], [], []
for question, answer in zip(question_list, answer_list):
    train_enc_inputs.append(question)
    train_dec_inputs.append([word_to_id['[BOS]']] + answer)
    train_labels.append(answer + [word_to_id['[EOS]']])

# Encoder 입력의 길이를 모두 동일하게 변경 (최대길이 4)
for row in train_enc_inputs:
    row += [0] * (4 - len(row))

# Decoder 입력의 길이를 모두 동일하게 변경 (최대길이 6)
for row in train_dec_inputs:
    row += [0] * (6 - len(row))

# 정답의 길이를 모두 동일하게 변경 (최대길이 6)
for row in train_labels:
    row += [0] * (6 - len(row))

# numpy array로 변환/
train_enc_inputs = np.array(train_enc_inputs)
train_dec_inputs = np.array(train_dec_inputs)
train_labels = np.array(train_labels)

train_enc_inputs, train_dec_inputs, train_labels

In [None]:
# embedding with random weight
embed_weight = np.random.randint(-9, 10, (config.n_vocab, config.d_model)) / 10

embed = tf.keras.layers.Embedding(config.n_vocab, config.d_model, weights=[embed_weight])
embed_weight

In [None]:
# encoder hidden
hidden_enc = embed(train_enc_inputs)
hidden_enc

In [None]:
# decoder hidden
hidden_dec = embed(train_dec_inputs)
hidden_dec

# Mask

# 1. Pad Mask

In [None]:
train_enc_inputs, train_dec_inputs

In [None]:
def get_pad_mask(tokens, i_pad=0):
    """
    pad mask 계산하는 함수
    :param tokens: tokens (bs, n_seq)
    :param i_pad: id of pad
    :return mask: pad mask (pad: 1, other: 0)
    """
    #########################################
    # 0인 부분 확인
    mask = tf.math.equal(tokens, i_pad)
    # boolean -> float 32
    mask = tf.cast(mask, tf.float32)
    # expand dimension for n_seq
    mask = tf.expand_dims(mask, axis=1)
    #########################################
    return mask

In [None]:
enc_pad_mask = get_pad_mask(train_enc_inputs)
enc_pad_mask

# 2. Causal Mask

In [None]:
def get_causal_mask(tokens, i_pad=0):
    """
    causal mask 계산하는 함수
    :param tokens: tokens (bs, n_seq)
    :param i_pad: id of pad
    :return mask: causal and pad mask (causal or pad: 1, other: 0)
    """
    #########################################
    # 개수 조회
    n_seq = tf.shape(tokens)[1]
    # print(n_seq)
    # make ahead mask
    mask = 1 - tf.linalg.band_part(tf.ones((n_seq, n_seq)), -1, 0)
    # expand dim for bs
    mask = tf.expand_dims(mask, axis=0)
    # print(mask)
    # get pad_mask
    pad_mask = get_pad_mask(tokens, i_pad)
    # print(pad_mask)
    # mask all ahead_mask or pad_mask
    mask = tf.maximum(mask, pad_mask)
    #########################################
    return mask

In [None]:
dec_causal_mask = get_causal_mask(train_dec_inputs)
dec_causal_mask

# Mask 생성

In [None]:
# Encoder Self Attetnion mask
enc_self_mask = get_pad_mask(train_enc_inputs)
enc_self_mask

In [None]:
# Decoder Self Attetnion mask
dec_self_mask = get_causal_mask(train_dec_inputs)
dec_self_mask

In [None]:
# Encoder-Decoder Attetnion mask
enc_dec_mask = get_pad_mask(train_enc_inputs)
enc_dec_mask

# Scaled dot product attention

In [None]:
class ScaleDotProductAttention(tf.keras.layers.Layer):
    """
    Scale Dot Product Attention Class
    """
    def __init__(self, name="scale_dot_product_attention"):
        """
        생성자
        :param name: layer name
        """
        super().__init__(name=name)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: Q, K, V, attn_mask tuple
        :return attn_out: attention 실행 결과
        """
        #########################################
        Q, K, V, attn_mask = inputs
        # matmul Q, K (transpose_b=True)
        attn_score = tf.matmul(Q, K, transpose_b=True)
        # get scale = d_model ** 0.5
        scale = tf.math.sqrt(tf.cast(tf.shape(K)[-1], tf.float32))
        # print(attn_score)
        # divide by scale
        attn_scale = tf.math.divide(attn_score, scale)
        # print(attn_scale)
        # do mask (subtract 1e-9 for masked value)
        attn_scale -= 1.e9 * attn_mask
        # print(attn_scale)
        # calculate attention prob
        attn_prob = tf.nn.softmax(attn_scale, axis=-1)
        # print(attn_prob)
        # weighted sum of V
        attn_out = tf.matmul(attn_prob, V)
        return attn_out
        #########################################

In [None]:
# Encoder Self Attetnion
Q = hidden_enc
K = hidden_enc
V = hidden_enc

attention = ScaleDotProductAttention()
attn_out = attention((Q, K, V, enc_self_mask))
attn_out

In [None]:
# Decoder Self Attetnion
Q = hidden_dec
K = hidden_dec
V = hidden_dec

attn_out = attention((Q, K, V, dec_self_mask))
attn_out

In [None]:
# Encoder-Decoder Attetnion
Q = hidden_dec
K = hidden_enc
V = hidden_enc

attn_out = attention((Q, K, V, enc_dec_mask))
attn_out

# Multi Head Attention

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    """
    Multi Head Attention Class
    """
    def __init__(self, config, name="multi_head_attention"):
        """
        생성자
        :param config: Config 객체
        :param name: layer name
        """
        super().__init__(name=name)

        self.d_model = config.d_model
        self.n_head = config.n_head
        self.d_head = config.d_head

        # Q, K, V input dense layer
        self.W_Q = tf.keras.layers.Dense(config.n_head * config.d_head)
        self.W_K = tf.keras.layers.Dense(config.n_head * config.d_head)
        self.W_V = tf.keras.layers.Dense(config.n_head * config.d_head)
        # Scale Dot Product Attention class
        self.attention = ScaleDotProductAttention(name="self_attention")
        # output dense layer
        self.W_O = tf.keras.layers.Dense(config.d_model)

    def call(self, inputs):
        """
        layer 실행
        :param inputs: Q, K, V, attn_mask tuple
        :return attn_out: attention 실행 결과
        """
        #########################################
        Q, K, V, attn_mask = inputs
        # Q_m = self.W_Q(Q)
        # print(Q_m.shape)
        # Q_m = tf.reshape(Q_m, [-1, tf.shape(Q)[1], self.n_head, self.d_head])
        # print(Q_m.shape)
        # Q_m = tf.transpose(Q_m, [0, 2, 1, 3])
        # print(Q_m.shape)
        # build multihead Q, K, V
        Q_m = tf.transpose(tf.reshape(self.W_Q(Q), [-1, tf.shape(Q)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, Q_len, d_head)
        K_m = tf.transpose(tf.reshape(self.W_K(K), [-1, tf.shape(K)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, K_len, d_head)
        V_m = tf.transpose(tf.reshape(self.W_V(V), [-1, tf.shape(V)[1], self.n_head, self.d_head]), [0, 2, 1, 3])  # (bs, n_head, K_len, d_head)
        # print(Q_m.shape, K_m.shape, V_m.shape)÷
        # build multihead mask
        attn_mask_m = tf.expand_dims(attn_mask, axis=1)
        # print(attn_mask.shape, attn_mask_m.shape)
        # Scale Dot Product Attention with multi head Q, K, V, attn_mask
        attn_out_m = self.attention((Q_m, K_m, V_m, attn_mask_m))  # (bs, n_head, Q_len, d_head)
        # print(attn_out_m.shape)
        # transpose and reshape
        attn_out_t = tf.transpose(attn_out_m, perm=[0, 2, 1, 3])  # (bs, Q_len, n_head, d_head)
        # print(attn_out_t.shape)
        attn_out_c = tf.reshape(attn_out_t, [-1, tf.shape(Q)[1], config.n_head * config.d_head])  # (bs, Q_len, n_head * d_head)
        # print(attn_out_c.shape)
        # linear for output
        attn_out = self.W_O(attn_out_c) # (bs, Q_len, d_model)
        return attn_out
        #########################################

In [None]:
# Encoder Self Attetnion
Q = hidden_enc
K = hidden_enc
V = hidden_enc

attention = MultiHeadAttention(config)
attn_out = attention((Q, K, V, enc_self_mask))
attn_out

In [None]:
# Decoder Self Attetnion
Q = hidden_dec
K = hidden_dec
V = hidden_dec

attn_out = attention((Q, K, V, dec_self_mask))
attn_out

In [None]:
# Encoder-Decoder Attetnion
Q = hidden_dec
K = hidden_enc
V = hidden_enc

attn_out = attention((Q, K, V, enc_dec_mask))
attn_out