## 라이브러리 임포트

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import os
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
from collections import Counter
from sklearn.metrics import f1_score

from konlpy.tag import Okt

## 클래스 선언

In [2]:
class Vocabulary():
    """ 매핑을 위해 텍스트를 처리하고 어휘 사전을 만드는 클래스 """

    def __init__(self, add_unk=True, unk_token="<UNK>"):
        """
        매개변수:
            token
        """
        token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token
                              for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token

        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token)

    def add_token(self, token):
        """ 토큰을 기반으로 매핑 딕셔너리를 업데이트합니다 
        
        매개변수:
            token(str): Vocabulary에 추가할 토큰
        반환값:
            index(int): 토큰에 상응하는 정수
        """
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def lookup_token(self, token):
        """ 토큰에 대응하는 인덱스를 추출합니다.
        토큰이 없으면 UNK 인덱스를 반환합니다.

        매개변수:
            token (str): 찾을 토큰
        반환값:
            index (int): 토큰에 해당하는 인덱스
        노트:
            UNK 토큰을 사용하려면 (Vocabulary에 추가하기 위해)
            `unk_index`가 0보다 커야 합니다.
        """
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]
        
    def lookup_index(self, index):
        """ 인덱스에 해당하는 토큰을 반환합니다.

        매개변수:
            index (int): 찾을 인덱스
        반환값:
            token (str): 인텍스에 해당하는 토큰
        에러:
            KeyError: 인덱스가 Vocabulary에 없을 때 발생합니다.
        """
        if index not in self._idx_to_token:
            raise KeyError("인덱스 (%d)가 어휘 사전에 없습니다." % index)
        return self._idx_to_token[index]
    
    def __len__(self):
        return len(self._token_to_idx)

In [3]:
class HateVectorizer():
    """어휘 사전을 생성하고 관리합니다 """
    def __init__(self, comment_vocab, label_vocab, tokenizer, document_count, word_document_count):
        """
        매개변수:
            comment_vocab (Vocabulary): 토큰을 정수에 매핑하는 Vocabulary
            label_vocab (Vocabulary): 클래스 레이블을 정수에 매핑하는 Vocabulary
            document_count (int): 전체 문서 수
            word_document_count (np.ndarray): 각 단어가 등장한 문서 수
        """
        self.comment_vocab = comment_vocab
        self.label_vocab = label_vocab
        self.tokenizer = tokenizer

        self.document_count = document_count
        self.word_document_count = word_document_count

    def vectorize(self, comment):
        """TF-IDF를 사용하여 댓글을 벡터화합니다"""
        # COMPLETE YOUR CODE - START
        tokens = self.tokenizer.morphs(comment)
        total_tokens = len(tokens)
        token_counts = Counter(tokens)
        
        tf = np.zeros(len(self.comment_vocab), dtype=np.float32)
        for token, count in token_counts.items():
            index = self.comment_vocab.lookup_token(token)
            tf[index] = count / total_tokens
            
        idf = np.log(self.document_count / (self.word_document_count + 1))
        
        # COMPLETE YOUR CODE - END
        
        # tf = # COMPLETE YOUR CODE
        # idf = # COMPLETE YOUR CODE
        
        return tf * idf

    @classmethod
    def from_dataframe(cls, comment_df, cutoff=10):
        tokenizer = Okt()

        # 1. 어휘 구축
        comment_vocab = Vocabulary(add_unk=True)
        label_vocab = Vocabulary(add_unk=False)
        
        # count > cutoff인 단어를 어휘 사전에 추가합니다.
        # COMPLETE YOUR CODE - START
        word_counts = Counter()
        for comment in comment_df.text:
            tokens = tokenizer.morphs(comment)
            word_counts.update(tokens)
        
        for word, count in word_counts.items():
            if count > cutoff:
                comment_vocab.add_token(word)
                        
        # COMPLETE YOUR CODE - END
                
        for label in sorted(set(comment_df.label)):
            label_vocab.add_token(label)

        # 2. 단어 빈도 및 문서 빈도 계산
        document_count = len(comment_df)  # COMPLETE YOUR CODE
        word_document_count = np.zeros(len(comment_vocab), dtype=np.float32)

        for comment in comment_df.text:
            # COMPLETE YOUR CODE - START
            tokens = set(tokenizer.morphs(comment))
            for token in tokens:
                if token in comment_vocab._token_to_idx:
                    index = comment_vocab.lookup_token(token)
                    word_document_count[index] += 1
        
            # COMPLETE YOUR CODE - END

        return cls(comment_vocab, label_vocab, tokenizer, document_count, word_document_count)

In [4]:
class HateDataset(Dataset):
    def __init__(self, comment_df, vectorizer):
        self.comments_df = comment_df
        self._vectorizer = vectorizer

        self.train_df = self.comments_df[self.comments_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.comments_df[self.comments_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.comments_df[self.comments_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}
        
        self.set_split('train')

    @classmethod
    def load_dataset_and_make_vectorizer(cls, comment_csv):
        """데이터셋을 로드하고 새로운 HateVectorizer 객체를 만듭니다
        
        매개변수:
            comment_csv(str): 데이터셋의 위치
        반환값:
            HateDataset의 인스턴스
        """
        comment_df = pd.read_csv(comment_csv)
        train_comment_df = comment_df[comment_df.split=='train']
        return cls(comment_df, HateVectorizer.from_dataframe(train_comment_df))

    def set_split(self, split='train'):
        """데이터프레임에 있는 열을 사용해 분할 세트를 선택합니다
        
        매개변수:
            split (str): "train". "val", "test" 중 하나
        """
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def get_vectorizer(self):
        """ 벡터 변환 객체를 반환합니다 """
        return self._vectorizer

    def __len__(self):
        return self._target_size
    
    def __getitem__(self, index):
        """ 파이토치 데이터셋의 주요 진입 메서드
        
        매개변수:
            index (int): 데이터 포인트의 인덱스
        반환값:
            데이터 포인트의 특성(x_data)와 레이블(y_target)으로 이루어진 딕셔너리
        """
        # COMPLETE YOUR CODE - START
        row = self._target_df.iloc[index]
        
        comment_vector = self._vectorizer.vectorize(row.text)
        label_index = self._vectorizer.label_vocab.lookup_token(row.label)
        
        # COMPLETE YOUR CODE - END
        return {'x_data' : comment_vector,
                'y_target' : label_index}
    
    def get_num_batches(self, batch_size):
        """배치 크기가 주어지면 데이터셋으로 만들 수 있는 배치 개수를 반환합니다
        
        매개변수:
            batch_size (int)
        반환값:
            배치 개수
        """
        return len(self) // batch_size

In [5]:
class MLPClassifier(nn.Module):
    def __init__(self, num_features, hidden_dim, num_classes, dropout_p):
        super(MLPClassifier, self).__init__()
        # COMPLETE YOUR CODE - START
        self.fc1 = nn.Linear(num_features, hidden_dim)
        self.dropout = nn.Dropout(dropout_p)
        self.fc2 = nn.Linear(hidden_dim, num_classes)
                
        # COMPLETE YOUR CODE - END

    def forward(self, x_in, apply_softmax=False):
        # COMPLETE YOUR CODE - START
        indtermediate = F.relu(self.fc1(x_in))
        indtermediate = self.dropout(indtermediate)
        output = self.fc2(indtermediate)
        
        if apply_softmax:
            output = F.softmax(output, dim=1)
        
        # COMPLETE YOUR CODE - END

        return output

## 유틸함수

In [6]:
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"):
    """
    파이토치 DataLoader를 감싸고 있는 제너레이터 함수.
    걱 텐서를 지정된 장치로 이동합니다.
    """
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = tensor.to(device)
        yield out_data_dict

def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)

## 설정과 전처리 작업

In [7]:
from argparse import Namespace

args = Namespace(
    comment_csv="hate_comments.csv",
    model_state_file="model.pth",
    save_dir="model_storage/project1",
    # 모델 하이퍼파라미터
    hidden_dim=100,
    # 훈련 하이퍼파라미터
    seed=42,
    learning_rate=0.0001,
    dropout_p=0.1,
    batch_size=64,
    num_epochs=10,
    early_stopping_criteria=5,
    # 실행 옵션
    cuda=True,
    catch_keyboard_interrupt=True,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True
)

if args.expand_filepaths_to_save_dir:
    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)
    
# CUDA 체크
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")
print("CUDA 사용여부: {}".format(args.cuda))

# 재현성을 위해 시드 설정
set_seed_everywhere(args.seed, args.cuda)

# 디렉토리 처리
handle_dirs(args.save_dir)

CUDA 사용여부: False


## 헬퍼함수

In [8]:
def compute_accuracy(y_pred, y_target):
    y_pred_indices = y_pred.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

def compute_f1_score(y_pred, y_target):
    y_pred_indices = y_pred.max(dim=1)[1]
    return f1_score(y_target.cpu().numpy(), y_pred_indices.cpu().numpy(), average='binary')

def make_train_state(args):
        return {'stop_early': False,
                'early_stopping_step': 0,
                'early_stopping_best_val': 1e8,
                'learning_rate': args.learning_rate,
                'epoch_index': 0,
                'train_loss': [],
                'train_acc': [],
                'train_f1' : [],
                'val_loss': [],
                'val_acc': [],
                'val_f1': [],
                'test_loss': None,
                'test_acc': None,
                'test_f1': None,
                'model_filename': args.model_state_file
            }

def update_train_state(args, model, train_state):
    """훈련 상태를 업데이트합니다.

    Components:
     - 조기 종료: 과대 적합 방지
     - 모델 체크포인트: 더 나은 모델을 저장합니다

    :param args: 메인 매개변수
    :param model: 훈련할 모델
    :param train_state: 훈련 상태를 담은 딕셔너리
    :returns:
        새로운 훈련 상태
    """

    # 적어도 한 번 모델을 저장합니다
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # 성능이 향상되면 모델을 저장합니다
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # 손실이 나빠지면
        if loss_t >= train_state['early_stopping_best_val']:
            # 조기 종료 단계 업데이트
            train_state['early_stopping_step'] += 1
        # 손실이 감소하면
        else:
            # 최상의 모델 저장
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # 조기 종료 단계 재설정
            train_state['early_stopping_step'] = 0

        # 조기 종료 여부 확인
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state

## 초기화

In [11]:
#데이터셋 만들기
dataset = HateDataset.load_dataset_and_make_vectorizer(args.comment_csv)
vectorizer = dataset.get_vectorizer()

classifier = MLPClassifier(num_features=len(vectorizer.comment_vocab),
                           hidden_dim=args.hidden_dim,
                           num_classes=len(vectorizer.label_vocab),
                           dropout_p=args.dropout_p)

JVMNotFoundException: No JVM shared library file (jvm.dll) found. Try setting up the JAVA_HOME environment variable properly.

## 훈련 반복

In [None]:
classifier = classifier.to(args.device)

loss_func = nn.CrossEntropyLoss()
optimizer = optim.AdamW(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                             mode='min', factor=0.5,
                                             patience=1)

train_state = make_train_state(args)

epoch_bar = tqdm(desc='training routine', 
                    total=args.num_epochs, 
                    position=0)

dataset.set_split('train')
train_bar = tqdm(desc='split=train', 
                    total=dataset.get_num_batches(args.batch_size), 
                    position=1, 
                    leave=True)
dataset.set_split('val')
val_bar = tqdm(desc='split=val', 
                total=dataset.get_num_batches(args.batch_size), 
                position=1, 
                leave=True)

try:
    for epoch_index in range(args.num_epochs):
        train_state['epoch_index'] = epoch_index

        # 훈련 세트에 대한 순회
        dataset.set_split('train')
        batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        running_f1 = 0.0
        classifier.train()

        for batch_index, batch_dict in enumerate(batch_generator):
            # 단계 1. 그레이디언트를 0으로 초기화합니다
            optimizer.zero_grad()

            # 단계 2. 출력을 계산합니다
            y_pred = classifier(x_in=batch_dict['x_data'].float())

            # 단계 3. 손실을 계산합니다
            loss = loss_func(y_pred, batch_dict['y_target'])
            loss_t = loss.item()
            running_loss += (loss_t - running_loss) / (batch_index + 1)

            # 단계 4. 손실을 사용해 그레이디언트를 계산합니다
            loss.backward()

            # 단계 5. 옵티마이저로 가중치를 업데이트합니다
            optimizer.step()

            # 정확도와 F1 점수를 계산합니다
            acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
            running_acc += (acc_t - running_acc) / (batch_index + 1)
            f1_t = compute_f1_score(y_pred, batch_dict['y_target'])
            running_f1 += (f1_t - running_f1) / (batch_index + 1)

            # 진행 바 업데이트
            train_bar.set_postfix(loss=running_loss, 
                                    acc=running_acc, 
                                    f1=running_f1, 
                                    epoch=epoch_index)
            train_bar.update()

        train_state['train_loss'].append(running_loss)
        train_state['train_acc'].append(running_acc)
        train_state['train_f1'].append(running_f1)
        
        # 검증 세트에 대한 순회
        dataset.set_split('val')
        batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
        running_loss = 0.0
        running_acc = 0.0
        running_f1 = 0.0
        classifier.eval()

        with torch.no_grad():
            for batch_index, batch_dict in enumerate(batch_generator):
                # 단계 1. 출력을 계산합니다
                y_pred = classifier(x_in=batch_dict['x_data'].float())

                # 단계 2. 손실을 계산합니다
                loss = loss_func(y_pred, batch_dict['y_target'])
                loss_t = loss.item()
                running_loss += (loss_t - running_loss) / (batch_index + 1)

                # 단계 3. 정확도와 F1 점수를 계산합니다
                acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
                running_acc += (acc_t - running_acc) / (batch_index + 1)
                f1_t = compute_f1_score(y_pred, batch_dict['y_target'])
                running_f1 += (f1_t - running_f1) / (batch_index + 1)

                val_bar.set_postfix(loss=running_loss, acc=running_acc, f1=running_f1, epoch=epoch_index)
                val_bar.update()

        train_state['val_loss'].append(running_loss)
        train_state['val_acc'].append(running_acc)
        train_state['val_f1'].append(running_f1)

        train_state = update_train_state(args=args, model=classifier,
                                        train_state=train_state)

        scheduler.step(train_state['val_loss'][-1])

        # 에포크마다 훈련 및 검증 결과 출력
        print(f"\nEpoch {epoch_index+1}/{args.num_epochs}")
        print(f"Train Loss: {train_state['train_loss'][-1]:.4f}, Train Acc: {train_state['train_acc'][-1]:.2f}%, Train F1: {train_state['train_f1'][-1]:.4f}")
        print(f"Val Loss: {train_state['val_loss'][-1]:.4f}, Val Acc: {train_state['val_acc'][-1]:.2f}%, Val F1: {train_state['val_f1'][-1]:.4f}")
        print("-" * 50)

        if train_state['stop_early']:
            break

        train_bar.n = 0
        val_bar.n = 0
        epoch_bar.update()

except KeyboardInterrupt:
    print("Exiting loop")


## 테스트

In [27]:
# 가장 좋은 모델을 사용해 테스트 세트의 손실과 정확도를 계산합니다
classifier.load_state_dict(torch.load(train_state['model_filename']))

classifier = classifier.to(args.device)
loss_func = nn.CrossEntropyLoss()

dataset.set_split('test')
batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
running_loss = 0.0
running_acc = 0.0
running_f1 = 0.0
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # 출력을 계산합니다
    y_pred = classifier(x_in=batch_dict['x_data'].float())

    # 손실을 계산합니다
    loss = loss_func(y_pred, batch_dict['y_target'])
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # 정확도를 계산합니다
    acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
    running_acc += (acc_t - running_acc) / (batch_index + 1)
    f1_t = compute_f1_score(y_pred, batch_dict['y_target'])
    running_f1 += (f1_t - running_f1) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc
train_state['test_f1'] = running_f1

In [None]:
print("테스트 손실: {:.3f}".format(train_state['test_loss']))
print("테스트 정확도: {:.2f}".format(train_state['test_acc']))
print("테스트 f1 점수: {:.2f}".format(train_state['test_f1']))