<a href="https://colab.research.google.com/github/tanakakao/test/blob/main/transformer_keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Layer, Input, Dense, Conv1D, Activation, Dropout, LayerNormalization, Reshape, Embedding, MultiHeadAttention
from tensorflow.keras import activations
import numpy as np
import math

In [58]:
class FeedForwardNetwork(Layer):
    '''
    Position-wise Feedforward Neural Network
    transformer blockで使用される全結合層
    '''
    def __init__(self, hidden_dim, drop_rate):
        super().__init__()
        # 2層構造
        # 1層目：チャンネル数を増加させる
        self.filter_dense_layer = Dense(hidden_dim * 4, use_bias=True, activation='relu')
        
        # 2層目：元のチャンネル数に戻す
        self.output_dense_layer = Dense(hidden_dim, use_bias=True)
        self.drop = Dropout(drop_rate)

    def call(self, x, training):
        '''
        入力と出力で形が変わらない
        [batch_size, token_num, hidden_dim]
        '''
        
        # [batch_size, token_num, hidden_dim] -> [batch_size, token_num, 4*hidden_dim]
        x = self.filter_dense_layer(x)
        x = self.drop(x, training=training)
        
        # [batch_size, token_num, 4*hidden_dim] -> [batch_size, token_num, hidden_dim]
        return self.output_dense_layer(x)

In [223]:
class ResidualNormalizationWrapper(Layer):
    '''
    残差接続
    output: input + SubLayer(input)
    '''
    def __init__(self, layer, drop_rate):
        super().__init__()
        self.layer = layer # SubLayer : ここではAttentionかFFN
        self.layer_normalization = LayerNormalization()
        self.drop = Dropout(drop_rate)

    def call(self, x, training, value=None, attention_mask=None, return_attention_scores=None):
        """
        AttentionもFFNも入力と出力で形が変わらない
        [batch_size, token_num, hidden_dim]
        """
        
        params = {}
        if attention_mask is not None:
            params['attention_mask'] = attention_mask
        if return_attention_scores:
            params['return_attention_scores'] = return_attention_scores
        
        out = self.layer_normalization(x)
        if value is not None:
            params['value'] = out

        if return_attention_scores:
            out, attn_weights = self.layer(out,training=training, **params)
            out = self.drop(out, training=training)
            return x + out, attn_weights
        else:
            out = self.layer(out,training=training, **params)
            out = self.drop(out, training=training)
            return x + out

In [224]:
class AddPositionalEncoding(Layer):
    '''
    入力テンソルに対し、位置の情報を付与して返すレイヤー
    see: https://arxiv.org/pdf/1706.03762.pdf

    PE_{pos, 2i}   = sin(pos / 10000^{2i / d_model})
    PE_{pos, 2i+1} = cos(pos / 10000^{2i / d_model})
    '''
    def call(self, inputs):
        fl_type = inputs.dtype
        batch_size, max_length, depth = tf.unstack(tf.shape(inputs))

        depth_counter = tf.range(depth) // 2 * 2  # 0, 0, 2, 2, 4, ...
        depth_matrix = tf.tile(tf.expand_dims(depth_counter, 0), [max_length, 1])  # [max_length, depth]
        depth_matrix = tf.pow(10000.0, tf.cast(depth_matrix / depth, fl_type))  # [max_length, depth]

        # cos(x) == sin(x + π/2)
        phase = tf.cast(tf.range(depth) % 2, fl_type) * math.pi / 2  # 0, π/2, 0, π/2, ...
        phase_matrix = tf.tile(tf.expand_dims(phase, 0), [max_length, 1])  # [max_length, depth]

        pos_counter = tf.range(max_length)
        pos_matrix = tf.cast(tf.tile(tf.expand_dims(pos_counter, 1), [1, depth]), fl_type)  # [max_length, depth]

        positional_encoding = tf.sin(pos_matrix / depth_matrix + phase_matrix)
        # [batch_size, max_length, depth]
        positional_encoding = tf.tile(tf.expand_dims(positional_encoding, 0), [batch_size, 1, 1])

        return inputs + positional_encoding

In [225]:
class TokenEmbedding(Layer):
    def __init__(self, vocab_size, embedding_dim, embeddings=None, PAD_ID=0):
        # vocab_size: 単語の総数
        # embedding_dim: Embeddingの次数
        super().__init__()
        self.pad_id = PAD_ID
        self.embedding_dim = embedding_dim
        
        self.embedding = Embedding(vocab_size, embedding_dim)
        
        if embeddings is None:
            self.embedding = Embedding(input_dim=vocab_size,
                                       output_dim=embedding_dim,
                                       mask_zero=True,
                                       trainable=True)
        else:
            self.embedding = Embedding(input_dim=embeddings.shape[0],
                                       output_dim=embeddings.shape[1],
                                       mask_zero=True,
                                       trainable=True,
                                       weights=[embeddings])

    def call(self, x):
        embedding = self.embedding(x)
        return embedding * self.embedding_dim ** 0.5

In [226]:
class TransformerBlock(Layer):
    """
    transformer block : before ->[attention -> FF]-> next
    それぞれ残差接続とLayerNormalizationの処理が含まれる
    """
    def __init__(self, hidden_dim, heads_num, drop_rate=0.1):
        """
        hidden_numはheads_numで割り切れえる値とすること
        """
        super().__init__()
        self.atten = ResidualNormalizationWrapper(
            layer = MultiHeadAttention(key_dim = 2, num_heads = heads_num, dropout = drop_rate),
            drop_rate = drop_rate)
        
        self.ffn = ResidualNormalizationWrapper(
            layer = FeedForwardNetwork(hidden_dim = hidden_dim, drop_rate = drop_rate),
            drop_rate = drop_rate)
    
    def call(self, input, training, attention_mask=None, return_attention_scores=False):
        """
        入力と出力で形式が変わらない
        [batch_size, token_num, hidden_dim]
        """
      
        if return_attention_scores:
            x, attn_weights = self.atten(x=input,value=input,return_attention_scores=return_attention_scores,attention_mask=None, training=training)
            x = self.ffn(x)
            return x, attn_weights
        else:
            x = self.atten(x=input,value=input, return_attention_scores=return_attention_scores,attention_mask=None, training=training)
            x = self.ffn(x)
            return x

In [227]:
class Encoder(Layer):
    '''
    TransformerのEncoder
    '''
    def __init__(
            self,
            vocab_size, # 単語の総数
            hopping_num, # Multi-head Attentionの繰り返し数
            heads_num, # Multi-head Attentionのヘッド数
            hidden_dim, # Embeddingの次数
            token_num, # 系列長(文章中のトークン数)
            drop_rate, # ドロップアウトの確率
            embeddings=None
    ):
        super().__init__()
        self.hopping_num = hopping_num
        
        # Embedding層
        self.token_embedding = TokenEmbedding(vocab_size, hidden_dim, embeddings)
        # Position Embedding
        self.add_position_embedding = AddPositionalEncoding()
        self.input_dropout_layer = Dropout(drop_rate)

        # Multi-head Attentionの繰り返し(hopping)のリスト
        self.attention_block_list = [TransformerBlock(hidden_dim, heads_num) for _ in range(hopping_num)]
        self.output_normalization = LayerNormalization()

    def call(
            self,
            input,
            training,
            attention_mask=None,
            return_attention_scores=False
    ):
        '''
        input: 入力 [batch_size, length]
        memory: 入力 [batch_size, length]
        attention_mask: attention weight に適用される mask
            [batch_size, 1, q_length, k_length] 
            pad 等無視する部分が 0 となるようなもの(Decoderで使用)
        return_attention_scores : attention weightを出力するか
        出力 [batch_size, length, hidden_dim]
        '''
        # [batch_size, token_num] -> [batch_size, token_num, hidden_dim]
        embedded_input = self.token_embedding(input)
        # Positional Embedding
        embedded_input = self.add_position_embedding(embedded_input)
        query = self.input_dropout_layer(embedded_input, training=training)
        
        
        if return_attention_scores:
            # MultiHead Attentionを繰り返し適用
            for i in range(self.hopping_num):
                query, atten_weights = self.attention_block_list[i](query, training, attention_mask, return_attention_scores)

            # [batch_size, token_num, hidden_dim]
            return self.output_normalization(query), atten_weights
        else:
            # MultiHead Attentionを繰り返し適用
            for i in range(self.hopping_num):
                query = self.attention_block_list[i](query, training, attention_mask, return_attention_scores)

            # [batch_size, token_num, hidden_dim]
            return self.output_normalization(query)

In [228]:
class AttentionClassifier(Model):
    def __init__(
            self,
            vocab_size, # 単語の総数
            hopping_num, # Multi-head Attentionの繰り返し数
            heads_num, # Multi-head Attentionのヘッド数
            hidden_dim, # Embeddingの次数
            token_num, # 系列長(文章中のトークン数)
            drop_rate, # ドロップアウトの確率
            NUMLABELS, # クラス数
            embeddings = None,
            PAD_ID = 0
    ):
        super().__init__()
        self.PAD_ID = PAD_ID
        
        self.encoder = Encoder(vocab_size, hopping_num, heads_num, hidden_dim, token_num, drop_rate, embeddings)
        self.dense1 = Dense(hidden_dim, activation='tanh')
        self.dropout1 = Dropout(drop_rate)   
        self.final_layer = Dense(NUMLABELS, activation='softmax')

    def call(self, x, training, return_attention_scores=False):
        self_attention_mask=self._create_enc_attention_mask(x)
        
        # [batch_size, token_num] -> [batch_size, token_num, hidden_dim]
        if return_attention_scores:
            enc_output, atten_weights = self.encoder(input=x,training=training, attention_mask=self_attention_mask,return_attention_scores=return_attention_scores)
        else:
            enc_output = self.encoder(input=x,training=training, attention_mask=self_attention_mask,return_attention_scores=return_attention_scores)
        
        # 文頭の重みを使用 [batch_size, 0, hidden_dim]
        # [batch_size, hidden_dim] -> [batch_size, hidden_dim]
        enc_output = self.dense1(enc_output[:, 0, :])
        enc_output = self.dropout1(enc_output)
        
        # [batch_size, hidden_dim] -> [batch_size, NUMLABELS]
        final_output = self.final_layer(enc_output)

        if return_attention_scores:
            return final_output, atten_weights
        else:
            return final_output
    
    def _create_enc_attention_mask(self, x):
        batch_size, length = tf.unstack(tf.shape(x))
        # マスクする部分を1とする
        pad_array = tf.cast(tf.equal(x, self.PAD_ID), tf.float32)  # [batch_size, token_num]
        
        # shape broadcasting で [batch_size, head_num, token_num, token_num] になる
        return tf.reshape(pad_array, [batch_size, 1, 1, length])

In [202]:
!pip install janome
import re
import pandas as pd
from janome.tokenizer import Tokenizer
j_t = Tokenizer(wakati=True)

class Vocab(object):
    def __init__(self):
        self.w2i = {}
        self.i2w = {}
        self.special_chars = ['<pad>', '<s>', '</s>', '<unk>']
        self.bos_char = self.special_chars[1]
        self.eos_char = self.special_chars[2]
        self.oov_char = self.special_chars[3]

    def fit(self, sentences, path=None):
        self._words = set()

        #with open(path, 'r',encoding="utf-8") as f:
        #    sentences = f.read().splitlines()

        for sentence in sentences:
            #self._words.update(sentence.split())
            self._words.update(sentence)

        self.w2i = {w: (i + len(self.special_chars))
                    for i, w in enumerate(self._words)}

        for i, w in enumerate(self.special_chars):
            self.w2i[w] = i

        self.i2w = {i: w for w, i in self.w2i.items()}

    def transform(self, sentences, path=None, bos=False, eos=False):
        output = []

        #with open(path, 'r',encoding="utf-8") as f:
        #    sentences = f.read().splitlines()

        for sentence in sentences:
            #sentence = sentence.split()
            if bos:
                sentence = [self.bos_char] + sentence
            if eos:
                sentence = sentence + [self.eos_char]
            output.append(self.encode(sentence))

        return output

    def encode(self, sentence):
        output = []

        for w in sentence:
            if w not in self.w2i:
                idx = self.w2i[self.oov_char]
            else:
                idx = self.w2i[w]
            output.append(idx)

        return output

    def decode(self, sentence):
        return [self.i2w[id] for id in sentence]


def tokenizer_janome(text):
    return [tok for tok in j_t.tokenize(text, wakati=True)]

def preprocessing_text(text):
    text = re.sub('\r', '', text)
    text = re.sub('\n', '', text)
    text = re.sub('　', '', text)
    #text = re.sub(' ', '', text)
    
    text = re.sub(r'[0-9 ０-９]', '0', text)
    return text

def tokenizer_with_preprocessing(text):
    text = preprocessing_text(text)
    ret = tokenizer_janome(text)
    return ret



In [13]:
from sklearn.model_selection import train_test_split

path='reviews.csv'

df = pd.read_csv(path)
seq_row = df['Body']
y = df['Rating2']

seq = [tokenizer_with_preprocessing(text) for text in seq_row]

x_train, x_test, y_train, y_test = train_test_split(seq, y, stratify=y)

In [14]:
vocab = Vocab()
vocab.fit(x_train)

x_id_train = vocab.transform(x_train, bos=True)
x_id_test = vocab.transform(x_test, bos=True)

In [15]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
import tensorflow as tf
import numpy as np

In [16]:
X_train = pad_sequences(x_id_train, padding='post', maxlen=64)
y_train_oht = tf.one_hot(y_train, depth=2, dtype=tf.float32)

In [229]:
model = AttentionClassifier(
            vocab_size = len(vocab.i2w), # 単語の総数
            hopping_num = 8, # Multi-head Attentionの繰り返し数
            heads_num = 6, # Multi-head Attentionのヘッド数
            hidden_dim = 300, # Embeddingの次数
            drop_rate = 0.1, # ドロップアウトの確率
            token_num = 64,
            NUMLABELS = 2
    )

test = np.random.randint(0,2,(32,128))
attention_mask = tf.constant(np.ones((32,1,128,128)), tf.float32)

res = model(X_train[:10],False)

In [230]:
from tensorflow.keras import optimizers
from tensorflow.keras.callbacks import LearningRateScheduler
import math

criterion = tf.keras.losses.CategoricalCrossentropy()
optimizer = optimizers.Adam(learning_rate=2e-4,
                           beta_1=0.9, beta_2=0.999, amsgrad=True)

def decay(epoch, steps=100):
    initial_lrate = 2e-4
    drop = 0.1
    epochs_drop = 7
    lrate = initial_lrate * math.pow(drop, math.floor((1+epoch)/epochs_drop))
    return lrate

lr_sc = LearningRateScheduler(decay, verbose=1)

In [231]:
model.compile(loss=criterion, optimizer=optimizer, metrics=['accuracy'])

In [232]:
history=model.fit(X_train, y_train_oht, batch_size=32, epochs=20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [145]:
layer = MultiHeadAttention(num_heads=2, key_dim=1, attention_axes=(2, 3))
input_tensor = tf.keras.Input(shape=(10, 64, 300))
output_tensor = layer(input_tensor, input_tensor)
print(output_tensor.shape)

(None, 10, 64, 300)
