# Import

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import re
import pickle
import pandas as pd

# Hyperparameter

In [2]:
hidden_size = 256
PAD_token = 0
SOS_token = 1
EOS_token = 2
UNK_token = 3
MAX_LENGTH = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Preprocessing

In [3]:
def clean_text(text):
    if pd.isna(text):
        return ''
    text = text.lower()
    text = re.sub(r'\d+', ' ', text)
    text = re.sub(r'([^\w\s])', r' \1 ', text)   
    text = re.sub(r'\s+', ' ', text) 
    text = text.strip()  
    return text

In [4]:
def indexesFromSentence(vocab, sentence):
    return [vocab.get(word, vocab['<UNK>']) for word in sentence.split(" ")]

In [5]:
def tensorFromSentence(vocab, sentence):
    indexes = indexesFromSentence(vocab, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

In [6]:
df = pd.read_csv('./dataset/chatbot_dataset.txt', sep='\t', names=['Question', 'Answer'])
df.head()

Unnamed: 0,Question,Answer
0,"hi, how are you doing?",i'm fine. how about yourself?
1,i'm fine. how about yourself?,i'm pretty good. thanks for asking.
2,i'm pretty good. thanks for asking.,no problem. so how have you been?
3,no problem. so how have you been?,i've been great. what about you?
4,i've been great. what about you?,i've been good. i'm in school right now.


In [7]:
df['Encoder_Inputs'] = df['Question'].apply(clean_text)
df['Decoder_Inputs'] = df['Answer'].apply(clean_text)

In [8]:
df.head()

Unnamed: 0,Question,Answer,Encoder_Inputs,Decoder_Inputs
0,"hi, how are you doing?",i'm fine. how about yourself?,"hi , how are you doing ?",i ' m fine . how about yourself ?
1,i'm fine. how about yourself?,i'm pretty good. thanks for asking.,i ' m fine . how about yourself ?,i ' m pretty good . thanks for asking .
2,i'm pretty good. thanks for asking.,no problem. so how have you been?,i ' m pretty good . thanks for asking .,no problem . so how have you been ?
3,no problem. so how have you been?,i've been great. what about you?,no problem . so how have you been ?,i ' ve been great . what about you ?
4,i've been great. what about you?,i've been good. i'm in school right now.,i ' ve been great . what about you ?,i ' ve been good . i ' m in school right now .


In [79]:
input_sentence = [sentence for sentence in df['Encoder_Inputs']]
output_sentence = [sentence + " <EOS>" for sentence in df['Decoder_Inputs']]

In [80]:
input_sentence[:5]

['hi , how are you doing ?',
 "i ' m fine . how about yourself ?",
 "i ' m pretty good . thanks for asking .",
 'no problem . so how have you been ?',
 "i ' ve been great . what about you ?"]

In [81]:
output_sentence[:5]

["i ' m fine . how about yourself ? <EOS>",
 "i ' m pretty good . thanks for asking . <EOS>",
 'no problem . so how have you been ? <EOS>',
 "i ' ve been great . what about you ? <EOS>",
 "i ' ve been good . i ' m in school right now . <EOS>"]

In [20]:
all_words = set(" ".join(df['Encoder_Inputs'].tolist() + df['Decoder_Inputs'].tolist()).split())
vocab = {'<PAD>' : PAD_token, '<SOS>' : SOS_token, '<EOS>' : EOS_token, '<UNK>' : UNK_token}
vocab.update({word : i+4 for i, word in enumerate(all_words)})
vocab_size = len(vocab)

In [22]:
with open('./dataset/vocab.pkl', 'wb') as f:
    pickle.dump(vocab, f)

In [23]:
word_to_idx = vocab
idx_to_word = {i : word for word, i in word_to_idx.items()}

# Model

In [67]:
class EncoderLSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=2)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output, hidden = self.lstm(embedded, hidden)
        return output, hidden
        
    def initHidden(self):
        return (torch.zeros(2, 1, self.hidden_size, device=device),
                torch.zeros(2, 1, self.hidden_size, device=device))

In [68]:
class DecoderLSTM(nn.Module):
    def __init__(self, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=2)
        self.out = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1)
        output = F.relu(output)
        output, hidden = self.lstm(output, hidden)
        output = self.out(output[0])
        return output, hidden

    def initHidden(self):
        return (torch.zeros(2, 1, self.hidden_size, device=device),
                torch.zeros(2, 1, self.hidden_size, device=device))

In [69]:
encoder = EncoderLSTM(vocab_size, hidden_size).to(device)
decoder = DecoderLSTM(hidden_size, vocab_size).to(device)

In [70]:
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.005)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.005)
criterion = nn.CrossEntropyLoss()

In [71]:
pairs = [list(x) for x in zip(df['Encoder_Inputs'], df['Decoder_Inputs'])]

In [72]:
pairs[0]

['hi , how are you doing ?', "i ' m fine . how about yourself ?"]

In [73]:
epoch = 100

for i in range(epoch):
    total_loss = 0

    for idx in range(len(pairs)):
        training_pair = pairs[idx]
        input_tensor = tensorFromSentence(word_to_idx, training_pair[0]).to(device)
        output_tensor = tensorFromSentence(word_to_idx, training_pair[1]).to(device)
        
        # 훈련
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        input_length = input_tensor.size(0)
        output_length = output_tensor.size(0)

        loss = 0
        encoder_hidden = encoder.initHidden()

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)

        decoder_input = torch.tensor([[SOS_token]], device=device)
        decoder_hidden = encoder_hidden

        for di in range(output_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()
            loss += criterion(decoder_output, output_tensor[di])
            if decoder_input.item() == EOS_token:
                break

        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        total_loss += loss.item() / output_length

    if i % 10 == 0:
        print(f"epoch:{i}, loss:{total_loss / len(pairs)}")

epoch:0, loss:3.6210716095386357
epoch:10, loss:3.425848677230278
epoch:20, loss:2.88936164604236
epoch:30, loss:2.1241015205164766
epoch:40, loss:1.7065108936730204
epoch:50, loss:1.337241018826608
epoch:60, loss:1.2785630731401403
epoch:70, loss:1.013029809638668
epoch:80, loss:0.8444368172917225
epoch:90, loss:0.7111186895707717


# Evaluate

In [74]:
encoder.eval()
decoder.eval()

DecoderLSTM(
  (embedding): Embedding(433, 256)
  (lstm): LSTM(256, 256, num_layers=2)
  (out): Linear(in_features=256, out_features=433, bias=True)
)

In [75]:
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromSentence(word_to_idx, sentence).to(device)
        input_length = input_tensor.size(0)
        encoder_hidden = encoder.initHidden()
        encoder_hidden = tuple([e.to(device) for e in encoder_hidden])
        
        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
        
        decoder_input = torch.tensor([[SOS_token]], device=device)
        decoder_hidden = encoder_hidden
        decoded_words = []  # output sentence
        
        for di in range(max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                 decoded_words.append(idx_to_word[topi.item()])   #여기는 최종 아웃풋의 인덱스가 들어갑니다
            decoder_input = topi.squeeze().detach()
        return ' '.join(decoded_words)

In [76]:
# 채팅함수
def chat(encoder, decoder, max_length=MAX_LENGTH):
    print("Let's chat! (type 'bye' to exit)")
    while True:
        input_sentence = input("> ")
        if input_sentence == 'bye':
            break
        output_sentence = evaluate(encoder, decoder, input_sentence)
        print('<', output_sentence)

In [78]:
chat(encoder, decoder)

Let's chat! (type 'bye' to exit)


>  hi, how are you doing?	


< you were seen some just that that . . .


>  how do you do?


< i you i see got . . . <EOS>


>  bye
