In [25]:
import numpy as np
import pandas as pd
import tensorflow as tf
import pickle
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint


In [26]:
a = pd.read_csv('C:/pytest/chatdata_small.csv', encoding='cp949')
text = '가고싶은 학교 떨어졌어'
text1 = '1지망 학교 떨어졌어'

In [28]:
with open(file = 'C:/pytest/data/model_name/transformer.pickle', mode='rb') as f:
    loaded_tokenizer = pickle.load(f)
    
word_index = loaded_tokenizer.word_index

In [29]:
model_name = 'transformer'

# 사전환경설정

In [30]:
word_index.update({"SOS": 1, "EOS": 2}) #질문

char2idx_dict = word_index
idx2char_dict = {y: x for x, y in word_index.items()}

char2idx_dict['<PAD>']=0
char2idx_dict['<SOS>']= char2idx_dict['SOS']
del char2idx_dict['SOS']

char2idx_dict['<END>'] = char2idx_dict['EOS']
del char2idx_dict['EOS']

idx2char_dict[0] = '<PAD>'
idx2char_dict[1] = '<SOS>'
idx2char_dict[2] = '<END>'  #질문

In [31]:
prepro_configs = dict({'char2idx':char2idx_dict, 'idx2char':idx2char_dict, 
                       'vocab_size':len(word_index), 'pad_symbol': '<PAD>', 
                       'std_symbol': '<SOS>', 'end_symbol': '<END>'})

In [32]:
STD_INDEX = 1
char2idx = prepro_configs['char2idx']
end_index = prepro_configs['end_symbol']
vocab_size = prepro_configs['vocab_size']
MAX_SEQUENCE = 25

kargs = {'model_name': model_name,
         'num_layers': 2,
         'd_model': 512,
         'num_heads': 8,
         'dff': 2048,
         'input_vocab_size':vocab_size,
         'target_vocab_size': vocab_size,
         'maximum_position_encoding': MAX_SEQUENCE,
         'end_token_idx': char2idx[end_index],
         'rate': 0.1
}

In [33]:
# 마스킹
def create_padding_mask(seq):
    # 시퀀스가 0과 같으면 1을 출력해라  #cast는 float32형으로만 바꿔주는 기능
    mask = tf.cast(tf.math.equal(seq, 0), tf.float32)  
    # 1차원을 3차원으로 만들어줌
    return mask[:, tf.newaxis, tf.newaxis, :]   #3차원으로 변환

# 상삼각행렬생성(대각선제외)
def create_look_ahead_mask(size):
    #band_part(x,y,z)인데 z를 -1로 하면 대각선포함 윗부분이 1이됨
    #y를 -1로 하면 대각선포함 아랫쪽만 1로 만듬
    # x는 2차원 이상 행렬이 들어와야함, output의 형상
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    # 1에서 대각선포함아랫쪽1을 빼면 대각선제외 위쪽1만 남음
    return mask

def create_masks(inp, tar):
    enc_padding_mask = create_padding_mask(inp)      # 인코더 패딩 마스크
    dec_padding_mask = create_padding_mask(inp) 
    #인코딩,디코딩 패딩 마스크 생성
    
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, combined_mask, dec_padding_mask

In [34]:
# 포지셔널 인코딩
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * i//2) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], 
                            np.arange(d_model)[np.newaxis, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

In [35]:
# 어텐션
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q,k, transpose_b=True)
    
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
        
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    
    output = tf.matmul(attention_weights, v)
    
    return output, attention_weights
    

In [36]:
#멀티헤드어텐션, 피드포워드네트워크, 인/디코더레이어, 인/디코더
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, **kargs):                             # 초기화 함수
        super(MultiHeadAttention, self).__init__()
        self.num_heads = kargs['num_heads']
        self.d_model = kargs['d_model']

        assert self.d_model % self.num_heads == 0

        self.depth = self.d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(kargs['d_model'])
        self.wk = tf.keras.layers.Dense(kargs['d_model'])
        self.wv = tf.keras.layers.Dense(kargs['d_model'])
        # 셀프어텐션 결과 출력
        self.dense = tf.keras.layers.Dense(kargs['d_model'])

    # 각 배치마다 (seq_len * depth) 형태를 (num_heads * seq_len * deqth)로 변환
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        # reshape 할때 -1을 넣으면 이후 기입된 부분의 축변환을 하고난 뒤 남은 값을 자동 배정

        return tf.transpose(x, perm=[0, 2, 1, 3]) 
    
    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm = [0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        output = self.dense(concat_attention)

        return output, attention_weights

def feed_forward_network(**kargs):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(kargs['dff'],activation='relu'),
        tf.keras.layers.Dense(kargs['d_model'])
        ])

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, **kargs):                               # 초기화
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(**kargs)                 # 멀티 헤드 어텐션 레이어 생성
        self.ffn = feed_forward_network(**kargs)     #파워포워드네트워크

        #층정규화
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = tf.keras.layers.Dropout(kargs['rate'])
        self.dropout2 = tf.keras.layers.Dropout(kargs['rate'])

    def call(self, x, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)              
        ffn_output = self.dropout2(ffn_output)   
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

# 디코더레이어 준비
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(**kargs)      # 멀티 헤드 어텐션 레이어 1
        self.mha2 = MultiHeadAttention(**kargs)      # 멀티 헤드 어텐션 레이어 2
        self.ffn = feed_forward_network(**kargs)     # 포지션 와이즈 피드 포워드 네트워크

        # 층 정규화
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        # Dropout Layer
        self.dropout1 = tf.keras.layers.Dropout(kargs['rate'])
        self.dropout2 = tf.keras.layers.Dropout(kargs['rate'])
        self.dropout3 = tf.keras.layers.Dropout(kargs['rate'])
    
    
    def call(self, x, enc_output, look_ahead_mask, padding_mask):
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)                    # 멀티 헤드 어텐션 레이어 1 수행
        attn1 = self.dropout1(attn1)              # 드롭아웃 수행
        out1 = self.layernorm1(attn1 + x)         # 층 정규화 및 리지듀얼 커넥션 수행

        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)  # 멀티 헤드 어텐션 레이어 2 수행
        attn2 = self.dropout2(attn2)              # 드롭아웃 수행
        out2 = self.layernorm2(attn2 + out1)      # 층 정규화 및 리지듀얼 커넥션 수행

        ffn_output = self.ffn(out2)               # out2에 대해 피드포워드 연산 수행
        ffn_output = self.dropout3(ffn_output)    # 드롭아웃 수행
        out3 = self.layernorm3(ffn_output + out2) # 층 정규화 및 리지듀얼 커넥션 수행

        return out3, attn_weights_block1, attn_weights_block2


class Encoder(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super(Encoder, self).__init__()

        self.d_model = kargs['d_model']
        self.num_layers = kargs['num_layers']

        #워드 임베딩 레이어 생성
        self.embedding = tf.keras.layers.Embedding(input_dim=kargs['input_vocab_size'],
                                                   output_dim = self.d_model)

        #포지셔널 인코딩레이어 생성
        self.pos_encoding = positional_encoding(position=kargs['maximum_position_encoding'],
                                                d_model=self.d_model)

        self.enc_layers = [EncoderLayer(**kargs) for _ in range(self.num_layers)]

        self.dropout = tf.keras.layers.Dropout(kargs['rate'])

    def call(self, x, mask):
        seq_len = tf.shape(x)[1]

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len,:]

        x = self.dropout(x)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, mask)
        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super(Decoder,self).__init__()

        self.d_model = kargs['d_model']
        self.num_layers = kargs['num_layers']

        #워드 임베딩 레이어 생성(1)
        self.embedding = tf.keras.layers.Embedding(input_dim=kargs['target_vocab_size'],
                                                   output_dim = self.d_model)
        # 포지셔널 인코딩 레이어 생성(2)
        self.pos_encoding = positional_encoding(position = kargs['maximum_position_encoding'],
                                                d_model = self.d_model)
        # 디코더 레이어 생성,num_layers 수만큼 리스트 배열로 만든다.
        self.dec_layers = [DecoderLayer(**kargs) for _ in range(self.num_layers)]
        
        self.dropout = tf.keras.layers.Dropout(kargs['rate'])
        
    def call(self, x, enc_output, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}   #딕셔너리초기화가 무슨말이지 그냥 처음 선언된 딕셔너리니까 빈값 생성해준거 아닌가 앞으로 추가할꺼니까

        x = self.embedding(x)
        x *=tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:,:seq_len, :]

        x = self.dropout(x)
        

        for i in range(self.num_layers):
            x, block1, block2, = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask)
            attention_weights['decoder_layer{}_bolck1'.format(i+1)] = block1
            attention_weights['decoder_layer{}_block2'.format(i+1)] = block2

        return x, attention_weights

In [37]:
# 트랜스포머
class Transformer(tf.keras.Model):
    def __init__(self, **kargs):
        super(Transformer, self).__init__(name=kargs['model_name'])
        self.end_token_idx = kargs['end_token_idx']
        self.encoder = Encoder(**kargs)
        self.decoder = Decoder(**kargs)
        self.final_layer = tf.keras.layers.Dense(kargs['target_vocab_size'])
              
    def call(self, x):
        inp, tar = x
        enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(inp, tar)
        
        enc_output = self.encoder(inp, enc_padding_mask)
        
        dec_output, _ = self.decoder(tar, enc_output, look_ahead_mask, dec_padding_mask)
        final_output = self.final_layer(dec_output)
        
        return final_output
    
    def inference(self, x):
        inp = x
        tar = tf.expand_dims([STD_INDEX], axis=0)
        enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(inp, tar)
        enc_output = self.encoder(inp, enc_padding_mask)

        predict_tokens = list()
        for t in range(0, MAX_SEQUENCE):
            dec_output, _ = self.decoder(tar, enc_output, look_ahead_mask, dec_padding_mask)
            final_output = self.final_layer(dec_output)
            outputs = tf.argmax(final_output, axis=-1).numpy()
            pred_token = outputs[0][-1]
            if pred_token == self.end_token_idx:
                break
            predict_tokens.append(pred_token)
            tar = tf.expand_dims([STD_INDEX]+predict_tokens, axis=0)
            _, look_ahead_mask, dec_padding_mask = create_masks(inp, tar)
        return predict_tokens

In [38]:
#손실함수, 정확도함수
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='accuracy')

def loss(real,pred):
    mask = tf.math.logical_not(tf.math.equal(real,0))
    loss_ = loss_object(real,pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    
    return tf.reduce_mean(loss_)



def accuracy(real,pred):
    mask = tf.math.logical_not(tf.math.equal(real,0))
    mask = tf.expand_dims(tf.cast(mask, dtype=pred.dtype), axis = -1)
    pred *= mask
    acc = train_accuracy(real,pred)
    
    return tf.reduce_mean(acc)

In [39]:
#모델구성, 컴파일
loaded_model = Transformer(**kargs)
loaded_model.compile(optimizer = tf.keras.optimizers.Adam(1e-4), loss=loss, metrics=[accuracy])

In [40]:
sample_pad = loaded_tokenizer.texts_to_sequences([text1])

sample_input_pad = np.array([[34, 35, 36, 0, 0, 0]])

In [41]:
loaded_model.fit([sample_input_pad, sample_input_pad], sample_input_pad, batch_size=1, epochs=1)



<tensorflow.python.keras.callbacks.History at 0x14c170b0d48>

In [42]:
loaded_model.load_weights('C:/pytest/data/transformer/weights.h5')

In [43]:
import re

def enc_processing(value,dictionary):
    FILTERS = "([~.,!?\"':;)(])" # 정규화를 사용하여 필터에 들어 있는 값들을 "" 으로 치환 한다.
    CHANGE_FILTER = re.compile(FILTERS)

    sequences_input_index = [] # 인덱스 값들을 가지고 있는 배열

    for sequence in value: # 한 줄 씩 불러온다
        sequence = re.sub(CHANGE_FILTER, "", sequence)
        sequence_index = [] # 하나의 문장을 인코딩 할 때 가지고 있기 위한 배열
        for word in sequence.split(): # 문장을 스페이스 단위로 자른다
            if dictionary.get(word) is not None: # 잘려진 단어들이 딕셔너리에 있으면 그 값을 sequence_index에 추가
                sequence_index.extend([dictionary[word]])
        if len(sequence_index) > MAX_SEQUENCE: # 문장 제한 길이보다 길어질 경우 뒷 토큰을 자른다
            sequence_index = sequence_index[:MAX_SEQUENCE]
        sequence_index += (MAX_SEQUENCE - len(sequence_index)) * [dictionary["<PAD>"]]
        sequences_input_index.append(sequence_index) # 인덱스화 되어 있는 값을 sequences_input_index에 넣어준다
    return np.asarray(sequences_input_index)

In [44]:
text = "3박4일~ 정도 놀러가고 싶다."
char2idx = prepro_configs['char2idx']
idx2char = prepro_configs['idx2char']                          
test_index_inputs = enc_processing([text], char2idx)

In [46]:
outputs = loaded_model.inference(test_index_inputs)
print(' '.join([idx2char[output] for output in outputs]))

여행은 언제나 좋죠


In [47]:
test_index_inputs

array([[ 6, 37,  7,  8,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0]])