# Prompt Tuning - Causal Language Model (Twitter Dataset)

Twitter Text에 대해 정상적인 텍스트인지 불만을 나타내는 내용인지 분류하는 모델을 Prompt Tuning 방식으로 파인튜닝해 보도록 하겠습니다.

Reference : https://huggingface.co/docs/peft/main/en/task_guides/clm-prompt-tuning

## 0. Setup

In [None]:
# !pip install -q peft transformers datasets

In [None]:
# MLP Suwon 설정 필요
import os

os.environ['REQUESTS_CA_BUNDLE'] = '/etc/ssl/certs/ca-certificates.crt'

proxies = {
'http': '75.17.107.42:8080',
'https': '75.17.107.42:8080'
}

In [None]:
# MLP Suwon 설정 필요
import ssl

if hasattr(ssl, '_create_unverified_context'):
   ssl._create_default_https_context = ssl._create_unverified_context

In [None]:
!nvidia-smi

## 1. Dataset (Twitter Data)

**Twitter Complaints** 데이터셋은 트위터 문장이 불평(항의)하는 내용인지 아닌지를 분류하는 데이터입니다.

In [None]:
import os
import torch
from tqdm import tqdm
from datasets import load_dataset

dataset_name = "twitter_complaints"
dataset = load_dataset("ought/raft", dataset_name)
dataset["train"][0]

In [None]:
classes = [k.replace("_", " ") for k in dataset["train"].features["Label"].names]
dataset = dataset.map(
    lambda x: {"text_label": [classes[label] for label in x["Label"]]},
    batched=True,
    num_proc=1,
)
dataset["train"][0]

In [None]:
dataset["train"][2]

## 2. Preprocess Dataset (Tokenizing)

사전학습 언어모델로 **Bloomz-560m**을 사용하여 **Twitter Complaints** 분류 태스크를 파인튜닝하게 됩니다.

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, default_data_collator, get_linear_schedule_with_warmup
from peft import get_peft_config, get_peft_model, PromptTuningInit, PromptTuningConfig, TaskType, PeftType

from torch.utils.data import DataLoader

device = "cuda"
model_name_or_path = "bigscience/bloomz-560m"
tokenizer_name_or_path = "bigscience/bloomz-560m"

dataset_name = "twitter_complaints"
text_column = "Tweet text"
label_column = "text_label"

max_length = 64
lr = 3e-2
num_epochs = 20
batch_size = 8

Bloomz-560m 모델에 사용된 Tokenizer를 가져옵니다.

In [None]:
# 다음 코드를 완성하세요!! (사전학습 모델에 사용된 Tokenizer 가져오기)
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)

if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id

Classification 문제가 아닌 Causal Language Model을 이용하여 분류를 할 예정이므로, Text와 Label을 이어붙이는 형태로 변환합니다.  
즉, 텍스트 입력이 들어오면 Label을 Next Token Prediction 하는 문제로 치환하게 됩니다.

In [None]:
def preprocess_function(examples):
    batch_size = len(examples[text_column])
    inputs = [f"{text_column} : {x} Label : " for x in examples[text_column]]
    targets = [str(x) for x in examples[label_column]]
    model_inputs = tokenizer(inputs)
    labels = tokenizer(targets)

    for i in range(batch_size):
        sample_input_ids = model_inputs["input_ids"][i]
        label_input_ids = labels["input_ids"][i] + [tokenizer.pad_token_id]
        model_inputs["input_ids"][i] = sample_input_ids + label_input_ids
        labels["input_ids"][i] = [-100] * len(sample_input_ids) + label_input_ids
        model_inputs["attention_mask"][i] = [1] * len(model_inputs["input_ids"][i])
    
    # Max Length로 맞추기 위해 Padding Token 넣어주는 작업을 합니다.
    for i in range(batch_size):
        sample_input_ids = model_inputs["input_ids"][i]
        label_input_ids = labels["input_ids"][i]
        model_inputs["input_ids"][i] = [tokenizer.pad_token_id] * (max_length - len(sample_input_ids)) + sample_input_ids
        model_inputs["attention_mask"][i] = [0] * (max_length - len(sample_input_ids)) + model_inputs["attention_mask"][i]
        labels["input_ids"][i] = [-100] * (max_length - len(sample_input_ids)) + label_input_ids
        model_inputs["input_ids"][i] = torch.tensor(model_inputs["input_ids"][i][:max_length])
        model_inputs["attention_mask"][i] = torch.tensor(model_inputs["attention_mask"][i][:max_length])
        labels["input_ids"][i] = torch.tensor(labels["input_ids"][i][:max_length])

    model_inputs["labels"] = labels["input_ids"]

    return model_inputs

In [None]:
processed_datasets = dataset.map(
    preprocess_function,
    batched=True,
    num_proc=1,
    remove_columns=dataset["train"].column_names,
    load_from_cache_file=False,
    desc="Running tokenizer on dataset",
)

In [None]:
train_dataset = processed_datasets["train"]
eval_dataset = processed_datasets["test"]

train_dataloader = DataLoader(
    train_dataset, shuffle=True, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True
)

eval_dataloader = DataLoader(eval_dataset, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True)

## 3. PEFT/PromptTuning Config

Prompt Tuning 관련 Configuration 작업을 진행합니다. Token 갯수는 8개로 Prompt는 텍스트 "Classify if the tweet is a complaint or not:" 로 초기화합니다.

In [None]:
# 다음 코드를 완성하세요!! (PromptTuningConfig 설정: prompt_tuning_init, num_virtual_tokens, prompt_tuning_init_text)
peft_config = PromptTuningConfig(
    task_type=TaskType.CAUSAL_LM,
    prompt_tuning_init=PromptTuningInit.TEXT,
    num_virtual_tokens=8,
    prompt_tuning_init_text="Classify if the tweet is a complaint or not:",
    tokenizer_name_or_path=model_name_or_path,
)

#### Prompt Tuning 기법으로 인해 전체 모델의 0.0014%의 파라미터만 파인튜닝에 사용합니다.

In [None]:
model = AutoModelForCausalLM.from_pretrained(model_name_or_path)
# 다음 코드를 완성하세요!! (peft_config 반영한 PEFT/PromptTuning Model 구성)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()

## 4. Model Fine-Tuning

지정한 Epoch 만큼 Prompt Fine-Tuning 을 진행합니다.

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

lr_scheduler = get_linear_schedule_with_warmup(
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=(len(train_dataloader) * num_epochs),
)

In [None]:
model = model.to(device)

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for step, batch in enumerate(tqdm(train_dataloader)):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss
        total_loss += loss.detach().float()
        loss.backward()
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()

    model.eval()
    eval_loss = 0
    eval_preds = []
    for step, batch in enumerate(tqdm(eval_dataloader)):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)
        loss = outputs.loss
        eval_loss += loss.detach().float()
        eval_preds.extend(
            tokenizer.batch_decode(torch.argmax(outputs.logits, -1).detach().cpu().numpy(), skip_special_tokens=True)
        )

    eval_epoch_loss = eval_loss / len(eval_dataloader)
    eval_ppl = torch.exp(eval_epoch_loss)
    train_epoch_loss = total_loss / len(train_dataloader)
    train_ppl = torch.exp(train_epoch_loss)
    print(f"{epoch=}: {train_ppl=} {train_epoch_loss=} {eval_ppl=} {eval_epoch_loss=}")

## 5. Inference (Causal LM)

In [None]:
# 다음 코드를 완성하세요!! (임의의 텍스트를 입력하여 결과 Label 확인)
inputs = tokenizer(
    f'{text_column} : {"@nationalgridus I have no water and the bill is current and paid. Can you do something about this?"} Label : ',
    return_tensors="pt",
)

In [None]:
model.to(device)

with torch.no_grad():
    inputs = {k: v.to(device) for k, v in inputs.items()}
    outputs = model.generate(
        input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], max_new_tokens=10, eos_token_id=3
    )
    print(tokenizer.batch_decode(outputs.detach().cpu().numpy(), skip_special_tokens=True))