In [45]:
!pip install konlpy

Collecting konlpy
  Downloading konlpy-0.6.0-py2.py3-none-any.whl (19.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.4/19.4 MB[0m [31m70.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting JPype1>=0.7.0 (from konlpy)
  Downloading JPype1-1.4.1-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (465 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m465.3/465.3 kB[0m [31m52.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: JPype1, konlpy
Successfully installed JPype1-1.4.1 konlpy-0.6.0


In [11]:
import re

import pandas as pd
import torch
from torch.utils.data import Dataset
from konlpy.tag import Okt
from tqdm import tqdm

PAD_TOKEN = 0
SOS_TOKEN = 1
EOS_TOKEN = 2


class WordVocab():
    def __init__(self):
        self.word2index = {
            '<PAD>': PAD_TOKEN,
            '<SOS>': SOS_TOKEN,
            '<EOS>': EOS_TOKEN,
        }
        self.word2count = {}
        self.index2word = {
            PAD_TOKEN: '<PAD>',
            SOS_TOKEN: '<SOS>',
            EOS_TOKEN: '<EOS>'
        }

        self.n_words = 3  # PAD, SOS, EOS 포함

    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            self.add_word(word)

    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1


class TextDataset(Dataset):
    def __init__(self, filename, min_length=2, max_length=32):
        super(TextDataset, self).__init__()

        # TOKEN 정의
        self.PAD_TOKEN = 0  # Padding 토큰
        self.SOS_TOKEN = 1  # SOS 토큰
        self.EOS_TOKEN = 2  # EOS 토큰

        self.tagger = Okt()   # 형태소 분석기
        self.max_length = max_length  # 한 문장의 최대 길이 지정

        # CSV 데이터 로드
        df = pd.read_csv(filename)

        # 한글 정규화
        korean_pattern = r'[^ ?,.!A-Za-z0-9가-힣+]'
        self.normalizer = re.compile(korean_pattern)

        # src: 질의, tgt: 답변
        src_clean = []
        tgt_clean = []

        # 단어 사전 생성
        wordvocab = WordVocab()

        for _, row in tqdm(df.iterrows(), total=len(df)):
            src = row["question"]
            tgt = row["answer"]

            # 한글 전처리
            src = self.clean_text(src)
            tgt = self.clean_text(tgt)

            if len(src.split()) > min_length and len(tgt.split()) > min_length:
                # 최소 길이를 넘어가는 문장의 단어만 추가
                wordvocab.add_sentence(src)
                wordvocab.add_sentence(tgt)
                src_clean.append(src)
                tgt_clean.append(tgt)

        self.srcs = src_clean
        self.tgts = tgt_clean
        self.wordvocab = wordvocab

    def normalize(self, sentence):
        # 정규표현식에 따른 한글 정규화
        return self.normalizer.sub("", sentence)

    def clean_text(self, sentence):
        # 한글 정규화
        sentence = self.normalize(sentence)
        # 형태소 처리
        sentence = self.tagger.morphs(sentence)
        sentence = ' '.join(sentence)
        sentence = sentence.lower()
        return sentence

    def texts_to_sequences(self, sentence):
        # 문장 -> 시퀀스로 변환
        return [self.wordvocab.word2index[w] for w in sentence.split()]

    def pad_sequence(self, sentence_tokens):
        # 문장의 맨 끝 토큰은 제거
        sentence_tokens = sentence_tokens[:(self.max_length - 1)]
        token_length = len(sentence_tokens)

        # 문장의 맨 끝부분에 <EOS> 토큰 추가
        sentence_tokens.append(self.EOS_TOKEN)

        for i in range(token_length, (self.max_length - 1)):
            # 나머지 빈 곳에 <PAD> 토큰 추가
            sentence_tokens.append(self.PAD_TOKEN)
        return sentence_tokens

    def __getitem__(self, idx):
        inputs = self.srcs[idx]
        inputs_sequences = self.texts_to_sequences(inputs)
        inputs_padded = self.pad_sequence(inputs_sequences)

        outputs = self.tgts[idx]
        outputs_sequences = self.texts_to_sequences(outputs)
        outputs_padded = self.pad_sequence(outputs_sequences)

        return torch.tensor(inputs_padded), torch.tensor(outputs_padded)

    def __len__(self):
        return len(self.srcs)

In [12]:
import random
import torch
from torch import nn, optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class Encoder(nn.Module):
    def __init__(self, num_vocabs, hidden_size, embedding_dim, num_layers):
        super(Encoder, self).__init__()

        # 단어 사전의 개수 지정
        self.num_vocabs = num_vocabs
        # 임베딩 레이어 정의 (number of vocabs, embedding dimension)
        self.embedding = nn.Embedding(num_vocabs, embedding_dim)
        # GRU (embedding dimension)
        self.gru = nn.GRU(embedding_dim,
                          hidden_size,
                          num_layers=num_layers,
                          bidirectional=False)

    def forward(self, x):
        x = self.embedding(x).permute(1, 0, 2)
        output, hidden = self.gru(x)
        return output, hidden


class Decoder(nn.Module):
    def __init__(self, num_vocabs, hidden_size, embedding_dim, num_layers=1, dropout=0.2):
        super(Decoder, self).__init__()
        # 단어사전 개수
        self.num_vocabs = num_vocabs
        self.embedding = nn.Embedding(num_vocabs, embedding_dim)
        self.dropout = nn.Dropout(dropout)
        self.gru = nn.GRU(embedding_dim,
                          hidden_size,
                          num_layers=num_layers,
                          bidirectional=False)

        # 최종 출력은 단어사전의 개수
        self.fc = nn.Linear(hidden_size, num_vocabs)

    def forward(self, x, hidden_state):
        x = x.unsqueeze(0)  # (1, batch_size) 로 변환
        embedded = F.relu(self.embedding(x))
        embedded = self.dropout(embedded)
        output, hidden = self.gru(embedded, hidden_state)
        output = self.fc(output.squeeze(0))  # (sequence_length, batch_size, hidden_size(32) x bidirectional(1))
        return output, hidden


class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, inputs, outputs, teacher_forcing_ratio=0.5):
        # inputs : (batch_size, sequence_length)
        # outputs: (batch_size, sequence_length)

        batch_size, output_length = outputs.shape
        output_num_vocabs = self.decoder.num_vocabs

        # 리턴할 예측된 outputs를 저장할 임시 변수
        # (sequence_length, batch_size, num_vocabs)
        predicted_outputs = torch.zeros(output_length, batch_size, output_num_vocabs).to(self.device)

        # 인코더에 입력 데이터 주입, encoder_output은 버리고 hidden_state 만 살립니다.
        # 여기서 hidden_state가 디코더에 주입할 context vector 입니다.
        # (Bidirectional(1) x number of layers(1), batch_size, hidden_size)
        _, decoder_hidden = self.encoder(inputs)

        # (batch_size) shape의 SOS TOKEN으로 채워진 디코더 입력 생성
        decoder_input = torch.full((batch_size,), SOS_TOKEN, device=self.device)

        # 순회하면서 출력 단어를 생성합니다.
        # 0번째는 SOS TOKEN이 위치하므로, 1번째 인덱스부터 순회합니다.
        for t in range(0, output_length):
            # decoder_input : 디코더 입력 (batch_size) 형태의 SOS TOKEN로 채워진 입력
            # decoder_output: (batch_size, num_vocabs)
            # decoder_hidden: (Bidirectional(1) x number of layers(1), batch_size, hidden_size), context vector와 동일 shape
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)

            # t번째 단어에 디코더의 output 저장
            predicted_outputs[t] = decoder_output

            # teacher forcing 적용 여부 확률로 결정
            # teacher forcing 이란: 정답치를 다음 RNN Cell의 입력으로 넣어주는 경우. 수렴속도가 빠를 수 있으나, 불안정할 수 있음
            teacher_force = random.random() < teacher_forcing_ratio

            # top1 단어 토큰 예측
            top1 = decoder_output.argmax(1)

            # teacher forcing 인 경우 ground truth 값을, 그렇지 않은 경우, 예측 값을 다음 input으로 지정
            decoder_input = outputs[:, t] if teacher_force else top1

        return predicted_outputs.permute(1, 0, 2)  # (batch_size, sequence_length, num_vocabs)로 변경


dataset = TextDataset("ChatbotData.csv")

# Encoder 정의
encoder = Encoder(num_vocabs=dataset.wordvocab.n_words,
                  hidden_size=32,
                  embedding_dim=64,
                  num_layers=1)
# Decoder 정의
decoder = Decoder(num_vocabs=dataset.wordvocab.n_words,
                  hidden_size=32,
                  embedding_dim=64,
                  num_layers=1)
# Seq2Seq 정의
seq2seq = Seq2Seq(encoder, decoder, 'cpu')

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

NUM_VOCABS = dataset.wordvocab.n_words
HIDDEN_SIZE = 128
EMBEDDIMG_DIM = 64

print(f'num_vocabs: {NUM_VOCABS}\n======================')

# Encoder 정의
encoder = Encoder(num_vocabs=NUM_VOCABS,
                  hidden_size=HIDDEN_SIZE,
                  embedding_dim=EMBEDDIMG_DIM,
                  num_layers=1)
# Decoder 정의
decoder = Decoder(num_vocabs=NUM_VOCABS,
                  hidden_size=HIDDEN_SIZE,
                  embedding_dim=EMBEDDIMG_DIM,
                  num_layers=1)

# Seq2Seq 생성
# encoder, decoder를 device 모두 지정
model = Seq2Seq(encoder.to(device), decoder.to(device), device)
print(model)

LR = 1e-3
optimizer = optim.Adam(model.parameters(), lr=LR)
loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN)

data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=32, shuffle=True,
)

100%|██████████| 11823/11823 [00:28<00:00, 413.08it/s]


num_vocabs: 11964
Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(11964, 64)
    (gru): GRU(64, 128)
  )
  (decoder): Decoder(
    (embedding): Embedding(11964, 64)
    (dropout): Dropout(p=0.2, inplace=False)
    (gru): GRU(64, 128)
    (fc): Linear(in_features=128, out_features=11964, bias=True)
  )
)


In [13]:
model.train()
for epoch in tqdm(range(10)):
  for x, y in data_loader:
    x, y = x.to(device), y.to(device)

    optimizer.zero_grad()

    # output: (batch_size, sequence_length, num_vocabs)
    output = model(x, y)
    output_dim = output.size(2)

    # 1번 index 부터 슬라이싱한 이유는 0번 index가 SOS TOKEN 이기 때문
    # (batch_size*sequence_length, num_vocabs) 로 변경
    output = output.reshape(-1, output_dim)

    # (batch_size*sequence_length) 로 변경
    y = y.view(-1)

    # Loss 계산
    loss = loss_fn(output, y)
    loss.backward()
    optimizer.step()

100%|██████████| 10/10 [04:12<00:00, 25.28s/it]


In [19]:
model.eval()


input_sentence = input('Q: ')

input_sentence = dataset.clean_text(input_sentence)
input_sentence = dataset.texts_to_sequences(input_sentence)
input_sentence = dataset.pad_sequence(input_sentence)
input_sentence = torch.tensor(input_sentence).unsqueeze(0).to(device)

# 입력 문장에 대한 예측된 출력 문장 생성
output = model(input_sentence, input_sentence, teacher_forcing_ratio=0.0)

# 출력 문장의 첫번째 단어는 SOS_TOKEN 이므로 제외
output = output.argmax(dim=2)[0, 1:].cpu().numpy()

# 예측된 토큰을 문장으로 변환
output_sentence = ' '.join([
    dataset.wordvocab.index2word[i] for i in output if i != EOS_TOKEN])

print(f'A: {output_sentence}')

Q: 감기 걸린 것 같아
A: 생각 해보세요 .
