In [None]:
!pip install transformers



In [None]:
# utils.py
import torch
import torch.nn as nn

from transformers import (
    AutoTokenizer,
    AutoModelForMaskedLM,
    AdamW,
    get_scheduler
)
def mask_tokens(tokenizer, input_ids:torch.Tensor, mlm_prob:float=0.15, do_rep_random:bool=True):
    '''
        Copied from huggingface/transformers/data/data_collator - torch.mask_tokens()
        Prepare masked tokens inputs/labels for masked language modeling
        if do_rep_random is True:
            80% MASK, 10% random, 10% original
        else:
            100% MASK
    '''
    labels = input_ids.clone()

    probability_matrix = torch.full(labels.shape, mlm_prob)
    special_tokens_mask = [
        tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
    ]
    probability_matrix.masked_fill_(torch.tensor(special_tokens_mask, dtype=torch.bool), value = 0.0)
    if tokenizer._pad_token is not None:
        padding_mask = labels.eq(tokenizer.pad_token_id)
        probability_matrix.masked_fill_(padding_mask, value=0.0)
    masked_indices = torch.bernoulli(probability_matrix).bool()
    labels[~masked_indices] = -100 # We only compute loss on masked tokens

    # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
    mask_rep_prob = 0.8
    if not do_rep_random:
        mask_rep_prob = 1.0

    indices_replaced = torch.bernoulli(torch.full(labels.shape, mask_rep_prob)).bool() & masked_indices
    input_ids[indices_replaced] = tokenizer.convert_tokens_to_ids(tokenizer.mask_token)

    if do_rep_random:
        # 10% of the time, we replace masked input tokens with random word
        indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
        random_words = torch.randint(len(tokenizer), labels.shape, dtype=torch.long)
        input_ids[indices_random] = random_words[indices_random]

    return input_ids, labels


def load_tokenizer(args):
    if args.do_pred:
        tokenizer_path = args.tuned_model_path
    else:
        tokenizer_path = args.model_name_or_path

    return AutoTokenizer.from_pretrained(tokenizer_path)

def initialize_model(args, total_steps):
    model = AutoModelForMaskedLM.from_pretrained(args.model_name_or_path)

    if (torch.cuda.is_available()) and (not args.no_cuda):
        if (not args.multi):
            device = "cuda:" + str(args.dev_num)
        else:
            n_dev = torch.cuda.device_count()
            dev_list = list(range(n_dev))
            model = nn.DataParallel(model, device_ids = dev_list, output_device=dev_list[0])
            device = dev_list[0]
    else:
        device = "cpu"
    model.to(device)

    optimizer = AdamW(model.parameters(),
                    lr = args.learning_Rate,
                    eps = args.eps,
                    weight_decay = args.weight_decay)

    scheduler = get_scheduler(args.scheduler_name,
                            optimizer,
                            num_warmup_steps = int(total_steps * args.warmup_proportion),
                            num_training_steps = total_steps)

    return model, optimizer, scheduler, device


def initialize_model_with_ds(args):
    import deepspeed

    model = AutoModelForMaskedLM.from_pretrained(args.model_name_or_path)
    model, optimizer, _, scheduler = deepspeed.initialize(model=model, args=args, model_parameters=model.parameters())

    return model, optimizer, scheduler

In [None]:
# data_loader.py
import torch
import pandas as pd

from tqdm import tqdm
from torch.utils.data import Dataset
from transformers import AutoTokenizer
from argparse import Namespace

class DataSet(Dataset):
    def __init__(self, df:pd.DataFrame, tokenizer:AutoTokenizer, args:Namespace):
        self.data = df.to_dict("records")
        input_ids, attention_masks = [], []

        for line in tqdm(self.data):
            try:
                comments = line["comments"].replace("\n", "")
                encoded_dict = tokenizer(
                    comments,
                    add_special_tokens =True,
                    max_length = args.max_seq_len,
                    padding = "max_length",
                    truncation = True,
                    return_attention_mask = True,
                    return_tensors="pt"
                )
                input_ids.append(encoded_dict.input_ids)
                attention_masks.append(encoded_dict.attention_mask)
            except:
                continue

        # flattening : convert it to 0 dim torch tensor
        self.input_ids = torch.cat(input_ids, dim = 0)
        self.attention_masks = torch.cat(attention_masks, dim = 0)


    # get data length
    def __len__(self):
        return len(self.input_ids)

    # get each data info
    def __getitem__(self, idx):
        input_id = self.input_ids[idx]
        attention_mask = self.attention_masks[idx]

        return input_id, attention_mask



class AugmentDataSet(Dataset):
    def __init__(self, sent_list:list, tokenizer:AutoTokenizer, args:Namespace):
        total_result = []
        input_ids, attention_masks = [], []

        for line in tqdm(sent_list):
            try:
                comments = line
                encoded_dict = tokenizer(
                    comments,
                    add_special_tokens =True,
                    max_length = 100,
                    padding = "max_length",
                    truncation = True,
                    return_attention_mask = True,
                    return_tensors="pt"
                )
                input_ids.append(encoded_dict.input_ids)
                attention_masks.append(encoded_dict.attention_mask)
                total_result.append({"comments":line})
            except:
                continue

        # flattening : convert it to 0 dim torch tensor
        self.input_ids = torch.cat(input_ids, dim = 0)
        self.attention_masks = torch.cat(attention_masks, dim = 0)
        self.df = pd.DataFrame(total_result)

    # get data length
    def __len__(self):
        return len(self.input_ids)

    # get each data info
    def __getitem__(self, idx):
        input_id = self.input_ids[idx]
        attention_mask = self.attention_masks[idx]

        return input_id, attention_mask

In [None]:
# augment.py
import copy
import torch
import tqdm
import argparse

#from utils import mask_tokens

from typing import Union
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, AutoModelForMaskedLM

def load_tuned_model(args:argparse.Namespace):
    if (torch.cuda.is_available()) and (args.dev_num>=0) and (args.dev_num < torch.cuda.device_count()):
        dev = "cuda:{}".format(args.dev_num)
    else:
        dev = "cpu"

    model = AutoModelForMaskedLM.from_pretrained(args.tuned_model_path)
    tokenizer = AutoTokenizer.from_pretrained(args.tuned_model_path)

    model.to(dev)
    return model, tokenizer, dev

def tokenize(tokenizer:AutoTokenizer, sent:str):
    encoded_dict = tokenizer(
        sent,
        add_special_tokens = True,
        return_attention_mask = True,
        return_tensors = "pt"
    )
    input_id, attention_mask = encoded_dict.input_ids, encoded_dict.attention_mask

    return input_id, attention_mask

def is_same_token_type(org_token:str, candidate:str) -> bool:
    '''
    후보 필터링 조건을 만족하는지 확인
    - 후보와 원 토큰의 타입을 문장부호와 일반 토큰으로 나누어 같은 타입에 속하는지 확인
    '''
    res = False
    if org_token[0]=="#" and org_token[2:].isalpha()==candidate.isalpha():
        res = True
    elif candidate[0]=="#" and org_token.isalpha()==candidate[2:].isalpha():
        res = True
    elif candidate[0]=="#" and org_token[0]=="#" and org_token[2:].isalpha()==candidate[2:].isalpha():
        res = True
    elif org_token.isalpha()==candidate.isalpha() and (candidate[0]!="#" and org_token[0]!="#"):
        res = True

    return res

def candidate_filtering(tokenizer:AutoTokenizer,
                        input_ids:list,
                        idx:int,
                        org:int,
                        candidates:Union[list, torch.Tensor]) -> int:
    '''
    후보 필터링 조건에 만족하는 최적의 후보 선택
    1. 원래 토큰과 후보 토큰이 같은 타입(is_same_token_type 참고)
    2. 현 위치 앞 혹은 뒤에 동일한 토큰이 있지 않음
    '''

    org_token = tokenizer.convert_ids_to_tokens([org])[0]
    candidate_tokens = tokenizer.convert_ids_to_tokens(candidates.cpu().tolist())

    for rank, token in enumerate(candidate_tokens):
        if org_token!=token and is_same_token_type(org_token, token):
            if input_ids[idx-1]==candidates[rank] or input_ids[idx+1]==candidate_tokens[rank]:
                continue
            return candidates[rank]

    return org

def augment_one_sent(model:AutoModelForMaskedLM,
                    tokenizer:AutoTokenizer,
                    sent:str,
                    dev:Union[str, torch.device],
                    args:Union[argparse.Namespace, dict]) -> str:
    '''
    한 문장에 랜덤으로 마스킹을 적용하여 새로운 문장을 생성(증강)

    args:
        model(AutoModelForMaskedLM)     : finetuned model
        tokenizer(AutoTokenizer)
        sent(str)                       : 증강할 문장
        dev(str or torch.device)
        args(argparse.Namespace)
            - k(int, default=5) : 사용할 후보의 개수. k개의 후보 적절한 토큰이 없을 경우 원래 토큰 그대로 유지
            - threshold(float, default=0.95) : 확률 필터링에 사용할 임계치.
                                               마스크에 대해서 특정 후보 토큰을 생성할 확률이 임계치보다 클 경우에는 별도의 필터링 없이 후보를 그대로 사용.
           -  mlm_prob(float, default=0.15) : 마스킹 비율

    return:
        (str) : 증강 문장
    '''

    if type(args) == argparse.Namespace:
        k = args.k
        threshold = args.threshold
        mlm_prob = args.mlm_prob
    else:
        ## type == dict
        k = args["k"]
        threshold = args["threshold"]
        mlm_prob = args["mlm_prob"]

    model.eval()

    input_id, attention_mask  = tokenize(tokenizer, sent)
    org_ids = copy.deepcopy(input_id[0])

    masked_input_id, _ = mask_tokens(tokenizer, input_id, mlm_prob, do_rep_random=False)
    while masked_input_id.cpu().tolist()[0].count(tokenizer.mask_token_id) < 1:
        masked_input_id, _ = mask_tokens(tokenizer, input_id, mlm_prob, do_rep_random=False)

    with torch.no_grad():
        masked_input_id, attention_mask = masked_input_id.to(dev), attention_mask.to(dev)
        output = model(masked_input_id, attention_mask = attention_mask)
        logits = output["logits"][0]

    copied = copy.deepcopy(masked_input_id.cpu().tolist()[0])
    for i in range(len(copied)):
        if copied[i] == tokenizer.mask_token_id:
            org_token = org_ids[i]
            prob = logits[i].softmax(dim=0)
            probability, candidates = prob.topk(k)
            if probability[0]<threshold:
                res = candidate_filtering(tokenizer, copied, i, org_token, candidates)
            else:
                res = candidates[0]
            copied[i] = res

    copied = tokenizer.decode(copied, skip_special_tokens=True)

    return copied


def batch_augment(model:AutoModelForMaskedLM,
                tokenizer:AutoTokenizer,
                dataset:torch.utils.data.Dataset,
                dev:Union[str, torch.device],
                args:argparse.Namespace) -> str:
    '''
    배치 단위의 문장에 랜덤으로 마스킹을 적용하여 새로운 문장 배치를 생성(증강)

    args:
        model(AutoModelForMaskedLM)
        tokenizer(AutoTokenizer)
        dataset(torch.utils.data.Dataset)
        dev(str or torch.device)
        args(argparse.Namespace)
            - k(int, default=5)
            - threshold(float, default=0.95)
           -  mlm_prob(float, default=0.15)

    return:
        (list) : 증강한 문장들의 리스트
    '''

    k = args.k
    threshold = args.threshold
    mlm_prob = args.mlm_prob
    batch_size = args.batch_size

    model.eval()

    augmented_res = []
    dataloader = DataLoader(dataset, batch_size = batch_size)
    for batch in tqdm.tqdm(dataloader):
        #########################################################
        # 인풋 문장에 랜덤으로 마스킹 적용
        input_ids, attention_masks = batch[0], batch[1]
        masked_input_ids, _ = mask_tokens(tokenizer, input_ids, mlm_prob, do_rep_random=False)

        masked_input_ids = masked_input_ids.to(dev)
        attention_masks = attention_masks.to(dev)
        labels = input_ids
        #########################################################

        with torch.no_grad():
            output = model(masked_input_ids, attention_mask = attention_masks)
            logits1 = output["logits"]

        #########################################################
        # 배치 내의 문장 별로 후보 필터링을 적용하고, 결과를 토대로 새로운 문장 생성
        augmented1 = []
        for sent_no in range(len(masked_input_ids)):
            copied = copy.deepcopy(input_ids.cpu().tolist()[sent_no])

            for i in range(len(masked_input_ids[sent_no])):
                if masked_input_ids[sent_no][i] == tokenizer.pad_token_id:
                    break

                if masked_input_ids[sent_no][i] == tokenizer.mask_token_id:
                    org_token = labels.cpu().tolist()[sent_no][i]
                    prob = logits1[sent_no][i].softmax(dim=0)
                    probability, candidates = prob.topk(k)
                    if probability[0]<threshold:
                        res = candidate_filtering(tokenizer, copied, i, org_token, candidates)
                    else:
                        res = candidates[0]
                    copied[i] = res

            copied = tokenizer.decode(copied, skip_special_tokens=True)
            augmented1.append(copied)
        #########################################################
        augmented_res.extend(augmented1)

    return augmented_res

if __name__ == "__main__":
    import random

    random.seed(1)

    args = argparse.Namespace(
        tuned_model_path="seoyeon96/KcELECTRA-MLM",
        dev_num=0,
        input_file=None,
        batch_size=1,
        mlm_prob=0.15,
        threshold=0.95,
        k=5
    )

    model, tokenizer, dev = load_tuned_model(args)

    if args.batch_size > 1:
        if args.input_file is None:
            raise Exception("input_file is None")

        with open(args.input_file, "r") as f:
            corpus = f.readlines()

        dataset = AugmentDataSet(corpus, tokenizer)
        augmented = batch_augment(model, tokenizer, dataset, dev, args)
    else:
        while True:
            input_sen = input("INPUT = ").strip()
            if input_sen.lower() == "quit":
                print("대화가 종료됩니다.")
                break
            augmented = augment_one_sent(model, tokenizer, input_sen, dev, args)
            print("OUTPUT = ", augmented)
            print("-"*30)



INPUT = quit
대화가 종료됩니다.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForMaskedLM
import copy
import torch


# 모델 및 토크나이저 로딩
args = argparse.Namespace(
    tuned_model_path="seoyeon96/KcELECTRA-MLM",
    dev_num=0,
    input_file=None,
    batch_size=1,
    mlm_prob=0.15,
    threshold=0.95,
    k=5
)

model, tokenizer, dev = load_tuned_model(args)

# 데이터 로딩
df = pd.read_excel("/content/drive/MyDrive/NLPicasso/최종/Gaslighting_data.xlsx")

# "prompt" 및 "competition" 열의 텍스트 증강
augmented_data = []
for _, row in df.iterrows():
    for column in ["prompt", "competition"]:
        # Convert the data to string type before passing
        text_data = str(row[column])
        augmented_text = augment_one_sent(model, tokenizer, text_data, dev, args)
        row[column] = augmented_text
    augmented_data.append(row)



# 증강된 데이터와 원본 데이터 합치기
augmented_df = pd.DataFrame(augmented_data)
combined_df = pd.concat([df, augmented_df], ignore_index=True)
combined_df = combined_df.drop(['Unnamed: 2'], axis = 1)


print(combined_df)

# 엑셀 파일로 저장
combined_df.to_csv("/content/drive/MyDrive/NLPicasso/최종/Gaslighting_augmentation_data.csv", index=False)

                                                 prompt  \
0     오빠가 그런 행동 한다는 것 자체가 나한테는 계속 상처인데 평생 사죄를 하고 케어하...   
1           그런 행동들이 나한테는 상처라고 그런데 어떻게 케어를 하고 케어를 하겠다는거야   
2      누가 그런 행동을 했다는거야 난 너무 상처받았는데 어떻게 위로를 하고 위로를 한다는거야   
3                     그딴 행동을 하지 말았어야지 이제와서 뭘 하겠다는거야 도대체   
4     계속 그런식으로 행동하면서 상처받았으면서 부모님에게 사죄도 하고 케어하겠다는 말은 ...   
...                                                 ...   
3393                                    오늘 안주에 귀신이 나왔어요   
3394                       넌 세상이 얼마나 만만하게 보이면 그런행동을 하냐?   
3395                           담배 좀 끊어라 무슨 죽을라고 그렇게 피냐?   
3396             아이가 수업시간에 돌아다녀 다른 아이들이 집중을 하고 있습니다 어머니   
3397                           난 그런 기사 좀 불편하니까 안썻으면 좋겠어   

                                           competition  
0                        자꾸 고집 피우지마 난 그런 말 한 적이 없는데거든?  
1                       아니 언제 저런 행동을 했다는거야 고집 부리 내지좀 마  
2                      그런 행동이 뭔데? 내가 무슨 행동을 했다는거야 왜 우겨  
3                너 지금 나한테 그딴 행동이라고 한거야. 그리고 너 니가 뭘 알아어  
4     