In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import re
import pickle
import pandas as pd

In [2]:
#하이퍼 파라미터
hidden_size = 256
PAD_token = 0
SOS_token = 1
EOS_token = 2
UNK_token = 3
MAX_LENGTH = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
def clean_text(text):
    if pd.isna(text):  # NaN값을 처리
        return ''
    text = text.lower()
    text = re.sub(r'\d+', ' ', text)   #숫자를 공백으로
    text = re.sub(r'([^\w\s])', r' \1 ', text)   # 마침표 앞 뒤로 공백 추가
    text = re.sub(r'\s+', ' ', text)  # 두개 이상의 공백은 하나의 공백으로..
    text = text.strip()  # 텍스트 앞 뒤의 공백 제거
    return text

In [4]:
def indexesFromSentence(vocab, sentence):
    return [vocab.get(word, vocab['<UNK>']) for word in sentence.split(' ')]

In [5]:
def tensorFromSentence(vocab, sentence):
    indexes = indexesFromSentence(vocab, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

In [6]:
class EncoderLSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=2)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output, hidden = self.lstm(embedded, hidden)
        return output, hidden

    def initHidden(self):
        return (torch.zeros(2, 1, self.hidden_size, device=device),
                torch.zeros(2, 1, self.hidden_size, device=device))

In [7]:
class DecoderLSTM(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=2)
        self.out = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1)
        output = F.relu(output)
        output, hidden = self.lstm(output, hidden)
        output = self.out(output[0])
        return output, hidden

    def initHidden(self):
        return (torch.zeros(2, 1, self.hidden_size, device=device),
                torch.zeros(2, 1, self.hidden_size, device=device))

In [8]:
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion):
    encoder_hidden = encoder.initHidden()
    
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    
    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)
    
    loss = 0
    
    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
    
    decoder_input = torch.tensor([[SOS_token]], device=device)
    decoder_hidden = encoder_hidden
    
    for di in range(target_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()
        loss += criterion(decoder_output, target_tensor[di])
        if decoder_input.item() == EOS_token:
            break
    
    loss.backward()    # backpropagation only 1 line!
    
    encoder_optimizer.step()
    decoder_optimizer.step()
    
    return loss.item() / target_length
        

In [9]:
# 학습을 반복해주는 코드
def trainIters(encoder, decoder, n_iters, print_every=1000, learning_rate=0.01):
    print_loss_total = 0
    
    for iter in range(1, n_iters+1):
        training_pair = random.choice(pairs)
        input_tensor = tensorFromSentence(word_to_ix, training_pair[0]).to(device)
        target_tensor = tensorFromSentence(word_to_ix, training_pair[1]).to(device)
        
        loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)
        print_loss_total += loss
        
        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print(f'Iteration: {iter}, Loss: {print_loss_avg: .4f}')
            print_loss_total = 0

In [10]:
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromSentence(word_to_ix, sentence).to(device)
        input_length = input_tensor.size(0)
        encoder_hidden = encoder.initHidden()
        encoder_hidden = tuple([e.to(device) for e in encoder_hidden])
        
        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
        
        decoder_input = torch.tensor([[SOS_token]], device=device)
        decoder_hidden = encoder_hidden
        decoded_words = []  # output sentence
        
        for di in range(max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                 decoded_words.append(ix_to_word[topi.item()])   #여기는 최종 아웃풋의 인덱스가 들어갑니다
            decoder_input = topi.squeeze().detach()
        return ' '.join(decoded_words)

In [11]:
# 채팅함수
def chat(encoder, decoder, max_length=MAX_LENGTH):
    print("Let's chat! (type 'bye' to exit)")
    while True:
        input_sentence = input("> ")
        if input_sentence == 'bye':
            break
        output_sentence = evaluate(encoder, decoder, input_sentence)
        print('<', output_sentence)

In [12]:
# 데이터 로드 및 기본 전처리 부분을..
df = pd.read_csv('./dataset/chatbot_dataset.txt', sep='\t', names=['Question', 'Answer'])
df['Encoder Inputs'] = df['Question'].apply(clean_text)
df['Decoder Inputs'] = df['Answer'].apply(clean_text)

In [13]:
df['Decoder Inputs']

0                      i ' m fine . how about yourself ?
1                i ' m pretty good . thanks for asking .
2                    no problem . so how have you been ?
3                   i ' ve been great . what about you ?
4         i ' ve been good . i ' m in school right now .
                             ...                        
295        i first learned how to do it in high school .
296    did you take some sort of art class or somethi...
297                         that was my favorite class .
298                        you have got to be talented .
299                                             thanks .
Name: Decoder Inputs, Length: 300, dtype: object

In [14]:
input_sentence = [sentence for sentence in df['Encoder Inputs']]
output_sentence = [sentence + "<EOS>" for sentence in df['Decoder Inputs']]

In [15]:
input_sentence[0:5]

['hi , how are you doing ?',
 "i ' m fine . how about yourself ?",
 "i ' m pretty good . thanks for asking .",
 'no problem . so how have you been ?',
 "i ' ve been great . what about you ?"]

In [16]:
output_sentence[0:5]

["i ' m fine . how about yourself ?<EOS>",
 "i ' m pretty good . thanks for asking .<EOS>",
 'no problem . so how have you been ?<EOS>',
 "i ' ve been great . what about you ?<EOS>",
 "i ' ve been good . i ' m in school right now .<EOS>"]

In [17]:
# 단어 사전 생성
all_words = set(' '.join(df['Encoder Inputs'].tolist()+df['Decoder Inputs'].tolist()).split())
vocab = {'<PAD>': PAD_token, '<SOS>': SOS_token, '<EOS>': EOS_token, '<UNK>': UNK_token}
vocab.update({word: i+4 for i, word in enumerate(all_words)})
vocab_size = len(vocab)
# vocab 변수 저장
with open('./dataset/vocab.pkl', 'wb') as f:
    pickle.dump(vocab, f)

In [18]:
word_to_ix = vocab
ix_to_word = {i: word for word, i in word_to_ix.items()}

In [19]:
word_to_ix['hello']

327

In [20]:
ix_to_word[167]

'truth'

In [21]:
encoder = EncoderLSTM(vocab_size, hidden_size).to(device)
decoder = DecoderLSTM(hidden_size, vocab_size).to(device)

In [22]:
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.005)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.005)
criterion = nn.CrossEntropyLoss()

In [23]:
# pairs 리스트를 만들어서 학습 데이터를 준비
pairs = [list(x) for x in zip(df['Encoder Inputs'], df['Decoder Inputs'])]

In [24]:
pairs[1]

["i ' m fine . how about yourself ?",
 "i ' m pretty good . thanks for asking ."]

In [25]:
#학습실행 def trainIters(encoder, decoder, n_iters, print_every=1000, learning_rate=0.01):
trainIters(encoder, decoder, 30000, print_every=1000)

Iteration: 1000, Loss:  3.1257
Iteration: 2000, Loss:  2.9322
Iteration: 3000, Loss:  2.7119
Iteration: 4000, Loss:  2.3395
Iteration: 5000, Loss:  2.0017
Iteration: 6000, Loss:  1.6962
Iteration: 7000, Loss:  1.4017
Iteration: 8000, Loss:  1.2547
Iteration: 9000, Loss:  1.0569
Iteration: 10000, Loss:  0.8863
Iteration: 11000, Loss:  0.7781
Iteration: 12000, Loss:  0.6946
Iteration: 13000, Loss:  0.7157
Iteration: 14000, Loss:  0.5904
Iteration: 15000, Loss:  0.5542
Iteration: 16000, Loss:  0.4554
Iteration: 17000, Loss:  0.5023
Iteration: 18000, Loss:  0.4672
Iteration: 19000, Loss:  0.5450
Iteration: 20000, Loss:  0.4171
Iteration: 21000, Loss:  0.4262
Iteration: 22000, Loss:  0.4565
Iteration: 23000, Loss:  0.4016
Iteration: 24000, Loss:  0.3283
Iteration: 25000, Loss:  0.4269
Iteration: 26000, Loss:  0.5385
Iteration: 27000, Loss:  0.4403
Iteration: 28000, Loss:  0.2090
Iteration: 29000, Loss:  0.1817
Iteration: 30000, Loss:  0.3995


In [26]:
torch.save(encoder.state_dict(), './models/encoder_tmp.pth')
torch.save(decoder.state_dict(), './models/decoder_tmp.pth')

In [27]:
# 평가실행
encoder.eval()
decoder.eval()

DecoderLSTM(
  (embedding): Embedding(433, 256)
  (lstm): LSTM(256, 256, num_layers=2)
  (out): Linear(in_features=256, out_features=433, bias=True)
)

In [None]:
chat(encoder, decoder)

Let's chat! (type 'bye' to exit)
> how are you?
< how shoes have you nice how <EOS>
> How's today?
< how me how she bad . <EOS>
> I think not ready yet
< really really it you really hope really ? <EOS>
> not a good chatbot
< i think m you . <EOS>
> don't think of me
< is to you the weather going ? <EOS>


In [None]:
# vocab 변수 로드
with open('./dataset/vocab.pkl', 'rb') as f:
    vocab = pickle.load(f)
vocab_size = len(vocab)
word_to_ix = vocab
ix_to_word = {i: word for word, i in word_to_ix.items()}
encoder = EncoderLSTM(vocab_size, hidden_size).to(device)
decoder = DecoderLSTM(hidden_size, vocab_size).to(device)

encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.005)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.005)