# 10.1 임베딩

## 10.1.1 희소 표현 기반 임베딩

In [None]:
# 원핫 인코딩 적용
import torch
import pandas as pd

class2=pd.read_csv("class2.csv") # 데이터셋을 메모리로 로딩

from sklearn import preprocessing
label_encoder = preprocessing.LabelEncoder() # 데이터를 인코딩하는 데 사용. 다음의 OneHotEncoder와 함께 사용
onehot_encoder = preprocessing.OneHotEncoder() # 데이터를 숫자 형식으로 표현

train_x = label_encoder.fit_transform(class2['class2'])
train_x

## 10.1.2 횟수기반 임베딩

In [None]:
# 코퍼스에 카운터 벡터 적용
from sklearn.feature_extraction.text import CountVectorizer
corpus = [
    'This is last chance.',
    'and if you do not have this chance.',
    'you will never get any chance.',
    'will you do get this one?',
    'please, get this chance',
]
vect = CountVectorizer()
vect.fit(corpus)
vect.vocabulary_

In [None]:
# 배열 변환
vect.transform(['you will never get any chance.']).toarray()

In [None]:
# 불용어를 제거한 카운터 벡터
vect = CountVectorizer(stop_words=["and", "is", "please", "this"]).fit(corpus) # stop_words를 사용하여 is, not, an 같은 불용어 제거
vect.vocabulary_

### TF-IDF

In [None]:
# TF-IDF를 적용한 후 행렬로 표현
from sklearn.feature_extraction.text import TfidfVectorizer
doc = ['I like machine learning', 'I love deep learning', 'I run everyday']
tfidf_vectorizer = TfidfVectorizer(min_df=1)
tfidf_matrix = tfidf_vectorizer.fit_transform(doc)
doc_distance = (tfidf_matrix * tfidf_matrix.T)
print ('유사도를 위한', str(doc_distance.get_shape()[0]), 'x', str(doc_distance.get_shape()[1]), 'matrix를 만들었습니다.')
print(doc_distance.toarray())

## 10.1.3 예측기반 임베딩

In [None]:
# nltk 설치
!pip install nltk

In [None]:
# nltk 패키지 불러오기
import nltk
nltk.download("all-nltk")

### 워드투벡터

In [None]:
# 데이터셋을 메모리로 로딩하고 토큰화 적용
from nltk.tokenize import sent_tokenize, word_tokenize
import warnings
warnings.filterwarnings(action = 'ignore')
import gensim
from gensim.models import Word2Vec

sample = open("peter.txt", "r", encoding='UTF8') # 피터팬 데이터셋 로딩
s = sample.read()

f = s.replace("\n", " ") # 줄바꿈을 공백으로 변환
data = []

for i in sent_tokenize(f): # 로딩한 파일의 각 문장마다 반복
    temp = []
    for j in word_tokenize(i): # 문장을 단어로 토큰화
        temp.append(j.lower()) # 토큰화된 단어를 소문자로 변환하여 temp에 저장
    data.append(temp)

data

### CBOW

In [None]:
# 데이터셋에 CBOW 적용 후 peter와 wendy의 유사성 확인
model1 = gensim.models.Word2Vec(data, min_count = 1,
                                vector_size = 100, window = 5, sg=0)
print("Cosine similarity between 'peter' " +
                 "wendy' - CBOW : ",
      model1.wv.similarity('wendy', 'wendy'))

model1 = gensim.models.Word2Vec(data, min_count = 1,
                                vector_size = 100, window = 5)

In [None]:
# peter와 hook 유사성 확인
print("Cosine similarity between 'peter' " +
                 "hook' - CBOW : ",
      model1.wv.similarity('peter', 'hook'))

### skip-gram

In [None]:
# 데이터셋에 skip gram 적용 후 peter와 wendy의 유사성 확인
model2 = gensim.models.Word2Vec(data, min_count = 1, vector_size = 100,
                                window = 5, sg = 1) # skip gram 모델 사용
print("Cosine similarity between 'peter' " +
          "wendy' - Skip Gram : ",
    model2.wv.similarity('peter', 'wendy')) # 결과 출력

In [None]:
# peter와 hook의 유사성
print("Cosine similarity between 'peter' " +
            "hook' - Skip Gram : ",
      model2.wv.similarity('peter', 'hook'))

### 패스트텍스트

In [None]:
# 라이브러리 및 데이터 호출
from gensim.test.utils import common_texts
from gensim.models import FastText

model=FastText('..\chap10\data\peter.txt', vector_size=4, window=3, min_count=1, epochs=10)

In [None]:
# peter, wendy에 대한 코사인 유사도
sim_score=model.wv.similarity('peter', 'wendy')
print(sim_score)

In [None]:
# peter, hook에 대한 코사인 유사도
sim_score=model.wv.similarity('peter', 'hook')
print(sim_score)

In [None]:
# 라이브러리와 사전 훈련된 모델 호출
from __future__ import print_function
from gensim.models import KeyedVectors # gensim은 자연어를 벡터로 변환하는 데 필요한 편의 기능을 제공하는 라이브러리입니다

model_kr=KeyedVectors.load_word2vec_format('..\chap10\data\wiki.ko.vec') # wiki.ko.vec 파일을 메모리로 불러오기

In [None]:
# 노력과 유사한 단어와 유사도 확인
find_similar_to='노력'

for similar_word in model_kr.similar_by_word(find_similar_to):
    print("Word: {0}, Similarity: {1:.2f}".format(
        similar_word[0], similar_word[1]
    ))

In [None]:
# 동물, 육식동물에는 긍정적이지만 사람에는 부정적인 단어와 유사도 확인
similarities=model_kr.most_similar(positive=['동물', '육식동물'], negative=['사람'])
print(similarities)

## 10.1.4 횟수/예측 기반 임베딩

### 글로브

In [None]:
# 라이브러리 및 데이터셋 로딩
import numpy as np
%matplotlib notebook
import matplotlib.pyplot as plt
plt.style.use('ggplot')
from sklearn.decomposition import PCA
from gensim.test.utils import datapath, get_tmpfile
from gensim.models import KeyedVectors
import gensim.scripts.glove2word2vec import glove2word2vec

glove_file=datapath('..\chap10\data\glove.6B.100d.txt')
word2vec_glove_file=get_tmpfile("glove.6B.100d.word2vec.txt") # 글로브 데이터를 위한 워드투벡터 형태로 변환
glove2word2vec(glove_file, word2vec_glove_file)

In [None]:
# bill과 유사한 단어의 리스트를 반환
model = KeyedVectors.load_word2vec_format(word2vec_glove_file) # load_word2vec_format 메서드를 이용해 word2vec.c 형식으로 벡터를 가져옴
model.most_similar('bill') # 단어(bill) 기준으로 가장 유사한 단어들의 리스트

In [None]:
# cherry와 유사한 단어의 리스트를 반환
model.most_similar('cherry') # 단어(cherry) 기준으로 가장 유사한 단어들의 리스트

In [None]:
# cherry와 관련성이 없는 단어의 리스트를 반환
model.most_similar(negative=['cherry']) # 단어(cherry)와 관련성이 없는 단어들을 반환

In [None]:
# woman, king과 유사성이 높으면서 man과 관련성이 없는 단어 반환
result=model.most_similar(positive=['woman', 'king'], negative=['man'])
print("{}: {:.4f}".format(*result[0]))

In [None]:
# australia, beer, france와 관련성이 있는 단어 반환
def analogy(x1, x2, y1):
    result = model.most_similar(positive=[y1, x2], negative=[x1])
    return result[0][0]
analogy('australia', 'beer', 'france')

In [None]:
# tall, tallest, long 단어 기반으로 새로운 단어 유추
analogy('tall', 'tallest', 'long')

In [None]:
# breakfast cereal dinner lunch 중 유사도가 낮은 단어 반환
print(model.doesnt_match("breakfast cereal dinner lunch".split())) # 유사도가 가장 낮은 단어 반환

# 10.2 트랜스포머 어텐션

## 10.2.1 seq2seq

In [None]:
# 라이브러리 호출
from __future__ import unicode_literals, print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd

import os
import re
import random

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# 데이터 준비
SOS_token = 0
EOS_token = 1
MAX_LENGTH = 20

class Lang: # 딕셔너리를 만들기 위한 클래스
    def __init__(self): # 단어의 인덱스를 저장하기 위한 컨테이너를 초기화
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS"} # SOS: 문장의 시작, EOS: 문장의 끝
        self.n_words = 2

    def addSentence(self, sentence): # 문장을 단어 단위로 분리한 후 컨테이너에 추가
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word): # 컨테이너에 단어가 없다면 추가되고, 있다면 카운트를 업데이트
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

In [None]:
# 데이터 정규화
def normalizeString(df, lang):
    sentence = df[lang].str.lower()
    sentence = sentence.str.replace('[^A-Za-z\s]+', '') # a-z, A-Z, ..., ?, ! 등을 제외하고 모두 공백으로 바꿈
    sentence = sentence.str.normalize('NFD') # 유니코드 정규화 방식
    sentence = sentence.str.encode('ascii', errors='ignore').str.decode('utf-8') # Unicode를 ASCII로 전환
    return sentence

def read_sentence(df, lang1, lang2):
    sentence1 = normalizeString(df, lang1) # 데이터셋의 첫 번째 열(영어)
    sentence2 = normalizeString(df, lang2) # 데이터셋의 두 번째 열(프랑스어)
    return sentence1, sentence2

def read_file(loc, lang1, lang2):
    df = pd.read_csv(loc, delimiter='\t', header=None, names=[lang1, lang2])
    return df

def process_data(lang1,lang2):
    df = read_file('../chap10/data/%s-%s.txt' % (lang1, lang2), lang1, lang2) # 데이터셋 불러오기
    sentence1, sentence2 = read_sentence(df, lang1, lang2)

    input_lang = Lang()
    output_lang = Lang()
    pairs = []
    for i in range(len(df)):
        if len(sentence1[i].split(' ')) < MAX_LENGTH and len(sentence2[i].split(' ')) < MAX_LENGTH:
            full = [sentence1[i], sentence2[i]] # 첫 번째와 두 번째 열을 합쳐서 저장
            input_lang.addSentence(sentence1[i]) # 입력으로 영어 사용
            output_lang.addSentence(sentence2[i]) # 출력으로 프랑스어 사용
            pairs.append(full) # pairs에는 입력과 출력이 합쳐진 것을 사용

    return input_lang, output_lang, pairs

In [None]:
# 텐서로 변환
def indexesFromSentence(lang, sentence): # 문장을 단어로 분리하고 인덱스를 반환
    return [lang.word2index[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence): # 딕셔너리에서 단어에 대한 인덱스를 가져오고 문장 끝에 토큰을 추가
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorsFromPair(input_lang, output_lang, pair): # 입력과 출력 문장을 텐서로 변환하여 반환
    input_tensor = tensorFromSentence(input_lang, pair[0])
    target_tensor = tensorFromSentence(output_lang, pair[1])
    return (input_tensor, target_tensor)

In [None]:
# 인코더 네트워크
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, embbed_dim, num_layers):
        super(Encoder, self).__init__()
        self.input_dim = input_dim # 인코더에서 사용할 입력층
        self.embbed_dim = embbed_dim # 인코더에서 사용할 임베딩 계층
        self.hidden_dim = hidden_dim # 인코더에서 사용할 은닉층(이전 은닉층)
        self.num_layers = num_layers # 인코더에서 사용할 GRU의 계층 개수
        self.embedding = nn.Embedding(input_dim, self.embbed_dim) # 임베딩 계층 초기화
        self.gru = nn.GRU(self.embbed_dim, self.hidden_dim, num_layers=self.num_layers) # 임베딩 차원, 은닉층 차원, GRU의 계층 개수를 이용하여 GRU 계층을 초기화

    def forward(self, src):
        embedded = self.embedding(src).view(1,1,-1) # 임베딩 처리
        outputs, hidden = self.gru(embedded) # 임베딩 결과를 GRU 모델에 적용
        return outputs, hidden

In [None]:
# 디코더 네트워크
class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, embbed_dim, num_layers):
        super(Decoder, self).__init__()

        self.embbed_dim = embbed_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_layers = num_layers

        self.embedding = nn.Embedding(output_dim, self.embbed_dim) # 임베딩 계층 초기화
        self.gru = nn.GRU(self.embbed_dim, self.hidden_dim, num_layers=self.num_layers) # GRU 계층 초기화
        self.out = nn.Linear(self.hidden_dim, output_dim) # 선형 계층 초기화
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        input = input.view(1, -1) # 입력을 (1,배치 크기)로 변경
        embedded = F.relu(self.embedding(input))
        output, hidden = self.gru(embedded, hidden)
        prediction = self.softmax(self.out(output[0]))
        return prediction, hidden

In [None]:
# seq2seq 네트워크
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device, MAX_LENGTH=MAX_LENGTH):
        super().__init__()

        self.encoder = encoder # 인코더 초기화
        self.decoder = decoder # 디코더 초기화
        self.device = device

    def forward(self, input_lang, output_lang, teacher_forcing_ratio=0.5):

        input_length = input_lang.size(0) # 입력 문자 길이(문장 단어 수)
        batch_size = output_lang.shape[1]
        target_length = output_lang.shape[0]
        vocab_size = self.decoder.output_dim
        outputs = torch.zeros(target_length, batch_size, vocab_size).to(self.device) # 예측된 출력을 저장하기 위한 변수 초기화

        for i in range(input_length):
            encoder_output, encoder_hidden = self.encoder(input_lang[i]) # 문장의 모든 단어를 인코딩
        decoder_hidden = encoder_hidden.to(device) # 인코더의 은닉층을 디코더의 은닉층으로 사용
        decoder_input = torch.tensor([SOS_token], device=device) # 첫번째 예측 단어 앞에 토큰(SOS) 추가

        for t in range(target_length): # 현재 단어에서 출력 단어를 예측
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
            outputs[t] = decoder_output
            teacher_force = random.random() < teacher_forcing_ratio
            topv, topi = decoder_output.topk(1)
            input = (output_lang[t] if teacher_force else topi) # teacher_force를 활성화하면 목표 단어를 다음 입력으로 사용
            if(teacher_force == False and input.item() == EOS_token): # teacher_force를 활성화하지 않으면 자체 예측 값을 다음 입력으로 사용
                break
        return outputs

In [None]:
# 모델의 오차 계산 함수 정의
teacher_forcing_ratio = 0.5

def Model(model, input_tensor, target_tensor, model_optimizer, criterion):
    model_optimizer.zero_grad()
    input_length = input_tensor.size(0)
    loss = 0
    epoch_loss = 0
    output = model(input_tensor, target_tensor)
    num_iter = output.size(0)

    for ot in range(num_iter):
        loss += criterion(output[ot], target_tensor[ot]) # 모델의 예측 결과와 정답(예상 결과)을 이용하여 오차 계산

    loss.backward()
    model_optimizer.step()
    epoch_loss = loss.item() / num_iter
    return epoch_loss

In [None]:
# 모델 훈련 함수 정의
def trainModel(model, input_lang, output_lang, pairs, num_iteration=20000):
    model.train()
    optimizer = optim.SGD(model.parameters(), lr=0.01) # 옵티마이저로 SGD를 사용
    criterion = nn.NLLLoss()
    total_loss_iterations = 0

    training_pairs = [tensorsFromPair(input_lang, output_lang, random.choice(pairs))
                      for i in range(num_iteration)]

    for iter in range(1, num_iteration+1):
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]
        loss = Model(model, input_tensor, target_tensor, optimizer, criterion) # Model 객체를 이용하여 오차 계산
        total_loss_iterations += loss

        if iter % 5000 == 0: # 5000번째마다 오차 값에 대해 출력
            avarage_loss= total_loss_iterations / 5000
            total_loss_iterations = 0
            print('%d %.4f' % (iter, avarage_loss))

    torch.save(model.state_dict(), '../chap10/data/mytraining.pt')
    return model

In [None]:
# 모델 평가
def evaluate(model, input_lang, output_lang, sentences, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentences[0]) # 입력 문자열을 텐서로 변환
        output_tensor = tensorFromSentence(output_lang, sentences[1]) # 출력 문자열을 텐서로 변환
        decoded_words = []
        output = model(input_tensor, output_tensor)

        for ot in range(output.size(0)):
            topv, topi = output[ot].topk(1) # 각 출력에서 가장 높은 값을 찾아 인덱스를 반환

            if topi[0].item() == EOS_token:
                decoded_words.append('<EOS>') # EOS 토큰을 만나면 평가를 멈춤
                break
            else:
                decoded_words.append(output_lang.index2word[topi[0].item()]) # 예측 결과를 출력 문자열에 추가
    return decoded_words

def evaluateRandomly(model, input_lang, output_lang, pairs, n=10): # 훈련 데이터셋으로부터 임의의 문장을 가져와서 모델 평가
    for i in range(n):
        pair = random.choice(pairs) # 임의로 문장을 가져옴
        print('input {}'.format(pair[0]))
        print('output {}'.format(pair[1]))
        output_words = evaluate(model, input_lang, output_lang, pair) # 모델 평가 결과는 output_words에 저장
        output_sentence = ' '.join(output_words)
        print('predicted {}'.format(output_sentence))

In [None]:
# 모델 훈련
lang1 = 'eng' # 입력으로 사용할 영어
lang2 = 'fra' # 출력으로 사용할 프랑스어
input_lang, output_lang, pairs = process_data(lang1, lang2)

randomize = random.choice(pairs)
print('random sentence {}'.format(randomize))

input_size = input_lang.n_words
output_size = output_lang.n_words
print('Input : {} Output : {}'.format(input_size, output_size)) # 입력과 출력에 대한 단어 수 출력

embed_size = 256
hidden_size = 512
num_layers = 1
num_iteration = 75000 # 75000번 반복하여 모델 훈련

encoder = Encoder(input_size, hidden_size, embed_size, num_layers) # 인코더에 훈련 데이터셋을 입력하고 모든 출력과 은닉 상태를 저장
decoder = Decoder(output_size, hidden_size, embed_size, num_layers) # 디코더의 첫번째 입력으로 SOS 토큰이 제공, 인코더의 마지막 은닉 상태가 디코더의 첫번째 은닉 상태로 제공

model = Seq2Seq(encoder, decoder, device).to(device) # 인코더 디코더 모델(seq2seq) 객체 생성

print(encoder)
print(decoder)

model = trainModel(model, input_lang, output_lang, pairs, num_iteration) # 모델 학습

In [None]:
# 임의의 문장에 대한 평가 결과
evaluateRandomly(model, input_lang, output_lang, pairs)

In [None]:
# 어텐션이 적용된 디코더
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size) # 임베딩 계층 초기화
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

In [None]:
# 어텐션 디코더 모델 학습을 위한 함수
def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01):
    start = time.time()
    plot_losses = []
    print_loss_total = 0
    plot_loss_total = 0

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate) # 인코더와 디코더에 SGD 옵티마이저 적용
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [tensorsFromPair(input_lang, output_lang, random.choice(pairs))
                      for i in range(n_iters)]
    criterion = nn.NLLLoss()

    for iter in range(1, n_iters + 1):
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0] # 입출력 쌍에서 입력을 input_tensor로 사용
        target_tensor = training_pair[1] # 입출력 쌍에서 출력을 target_tensor로 사용
        loss = Model(model, input_tensor, target_tensor, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if iter % 5000 == 0: # 모델을 75000번 훈련을 진행하여 5000번째마다 오차를 출력
            print_loss_avg = print_loss_total / 5000
            print_loss_total = 0
            print('%d,  %.4f' % (iter, print_loss_avg))

In [None]:
# 어텐션 디코더 모델 훈련
import time

embed_size = 256
hidden_size = 512
num_layers = 1
input_size = input_lang.n_words
output_size = output_lang.n_words

encoder1 = Encoder(input_size, hidden_size, embed_size, num_layers)
attn_decoder1 = AttnDecoderRNN(hidden_size, output_size, dropout_p=0.1).to(device)

print(encoder1)
print(attn_decoder1)

attn_model = trainIters(encoder1, attn_decoder1, 75000, print_every=5000, plot_every=100, learning_rate=0.01)

## 10.2.2 버트(BERT)

In [None]:
!pip install transformers

In [None]:
!pip install pytorch-transformers

In [None]:
# 라이브러리 호출
import matplotlib.pyplot as plt
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from pytorch_transformers import BertTokenizer, BertForSequenceClassification # 버트 사용을 위한 라이브러리
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix # 모델 평가를 위해 사용
import seaborn as sns

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# 데이터셋 불러오기
train_df = pd.read_csv('../chap10/data/training.txt', sep='\t')
valid_df = pd.read_csv('../chap10/data/validing.txt', sep='\t')
test_df = pd.read_csv('../chap10/data/testing.txt', sep='\t')

In [None]:
# 불러온 데이터셋 중 일부만 사용
train_df = train_df.sample(frac=0.1, random_state=500)
valid_df = valid_df.sample(frac=0.1, random_state=500)
test_df = test_df.sample(frac=0.1, random_state=500)

In [None]:
# 데이터셋 생성
class Datasets(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        text = self.df.iloc[idx, 1]
        label = self.df.iloc[idx, 2]
        return text, label

In [None]:
# 데이터셋의 데이터를 데이터로더로 전달
train_dataset = Datasets(train_df)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=0)

valid_dataset = Datasets(valid_df)
valid_loader = DataLoader(valid_dataset, batch_size=2, shuffle=True, num_workers=0)

test_dataset = Datasets(test_df)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=True, num_workers=0)

In [None]:
# 버트 토크나이저 내려받기
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased')
model.to(device)

In [None]:
# 최적화 모델 저장
def save_checkpoint(save_path, model, valid_loss): # 모델 평가를 위해 훈련 과정을 저장
    if save_path == None:
        return
    state_dict = {'model_state_dict': model.state_dict(),
                  'valid_loss': valid_loss}

    torch.save(state_dict, save_path)
    print(f'Model saved to ==> {save_path}')

def load_checkpoint(load_path, model): # save_checkpoint 함수에서 저장된 모델을 가져옴
    if load_path==None:
        return
    state_dict = torch.load(load_path, map_location=device)
    print(f'Model loaded from <== {load_path}')

    model.load_state_dict(state_dict['model_state_dict'])
    return state_dict['valid_loss']

def save_metrics(save_path, train_loss_list, valid_loss_list, global_steps_list): # 훈련, 검증에 대한 오차와 에포크를 저장
    if save_path == None:
        return
    state_dict = {'train_loss_list': train_loss_list,
                  'valid_loss_list': valid_loss_list,
                  'global_steps_list': global_steps_list}
    torch.save(state_dict, save_path)
    print(f'Model saved to ==> {save_path}')

def load_metrics(load_path): # save_metrics에 저장해 둔 정보를 불러옴
    if load_path==None:
        return
    state_dict = torch.load(load_path, map_location=device)
    print(f'Model loaded from <== {load_path}')
    return state_dict['train_loss_list'], state_dict['valid_loss_list'], state_dict['global_steps_list']

In [None]:
# 모델 훈련 함수 정의
def train(model,
          optimizer,
          criterion = nn.BCELoss(), # 영화 리뷰는 좋고 나쁨만 있으므로 BinaryCrossEntropy를 사용
          num_epochs = 5,
          eval_every = len(train_loader) // 2,
          best_valid_loss = float("Inf")):

    total_correct = 0.0
    total_len = 0.0
    running_loss = 0.0
    valid_running_loss = 0.0
    global_step = 0
    train_loss_list = []
    valid_loss_list = []
    global_steps_list = []

    model.train() # 모델 훈련
    for epoch in range(num_epochs):
        for text, label in train_loader:
            optimizer.zero_grad()
            encoded_list = [tokenizer.encode(t, add_special_tokens=True) for t in text]
            padded_list =  [e + [0] * (512-len(e)) for e in encoded_list] # 인코딩 결과에 제로패딩 적용

            sample = torch.tensor(padded_list)
            sample, label = sample.to(device), label.to(device)
            labels = torch.tensor(label)
            outputs = model(sample, labels=labels)
            loss, logits = outputs

            pred = torch.argmax(F.softmax(logits), dim=1)
            correct = pred.eq(labels)
            total_correct += correct.sum().item()
            total_len += len(labels)
            running_loss += loss.item()
            loss.backward()
            optimizer.step()
            global_step += 1

            if global_step % eval_every == 0: # 모델 평가
                model.eval()
                with torch.no_grad():
                    for text, label in valid_loader:
                        encoded_list = [tokenizer.encode(t, add_special_tokens=True) for t in text]
                        padded_list =  [e + [0] * (512-len(e)) for e in encoded_list]
                        sample = torch.tensor(padded_list)
                        sample, label = sample.to(device), label.to(device)
                        labels = torch.tensor(label)
                        outputs = model(sample, labels=labels)
                        loss, logits = outputs
                        valid_running_loss += loss.item()

                average_train_loss = running_loss / eval_every
                average_valid_loss = valid_running_loss / len(valid_loader)
                train_loss_list.append(average_train_loss)
                valid_loss_list.append(average_valid_loss)
                global_steps_list.append(global_step)

                running_loss = 0.0
                valid_running_loss = 0.0
                model.train()

                print('Epoch [{}/{}], Step [{}/{}], Train Loss: {:.4f}, Valid Loss: {:.4f}'
                      .format(epoch+1, num_epochs, global_step, num_epochs*len(train_loader),
                              average_train_loss, average_valid_loss))

                if best_valid_loss > average_valid_loss:
                    best_valid_loss = average_valid_loss
                    save_checkpoint('../chap10/data/model.pt', model, best_valid_loss) # 오차가 작아지면 모델 저장
                    save_metrics('../chap10/data/metrics.pt', train_loss_list, valid_loss_list, global_steps_list) # 평가에 사용된 훈련 오차, 검증 오차, 에폭 저장

    save_metrics('../chap10/data/metrics.pt', train_loss_list, valid_loss_list, global_steps_list) # 최종으로 사용된 훈련 오차, 검증 오차, 에폭 저장
    print('훈련 종료!')

In [None]:
# 모델의 파라미터(옵티마이저) 미세 조정 및 모델 훈련
optimizer = optim.Adam(model.parameters(), lr=2e-5) # 아담과 적절한 학습률로 버트 모델을 미세 조정
train(model=model, optimizer=optimizer) # 모델 학습

In [None]:
# 오차 정보를 그래프로 확인
train_loss_list, valid_loss_list, global_steps_list = load_metrics('../chap10/data/metrics.pt') # 최종으로 저장된 모델을 불러옴
plt.plot(global_steps_list, train_loss_list, label='Train') # 훈련 데이터셋에 대한 오차
plt.plot(global_steps_list, valid_loss_list, label='Valid') # 검증 데이터셋에 대한 오차
plt.xlabel('Global Steps')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# 모델 평가 함수 정의
def evaluate(model, test_loader):
    y_pred = []
    y_true = []

    model.eval() # 테스트 데이터셋으로 모델 평가
    with torch.no_grad():
        for text, label in test_loader:
            encoded_list = [tokenizer.encode(t, add_special_tokens=True) for t in text]
            padded_list =  [e + [0] * (512-len(e)) for e in encoded_list]

            sample = torch.tensor(padded_list)
            sample, label = sample.to(device), label.to(device)
            labels = torch.tensor(label)
            output = model(sample, labels=labels)

            _, output = output
            y_pred.extend(torch.argmax(output, 1).tolist())
            y_true.extend(labels.tolist())

    print('Classification 결과:')
    print(classification_report(y_true, y_pred, labels=[1,0], digits=4))

    cm = confusion_matrix(y_true, y_pred, labels=[1,0])
    ax= plt.subplot()
    sns.heatmap(cm, annot=True, ax = ax, cmap='Blues', fmt="d")

    ax.set_title('Confusion Matrix')
    ax.set_xlabel('Predicted Labels')
    ax.set_ylabel('True Labels')
    ax.xaxis.set_ticklabels(['0', '1'])
    ax.yaxis.set_ticklabels(['0', '1'])

In [None]:
# 모델 평가
import warnings
warnings.filterwarnings('ignore')
best_model = model.to(device)
load_checkpoint('../chap10/data/model.pt', best_model)
evaluate(best_model, test_loader)