# 모듈 임포트

In [None]:
import pandas as pd
from pandas import DataFrame
import numpy as np
import torch
import tensorflow as tf
import random
import torch.optim as optim
from torch.optim import AdamW
from torch import nn
from torch.utils.data import DataLoader, Dataset, TensorDataset
from torch.nn import functional as F
import re
import pickle
import urllib.request
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, f1_score, precision_score, recall_score
import wandb
from konlpy.tag import Mecab
import collections
from hanspell import spell_checker
import kss

from evaluate import load
from gensim.summarization.summarizer import summarize
from ktextaug import TextAugmentation
import ktextaug
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModel, AutoModelForMaskedLM
from transformers_interpret import SequenceClassificationExplainer

import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('ggplot')

plt.rcParams['font.family'] = 'NanumGothic'

random_seed = 42
random.seed(random_seed)
np.random.seed(random_seed)
tf.random.set_seed(random_seed)
os.environ["PYTHONHASHSEED"] = str(random_seed)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
torch.manual_seed(random_seed)
if device == 'cuda':
    torch.cuda.manual_seed(random_seed) # current gpu seed
    torch.cuda.manual_seed_all(random_seed) # All gpu seed
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

wandb.login()

# 2. 데이터 불러오기

In [None]:
def load_data(file_name: str) -> DataFrame:
    """
    데이터를 불러오는 함수
    
    Input:
        fiie_name (str): 불러올 파일의 이름
    
    Output:
        df (DataFrame): 불러온 데이터를 데이터프레임으로 할당
    
    """
    data_path = os.getenv('HOME') + '/aiffel/mini_aiffelton/data/'#경로는 자신의 경로로 바꿔서 하면 됨
    file_path = os.path.join(data_path, file_name)
    df = pd.read_csv(file_path, encoding='utf-8', delimiter='\t')
    return df

In [None]:
train_df = load_data('ratings_train.csv')
val_df = load_data('ratings_val.csv')
test_df = load_data('ratings_test.csv')

## 2-1. 사전학습 모델 및 토크나이저 불러오기

In [None]:
model_name = "klue/roberta-base" #허깅페이스에서 모델만 갈아끼우면 됨

model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels = 2) #예측하고자 하는 클래스의 수를 넣어줘야함
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 3. 데이터 EDA

## 3-1. 데이터 null값 확인

In [None]:
train_df.info()

## 3-2. 타겟 데이터 불균형 확인

In [None]:
train_df['label'].value_counts()

In [None]:
num_classes = len(train_df['label'].value_counts()) #데이터프레임의 "label" 열에 있는 고유한 레이블의 개수를 계산하여 변수 num_classes에 저장

colors = plt.cm.Dark2(np.linspace(0, 1, num_classes)) #matplotlib의 cm 모듈을 사용하여 num_classes 개수에 맞게 컬러 맵을 생성
iter_color = iter(colors) #colors 배열에 대한 이터레이터를 생성 / 나중에 반복문에서 각 주제별로 다른 색상을 할당하기 위해 사용

train_df['label'].value_counts().plot.barh(title="레이블 데이터 불균형 체크",  # "labels" 열의 값들을 세어 막대 그래프로 그림
                                                 ylabel="Topics", #plot.barh함수를 사용하여 수평 막대 그래프를 생성
                                                 color=colors, #그래프의 제목, 축 레이블, 색상, 크기 등의 설정이 이곳에서 이루어짐
                                                 figsize=(9,9))

for i, v in enumerate(train_df['label'].value_counts()): #주제별 리뷰 수를 순회
    c = next(iter_color) #이터레이터 iter_color에서 다음 색상을 가져와 변수 c에 할당
    plt.text(v, i, #막대 그래프에 주제별 리뷰 수와 해당 비율을 텍스트로 표시
           " "+str(v)+", "+str(round(v*100/train_df.shape[0],2))+"%",  #v는 리뷰 수, i는 인덱스를 나타냄
           color=c, 
           va='center', 
           fontweight='bold')

## 3-3. 문자열의 길이 확인

In [None]:
print('문장의 최소 길이 :{}'.format(min(len(l) for l in train_df['document'])))
print('문장의 최대 길이 :{}'.format(max(len(l) for l in train_df['document'])))
print('문장의 평균 길이 :{}'.format(sum(map(len, train_df['document']))/len(train_df['document'])))

plt.hist([len(s) for s in train_df['document']], bins=50)
plt.xlabel('length of samples')
plt.ylabel('number of samples')
plt.show()

### 3-3-1. 긍정 / 부정 뉴스 문자열 길이 확인

In [None]:
train_df_label1 = train_df[train_df['label'] == 1]
print('긍정 뉴스 document 최소 길이 :{}'.format(min(len(l) for l in train_df_label1['document'])))
print('긍정 뉴스 document 최대 길이 :{}'.format(max(len(l) for l in train_df_label1['document'])))
print('긍정 뉴스 document 평균 길이 :{}'.format(sum(map(len, train_df_label1['document']))/len(train_df_label1['document'])))
print('-----'*10)
train_df_label0 = train_df[train_df['label'] == -1]
print('부정 뉴스 document 최소 길이 :{}'.format(min(len(l) for l in train_df_label0['document'])))
print('부정 뉴스 document 최대 길이 :{}'.format(max(len(l) for l in train_df_label0['document'])))
print('부정 뉴스 document 평균 길이 :{}'.format(sum(map(len, train_df_label0['document']))/len(train_df_label0['document'])))

plt.hist([len(s) for s in train_df_label1['document']], bins=50, alpha=0.5, label='긍정 뉴스')
plt.hist([len(s) for s in train_df_label0['document']], bins=50, alpha=0.5, label='부정 뉴스')
plt.xlabel('length of samples')
plt.ylabel('number of samples')
plt.legend()
plt.show()

## 3-4. 토큰화된 토큰 수 확인

In [None]:
print('토큰의 최소 개수: {}'.format(min(len(tokenizer(train_df['document'][l])['input_ids']) for l in range(len(train_df['document'])))))
print('토큰의 최대 개수: {}'.format(max(len(tokenizer(train_df['document'][l])['input_ids']) for l in range(len(train_df['document'])))))
print('토큰의 평균 개수: {}'.format(sum(len(tokenizer(train_df['document'][l])['input_ids']) for l in range(len(train_df['document'])))/len(train_df['document'])))

plt.hist([len(tokenizer(train_df['document'][l])['input_ids']) for l in range(len(train_df['document']))], bins=50)
plt.xlabel('length of samples')
plt.ylabel('number of samples')
plt.show()

### 3-4-1. 긍정 / 부정 뉴스 토큰화된 토큰 수 확인

In [None]:
train_df_label1 = train_df[train_df['label'] == 1]
train_df_label1 = pd.DataFrame(train_df_label1['document'],columns=['document']).reset_index(drop=True)
print('긍정 뉴스 토큰의 최소 개수: {}'.format(min(len(tokenizer(train_df_label1['document'][l])['input_ids']) for l in range(len(train_df_label1['document'])))))
print('긍정 뉴스 토큰의 최대 개수: {}'.format(max(len(tokenizer(train_df_label1['document'][l])['input_ids']) for l in range(len(train_df_label1['document'])))))
print('긍정 뉴스 토큰의 평균 개수: {}'.format(sum(len(tokenizer(train_df_label1['document'][l])['input_ids']) for l in range(len(train_df_label1['document'])))/len(train_df_label1['document'])))
print('-----'*10)

train_df_label0 = train_df[train_df['label'] == -1]
train_df_label0 = pd.DataFrame(train_df_label0['document'],columns=['document']).reset_index(drop=True)
print('부정 뉴스 토큰의 최소 개수: {}'.format(min(len(tokenizer(train_df_label0['document'][l])['input_ids']) for l in range(len(train_df_label0['document'])))))
print('부정 뉴스 토큰의 최대 개수: {}'.format(max(len(tokenizer(train_df_label0['document'][l])['input_ids']) for l in range(len(train_df_label0['document'])))))
print('부정 뉴스 토큰의 평균 개수: {}'.format(sum(len(tokenizer(train_df_label0['document'][l])['input_ids']) for l in range(len(train_df_label0['document'])))/len(train_df_label0['document'])))

plt.hist([len(tokenizer(train_df_label1['document'][l])['input_ids']) for l in range(len(train_df_label1['document']))], bins=50,alpha=0.5, label='긍정 뉴스')
plt.hist([len(tokenizer(train_df_label0['document'][l])['input_ids']) for l in range(len(train_df_label0['document']))], bins=50,alpha=0.5, label='부정 뉴스')
plt.xlabel('length of samples')
plt.ylabel('number of samples')
plt.legend()
plt.show()

## 3-5. 가장 많은 명사 추출(2글자 이상)

In [None]:
mecab = Mecab()

document = train_df['document'].to_list()
n_corpus = []
for doc in document:
    for n in mecab.nouns(doc): #mecab에서 명사 추출
        if len(n) > 1:
            n_corpus.append(n)

count = collections.Counter(n_corpus)
most = count.most_common() #빈도 수 순으로 추출

x, y= [], []
for word,count in most[:20]:
    x.append(word)
    y.append(count)

plt.figure(figsize=(40,20))
sns.barplot(x=y,y=x)

### 3-5-1. 긍정 뉴스에 대한 명사 추출

In [None]:
train_df_label1 = train_df[train_df['label'] == 1]
document = train_df_label1['document'].to_list()

n_corpus = []
for doc in document:
    for n in mecab.nouns(doc): #mecab에서 명사 추출
        if len(n) > 1:
            n_corpus.append(n)

count = collections.Counter(n_corpus)
most = count.most_common() #빈도 수 순으로 추출

x, y= [], []
for word,count in most[:20]:
    x.append(word)
    y.append(count)

plt.figure(figsize=(40,20))
sns.barplot(x=y,y=x)

### 3-5-2. 부정 뉴스에 대한 명사 추출

In [None]:
train_df_label0 = train_df[train_df['label'] == -1]
document = train_df_label0['document'].to_list()

n_corpus = []
for doc in document:
    for n in mecab.nouns(doc): #mecab에서 명사 추출
        if len(n) > 1:
            n_corpus.append(n)

count = collections.Counter(n_corpus)
most = count.most_common() #빈도 수 순으로 추출

x, y= [], []
for word,count in most[:20]:
    x.append(word)
    y.append(count)

plt.figure(figsize=(40,20))
sns.barplot(x=y,y=x)

# 4. 데이터 전처리

## 4-1. 중복 제거, 컬럼 삭제 및 레이블 변환

In [None]:
def transform(sentence: str, stopwords: bool = False) -> str:
    """
    입력받은 sentence를 전처리 해주는 함수
    
    Input:
        sentence (str): 전처리 시킬 문장
        stopwords (bool): 불용어 사전 사용 여부
    Output:
        sentence (str): 전처리된 문장
    """
    sentence = sentence.lower().strip()
    sentence = re.sub(r'\([^)]*\)', r'', sentence) #괄호로 둘러싸인 부분 제거
    sentence = re.sub(r'#\w+', '', sentence) #해쉬태그 문자 제거
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)  #문장 내의 구두점을 공백과 함께 분리(숫자 없앰 -> 금융이라 중요하다 생각)
    sentence = re.sub(r'[^a-zA-Z가-힣0-9?.!,% ]+', r' ', sentence) #영문 알파벳, 한글, 숫자, 구두점을 제외한 모든 문자를 제거
    sentence = re.sub(r"['\n']+", r"", sentence) #개행 문자 제거
    sentence = re.sub(r'["   "]+', " ", sentence) #연속된 공백을 하나의 공백으로 변환
    sentence = sentence.strip()
    
    if stopwords:
        words = sentence.split()
        filtered_words = [word for word in words if word not in stopwords]
        sentence = ' '.join(filtered_words)
    
    return sentence

In [None]:
def preprocess_data(df: DataFrame, data_type: str, stopwords: bool = False) -> DataFrame:
    """
    데이터프레임의 불필요한 컬럼을 없애주고, document의 한국어 맞춤법 교정을 해주는 함수
    
    Input:
        df (DataFrame): 전처리할 데이터 프레임
        data_type (str): 데이터 타입
        stopwords (bool):  불용어 사전 사용 여부 
        
    Output:
        df (DataFrame): 전처리된 데이터 프레임
    """
    df = df[['document', 'label']]
    df['label'] = df['label'].replace(-1, 0) #감성분석 데이터는 -1이 negative라서 해당 데이터를 0으로 바꿔줬지만, 만약 0,1로 레이블이 되어
                                             #있다면 없애도 됨
    if data_type=='train':
        df = df.drop_duplicates().reset_index(drop=True)

    #################표준어 체크
    
    new_sentences = []

    for sentence in tqdm(df['document']):
        
        sentence = transform(sentence, stopwords = False)
        try:
            sentence = sentence.replace('\\', '')
            spelled_sent = spell_checker.check(sentence)
            new_sentence = spelled_sent.checked
            if not new_sentence:

                new_sentence = sentence
            new_sentences.append(new_sentence)
        except:

            new_sentences.append(sentence)
    
    df['document'] = new_sentences
    
    if data_type=='train':
        df = df.drop_duplicates().reset_index(drop=True)
    
    return df

In [None]:
train_df = preprocess_data(train_df, data_type = 'train', stopwords = None)
val_df  = preprocess_data(val_df, data_type = 'val', stopwords = None)
test_df  = preprocess_data(test_df, data_type = 'test', stopwords = None)

## 4-2. 불용어 사전 불러오기

In [None]:
stopwords = [] 
stop_path = os.getenv('HOME') + '/aiffel/mini_aiffelton/data/stopwords.txt'  #경로

with open(stop_path, 'r', encoding='utf-8') as file:
    lines = file.read()

    sentences = re.findall(r'"([^"]*)"', lines)  # 작은따옴표로 둘러싸인 내용 추출
    for sentence in sentences:
        stopwords.append(sentence)

# 5. 데이터 증강

## 5-1. KOR_EDA

In [None]:
wordnet_path = os.getenv('HOME') + '/aiffel/mini_aiffelton/data/wordnet.pickle'
wordnet = {}
with open(wordnet_path, "rb") as f:
    wordnet = pickle.load(f)

########################################################################
# Synonym replacement
# Replace n words in the sentence with synonyms from wordnet
########################################################################
def synonym_replacement(words: list , n: int) -> list:
    """
    stop words가 아닌 특정 단어를 유의어로 교체하여 증강시키는 함수
    
    Input:
        words (list): 증강시킬 단어 리스트
        n (int): 대체할 단어 수
    Output:
        new_words (list): 랜덤으로 선택해서 유의어로 교체된 단어 리스트
    """
    new_words = words.copy()
    random_word_list = list(set([word for word in words])) #입력된 단어 리스트에서 중복을 제거한 후 
    random.shuffle(random_word_list)#랜덤하게 순서를 섞어 random_word_list생성
    num_replaced = 0
    for random_word in random_word_list:
        synonyms = get_synonyms(random_word) 
        if len(synonyms) >= 1:
            synonym = random.choice(list(synonyms))
            new_words = [synonym if word == random_word else word for word in new_words]
            num_replaced += 1
        if num_replaced >= n:
            break

    if len(new_words) != 0:
        sentence = ' '.join(new_words)
        new_words = sentence.split(" ")

    else:
        new_words = ""

    return new_words


def get_synonyms(word: str) -> list:
    """
    입력된 단어의 동의어를 찾아 리스트로 반환하는 함수
    
    Input:
        word (str): 전처리할 데이터 프레임
    Output:
        aug_data (dataframe): 전처리된 데이터 프레임
    """
    synomyms = []

    try:
        for syn in wordnet[word]: # WordNet을 통해 동의어를 가져옴
            for s in syn:
                synomyms.append(syn) #가져온 동의어 세트를 순회하면서 각 동의어를 synonyms 리스트에 추가
    except:
        pass #예외가 발생하면 (wordnet[word]에서 단어에 대한 동의어가 없을 경우) pass를 실행

    return synomyms

########################################################################
# Random deletion
# Randomly delete words from the sentence with probability p
########################################################################
def random_deletion(words: list, p: float) -> list:
    """
    문장 내에서 일정 확률로 임의의 단어를 선택해 제거하여 증강시키는 함수
    
    Input:
        words (list): 증강시킬 단어 리스트
        n (int): 임의의 단어를 제거시킬 확률
    Output:
        new_words (list): 임의의 단어를 제거한 단어 리스트
    """
    if len(words) == 1:
        return words

    new_words = []
    for word in words: #입력된 단어 리스트를 순회하면서 임의의 단어를 선택하여 제거하는 과정
        r = random.uniform(0, 1)
        if r > p:
            new_words.append(word)

    if len(new_words) == 0: #모든 단어가 제거된 경우에는 원래 단어 리스트에서 임의로 하나의 단어를 선택하여 반환
        rand_int = random.randint(0, len(words)-1)
        return [words[rand_int]]

    return new_words

########################################################################
# Random swap
# Randomly swap two words in the sentence n times
########################################################################
def random_swap(words: list, n: int) -> list:
    """
    문장 내에서 일정 횟수만큼 랜덤한 단어들을 서로 교체하여 증강시키는 함수
    
    Input:
        words (list): 증강시킬 단어 리스트
        n (int): 교체할 단어 쌍의 수
    Output:
        new_words (list): 랜덤하게 단어를 교체한 단어 리스트
    """
    new_words = words.copy()
    for _ in range(n):
        new_words = swap_word(new_words)

    return new_words

def swap_word(new_words: list) -> list:
    """
    단어 리스트 내에서 랜덤한 두 개의 단어를 서로 교체 함수
    
    Input:
        new_words (list): 단어 리스트
    Output:
        new_words (list): 단어를 교체한 단어 리스트
    """

    random_idx_1 = random.randint(0, len(new_words)-1)
    random_idx_2 = random_idx_1
    counter = 0

    while random_idx_2 == random_idx_1:
        random_idx_2 = random.randint(0, len(new_words)-1)
        counter += 1
        if counter > 3:
            return new_words

    new_words[random_idx_1], new_words[random_idx_2] = new_words[random_idx_2], new_words[random_idx_1]
    return new_words

########################################################################
# Random insertion
# Randomly insert n words into the sentence
########################################################################
def random_insertion(words: list, n: int) -> list:
    """
    문장 내에서 일정 횟수만큼 랜덤한 단어들을 삽입하여 증강시키는 함수
    
    Input:
        words (list): 증강시킬 단어 리스트
        n (int): 삽입할 단어의 수
    Output:
        new_words (list): 랜덤하게 단어를 삽입한 단어 리스트
    """
    new_words = words.copy()
    for _ in range(n):
        add_word(new_words)
    
    return new_words


def add_word(new_words: list) -> None:
    """
    단어 리스트 내에 랜덤한 위치에 랜덤한 동의어를 삽입하는 함수
    
    Input:
        new_words (list): 단어 리스트
    Output:
        None
    """
  
    synonyms = []
    counter = 0
    while len(synonyms) < 1:
        if len(new_words) >= 1:
            random_word = new_words[random.randint(0, len(new_words)-1)]
            synonyms = get_synonyms(random_word)
            counter += 1
        else:
            random_word = ""

        if counter >= 10:
            return
        
    random_synonym = synonyms[0]
    random_idx = random.randint(0, len(new_words)-1)
    new_words.insert(random_idx, random_synonym)


def EDA(sentence: str, alpha_sr: float = 0.1, alpha_ri: float = 0.1, alpha_rs: float = 0.1, p_rd: float = 0.1) -> list:
    """
    문장을 다양한 증강 기법을 적용하여 증강된 문장 리스트를 반환하는 함수
    
    Input:
        sentence (str): 증강할 문장
        alpha_sr (float): synonym replacement의 증강 비율
        alpha_ri (float): random insertion의 증강 비율
        alpha_rs (float): random swap의 증강 비율
        p_rd (float): random deletion의 제거 비율
        num_aug (int): 생성할 증강 문장 수
    Output:
        augmented_sentences (list): 증강된 문장 리스트
    """
    
    sentence = transform(sentence)
    words = sentence.split(' ')
    words = [word for word in words if word is not ""]
    num_words = len(words)

    augmented_sentences = []

    n_sr = max(1, int(alpha_sr*num_words))
    n_ri = max(1, int(alpha_ri*num_words))
    n_rs = max(1, int(alpha_rs*num_words))
    
    random_number = random.randint(1, 4)
    # SR
    if random_number == 1:    
        a_words = synonym_replacement(words, n_sr)
        augmented_sentences.append(' '.join(a_words))

    # RI
    elif random_number == 2:
        a_words = random_insertion(words, n_ri)
        augmented_sentences.append(' '.join(a_words))
    
    # RS
    elif random_number == 3:
        a_words = random_swap(words, n_rs)
        augmented_sentences.append(" ".join(a_words))

    # RD
    else:
        a_words = random_deletion(words, p_rd)
        augmented_sentences.append(" ".join(a_words))

    return augmented_sentences

In [None]:
def kor_eda(df: DataFrame, label_num: int, random_number: int,alpha_sr: float = 0.1, alpha_ri: float = 0.1, alpha_rs: float = 0.1, p_rd: float = 0.1) -> DataFrame:
    """
    주어진 데이터프레임에서 특정 레이블을 가진 문장을 선택하여 EDA를 적용한 후, 증강된 데이터프레임을 반환하는 함수
    
    Input:
        df (DataFrame): 증강을 수행할 데이터프레임
        label_num (int): 증강을 수행할 레이블 번호
        random_number (int): 증강시킬 문장의 수
        alpha_sr (float): synonym replacement의 증강 비율
        alpha_ri (float): random insertion의 증강 비율
        alpha_rs (float): random swap의 증강 비율
        p_rd (float): random deletion의 제거 비율
        
    Output:
        aug_train_data (DataFrame): 증강된 데이터프레임
    """
    label_data = df[df['label'] == label_num]
    
    random_sentences = random.sample(label_data['document'].tolist(), random_number)
    
    new_sentences = []
    
    for i in tqdm(random_sentences):
        new_sentences.extend(EDA(i, alpha_sr=alpha_sr, alpha_ri=alpha_ri, alpha_rs=alpha_rs, p_rd=p_rd))
            
    result = pd.DataFrame({'document': new_sentences, 'label': label_num})
    
    return result

In [None]:
aug_train_data = kor_eda(train_df, 0, 1200, alpha_sr=0.1, alpha_ri=0.1, alpha_rs=0.1, p_rd=0.1)

In [None]:
aug_data = pd.concat([train_df, aug_train_data]).reset_index(drop=True)

## 5-2. Fill_Masked_Function

In [None]:
fill_model_name = 'monologg/kobigbird-bert-base'

fill_model = AutoModelForMaskedLM.from_pretrained(fill_model_name)
fill_tokenizer = AutoTokenizer.from_pretrained(fill_model_name)

In [None]:
def fill_masked_augmentation(df: DataFrame, label_num: int, random_number: int, num_masks_ratio: float) -> DataFrame:
    """
    마스킹 기법을 통하여 데이터를 증강시키는 함수
    
    Input:
        df (DataFrame): 증강시킬 데이터가 있는 데이터 프레임
        label_num (int): 증강시키고 싶은 label number
        random_number (int): 증강시킬 데이터의 수
        num_masks_ratio (float): 본문에서 마스킹 시킬 비율
        
    Output:
        aug_data (DataFrame): 증강시킨 데이터가 있는 데이터 프레임
    """
    
    label_data = df[df['label'] == label_num]
    predicted_tokens_list = []
    fill_model.to(device)
    random.seed(random_seed) #이렇게 하니까 증강문이 고정됨(마스킹도 고정)
    
    for _ in tqdm(range(random_number)):
        
        random_sentences = random.choice(label_data['document'].tolist()) 

        num_masks = int(len(random_sentences.split()) * num_masks_ratio)
        num_masks = min(num_masks, 20)
        words = random_sentences.split()
        masked_indices = random.sample(range(len(words)), num_masks)
        masked_text = ' '.join('[MASK]' if i in masked_indices else word for i, word in enumerate(words))
        tokenized_sentence = fill_tokenizer.tokenize(masked_text)
        masked_indices = [i for i, token in enumerate(tokenized_sentence) if token == fill_tokenizer.mask_token]
        
        predicted_tokens = []
        for index in masked_indices:
            tokens = tokenized_sentence.copy()
            tokens[index] = fill_tokenizer.mask_token
            input_ids = fill_tokenizer.convert_tokens_to_ids(tokens)
            inputs = torch.tensor([input_ids]).to(device)

            with torch.no_grad():
                outputs = fill_model(inputs)
                predictions = outputs.logits[0, index].topk(k=5)
            
            max_count = 5
            count = 0
            random_prediction_index = torch.multinomial(predictions.values, 1)  # 상위 5개 중에서 랜덤하게 하나 선택
            random_prediction_token_id = predictions.indices[random_prediction_index].item()  # 선택한 토큰 ID 추출
            random_prediction_token = fill_tokenizer.convert_ids_to_tokens([random_prediction_token_id])[0]  # 토큰으로 변환
            
            while random_prediction_token in ["(",")"," ",".", ",", "'", '"',"‘","’",'“','”',"[SEP]", "[PAD]", "[UNK]"]:
                count += 1
                if count >= max_count:
                    random_prediction_token = " "
                    break
                random_prediction_index = torch.multinomial(predictions.values, 1)  # 상위 5개 중에서 랜덤하게 하나 선택
                random_prediction_token_id = predictions.indices[random_prediction_index].item()  # 선택한 토큰 ID 추출
                random_prediction_token = fill_tokenizer.convert_ids_to_tokens([random_prediction_token_id])[0]  # 토큰으로 변환
                

            predicted_tokens.append(random_prediction_token)

        new_sentence = tokenized_sentence.copy()

        for index, predicted_token in zip(masked_indices, predicted_tokens):
            new_sentence[index] = predicted_token

        new_sentence = fill_tokenizer.convert_tokens_to_string(new_sentence).strip()
        predicted_tokens_list.append(new_sentence)

    aug_data = pd.DataFrame({'document': predicted_tokens_list, 'label': label_num})

    return aug_data

In [None]:
aug_train_data = fill_masked_augmentation(train_df, label_num = 0, random_number = 1200 , num_masks_ratio = 0.15)

In [None]:
aug_data = pd.concat([train_df, aug_train_data]).reset_index(drop=True)

## 5-3. Mix_document

In [None]:
def tokenize_sentences(text: list, mode: str ='kss') -> list:
    """
    문장을 분리시키는 함수
    
    Input:
        text (list): 문장을 분리시킬 list
        mode (str): 문장을 분리시킬 모드
        
    Output:
        sentences (list): 문장을 분리시킨 list
    """
    if mode == 'to':
        sentences = tokenizer.tokenize(text)  # 토크나이저를 사용하여 문장 단위로 분리
    else:
        sentences = kss.split_sentences(text)  # Split sentences using kss library

    return sentences

def sentence_shuffle(text: list, min_blocks: int) -> str:
    """
    한 문단에서 문장을 뒤섞는 함수
    
    Input:
        text (list): 문장을 뒤섞을 문단
        min_blocks (int): 최소 문장 수
        
    Output:
        augmented_text (str): 문장을 뒤섞을 문단
    """
    
    sentences = tokenize_sentences(text, 'kss') 
    
    # 분리된 문장들 중에서 하나 이상의 문장 블록을 선택하기 위해 최소 블록 개수를 설정
    min_blocks = min(min_blocks, len(sentences))
    
    # 무작위로 두 개 이상의 문장 블록을 선택
    num_blocks_to_shuffle = random.randint(min_blocks, len(sentences)) #min_blocks부터 len(sentences) 사이의 정수 에서 무작위로 하나의 정수를 반환하는 함수
    selected_blocks_indices = random.sample(range(len(sentences)), num_blocks_to_shuffle)
    
    # 선택된 문장 블록들의 위치를 서로 바꿈
    shuffled_sentences = [sentences[i] for i in range(len(sentences)) if i not in selected_blocks_indices]
    for i in selected_blocks_indices:
        shuffled_sentences.append(sentences[i])
    
    # 문장 블록들을 다시 하나의 문자열로 합쳐서 증강된 문장을 생성
    augmented_text = ' '.join(shuffled_sentences)
    
    return augmented_text.strip()

def mix_document(data: DataFrame, label: int, num: int, min_blocks: int, mix_ratio_range: float = (0.2, 0.4), mode: str = 'kss') -> DataFrame:
    """
    주어진 데이터로부터 문서를 생성하거나 문장을 섞어 증강 데이터를 생성하는 함수
    
    Input:
        data (DataFrame): 원본 데이터를 담고 있는 DataFrame
        label: (int): 증강 데이터에 할당할 레이블
        num (int): 생성할 증강 데이터의 개수
        min_blocks (int): 최소 문장 수
        mix_ratio_range (tuple, optional): 문장 제거 및 추가할 비율 범위 (default: (0.2, 0.4))
        mode (str, optional): 문장 토큰화 방식 ('kss' 또는 'to') (default: 'kss')
        
    Output:
        augmented_df (DataFrame): 생성된 증강 데이터를 담고 있는 DataFrame
    """
    augmented_data = []
    filtered_data = data[data['label'] == label]['document'].tolist()
    
    selected_data_set = set()  # 중복 선택된 쌍을 추적하기 위한 집합
    
    for _ in range(num):
        random_number = random.randint(0, 1)
        
        if random_number == 0:
        
            selected_data = random.sample(filtered_data, 2)  # 원본 데이터 중에서 2개를 무작위로 선택

            while tuple(selected_data) in selected_data_set:
                selected_data = random.sample(filtered_data, 2)  # 중복된 데이터가 선택되면 다시 선택

            selected_data_set.add(tuple(selected_data))

            sentences_1 = tokenize_sentences(selected_data[0], mode)  # 첫 번째 선택된 데이터의 문장으로 분리
            sentences_2 = tokenize_sentences(selected_data[1], mode)  # 두 번째 선택된 데이터의 문장으로 분리

            mix_ratio_1 = random.uniform(*mix_ratio_range)
            mix_ratio_2 = random.uniform(*mix_ratio_range)

            # sentences_1에서 특정 비율만큼 문장 제거
            num_sentences_to_remove = int(len(sentences_1) * mix_ratio_1)
            removed_sentences = random.sample(sentences_1, num_sentences_to_remove)
            augmented_sentences_1 = [sentence for sentence in sentences_1 if sentence not in removed_sentences]


            # sentences_2에서 특정 비율만큼 문장 추출
            num_sentences_to_extract_2 = int(len(sentences_2) * mix_ratio_2)
            extracted_sentences_2 = random.sample(sentences_2, num_sentences_to_extract_2)

            # 추출된 문장들을 랜덤하게 위치에 삽입하여 augmented_sentences_1에 삽입
            for sentence in extracted_sentences_2:
                random_index = random.randint(0, len(augmented_sentences_1))
                augmented_sentences_1.insert(random_index, sentence)

            if mode == 'to':
                augmented_text = tokenizer.convert_tokens_to_string(augmented_sentences_1)  # 토큰을 다시 문자열로 변환하여 문서로 만듦
            else:
                augmented_text = ' '.join(augmented_sentences_1)  # 문장 내의 단어들을 공백으로 연결하여 문장으로 만듦

            augmented_data.append(augmented_text)
            
        elif random_number == 1:
        
            selected_data = random.choice(filtered_data)# 원본 데이터 중에서 1개를 무작위로 선택

            while tuple(selected_data) in selected_data_set:
                selected_data = random.choice(filtered_data)  # 중복된 데이터가 선택되면 다시 선택

            selected_data_set.add(tuple(selected_data))

            augmented_text = sentence_shuffle(selected_data, min_blocks)
            augmented_data.append(augmented_text)
        
    # 생성된 증강 데이터에 레이블 1 추가
    augmented_df = pd.DataFrame({'document': augmented_data, 'label': label})
        
    return augmented_df

In [None]:
aug_train_data = mix_document(train_df, 0, 1200, min_blocks = 3, mix_ratio_range=(0.2, 0.4), mode='kss')

In [None]:
aug_data = pd.concat([train_df, aug_train_data]).reset_index(drop=True)

## 5-4. back_translation

In [None]:
def back_translate(data: DataFrame, label, num, mode="back_translate", target_language='en', prob=0.4):
    """
    주어진 데이터를 역번역을 통해 증강하거나 노이즈를 추가하여 증강 데이터를 생성하는 함수
    
    Input:
        data (DataFrame): 원본 데이터를 담고 있는 DataFrame
        label: (int): 증강 데이터에 할당할 레이블
        num (int): 생성할 증강 데이터의 개수
        mode (str): 증강 방식 ('back_translate' 또는 'noise_add') (default: 'back_translate')
        target_language (str): 역번역에 사용할 언어 코드 (default: 'en')
        prob (float, optional): noise_add 방식일 때 노이즈 추가 확률 (default: 0.4)
        
    Output:
        augmented_df (DataFrame): 생성된 증강 데이터를 담고 있는 DataFrame
    """
    # TextAugmentation 객체 생성
    agent = TextAugmentation(tokenizer="mecab", num_processes=1)

    # 선택한 레이블로 데이터 필터링
    filtered_data = data[data['label'] == label]['document'].tolist()

    # 데이터 증강
    augmented_data = []
    selected_data_set = set()  # 중복 선택된 데이터를 추적하기 위한 집합

    for _ in tqdm(range(num)):
        selected_data = random.choice(filtered_data)  # 원본 데이터 중에서 무작위로 선택

        while selected_data in selected_data_set:
            selected_data = random.choice(filtered_data)  # 중복된 데이터가 선택되면 다시 선택

        selected_data_set.add(selected_data)  # 선택된 데이터를 집합에 추가

        # 데이터 증강
        if mode == "noise_add":
            generated_data = agent.generate(selected_data, mode="noise_add", prob=prob, noise_mode=['phonological_change', 'vowel_change', 'jamo_split'])
        else:
            generated_data = agent.generate(selected_data, mode="back_translate", target_language=target_language)  #일본어로 하려면 target_language='jp'
        
        augmented_data.append(generated_data)

    # 생성된 데이터를 DataFrame으로 변환
    augmented_df = pd.DataFrame({'document': augmented_data, 'label': label})
    
    return augmented_df

In [None]:
aug_train_data = back_translate(data=train_df, label=1, num=970, mode="back_translate", target_language='es')

In [None]:
aug_data = pd.concat([train_df, aug_train_data]).reset_index(drop=True)

# 6. 데이터 클래스 및 함수 만들기

In [None]:
class TrainTestDataset(Dataset):
    def __init__(self, dataframe, transform=True, train=True, stopwords=None):
        self.data = dataframe
        self.transform = transform
        self.train = train
        self.stopwords = stopwords

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        if self.train: #train data이면 sentence와 label 같이 전처리
            labels = self.data['label'][index]
            sentence = self.data['document'][index]
        else: #test data이면 label만 같이 전처리
            sentence = self.data['document'][index]

        if self.transform: #앞서 정의한 transform 함수로 sentence 전처리
            sentence = transform(sentence, stopwords=self.stopwords)

        encoded_dict = tokenizer.encode_plus( # Hugging Face의 토크나이저를 초기화한 후 사용
            sentence, #인코딩할 대상 문장
            add_special_tokens=True, #특수 토큰(Special Token)을 문장에 추가할지 여부를 지정
            max_length=config.MAX_LEN, #문장의 최대 길이를 지정 / 길이가 MAX_LEN보다 긴 문장은 자름
            padding='max_length', #문장을 패딩하여 동일한 길이로 맞춰줌 / 패딩은 문장의 뒷부분에 [PAD] 토큰을 추가
            truncation=True, #문장이 최대 길이를 초과할 경우 자르기(truncation)를 수행
            return_tensors='pt' #결과를 PyTorch의 텐서로 반환
        )

        padded_token_list = encoded_dict['input_ids'][0]
        token_type_id = encoded_dict['token_type_ids'][0]
        att_mask = encoded_dict['attention_mask'][0]

        if self.train: #train일 경우 padded_token_list, token_type_id, att_mask, target을 반환
            target = torch.tensor(labels)
            sample = (padded_token_list, token_type_id, att_mask, target)
        else: #test일 경우 padded_token_list, token_type_id, att_mask을 반환
            sample = (padded_token_list, token_type_id, att_mask)

        return sample

In [None]:
hyperparameters = {
    'model': "kfdeberta-base",
    'data':'-',
    'data_ag_method': 'None',
    'stopword':'False',
    'MAX_LEN': 512,
    'loss':'cross_entropy',
    'epochs': 3,
    'BATCH_SIZE': 8,
    'NUM_CORES': 0,
    'learning_rate': 1e-5,
    'eps' : 1e-8,
}

#wandb 초기화
wandb.init(project='fn_guide_project',name='PN_NEWS_kfdeberta-base', config = hyperparameters)
config = wandb.config

In [None]:
train_dataset = TrainTestDataset(train_df, transform=True, train = True, stopwords=False)
val_dataset = TrainTestDataset(val_df, transform=True, train = True, stopwords=False)
test_dataset = TrainTestDataset(test_df, transform=True, train = False, stopwords=False)

In [None]:
train_dataloader = torch.utils.data.DataLoader(train_dataset,
                                        batch_size=config.BATCH_SIZE,
                                        shuffle=True,
                                      num_workers=config.NUM_CORES)

val_dataloader = torch.utils.data.DataLoader(val_dataset,
                                        batch_size=config.BATCH_SIZE,
                                        shuffle=False,
                                      num_workers=config.NUM_CORES)

test_dataloader = torch.utils.data.DataLoader(test_dataset,
                                        batch_size=config.BATCH_SIZE,
                                        shuffle=False,
                                      num_workers=config.NUM_CORES)

In [None]:
optimizer = AdamW(model.parameters(), lr=config.learning_rate, eps =config.eps)
model.to(device)

# 7. 모델 학습 및 loss 함수

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=0.25):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = (self.alpha * (1 - pt) ** self.gamma * ce_loss).mean()
        return focal_loss

In [None]:
class HingeLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(HingeLoss, self).__init__()
        self.margin = margin

    def forward(self, outputs, targets):
        # 이진 분류를 위해 출력 텐서를 (배치 크기, 2) 모양으로 변경
        outputs = outputs.view(-1, 2)

        # 모델의 출력 디바이스에 맞추기
        targets = targets.to(outputs.device)

        # 이중 손실 계산
        loss = torch.max(torch.tensor(0.0).to(outputs.device), 1 - outputs * (targets * 2 - 1))
        return loss.mean()

In [None]:
class Balanced_crossentropy_loss(nn.Module):
    def __init__(self, beta=0.5):
        super(Balanced_crossentropy_loss, self).__init__()
        self.beta = beta

    def forward(self, inputs, targets):
        # inputs: 모델의 출력 logits
        # targets: 실제 레이블 (0 또는 1)

        # 확률값으로 변환하여 클래스 1에 해당하는 값만 선택
        prob = torch.softmax(inputs, dim=1)
        prob_class_1 = prob[:, 1]

        # 클래스별 샘플 수 계산
        class_count = torch.bincount(targets, minlength=2)  # 이진 분류에서는 minlength=2로 설정합니다.

        # 클래스 빈도에 반비례하는 가중치 계산
        weights = 1.0 / (class_count.float() + 1e-8)

        # 클래스별 가중치를 적용하여 손실 계산
        weight = weights[targets]

        # Balanced Cross Entropy Loss 계산
        loss = -weight * prob_class_1

        # 모든 샘플에 대한 손실 평균 계산
        loss = torch.mean(loss)

        return loss

In [None]:
class Weighted_crossentropy_loss(nn.Module):
    def __init__(self, pos_weight=None, weight=None, reduction='mean'):
        super(Weighted_crossentropy_loss, self).__init__()
        self.pos_weight = pos_weight
        self.weight = weight
        self.reduction = reduction

    def forward(self, inputs, targets):
        targets_one_hot = torch.eye(2)[targets].to(inputs.device)  # one-hot 인코딩, inputs와 동일한 디바이스로 이동
        weight_tensor = torch.tensor(self.weight).to(inputs.device)  # weight를 Tensor로 변환, inputs와 동일한 디바이스로 이동
        pos_weight_tensor = torch.tensor(self.pos_weight).to(inputs.device)  # pos_weight를 Tensor로 변환, inputs와 동일한 디바이스로 이동
        loss = F.binary_cross_entropy_with_logits(inputs, targets_one_hot, pos_weight=pos_weight_tensor, weight=weight_tensor, reduction=self.reduction)
        return loss

In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=3, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print            
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
                
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
# train
#FocalLoss 모델 이름 저장 다시 한번 확인해야함 -> 기존에 있던 모델에 덮어씌워질 수 있음(주의하기)
output = os.getenv('HOME')+'/aiffel/mini_aiffelton/results/' #자신의 경로로 바꾸기
early_stopping = EarlyStopping(patience=3, verbose=True, path=os.path.join(output, f'{config.model}_{config.data_ag_method}_{config.loss}_classification_model.pt'))

losses = []
accuracies = []

model.train()

wandb.watch(model,log="all",log_freq=20)  #model: 모니터링할 모델 객체/log:기록할 항목을 지정하는 옵션/log_freq:기록 빈도

# loss_fn = Balanced_crossentropy_loss(beta=0.3)
# loss_fn = Weighted_crossentropy_loss(pos_weight=3, weight=1)

for i in range(config.epochs):
    total_loss = 0.0
    correct = 0
    total = 0
    
    y_true_train = []  # 학습 데이터의 실제 라벨을 저장할 리스트
    y_pred_train = []  # 학습 데이터의 예측 라벨을 저장할 리스트
    
    # Training loop
    for input_ids_batch, token_type_id_batch, attention_masks_batch, y_batch in tqdm(train_dataloader):
        optimizer.zero_grad()
        y_batch = y_batch.to(device)
        input_ids_batch = input_ids_batch.to(device)
        token_type_id_batch = token_type_id_batch.to(device)
        attention_masks_batch = attention_masks_batch.to(device)
        y_pred = model(input_ids_batch, token_type_ids=token_type_id_batch, attention_mask=attention_masks_batch)[0].to(device)
        
#         loss = loss_fn(y_pred, y_batch) #Balanced_crossentropy_loss 사용시
#         loss = loss_function(y_pred, y_batch.view(-1, 1)) #hingeloss 사용시
#         loss = FocalLoss()(y_pred, y_batch) #focalloss 사용시
        loss = F.cross_entropy(y_pred, y_batch)  # cross_entropy 사용시

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        _, predicted = torch.max(y_pred, 1)
        correct += (predicted == y_batch).sum()
        total += len(y_batch)
        
        # 학습 데이터의 실제 라벨과 예측 라벨을 저장하여 나중에 F1 점수 계산에 사용
        y_true_train.extend(y_batch.tolist())
        y_pred_train.extend(predicted.tolist())

    losses.append(total_loss)
    accuracies.append(correct.float() / total)

    f1_train = f1_score(y_true_train, y_pred_train, average='binary')
    precision_train = precision_score(y_true_train, y_pred_train, average='binary')
    recall_train = recall_score(y_true_train, y_pred_train, average='binary')

    y_true_val = []  # 검증 데이터의 실제 라벨을 저장할 리스트 초기화
    y_pred_val = []  # 검증 데이터의 예측 라벨을 저장할 리스트 초기화
    
    # Validation loop
    model.eval()
    with torch.no_grad():
        val_correct = 0
        val_total = 0
        val_loss = 0.0 #추가

        for val_input_ids_batch, val_token_type_id_batch, val_attention_masks_batch, val_y_batch in tqdm(val_dataloader):
            val_y_batch = val_y_batch.to(device)
            val_input_ids_batch = val_input_ids_batch.to(device)
            val_token_type_id_batch = val_token_type_id_batch.to(device)
            val_attention_masks_batch = val_attention_masks_batch.to(device)
            val_y_pred = model(val_input_ids_batch, token_type_ids=val_token_type_id_batch, attention_mask=val_attention_masks_batch)[0].to(device)

            _, val_predicted = torch.max(val_y_pred, 1)
            val_correct += (val_predicted == val_y_batch).sum()
            val_total += len(val_y_batch)
            
            # 검증 데이터의 실제 라벨과 예측 라벨을 저장
            y_true_val.extend(val_y_batch.tolist())
            y_pred_val.extend(val_predicted.tolist())
        
        # 검증 데이터의 F1 점수 계산
        f1_val = f1_score(y_true_val, y_pred_val, average='binary')
        precision_val = precision_score(y_true_val, y_pred_val, average='binary')
        recall_val = recall_score(y_true_val, y_pred_val, average='binary')

        val_accuracy = val_correct.float() / val_total
        val_loss = F.cross_entropy(val_y_pred, val_y_batch)  # validation loss 값 할당
        
        print(f"Epoch {i+1}/{config.epochs}:")
        print("Train Loss:", total_loss / total, "Accuracy:", correct.float() / total)
        print("Train Precision:", f"{precision_train:.4f}", "Train Recall:", f"{recall_train:.4f}", "Train F1:", f"{f1_train:.4f}")
        print("Validation Accuracy:", val_accuracy)
        print("Validation Precision:", f"{precision_val:.4f}", "Validation Recall:", f"{recall_val:.4f}", "Validation F1:", f"{f1_val:.4f}")
    
    
    wandb.log({"train_loss": total_loss / total, "train_accuracy": correct.float() / total, "val_accuracy": val_accuracy, "val_loss": val_loss, "train_F1": f1_train,
        "val_F1 Score": f1_val})
    model.train()
    
    # Early Stopping 적용
    early_stopping(val_loss, model)

    if early_stopping.early_stop:
        print(f"Early stopping!! Goog luck!")
        break
        
    model_save_path = os.path.join(output, f'{config.model}_{config.data_ag_method}_{config.loss}_classification_model_{i+1}.pt')
    torch.save(model.state_dict(), model_save_path)
    wandb.save(os.path.join(output, f'{config.model}_{config.data_ag_method}_{config.loss}_classification_model_{i+1}.pt'))

# 8. 평가

In [None]:
model.eval()

pred = []
probabilities = []  # 예측 확률값을 저장할 리스트

with torch.no_grad():
    for input_ids_batch, token_type_id_batch, attention_masks_batch in tqdm(test_dataloader):
        y_pred = model(input_ids_batch.to(device),token_type_ids = token_type_id_batch.to(device), attention_mask=attention_masks_batch.to(device))[0]
        _, predicted = torch.max(y_pred, 1)
        pred.extend(predicted.tolist())
        
        probabilities.extend(F.softmax(y_pred, dim=1).tolist())

test_df['predicted_label'] = pred
test_df['probability_negative'] = [prob[0] for prob in probabilities]
test_df['probability_positive'] = [prob[1] for prob in probabilities]

mismatched_samples = test_df[test_df['label'] != test_df['predicted_label']]
print(mismatched_samples[['document', 'label', 'predicted_label', 'probability_negative', 'probability_positive']])
        
accuracy = accuracy_score(pred, test_df['label'])
print(accuracy_score(pred, test_df['label']))
classification_report_str = classification_report(test_df['label'], pred, target_names=['negative', 'positive'])
print(classification_report(test_df['label'], pred, target_names=['negative', 'positive']))

cm = confusion_matrix(test_df['label'], pred)
sns.heatmap(cm, annot = True, cmap='coolwarm', xticklabels=['negative', 'positive'], yticklabels=['negative', 'positive'], fmt='d')
plt.show()

wandb.log({"test_accuracy": accuracy})
wandb.log({"classification_report": classification_report_str})
wandb.log({"confusion_matrix": wandb.plot.confusion_matrix(probs=None,
                                                           y_true=test_df['label'],
                                                           preds=pred,
                                                           class_names=['negative', 'positive'])})

wandb.finish() #훈련과 평가가 다 끝났을때 해주는 것이 좋음

## 8-1. 틀린 분류 체크

In [None]:
test_check_df = test_df.copy() #테스트 데이터프레임 복사
pred1 = pd.Series(pred, name="pred") #예측된 레이블 시리즈로 만들고 이름 붙혀주기
test_check_df = pd.concat([test_check_df, pred1], axis = 1) #두개를 하나의 데이터프레임으로 합치기

diff_indices = test_check_df[test_check_df['label'] != test_check_df['pred']].index #답변이 틀린것들의 index를 뽑을 수 있음
diff_indices
#test_check_df.to_csv(data_path +'test_check_df.csv') #혹시나 저장하려면 data_path 잘 확인해서 저장하기

In [None]:
def wrong_check(index): #diff_indices에서 나온 index를 하나씩 넣으면 실제답변, 예측답변, document를 볼 수 있음 / 하나씩 확인
    print('실제 답변:{}, 예측 답변:{}'.format(test_check_df['label'][index],test_check_df['pred'][index]))
    print('document: \n',test_check_df['document'][index])

In [None]:
wrong_check()

In [None]:
def wrong_all_check(diff_indices):
    for i in diff_indices:
        print(i)
        print('실제 답변:{}, 예측 답변:{}'.format(test_check_df['label'][i],test_check_df['pred'][i]))
        print('document: \n',test_check_df['document'][i])
        print('\n')

In [None]:
wrong_all_check(diff_indices)

## 8-2. Perplexity (PPL)

In [None]:
def perplexity(aug_data: pd.Dataframe, aug_back_data: pd.Dataframe) -> float:
    """
    Multi 증강 방법에서 perplexity 비교
    
    Input:
        aug_data (pd.Dataframe): 하나의 증강법을 적용한 데이터프레임
        aug_back_data (pd.Dataframe): Multi 증강법을 적용한 데이터프레임
        
    Output:
        aug_ppl_avg (float) : 하나의 증강법의 데이터들에 대한 perplexity score
        aug_back_ppl_avg (float) : Multi 증강법의 데이터들에 대한 perplexity score
    """
    aug_ppl = []
    aug_back_ppl = []
    
    aug_list = aug_data['document'].tolist()
    aug_back_list = aug_back_data['document'].tolist()
    
    for text in aug_list:
        predictions = [text]
        perplexity = load("perplexity", module_type="metric")
        results = perplexity.compute(predictions=predictions, model_id='skt/kogpt2-base-v2', add_start_token=False)
        aug_ppl.append(results['perplexities'][0])

    for text in aug_back_list:
        predictions = [text]
        perplexity = load("perplexity", module_type="metric")
        results = perplexity.compute(predictions=predictions, model_id='skt/kogpt2-base-v2', add_start_token=False)
        aug_back_ppl.append(results['perplexities'][0])
    
    aug_ppl_avg = sum(aug_ppl) / len(aug_ppl)
    aug_back_ppl_avg = sum(aug_back_ppl) / len(aug_back_ppl)
    
    return aug_ppl_avg, aug_back_ppl_avg

In [None]:
mix_ppl_avg, mix_back_ppl_avg = perplexity(augmented_df_1, augmented_df)
print("mix_ppl_avg:", mix_ppl_avg)
print("mix_back_ppl_avg:", mix_back_ppl_avg)