In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

import re

import json 
import torch
import json_repair
import pandas as pd
from glob import glob 
from openai import OpenAI
from pydantic import BaseModel
from dotenv import load_dotenv
from pqdm.processes import pqdm
from datasets import Dataset, load_dataset
from trl import SFTTrainer
from peft import AutoPeftModelForCausalLM, LoraConfig
from transformers import (AutoTokenizer, 
                          AutoModelForCausalLM, 
                          TrainingArguments, 
                          pipeline)


# 제작한 데이터세 불러오기 
file_list = glob("./data/*.csv")
print(file_list)

df = pd.concat([pd.read_csv(file) for file in file_list])
df.shape

In [None]:
def extract_placeholder_mapping(original_text, transformed_text, allowed_types):
    allowed_pattern = re.compile(r'\[(' + '|'.join(allowed_types) + r')\d*\]')
    generic_pattern = re.compile(r'(\[[^]]+\])')

    mapping = {}

    orig_lines = original_text.splitlines()
    trans_lines = transformed_text.splitlines()
    n_lines = min(len(orig_lines), len(trans_lines))

    for idx in range(n_lines):
        orig_line = orig_lines[idx]
        trans_line = trans_lines[idx]

        parts = re.split(generic_pattern, trans_line)
        orig_pos = 0

        for i, part in enumerate(parts):
            if allowed_pattern.match(part):
                # placeholder 발견
                # 다음 literal을 찾음
                next_literal = parts[i + 1] if i + 1 < len(parts) else ''
                
                # 다음 literal이 존재하면, 그 literal까지의 텍스트를 추출
                if next_literal:
                    next_idx = orig_line.find(next_literal, orig_pos)
                    if next_idx != -1:
                        replaced_text = orig_line[orig_pos:next_idx]
                        orig_pos = next_idx
                    else:
                        # 다음 literal을 못 찾으면 끝까지
                        replaced_text = orig_line[orig_pos:]
                        orig_pos = len(orig_line)
                else:
                    # 다음 literal이 없으면 남은 텍스트 전체
                    replaced_text = orig_line[orig_pos:]
                    orig_pos = len(orig_line)

                replaced_text = replaced_text.strip()
                if replaced_text:
                    mapping[replaced_text] = part

            else:
                # literal인 경우, 원본에서 위치 업데이트
                found_idx = orig_line.find(part, orig_pos)
                if found_idx != -1:
                    orig_pos = found_idx + len(part)

    return mapping


df.head(2)

In [None]:
print(df["origin_data"].iloc[20])
print("--------------")
print(df["anonymized_data"].iloc[20])

In [None]:
print(df["origin_data"].iloc[-2])
print("--------------")
print(df["anonymized_data"].iloc[-2])

In [None]:
df["mapping"] = df["mapping"].map(lambda x: str(x))

In [None]:
import datasets 

dataset = datasets.Dataset.from_pandas(df)

def get_chat_format(element):
    system_prompt = "너는 개인정보를 비식별화하는 Assistant야. 너는 주어진 데이터를 바탕으로 개인정보를 비식별화하는 작업을 해야해."

    return {
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": element["origin_data"]},
            {"role": "assistant", "content": element["anonymized_data"]},
        ], 
        "label": element["mapping"]
    }

dataset = dataset.map(get_chat_format, remove_columns=dataset.features, batched=False)
dataset = dataset.shuffle(seed=42)
dataset = dataset.train_test_split(test_size=0.1, seed=42)

In [None]:
dataset

In [None]:
dataset["train"][0]

In [None]:
lora_alpha = 128
lora_r = 256
learning_rate = 5e-5

peft_config = LoraConfig(
        lora_alpha=lora_alpha,
        lora_dropout=0.05,
        r=lora_r,
        bias="none",
        target_modules=[
            "q_proj",
            "up_proj",
            "o_proj",
            "k_proj",
            "down_proj",
            "gate_proj",
            "v_proj"],
        task_type="CAUSAL_LM",
)

save_dir = f"./model/model_{learning_rate}_alpha-{lora_alpha}_r-{lora_r}"

args = TrainingArguments(
    output_dir=f"{save_dir}", 
    num_train_epochs=5,          
    per_device_train_batch_size=2,
    gradient_accumulation_steps=4,
    gradient_checkpointing=True,  
    optim="adamw_torch_fused",    
    logging_steps=2,            
    save_strategy="epoch",        
    learning_rate=learning_rate,
    bf16=True,                    
    tf32=True,                    
    max_grad_norm=0.3,            
    warmup_ratio=0.03,            
    lr_scheduler_type="constant", 
    push_to_hub=False,             
    report_to="wandb",            
)

In [None]:
model_id = "meta-llama/Meta-Llama-3.1-8B-Instruct"
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.bfloat16,
)

tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.padding_side = 'right'  
tokenizer.pad_token = tokenizer.eos_token

In [None]:
df["length"] = df["origin_data"].apply(len) + df["anonymized_data"].apply(len)

import matplotlib.pyplot as plt

plt.hist(df["length"], bins=30, color="skyblue", edgecolor="black")
plt.title("Distribution of Text Length")
plt.xlabel("Text Length (characters)")
plt.ylabel("Frequency")
plt.show()

In [None]:
trainer = SFTTrainer(
    model=model,
    args=args,
    train_dataset=dataset["train"],
    max_seq_length=2400,
    peft_config=peft_config,
    tokenizer=tokenizer,
    packing=True,
)

trainer.train()

trainer.save_model(f"{save_dir}")

In [None]:
del model
del tokenizer
torch.cuda.empty_cache()

In [None]:
save_dir = "/workspace/FASTCAMPUS-CH09_11/chapter10-De_identification/model/model_5e-05_alpha-128_r-256"

In [None]:
# 학습한 모델을 경로를 지정합니다.
peft_model_id = f"{save_dir}"

# PEFT 어댑터를 통해 사전 학습된 모델을 로드합니다.
fine_tuned_model = AutoPeftModelForCausalLM.from_pretrained(
  peft_model_id,
  device_map="auto",
  torch_dtype=torch.float16
).to("cuda")

# 토크나이저 로드합니다.
tokenizer = AutoTokenizer.from_pretrained(peft_model_id)
tokenizer.padding_side = 'right'  
tokenizer.pad_token = tokenizer.eos_token

In [None]:
dataset["test"]

In [None]:
pipe = pipeline("text-generation", model=fine_tuned_model, tokenizer=tokenizer, device_map="auto")

prompt = pipe.tokenizer.apply_chat_template(dataset["test"][-2]["messages"][:2], tokenize=False, add_generation_prompt=True)
print(prompt)

In [None]:
outputs = pipe(
    prompt, 
    max_new_tokens=512, 
    do_sample=True, 
    temperature=0.1, 
    top_k=40, 
    top_p=0.9, 
    eos_token_id=pipe.tokenizer.eos_token_id, 
    pad_token_id=pipe.tokenizer.eos_token_id
)

In [None]:
output_text = outputs[0]["generated_text"][len(prompt):]
print(output_text)

In [None]:
input_text = dataset["test"][-2]["messages"][1]["content"]
print(input_text)

In [None]:
mapping_result = extract_placeholder_mapping(
    input_text, 
    output_text, 
    allowed_types=(
        "PERSON", "CONTACT", "ADDRESS", "ACCOUNT", "DATEOFBIRTH", 
        "EMAIL", "LOCATION", "KAKO_ID", "TIWTTER_ID", "TELEGRAM_ID"))

print(mapping_result)