In [None]:
import numpy as np # 기본적인 연산을 위한 라이브러리
import matplotlib.pyplot as plt # 시각화를 위한 라이브러리
import pandas as pd
from tqdm.notebook import tqdm # 상태 바를 나타내기 위한 라이브러리

import torch # PyTorch 라이브러리
import torch.nn as nn # 모델 구성을 위한 라이브러리
import torch.optim as optim # optimizer 설정을 위한 라이브러리
from torch.utils.data import Dataset, DataLoader # 데이터셋 설정을 위한 라이브러리

import torchvision # PyTorch의 컴퓨터 비전 라이브러리
import torchvision.transforms as T # 이미지 변환을 위한 모듈

from sklearn.metrics import accuracy_score # 성능지표 측정
from sklearn.model_selection import train_test_split # train-validation-test set 나누는 라이브러리

In [None]:
# seed 고정
import random
import torch.backends.cudnn as cudnn

def random_seed(seed_num):
    torch.manual_seed(seed_num)
    torch.cuda.manual_seed(seed_num)
    torch.cuda.manual_seed_all(seed_num)
    cudnn.benchmark = False
    cudnn.deterministic = True
    random.seed(seed_num)

random_seed(42)

In [None]:
# 데이터를 불러올 때, 필요한 변환(transform)을 정의합니다.
mnist_transform = T.Compose([
    T.ToTensor(), # 텐서 형식으로 변환
])

In [None]:
data_csv = pd.read_csv('../data/medium_data.csv')
data_csv.head()

In [None]:
print(data_csv.shape)

# 각각의 title만 추출합니다.
# 우리는 title의 첫 단어가 주어졌을 때, 다음 단어를 예측하는 것을 수행할 것입니다.
data = data_csv['title'].values

In [None]:
import re

def cleaning_text(text):
    cleaned_text = re.sub( r"[^a-zA-Z0-9.,@#!\s']+", "", text) # 특수문자 를 모두 지우는 작업을 수행합니다.
    cleaned_text = cleaned_text.replace(u'\xa0',u' ') # No-break space를 unicode 빈칸으로 변환
    cleaned_text = cleaned_text.replace('\u200a',' ') # unicode 빈칸을 빈칸으로 변환
    return cleaned_text

cleaned_data = list(map(cleaning_text, data)) # 모든 특수문자와 공백을 지움
print('Before preprocessing')
print(data[:5])
print('After preprocessing')
print(cleaned_data[:5])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.max_len = max_len
        
        # AutoTokenizer로 한 번에 처리
        X_data, labels = self.create_sequences_with_padding(data)
        self.X = torch.tensor(X_data)
        self.label = torch.tensor(labels)

    def create_sequences_with_padding(self, texts):
        all_X = []
        all_labels = []
        
        for text in texts:
            # 토큰화 (특수 토큰 제외)
            tokens = self.tokenizer(text, add_special_tokens=False)['input_ids']
            
            # 점진적 시퀀스 생성
            for i in range(1, len(tokens)):
                sequence = tokens[:i+1]
                
                # AutoTokenizer로 즉시 패딩 처리
                if len(sequence) > self.max_len:
                    # 자르기
                    padded_seq = sequence[:self.max_len]
                else:
                    # 패딩 (앞쪽에 pad_token_id 추가)
                    pad_length = self.max_len - len(sequence)
                    padded_seq = [self.tokenizer.pad_token_id] * pad_length + sequence
                
                # 입력과 라벨 분리
                X = padded_seq[:-1]
                label = padded_seq[-1]
                
                all_X.append(X)
                all_labels.append(label)
        
        return all_X, all_labels

    def __len__(self): # dataset의 전체 길이 반환
        return len(self.X)

    def __getitem__(self, idx): # dataset 접근
        X = self.X[idx]
        label = self.label[idx]
        return X, label

In [None]:
from transformers import AutoTokenizer

def cleaning_text(text):
    cleaned_text = re.sub( r"[^a-zA-Z0-9.,@#!\s']+", "", text) # 특수문자 를 모두 지우는 작업을 수행합니다.
    cleaned_text = cleaned_text.replace(u'\xa0',u' ') # No-break space를 unicode 빈칸으로 변환
    cleaned_text = cleaned_text.replace('\u200a',' ') # unicode 빈칸을 빈칸으로 변환
    return cleaned_text

data = list(map(cleaning_text, data))
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
max_len = 20

In [None]:


# train set, validation set, test set으로 data set을 나눕니다. 8 : 1 : 1 의 비율로 나눕니다.
train, test = train_test_split(data, test_size = .2, random_state = 42)
val, test = train_test_split(test, test_size = .5, random_state = 42)

In [None]:
print("Train 개수: ", len(train))
print("Validation 개수: ", len(val))
print("Test 개수: ", len(test))

In [None]:
train_dataset = CustomDataset(train, tokenizer, max_len)
valid_dataset = CustomDataset(val, tokenizer, max_len)
test_dataset = CustomDataset(test, tokenizer, max_len)

In [None]:
batch_size = 32

train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
valid_dataloader = DataLoader(valid_dataset, batch_size = batch_size, shuffle = False)
test_dataloader = DataLoader(test_dataset, batch_size = batch_size, shuffle = False)

In [None]:
class NextWordPredictionModel(nn.Module):
    def __init__(self, vocab_size, embedding_dims, hidden_dims, num_classes, dropout_ratio, set_super):
        if set_super:
            super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dims, padding_idx = 0) # padding index 설정 => gradient 계산에서 제외
        self.hidden_dims = hidden_dims
        self.layers = nn.ModuleList()
        self.num_classes = num_classes
        for i in range(len(self.hidden_dims) - 1):
            self.layers.append(nn.Linear(self.hidden_dims[i], self.hidden_dims[i+1]))

            self.layers.append(nn.BatchNorm1d(self.hidden_dims[i+1]))

            self.layers.append(nn.ReLU())

            self.layers.append(nn.Dropout(dropout_ratio))

        self.classifier = nn.Linear(self.hidden_dims[-1], self.num_classes)
        self.softmax = nn.LogSoftmax(dim = 1)

    def forward(self, x):
        '''
        INPUT:
            x: [batch_size, sequence_len] # padding 제외
        OUTPUT:
            output : [batch_size, vocab_size]
        '''
        x = self.embedding(x) # [batch_size, sequence_len, embedding_dim]
        x = torch.sum(x, dim=1) # [batch_size, embedding_dim] 각 문장에 대해 임베딩된 단어들을 합쳐서, 해당 문장에 대한 임베딩 벡터로 만들어줍니다.
        for layer in self.layers:
            x = layer(x)

        output = self.classifier(x) # [batch_size, num_classes]
        output = self.softmax(output) # [batch_size, num_classes]
        return output

    def count_parameters(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)

In [None]:
from tqdm import tqdm

def training(model, dataloader, train_dataset, criterion, optimizer, device, epoch, num_epochs):
  model.train()  # 모델을 학습 모드로 설정
  train_loss = 0.0
  train_accuracy = 0

  tbar = tqdm(dataloader)
  for texts, labels in tbar:
      texts = texts.to(device)
      labels = labels.to(device)
      # 순전파
      outputs = model(texts)
      loss = criterion(outputs, labels)

      # 역전파 및 weights 업데이트
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      # 손실과 정확도 계산
      train_loss += loss.item()
      # torch.max에서 dim 인자에 값을 추가할 경우, 해당 dimension에서 최댓값과 최댓값에 해당하는 인덱스를 반환
      _, predicted = torch.max(outputs, 1)
      train_accuracy += (predicted == labels).sum().item()

      # tqdm의 진행바에 표시될 설명 텍스트를 설정
      tbar.set_description(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {loss.item():.4f}")

  # 에폭별 학습 결과 출력
  train_loss = train_loss / len(dataloader)
  train_accuracy = train_accuracy / len(train_dataset)

  return model, train_loss, train_accuracy

def evaluation(model, dataloader, valid_dataset, criterion, device, epoch, num_epochs):
  model.eval()  # 모델을 평가 모드로 설정
  valid_loss = 0.0
  valid_accuracy = 0

  with torch.no_grad(): # model의 업데이트 막기
      tbar = tqdm(dataloader)
      for images, labels in tbar:
          images = images.to(device)
          labels = labels.to(device)

          # 순전파
          outputs = model(images)
          loss = criterion(outputs, labels)

          # 손실과 정확도 계산
          valid_loss += loss.item()
          # torch.max에서 dim 인자에 값을 추가할 경우, 해당 dimension에서 최댓값과 최댓값에 해당하는 인덱스를 반환
          _, predicted = torch.max(outputs, 1)
          valid_accuracy += (predicted == labels).sum().item()

          # tqdm의 진행바에 표시될 설명 텍스트를 설정
          tbar.set_description(f"Epoch [{epoch+1}/{num_epochs}], Valid Loss: {loss.item():.4f}")

  valid_loss = valid_loss / len(dataloader)
  valid_accuracy = valid_accuracy / len(valid_dataset)

  return model, valid_loss, valid_accuracy

def training_loop(model, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs, patience, model_name):
    best_valid_loss = float('inf')  # 가장 좋은 validation loss를 저장
    early_stop_counter = 0  # 카운터
    valid_max_accuracy = -1

    for epoch in range(num_epochs):
        model, train_loss, train_accuracy = training(model, train_dataloader, train_dataset, criterion, optimizer, device, epoch, num_epochs)
        model, valid_loss, valid_accuracy = evaluation(model, valid_dataloader, valid_dataset, criterion, device, epoch, num_epochs)

        if valid_accuracy > valid_max_accuracy:
          valid_max_accuracy = valid_accuracy

        # validation loss가 감소하면 모델 저장 및 카운터 리셋
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), f"../data/model_{model_name}.pt")
            early_stop_counter = 0

        # validation loss가 증가하거나 같으면 카운터 증가
        else:
            early_stop_counter += 1

        print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f} Valid Loss: {valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.4f}")

        # 조기 종료 카운터가 설정한 patience를 초과하면 학습 종료
        if early_stop_counter >= patience:
            print("Early stopping")
            break

    return model, valid_max_accuracy


In [None]:
lr = 1e-3
device = 'cuda:0'

vocab_size = len(tokenizer.get_vocab())
embedding_dims = 512
hidden_dims = [embedding_dims, embedding_dims*4, embedding_dims*2, embedding_dims]
model = NextWordPredictionModel(vocab_size = vocab_size, embedding_dims = embedding_dims, hidden_dims = hidden_dims, num_classes = vocab_size, \
            dropout_ratio = 0.2, set_super = True).to(device)

num_epochs = 100
patience = 3
model_name = 'next'

optimizer = optim.Adam(model.parameters(), lr = lr)
criterion = nn.NLLLoss(ignore_index=0) # padding 한 부분 제외
model, valid_max_accuracy = training_loop(model, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs, patience, model_name)
print('Valid max accuracy : ', valid_max_accuracy)

In [None]:
model.load_state_dict(torch.load("../data/model_next.pt")) # 모델 불러오기
model = model.to(device)
model.eval()
total_labels = []
total_preds = []
with torch.no_grad():
    for texts, labels in tqdm(test_dataloader):
        texts = texts.to(device)
        labels = labels

        outputs = model(texts)
        # torch.max에서 dim 인자에 값을 추가할 경우, 해당 dimension에서 최댓값과 최댓값에 해당하는 인덱스를 반환
        _, predicted = torch.max(outputs.data, 1)

        total_preds.extend(predicted.detach().cpu().tolist())
        total_labels.extend(labels.tolist())

total_preds = np.array(total_preds)
total_labels = np.array(total_labels)
nwp_dnn_acc = accuracy_score(total_labels, total_preds) # 정확도 계산
print("Next word prediction DNN model accuracy : ", nwp_dnn_acc)