In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import random
import warnings
import time
import math
import torch
import torch.nn as nn
import torch.optim as optim
import spacy

# Unicode warning 제거 (폰트 관련 경고메시지)
plt.rcParams['axes.unicode_minus']=False
# 한글 폰트 설정

plt.rcParams['font.family'] = "NanumGothic"

warnings.filterwarnings('ignore')

data_dir = 'data'

df = pd.read_csv(os.path.join(data_dir, 'ChatbotData.csv'))
df

Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,PPL 심하네,눈살이 찌푸려지죠.,0
...,...,...,...
11818,훔쳐보는 것도 눈치 보임.,티가 나니까 눈치가 보이는 거죠!,2
11819,훔쳐보는 것도 눈치 보임.,훔쳐보는 거 티나나봐요.,2
11820,흑기사 해주는 짝남.,설렜겠어요.,2
11821,힘든 연애 좋은 연애라는게 무슨 차이일까?,잘 헤어질 수 있는 사이 여부인 거 같아요.,2


-----------------

### 한글 정규화 과정

In [2]:
import re

# 한글, 영어, 숫자, 공백, ?!.,을 제외한 나머지 문자 제거
korean_pattern = r'[^ ?,.!A-Za-z0-9가-힣+]'

# 패턴 컴파일
normalizer = re.compile(korean_pattern)
normalizer

re.compile(r'[^ ?,.!A-Za-z0-9가-힣+]', re.UNICODE)

In [3]:
print("수정 전:" ,df['Q'][50])
print("수정 후:",normalizer.sub("", df['Q'][50]))

def normalize(sentence):
    return normalizer.sub("", sentence)

수정 전: 감 말랭이 먹어야지
수정 후: 감 말랭이 먹어야지


In [4]:
from konlpy.tag import  Okt
from eunjeon import Mecab
# 형태소 분석기
mecab = Mecab()
okt = Okt()

In [5]:
# mecab
mecab.morphs(normalize(df['Q'][50]))

['감', '말', '랭', '이', '먹', '어야지']

In [6]:
# okt
okt.morphs(normalize(df['A'][50]))

['맛있게', '드세요', '.']

In [7]:
# 한글 전처리를 함수화
def clean_text(sentence, tagger):
    sentence = normalize(sentence)
    sentence = tagger.morphs(sentence)
    sentence = ' '.join(sentence)
    sentence = sentence.lower()
    return sentence

# 한글
print(clean_text(df['Q'][50], okt))

# 영어
clean_text(df['A'][50], okt)

감 말랭이 먹어야지


'맛있게 드세요 .'

In [8]:
len(df['Q']), len(df['A'])

(11823, 11823)

In [9]:
questions = [clean_text(sent, okt) for sent in df['Q'].values[:1000]]
answers = [clean_text(sent, okt) for sent in df['A'].values[:1000]]

In [10]:
print(questions[:5])
print(answers[:5])

['12시 땡 !', '1 지망 학교 떨어졌어', '3 박 4일 놀러 가고 싶다', '3 박 4일 정도 놀러 가고 싶다', 'ppl 심하네']
['하루 가 또 가네요 .', '위로 해 드립니다 .', '여행 은 언제나 좋죠 .', '여행 은 언제나 좋죠 .', '눈살 이 찌푸려지죠 .']


In [11]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
batch_size = 64
device

device(type='cpu')

In [12]:
class WordVocab():
    def __init__(self, SOS_TOKEN = 0, EOS_TOKEN = 1, UNKNOWN_TOKEN = 2):
        self.unknown_token = UNKNOWN_TOKEN
        
        # 각 토큰 별 word count
        self.word2count = {}
        
        # word -> idx
        self.word2index = {
            '<SOS>': SOS_TOKEN, 
            '<EOS>': EOS_TOKEN,
            '<UKN>': UNKNOWN_TOKEN,
        }

        # idx -> word
        self.index2word = {
            SOS_TOKEN: '<SOS>', 
            EOS_TOKEN: '<EOS>', 
            UNKNOWN_TOKEN: '<UKN>',
        }
        
        # total word counts(default 단어 개수가 3개이므로 2보다 커야함 => SOS, EOS, UNKNOWN을 포함하기 때문)
        self.n_words = 3  

    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            self.add_word(word)

    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1
    
    def word_to_index(self, word):
        if word in self.word2index:
            return self.word2index[word]
        else:
            return self.unknown_token
    
    def index_to_word(self, idx):
        return self.index2word[idx]

In [13]:
questions[50]

'감 말랭이 먹어야지'

In [14]:
print(f'원문: {questions[50]}')
wordvocab = WordVocab()
wordvocab.add_sentence(questions[50])
print('==='*10)
print('단어사전')
print(wordvocab.word2index)

원문: 감 말랭이 먹어야지
단어사전
{'<SOS>': 0, '<EOS>': 1, '<UKN>': 2, '감': 3, '말랭이': 4, '먹어야지': 5}


-----------------------------------

### 여태까지 했던 작업들을 CLASS 형태로 종합해준다.

In [15]:
class QADataset():
    def __init__(self, csv_path, min_length=3, max_length=25):
        data_dir = 'data'
        
        # TOKEN 정의
        self.SOS_TOKEN = 0 # SOS 토큰
        self.EOS_TOKEN = 1 # EOS 토큰
        
        self.tagger = Okt()   # 형태소 분석기
        self.max_length = max_length # 한 문장의 최대 길이 지정
        
        # CSV 데이터 로드
        df = pd.read_csv(os.path.join(data_dir, csv_path))
        df = df[:50]
        
        # 한글 정규화
        korean_pattern = r'[^ ?,.!A-Za-z0-9가-힣+]'
        self.normalizer = re.compile(korean_pattern)
        
        # src: 질의, tgt: 답변
        src_clean = []
        tgt_clean = []
        
        # 단어 사전 생성
        wordvocab = WordVocab()
        
        for _, row in df.iterrows():
            src = row['Q']
            tgt = row['A']
            
            # 한글 전처리
            src = self.clean_text(src)
            tgt = self.clean_text(tgt)
            
            if len(src.split()) > min_length and len(tgt.split()) > min_length:
                # 최소 길이를 넘어가는 문장의 단어만 추가
                wordvocab.add_sentence(src)
                wordvocab.add_sentence(tgt)
                src_clean.append(src)
                tgt_clean.append(tgt)            
        
        self.srcs = src_clean
        self.tgts = tgt_clean
        self.wordvocab = wordvocab

    
    def normalize(self, sentence):
        # 정규표현식에 따른 한글 정규화
        return self.normalizer.sub("", sentence)

    def clean_text(self, sentence):
        # 한글 정규화
        sentence = self.normalize(sentence)
        # 형태소 처리
        sentence = self.tagger.morphs(sentence)
        sentence = ' '.join(sentence)
        sentence = sentence.lower()
        return sentence
    
    def texts_to_sequences(self, sentence):
        # 문장 -> 시퀀스로 변환
        sequences = [self.wordvocab.word_to_index(w) for w in sentence.split()]
        # 문장 최대 길이 -1 까지 슬라이싱
        sequences = sequences[:self.max_length-1]
        # 맨 마지막에 EOS TOKEN 추가
        sequences.append(self.EOS_TOKEN)
        return sequences
    
    def sequences_to_texts(self, sequences):
        # 시퀀스 -> 문장으로 변환
        sentences = [self.wordvocab.index_to_word(s.item()) for s in sequences]
        return ' '.join(sentences)

    
    def __getitem__(self, idx): #중복 연산자(__function__)로 슬라이싱을 구현함. 즉, 실제 torch를 index를 통해 불러올 수 있게 한다.
        inputs = self.srcs[idx]
        inputs_sequences = self.texts_to_sequences(inputs)
        
        outputs = self.tgts[idx]
        outputs_sequences = self.texts_to_sequences(outputs)
        
        return torch.tensor(inputs_sequences).view(-1, 1), torch.tensor(outputs_sequences).view(-1, 1)
    
    def __len__(self): #마찬가지로 중복 연산자(__function__)를 통해 파이썬의 len 함수를 torch에도 적용할 수 있게 했다.
        return len(self.srcs)

In [16]:
# 한 문장의 최대 단어길이를 20로 설정
MAX_LENGTH = 20

dataset = QADataset('ChatbotData.csv', min_length=3, max_length=MAX_LENGTH)

In [17]:
# 3번 index 데이터셋 조회
# 결과: x(입력 데이터), y(출력 데이터)
dataset[0] #아까 썼던 중복 연산자(__getitem__)를 불러옴

(tensor([[3],
         [4],
         [5],
         [6],
         [1]]),
 tensor([[ 7],
         [ 8],
         [ 9],
         [10],
         [ 1]]))

In [19]:
x, y = dataset[0]

# 시퀀스를 문장으로 변환
print(dataset.sequences_to_texts(x))
print(dataset.sequences_to_texts(y))

1 지망 학교 떨어졌어 <EOS>
위로 해 드립니다 . <EOS>


---------------------------------------------------------------------------------------------------

### Encoder

In [20]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_size, embedding_dim, num_layers,dropout):
        super(Encoder, self).__init__()
        
        # 단어 사전의 개수 지정
        self.num_vocabs = input_dim
        self.embedding_dim = embedding_dim
        self.hidden_size = hidden_size
        self.n_layers = num_layers
        
        # 임베딩 레이어 정의 (number of vocabs, embedding dimension)
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        # LSTM (embedding dimension)
        self.rnn = nn.LSTM(embedding_dim, 
                          hidden_size, 
                          num_layers=num_layers, 
                          bidirectional=False, 
                          batch_first=True,
                         )
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        x = self.embedding(x).view(1,1,-1)
        embedded = self.dropout(x)
        outputs, (hidden, cell) = self.rnn(embedded)
        return  hidden,cell

### Decoder

In [21]:
import torch
import torch.nn as nn

class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_size, embedding_dim, num_layers, dropout):
        super(Decoder, self).__init__()
        
        self.output_dim = output_dim
        self.hidden_size = hidden_size
        self.embedding_dim = embedding_dim
        self.n_layers = num_layers
        
        # 임베딩 레이어 정의
        self.embedding = nn.Embedding(output_dim, embedding_dim)
        
        # LSTM 레이어 정의
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers=num_layers, batch_first=True)
        
        # Fully Connected Layer 정의 (output_dim = num_vocabs)
        self.fc_out = nn.Linear(hidden_size, output_dim)
        
        # Dropout 정의
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, input, hidden, cell):
        # input: (batch_size)
        # hidden: (num_layers * num_directions, batch_size, hidden_size)
        # cell: (num_layers * num_directions, batch_size, hidden_size)
        
        input = input.unsqueeze(0)  # input: (1, batch_size)
        
        embedded = self.dropout(self.embedding(input))  # embedded: (1, batch_size, embedding_dim)
        
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        # output: (1, batch_size, hidden_size)
        # hidden: (num_layers * num_directions, batch_size, hidden_size)
        
        prediction = self.fc_out(output.squeeze(0))  # prediction: (batch_size, num_vocabs)
        
        return prediction, hidden, cell

    def init_hidden(self, device, batch_size=1):
        # hidden state와 cell state 초기화
        return torch.zeros(self.num_layers, batch_size, self.hidden_size, device=device)

### Seq2Seq

In [22]:
# Seq2Seq
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

        # encoder와 decoder의 hid_dim이 일치하지 않는 경우 에러메세지
        assert encoder.hidden_size == decoder.hidden_size, \
            'Hidden dimensions of encoder decoder must be equal'
        # encoder와 decoder의 hid_dim이 일치하지 않는 경우 에러메세지
        assert encoder.n_layers == decoder.n_layers, \
            'Encoder and decoder must have equal number of layers'

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src: [src len, batch size]
        # trg: [trg len, batch size]
        
        batch_size = trg.shape[1]
        trg_len = trg.shape[0] # 타겟 토큰 길이 얻기
        trg_vocab_size = self.decoder.output_dim # context vector의 차원

        # decoder의 output을 저장하기 위한 tensor
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        # initial hidden state
        hidden, cell = self.encoder(src)

        # 첫 번째 입력값 <sos> 토큰
        input = trg[0,:]

        for t in range(1,trg_len): # <eos> 제외하고 trg_len-1 만큼 반복
            output, hidden, cell = self.decoder(input, hidden, cell)

            # prediction 저장
            outputs[t] = output

            # teacher forcing을 사용할지, 말지 결정
            teacher_force = random.random() < teacher_forcing_ratio

            # 가장 높은 확률을 갖은 값 얻기
            top1 = output.argmax(1)

            # teacher forcing의 경우에 다음 lstm에 target token 입력
            input = trg[t] if teacher_force else top1

        return outputs

### 파라미터 설정

In [23]:
# 하이퍼 파라미터 지정
input_dim = dataset.wordvocab.n_words
output_dim = dataset.wordvocab.n_words
enc_emb_dim = 64 # 임베딩 차원
dec_emb_dim = 64
hid_dim = 128 # hidden state 차원
num_layers = 2
enc_dropout = 0.5
dec_dropout = 0.5

In [24]:
dataset[0]

(tensor([[3],
         [4],
         [5],
         [6],
         [1]]),
 tensor([[ 7],
         [ 8],
         [ 9],
         [10],
         [ 1]]))

In [25]:
# 모델 생성
enc = Encoder(input_dim, enc_emb_dim, hid_dim, num_layers, enc_dropout)
dec = Decoder(output_dim, dec_emb_dim, hid_dim, num_layers, dec_dropout)

model = Seq2Seq(enc, dec, device).to(device)

In [26]:
# 가중치 초기화
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)

model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(215, 128)
    (rnn): LSTM(128, 64, num_layers=2, batch_first=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(215, 128)
    (lstm): LSTM(128, 64, num_layers=2, batch_first=True)
    (fc_out): Linear(in_features=64, out_features=215, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [27]:
# 모델의 학습가능한 파라미터 수 측정
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 234,903 trainable parameters


In [28]:
# optimizer
optimizer = optim.Adam(model.parameters())

criterion = nn.CrossEntropyLoss(ignore_index = 0)

In [29]:
epoch_loss = 0
for idx in range(len(dataset)):
    (src,trg) = dataset[idx]
    optimizer.zero_grad()

    output = model(src,trg) # [trg len, batch size, output dim]
    output_dim = output.shape[-1]
    output = output[1:].view(-1, output_dim) # loss 계산을 위해 1d로 변경
    trg = trg[1:].view(-1) # loss 계산을 위해 1d로 변경

    loss = criterion(output, trg)
    loss.backward()

    # 기울기 clip
    #torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

    optimizer.step()
    epoch_loss += loss.item()

: 

In [29]:
# 학습을 위한 함수
def train(model, dataset, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for idx in range(len(dataset)):
        (src,trg) = dataset[idx]
        optimizer.zero_grad()

        output = model(src,trg) # [trg len, batch size, output dim]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim) # loss 계산을 위해 1d로 변경
        trg = trg[1:].view(-1) # loss 계산을 위해 1d로 변경

        loss = criterion(output, trg)
        loss.backward()

        # 기울기 clip
        #torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(questions)

In [30]:
'''
# function to count training time
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs'''

'\n# function to count training time\ndef epoch_time(start_time, end_time):\n    elapsed_time = end_time - start_time\n    elapsed_mins = int(elapsed_time / 60)\n    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))\n    return elapsed_mins, elapsed_secs'

In [30]:
train_loss = train(model,dataset, optimizer, criterion, 1)
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')

: 

In [75]:
'''
# 학습 시작
num_epochs = 10
clip = 1

best_valid_loss = float('inf')

for epoch in range(num_epochs):
   
    start_time = time.time()
    
    train_loss = train(model,dataset, optimizer, criterion, clip)
    
    #end_time = time.time()
    
    #epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    #print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')'''

: 