<a href="https://colab.research.google.com/github/junieberry/NLP-withPyTorch/blob/main/05_CBOW_Frankenstein/05_CBOW_Frankenstein.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
from argparse import Namespace
from collections import Counter
import json
import re
import string

import numpy as np
import pandas as pd
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import tqdm

## 5.2 CBOW 임베딩 학습하기

**CBOWDataset**

In [2]:
class CBOWDataset(Dataset):

  ## cbow_df == 데이터셋 (pandas.DataFrame)
  ## vectorizer == 데이터셋에서 만든 CBOWVectorizer 객체 (CBOWVectorizer)
  def __init__(self, cbow_df, vectorizer):
    self.cbow_df = cbow_df
    self._vectorizer = vectorizer

    ## 빈칸 기준으로 찢어서 최대 길이를 반환
    measure_len = lambda context:len(context.split(" "))
    self._max_seq_length = max(map(measure_len, cbow_df.context))

    self.train_df = self.cbow_df[self.cbow_df.split=='train']
    self.train_size = len(self.train_df)

    self.val_df = self.cbow_df[self.cbow_df.split=='val']
    self.validation_size = len(self.val_df)

    self.test_df = self.cbow_df[self.cbow_df.split=='test']
    self.test_size = len(self.test_df)

    self._lookup_dict = {'train': (self.train_df, self.train_size),
                         'val': (self.val_df, self.validation_size),
                         'test': (self.test_df, self.test_size)}

    self.set_split('train')
  

  # Dataset 가져와서 새 Vectorizer 만들기
  ## cbow_csv == 데이터셋의 위치 (str)
  ## return == CBOWDataset 인스턴스
  @classmethod
  def load_dataset_and_make_vectorizer(cls, cbow_csv):
    cbow_df = pd.read_csv(cbow_csv)
    train_cbow_df = cbow_df[cbow_df.split=='train']
    return cls(cbow_df, CBOWVectorizer.from_dataframe(train_cbow_df))

  def set_split(self, split="train"):
    """ 데이터프레임에 있는 열을 사용해 분할 세트를 선택합니다 """
    self._target_split = split
    self._target_df, self._target_size = self._lookup_dict[split]

  def __len__(self):
    return self._target_size

    # 데이터셋의 진입 메서드
    ## index (int) == 데이터 포인트의 인덱스
    ## return == x_data와 y_target으로 이루어진 딕셔너리
  def __getitem__(self, index):
    row = self._target_df.iloc[index]

    context_vector = self._vectorizer.vectorize(row.context, self._max_seq_length)
    target_index = self._vectorizer.cbow_vocab.lookup_token(row.target)

    return {'x_data': context_vector,'y_target':target_index}

  def get_num_batches(self, batch_size):
    """배치 크기가 주어지면 데이터셋으로 만들 수 있는 배치 개수를 반환합니다
        
        매개변수:
            batch_size (int)
        반환값:
            배치 개수
    """
    return len(self) // batch_size
    
  def generate_batches(dataset, batch_size, shuffle=True,
                      drop_last=True, device="cpu"): 
      """
      파이토치 DataLoader를 감싸고 있는 제너레이터 함수.
      걱 텐서를 지정된 장치로 이동합니다.
      """
      dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                              shuffle=shuffle, drop_last=drop_last)

      for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
          out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

  def save_vectorizer(self, vectorizer_filepath):
    """CBOWVectorizer 객체를 json 형태로 디스크에 저장합니다
        
        매개변수:
            vectorizer_filepath (str): CBOWVectorizer 객체의 저장 위치
    """
    with open(vectorizer_filepath, "w") as fp:
        json.dump(self._vectorizer.to_serializable(), fp)

  def get_vectorizer(self):
    """ 벡터 변환 객체를 반환합니다 """
    return self._vectorizer

**CBOWVocabulary**

In [3]:
class Vocabulary(object):

  ##token_to_idx (dict): 기존 토큰-인덱스 매핑 딕셔너리
  ## mask_token (str) : Vocab에 추가할 MASK 토큰
  ## add_unk (bool): UNK 토큰을 추가할지 지정하는 플래그
  ## unk_token (str): Vocabulary에 추가할 UNK 토큰
  def __init__(self, token_to_idx=None,  mask_token="<MASK>", add_unk=True, unk_token="<UNK>"):
    if token_to_idx is None:
      token_to_idx = {}
    
    self._token_to_idx = token_to_idx

    self._idx_to_token = {idx: token 
                          for token, idx in self._token_to_idx.items()}
    
    self._add_unk = add_unk
    self._unk_token = unk_token

    ## CBOW에서 ㅊ가
    self._mask_token = mask_token

    self.mask_index = self.add_token(self._mask_token)

    self.unk_index = -1
    if add_unk:
      self.unk_index = self.add_token(unk_token)
  
  ## 직렬화할 수 있는 딕셔너리를 반환
  def to_serializable(self):
    return { 'token_to_idx': self._token_to_idx,
            'add_unk': self._add_unk,
            'unk_token': self._unk_token
    }
  
  @classmethod
  ## 직렬화된 딕셔너리에서 Vocabulary 객체를 만든다.
  def from_serializable(cls, contents):
    return cls(**contents)
  
  ## 토큰 추가하고 매핑 딕셔너리를 업데이트
  ## token (str) == Vocabulary에 추가할 토큰
  ## return == 토큰의 index
  def add_token(self, token):
    if token in self._token_to_idx:
      index = self._token_to_idx[token]
    else:
      index = len(self._token_to_idx)
      self._token_to_idx[token] = index
      self._idx_to_token[index] = token
    return index


  ## 토큰들을 추가하고 매핑 딕셔너리를 업데이트
  ## tokens (list) == 문자열 토큰 리스트
  ## return == 토큰들의 indices
  def add_many(self, tokens):
    return [self.add_token(token) for token in tokens]


  ## 토큰에 대응하는 인덱스 추출, 토큰이 없으면 UNK 인덱스 반환
  ## token (str) == 찾을 토큰
  ## index (int) == 토큰에 해당하는 인덱스
  def lookup_token(self, token):
        
        ## ??????
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

  ## 인덱스에 해당하는 토큰 반환
  ## index (int) == 찾을 인덱스
  ## return = 인덱스의 토큰
  def lookup_index(self, index):
    if index not in self._idx_to_token:
      raise KeyError("Vocabulary에 인덱스(%d)가 없습니다." % index)
    return self._idx_to_token[index]

  def __str__(self):
    return "<Vocabulary(size=%d)>" % len(self)

  def __len__(self):
    return len(self._token_to_idx)

  def get_vectorizer(self):
    """ 벡터 변환 객체를 반환합니다 """
    return self._vectorizer

**CBOWVectorizer**



In [4]:
class CBOWVectorizer(object):
  # cbow_vocab (Vocabulary) == 단어를 정수에 메핑
  def __init__(self, cbow_vocab):
    self.cbow_vocab = cbow_vocab


  ## context (str) == 공백으로 나누어진 단어 문자열
  ## vector_length (int) == 인덱스 벡터읙 ㅣㄹ이
  def vectorize(self, context, vector_length=-1):
    
    indices = [self.cbow_vocab.lookup_token(token) for token in context.split(' ')]
    if vector_length < 0:
      vector_length = len(indices)
    
    ## 0으로 패딩
    out_vector = np.zeros(vector_length, dtype=np.int64)
    out_vector[:len(indices)] = indices
    out_vector[len(indices):] = self.cbow_vocab.mask_index

    return out_vector

  # 데이터프레임에서 Vectorizer 만든다

  @classmethod
  def from_dataframe(cls, cbow_df):
    cbow_vocab = Vocabulary()
    for index, row in cbow_df.iterrows():
      for token in row.context.split(' '):
        cbow_vocab.add_token(token)
        cbow_vocab.add_token(row.target)
    
    return cls(cbow_vocab)


  @classmethod
  def from_serializable(cls, contents):
    cbow_vocab = Vocabulary.from_serializable(contents['cbow_vocab'])
    return cls(cbow_vocab=cbow_vocab)

  def to_serializable(self):
    return {'cbow_vocab': self.cbow_vocab.to_serializable()}


**DataLoader**

In [5]:

def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"):

    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)


    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

### 5.2.3 CBOW Classifier

1. Embedding 사용해 문맥의 단어를 나타내는 인덱스를 벡터로 변환
2. 전반적인 문맥을 위해 벡터 결합 (더하거나 최댓값, 평균값, 다층 퍼셉트론 사용 가능)
3. Linear층에서 문맥 벡터를 사용해 예측 벡터 계산

  예측 벡터는 전체 어휘 사전에 대한 확률 분포를 나타낸다.


Embedding 층은 임베딩 개수와 임베딩 크기, padding_idx로 재어된다~

In [6]:
class CBOWClassifier(nn.Module):

  def __init__(self, vocabulary_size, embedding_size, padding_idx=0):

    super(CBOWClassifier, self).__init__()

    self.embedding = nn.Embedding(
        num_embeddings = vocabulary_size,
        embedding_dim = embedding_size,
        padding_idx = padding_idx
    )

    self.fc1 = nn.Linear(
        in_features=embedding_size,
        out_features=vocabulary_size)
  

  def forward(self, x_in, apply_softmax=False):

    x_embedding_sum = F.dropout(self.embedding(x_in).sum(dim=1), 0.3)
    y_out = self.fc1(x_embedding_sum)

    if apply_softmax:
      y_out = F.softmax(y_out, dim=1)
    
    return y_out

In [7]:

def make_train_state(args):
    return {'stop_early': False,
            'early_stopping_step': 0,
            'early_stopping_best_val': 1e8,
            'learning_rate': args.learning_rate,
            'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1,
            'model_filename': args.model_state_file}

def update_train_state(args, model, train_state):
    """ 훈련 상태를 업데이트합니다.

    Components:
     - 조기 종료: 과대 적합 방지
     - 모델 체크포인트: 더 나은 모델을 저장합니다

    :param args: 메인 매개변수
    :param model: 훈련할 모델
    :param train_state: 훈련 상태를 담은 딕셔너리
    :returns:
        새로운 훈련 상태
    """

    # 적어도 한 번 모델을 저장합니다
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # 성능이 향상되면 모델을 저장합니다
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # 손실이 나빠지면
        if loss_t >= train_state['early_stopping_best_val']:
            # 조기 종료 단계 업데이트
            train_state['early_stopping_step'] += 1
        # 손실이 감소하면
        else:
            # 최상의 모델 저장
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # 조기 종료 단계 재설정
            train_state['early_stopping_step'] = 0

        # 조기 종료 여부 확인
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state

def compute_accuracy(y_pred, y_target):
    _, y_pred_indices = y_pred.max(dim=1)
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)

In [8]:

args = Namespace(
    # 날짜와 경로 정보
    cbow_csv="data/books/frankenstein_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch5/cbow",
    # 모델 하이퍼파라미터
    embedding_size=50,
    # 훈련 하이퍼파라미터
    seed=1337,
    num_epochs=100,
    learning_rate=0.0001,
    batch_size=32,
    early_stopping_criteria=5,
    # 실행 옵션
    cuda=True,
    catch_keyboard_interrupt=True,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True
)

if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)
    
    print("파일 경로: ")
    print("\t{}".format(args.vectorizer_file))
    print("\t{}".format(args.model_state_file))
    

# CUDA 체크
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")
    
print("CUDA 사용여부: {}".format(args.cuda))

# 재현성을 위해 시드 설정
set_seed_everywhere(args.seed, args.cuda)

# 디렉토리 처리
handle_dirs(args.save_dir)

파일 경로: 
	model_storage/ch5/cbow/vectorizer.json
	model_storage/ch5/cbow/model.pth
CUDA 사용여부: True


In [9]:
cd/content/drive/MyDrive/nlp-with-pytorch/chapter_5/5_2_CBOW

/content/drive/MyDrive/nlp-with-pytorch/chapter_5/5_2_CBOW


In [10]:
if args.reload_from_files:
    print("데이터셋과 Vectorizer를 로드합니다")
    dataset = CBOWDataset.load_dataset_and_load_vectorizer(args.cbow_csv,
                                                           args.vectorizer_file)
else:
    print("데이터셋을 로드하고 Vectorizer를 만듭니다")
    dataset = CBOWDataset.load_dataset_and_make_vectorizer(args.cbow_csv)
    dataset.save_vectorizer(args.vectorizer_file)
    
vectorizer = dataset.get_vectorizer()

classifier = CBOWClassifier(vocabulary_size=len(vectorizer.cbow_vocab), 
                            embedding_size=args.embedding_size)

데이터셋을 로드하고 Vectorizer를 만듭니다


**훈련**

In [11]:
classifier = classifier.to(args.device)
    
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                                 mode='min', factor=0.5,
                                                 patience=1)
train_state = make_train_state(args)

epoch_bar = tqdm.notebook.tqdm(desc='training routine', 
                               total=args.num_epochs,
                               position=0)

dataset.set_split('train')
train_bar = tqdm.notebook.tqdm(desc='split=train',
                               total=dataset.get_num_batches(args.batch_size), 
                               position=1, 
                               leave=True)
dataset.set_split('val')
val_bar = tqdm.notebook.tqdm(desc='split=val',
                             total=dataset.get_num_batches(args.batch_size), 
                             position=1, 
                             leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index

        # 훈련 세트에 대한 순회

        # 훈련 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
        dataset.set_split('train')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        classifier.train()

        for batch_index, batch_dict in enumerate(batch_generator):
            # 훈련 과정은 5단계로 이루어집니다

            # --------------------------------------
            # 단계 1. 그레이디언트를 0으로 초기화합니다
            optimizer.zero_grad()

            # 단계 2. 출력을 계산합니다
            y_pred = classifier(x_in=batch_dict['x_data'])

            # 단계 3. 손실을 계산합니다
            loss = loss_func(y_pred, batch_dict['y_target'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # 단계 4. 손실을 사용해 그레이디언트를 계산합니다
            loss.backward()

            # 단계 5. 옵티마이저로 가중치를 업데이트합니다
            optimizer.step()
            # -----------------------------------------
            
            # 정확도를 계산합니다
            acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)

            # 진행 바 업데이트
            train_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            train_bar.update()

        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)

        # 검증 세트에 대한 순회

        # 검증 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
        dataset.set_split('val')
        batch_generator = generate_batches(dataset, 
                                           batch_size=args.batch_size, 
                                           device=args.device)
        running_loss = 0.
        running_acc = 0.
        classifier.eval()

        for batch_index, batch_dict in enumerate(batch_generator):

            # 단계 1. 출력을 계산합니다
            y_pred =  classifier(x_in=batch_dict['x_data'])

            # 단계 2. 손실을 계산합니다
            loss = loss_func(y_pred, batch_dict['y_target'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # 단계 3. 정확도를 계산합니다
            acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            val_bar.set_postfix(loss=running_loss, acc=running_acc, 
                            epoch=epoch_index)
            val_bar.update()

        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)

        train_state = update_train_state(args=args, model=classifier,
                                         train_state=train_state)

        scheduler.step(train_state['val_loss'][-1])

        if train_state['stop_early']:
            break

        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()
except KeyboardInterrupt:
    print("Exiting loop")

training routine:   0%|          | 0/100 [00:00<?, ?it/s]

split=train:   0%|          | 0/1984 [00:00<?, ?it/s]

split=val:   0%|          | 0/425 [00:00<?, ?it/s]

In [12]:

# 가장 좋은 모델을 사용해 테스트 세트의 손실과 정확도를 계산합니다
classifier.load_state_dict(torch.load(train_state['model_filename']))
classifier = classifier.to(args.device)
loss_func = nn.CrossEntropyLoss()

dataset.set_split('test')
batch_generator = generate_batches(dataset, 
                                   batch_size=args.batch_size, 
                                   device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # 출력을 계산합니다
    y_pred =  classifier(x_in=batch_dict['x_data'])
    
    # 손실을 계산합니다
    loss = loss_func(y_pred, batch_dict['y_target'])
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # 정확도를 계산합니다
    acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
    running_acc += (acc_t - running_acc) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc

In [13]:

print("테스트 손실: {};".format(train_state['test_loss']))
print("테스트 정확도: {}".format(train_state['test_acc']))

테스트 손실: 7.674550395292394;
테스트 정확도: 12.999999999999998


In [14]:
def pretty_print(results):
    """
    임베딩 결과를 출력합니다
    """
    for item in results:
        print ("...[%.2f] - %s"%(item[1], item[0]))

def get_closest(target_word, word_to_idx, embeddings, n=5):
    """
    n개의 최근접 단어를 찾습니다.
    """

    # 다른 모든 단어까지 거리를 계산합니다
    word_embedding = embeddings[word_to_idx[target_word.lower()]]
    distances = []
    for word, index in word_to_idx.items():
        if word == "<MASK>" or word == target_word:
            continue
        distances.append((word, torch.dist(word_embedding, embeddings[index])))
    
    results = sorted(distances, key=lambda x: x[1])[1:n+2]
    return results

In [15]:
target_words = ['frankenstein', 'monster', 'science', 'sickness', 'lonely', 'happy']

embeddings = classifier.embedding.weight.data
word_to_idx = vectorizer.cbow_vocab._token_to_idx

for target_word in target_words: 
    print(f"======={target_word}=======")
    if target_word not in word_to_idx:
        print("Not in vocabulary")
        continue
    pretty_print(get_closest(target_word, word_to_idx, embeddings, n=5))

...[7.45] - scourge
...[7.56] - kindness
...[7.60] - tenderly
...[7.62] - crossed
...[7.70] - survivors
...[7.74] - apt
...[7.51] - cares
...[7.78] - manifested
...[7.79] - griefs
...[7.81] - trifling
...[7.84] - without
...[7.84] - truly
...[7.00] - mist
...[7.05] - mutual
...[7.07] - impression
...[7.23] - swelling
...[7.24] - gospel
...[7.25] - darkened
...[6.21] - while
...[6.61] - foundations
...[6.61] - literally
...[6.61] - awoke
...[6.67] - probabilities
...[6.68] - consoles
...[6.77] - excessive
...[6.87] - moonlight
...[6.98] - ought
...[7.02] - therefore
...[7.12] - three
...[7.12] - bed
...[6.29] - injury
...[6.36] - bottom
...[6.48] - penetrated
...[6.56] - danger
...[6.61] - altered
...[6.63] - chivalry
