In [56]:
from google.colab import drive
drive.mount("/gdrive", force_remount=True)

Mounted at /gdrive


In [57]:
from torch.utils.data import (DataLoader, TensorDataset)
from torch import nn
from tqdm import tqdm
import numpy as np
import torch
import os

class TransformerChat(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.vocab_size = config["vocab_size"]
        self.embedding_size = config['embedding_size']
        self.num_heads = config['num_heads']
        self.num_encoder_layers = config['num_encoder_layers']
        self.num_decoder_layers = config['num_decoder_layers']
        self.max_length = config['max_length']
        self.hidden_size = config['hidden_size']
        self.embeddings = nn.Embedding(self.vocab_size, self.embedding_size)
        self.transformer = nn.Transformer(d_model=self.embedding_size, nhead=self.num_heads, num_encoder_layers=self.num_encoder_layers,
                                          num_decoder_layers=self.num_decoder_layers, dim_feedforward=self.hidden_size)
        self.mask = self.transformer.generate_square_subsequent_mask(self.max_length).cuda()
        self.projection_layer = nn.Linear(self.embedding_size, self.vocab_size)

    def forward(self, enc_inputs, dec_inputs):
        enc_input_features = self.embeddings(enc_inputs).transpose(0, 1)
        dec_input_features = self.embeddings(dec_inputs).transpose(0, 1)
        dec_output_features = self.transformer(src=enc_input_features, tgt=dec_input_features, src_mask = self.mask, tgt_mask = self.mask)
        hypothesis = self.projection_layer(dec_output_features)

        return hypothesis

In [58]:
def load_vocab(file_dir):

    with open(file_dir,'r',encoding='utf8') as vocab_file:
        char2idx = {}
        idx2char = {}
        index = 0
        for char in vocab_file:
            char = char.strip()
            char2idx[char] = index
            idx2char[index] = char
            index+=1

    return char2idx, idx2char

def convert_data2feature(config, input_sequence, char2idx, decoder_input=False):
    input_features = np.zeros(config["max_length"], dtype=np.int)

    if decoder_input:
        input_sequence = " ".join(["<S>"] + input_sequence.split()[:-1])

    for idx,token in enumerate(input_sequence.split()):
        if token in char2idx.keys():
            input_features[idx] = char2idx[token]
        else:
            input_features[idx] = char2idx['<UNK>']

    return input_features

def load_dataset(config):
    char2idx, idx2char = load_vocab(config['vocab_file'])

    file_dir = config['train_file']
    data_file = open(file_dir,'r',encoding='utf8').readlines()
    
    enc_inputs, dec_inputs, dec_outputs = [], [], []

    for line in tqdm(data_file):

        line = line.strip().split('\t')

        input_sequence = line[0]
        output_sequence = line[1]

        enc_inputs.append(convert_data2feature(config, input_sequence, char2idx))
        dec_inputs.append(convert_data2feature(config, output_sequence, char2idx, True))
        dec_outputs.append(convert_data2feature(config, output_sequence, char2idx))

    enc_inputs = torch.tensor(enc_inputs, dtype=torch.long)
    dec_inputs = torch.tensor(dec_inputs, dtype=torch.long)
    dec_outputs = torch.tensor(dec_outputs, dtype=torch.long)

    return enc_inputs, dec_inputs, dec_outputs, char2idx, idx2char

In [59]:
def tensor2list(input_tensor):
    return input_tensor.cpu().detach().numpy().tolist()

def do_test(config, model, word2idx, idx2word, input_sequence="오늘 약속있으세요?"):
    model.eval()
    input_sequence = " ".join([e if e != " " else "<SP>" for e in input_sequence])
    enc_inputs = torch.tensor([convert_data2feature(config, input_sequence, word2idx)], dtype=torch.long).cuda()
    dec_inputs = torch.tensor([convert_data2feature(config, "", word2idx, True)], dtype=torch.long).cuda()
    response = ''

    for decoding_step in range(config['max_length']-1):
        dec_outputs = model(enc_inputs, dec_inputs)[decoding_step, 0, :]
        dec_output_idx = np.argmax(tensor2list(dec_outputs))
        dec_inputs[0][decoding_step+1] = dec_output_idx
        if idx2word[dec_output_idx] == "</S>":
            break
        response += idx2word[dec_output_idx]
    
    print(response.replace("<SP>", " "))

def test(config):
    word2idx, idx2word = load_vocab(config['vocab_file'])
    model = TransformerChat(config).cuda()
    model.load_state_dict(torch.load(os.path.join(config["output_dir"], config["trained_model_name"])))

    while(True):
        input_sequence = input("문장을 입력하세요. (종료는 exit을 입력하세요.) : ")
        if input_sequence == 'exit':
            break
        do_test(config, model, word2idx, idx2word, input_sequence)

In [60]:
def train(config):
    model = TransformerChat(config).cuda()
    enc_inputs, dec_inputs, dec_outputs, word2idx, idx2word = load_dataset(config)
    train_features = TensorDataset(enc_inputs, dec_inputs, dec_outputs)
    train_dataloader = DataLoader(train_features, shuffle=True, batch_size=config["batch_size"])
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config["learn_rate"])
    for epoch in range(config["epoch"] + 1):
        for (step, batch) in enumerate(train_dataloader):
            model.train()
            batch = tuple(t.cuda() for t in batch)
            optimizer.zero_grad()
            enc_inputs, dec_inputs, dec_outputs = batch
            hypothesis = model(enc_inputs, dec_inputs).view(-1, config['vocab_size'])
            labels = dec_outputs.transpose(0, 1)
            labels = labels.reshape(config["max_length"]*dec_inputs.size(0))
            loss = loss_func(hypothesis, labels)
            loss.backward()
            optimizer.step()
            if (step+1)% 200 == 0:
                print("Current Step : {0:d} / {1:d}\tCurrent Loss : {2:f}".format(step+1, int(len(enc_inputs) / config['batch_size']), loss.item()))
        torch.save(model.state_dict(), os.path.join(config["output_dir"], "epoch_{0:d}.pt".format(epoch)))

In [67]:
if(__name__=="__main__"):

    root_dir = "/gdrive/My Drive/ml_colab/week13/"
    output_dir = os.path.join(root_dir, "chatbot")
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    config = {"mode": "test",
              "vocab_file": os.path.join(root_dir, "vocab.txt"),
              "train_file": os.path.join(root_dir, "train.txt"),
              "trained_model_name":"epoch_{}.pt".format(5),
              "output_dir":output_dir,
              "epoch": 5,
              "learn_rate":0.00005,
              "num_encoder_layers": 6,
              "num_decoder_layers": 6,
              "num_heads": 4,
              "max_length": 20,
              "batch_size": 128,
              "embedding_size": 256,
              "hidden_size": 512,
              "vocab_size": 4427
            }


    if(config["mode"] == "train"):
        train(config)
    else:
        test(config)

문장을 입력하세요. (종료는 exit을 입력하세요.) : 유머


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  app.launch_new_instance()


(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 나한테 욕하는 거야..?
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 너 진짜 못됐다!!
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 속상해
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 나한테 왜 그래ㅜ
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 나쁘다
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : ㅜㅜㅜ
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 유머라며!!!
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 됐어
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 안해
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : 흥
(이름) (비속어) (비속어)
문장을 입력하세요. (종료는 exit을 입력하세요.) : exit
