In [3]:
import os
import re
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
from konlpy.tag import Okt

FILTERS = "([~.,!?\"':;)(])"
CHANGE_FILTER = re.compile(FILTERS) # 미리 Complie
PAD, PAD_INDEX = "<PAD>", 0 # 패딩 토큰
STD, STD_INDEX = "<SOS>", 1 # 시작 토큰
END, END_INDEX = "<END>", 2 # 종료 토큰
UNK, UNK_INDEX = "<UNK>", 3 # 사전에 없음
MARKER = [PAD,STD,END,UNK]
MAX_SEQUNECE = 25

In [4]:
def load_data(path):
    df = pd.read_csv(path,header=0)
    question, answer = list(df['Q']),list(df['A'])
    
    return question, answer

def tokenizing_data(data):
    words = []
    
    for sentence in data:
        sentence = re.sub(CHANGE_FILTER, "", sentence)
        for word in sentence.split():
            words.append(word) 

    return [word for word in words if word]

def analzing_morhology(data):
    okt = Okt()
    results = []
    
    for seq in tqdm(data):
        result = " ".join(okt.morphs(seq.replace(' ', '')))
        results.append(result)
        
    return results

def make_vocab(vocab_list):
    word2idx = {word: idx for idx, word in enumerate(vocab_list)}
    idx2word = {idx: word for idx, word in enumerate(vocab_list)}

    return word2idx, idx2word

In [9]:
def load_vocab_dict(raw_data_path, vocab_path):
    vocab_list = []
    
    if not os.path.exists(vocab_path):
        question, answer = load_data(raw_data_path)
        
        data = []
        data.extend(question)
        data.extend(answer)

        # Tokenizing 
        words = tokenizing_data(data)
        words = list(set(words))
        words[:0] = MARKER # 사전에 정의한 토큰을 단어 리스트 앞에 추가

        with open(vocab_path, 'w', encoding='utf-8') as vocab_f:
            for word in words:
                vocab_f.write(word + '\n')
    
    with open(vocab_path, 'r', encoding='utf-8') as vocab_f:
        for line in vocab_f:
            vocab_list.append(line.strip())
            
    word2idx, idx2word = make_vocab(vocab_list)
    
    return word2idx, idx2word, len(word2idx)

def processing_encoder_input(value, dictionary):
    sequences_input_index = []
    sequences_length = []

    for sequence in value :
        sequence = re.sub(CHANGE_FILTER, "", sequence)
        sequence_index = []
        
        for word in sequence.split():
            if dictionary.get(word) is not None:
                sequence_index.extend([dictionary[word]])
            else:
                sequence_index.extend([dictionary[UNK]])
        
        # truncating      
        if len(sequence_index) > MAX_SEQUNECE:
            sequence_index = sequence_index[:MAX_SEQUNECE]
        
        # padding
        sequences_length.append(len(sequence_index))
        sequence_index += (MAX_SEQUNECE - len(sequence_index)) * [dictionary[PAD]]
        sequences_input_index.append(sequence_index)

    return np.asarray(sequences_input_index), sequences_length

def processing_decoder_input(value, dictionary):
    sequences_output_index = []
    sequences_length = []

    for sequence in value:
        sequence = re.sub(CHANGE_FILTER,"",sequence)
        sequence_index = []

        sequence_index = [dictionary[STD]] + [dictionary[word] for word in sequence.split()]

        if len(sequence_index) > MAX_SEQUNECE:
            sequence_index = sequence_index[:MAX_SEQUNECE]

        sequences_length.append(len(sequence_index))
        sequence_index += (MAX_SEQUNECE - len(sequence_index))*[dictionary[PAD]]
        sequences_output_index.append(sequence_index)
        
    return np.asarray(sequences_output_index), sequences_length

def processing_decoder_target(value, dictionary):
    sequences_target_index = []
    
    for sequence in value :
        sequence = re.sub(CHANGE_FILTER, "", sequence)
        sequence_index = [dictionary[word] for word in sequence.split()]
        
        if len(sequence_index) >= MAX_SEQUNECE:
            sequence_index = sequence_index[:MAX_SEQUNECE-1] + [dictionary[END]]
        else :
            sequence_index += [dictionary[END]]

        sequence_index += (MAX_SEQUNECE - len(sequence_index)) * [dictionary[PAD]]
        sequences_target_index.append(sequence_index)

    return np.asarray(sequences_target_index)

In [12]:
if __name__ == "__main__":
    RAW_DATA_PATH = './ChatBotData.csv'
    VOCAB_PATH = './vocab.txt'
    
    inputs, outputs = load_data(RAW_DATA_PATH)
    char2idx, idx2char, vocab_size = load_vocab_dict(RAW_DATA_PATH, VOCAB_PATH)

    # encoder/decoder input /target
    index_inputs, input_seq_len = processing_encoder_input(inputs, char2idx)
    index_outputs, output_seq_len = processing_decoder_input(outputs, char2idx)
    index_targets = processing_decoder_target(outputs, char2idx)

    data_configs = {}
    data_configs['char2idx'] =char2idx
    data_configs['idx2char'] = idx2char
    data_configs['vocab_size'] = vocab_size
    data_configs['pad_symbol'] = PAD
    data_configs['std_symbol'] = STD
    data_configs['end_symbol'] = END
    data_configs['unk_symbol'] = UNK

    DATA_IN_PATH = './data_in/'
    np.save('train_inputs.npy', index_inputs)
    np.save('train_outputs.npy', index_outputs)
    np.save('train_targets.npy', index_targets)

    json.dump(data_configs, open('data_configs.json', 'w'))