In [None]:
import csv
import argparse
import json
import datetime
import random
import string
import os


from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig
from transformers import TrainingArguments, Trainer

import datasets

from torch.utils.data import  Dataset
import torch
import copy




In [None]:
is_colab = False
if is_colab:
  from google.colab import drive
  drive.mount("/content/gdrive")

### Configurations + Hyperparameters Setting


In [None]:
root = ""
assert root != "", "Please update the proper root path"

data_path = ""
assert data_path != "", "Please update the proper data path"




# For LoRA
lora = True
lora_r = 4
lora_alpha = 4

# For Model
random_id = "".join(random.choices(string.ascii_lowercase + string.digits, k = 8))
token_path = "Qwen/Qwen2-0.5B"
model_path = ""

assert model_path != "", "Please update the proper model path"


# For Training
model_max_length = 2048
num_train_epochs = 4
batch_size = 16


# For Output
_tmp = token_path.replace("/", "-") + ("-Lora" if lora else "")
output_dir = f"{root}/train_and_evaluate/Output/{random_id}-{_tmp}"
os.mkdir(output_dir)
print(output_dir)


In [None]:
class CustomDataset(Dataset):  # NOTE: This class can be replaced by Dataset.from_csv and we can add that later
    def __init__(self, labels, encoding=None):
        self.encodings = encoding
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item


def get_text_label(data_path, limit = None):
    texts = []
    labels = []
    with open(data_path, "r", encoding="utf-8") as data_file:
        reader = csv.reader(data_file)
        for index, _row in enumerate(reader):
            if(index == 0 or len(_row) != 2):
                continue
            if((not (limit is None)) and len(texts) == limit):
                break
            texts.append(_row[0])
            labels.append(_row[1])

    unique_label = list(set(copy.deepcopy(labels)))

    unique_label.sort()

    label2id = {}
    id2label = {}
    for label in unique_label:
        label2id[label] = len(label2id)
        id2label[len(label2id) - 1] = label

    labels = [label2id[label] for label in labels]

    print("Successfully load data from", data_path)
    print("There are {} texts and {} labels".format(len(texts), len(labels)))
    print("Example: Text: {}\nLabel: {} - {}".format(texts[0], labels[0], id2label[labels[0]]))
    print(f"Unique labels: {unique_label}")
    return texts, labels, label2id, id2label


In [None]:
# Loading Raw dataset, will process to token later
train_texts, train_labels, label2id, id2label = get_text_label(f"{data_path}/train.csv")
test_texts, test_labels, _, _ = get_text_label(f"{data_path}/test.csv")
test_texts = test_texts[:5] # Trick: we don't use validation test and will run the evaluation seperately so load a few dataset to "skip" testing process
test_labels = test_labels[:5]
print("Successfully load data", len(train_texts), len(train_labels), len(test_texts), len(test_labels))

print(label2id)

In [None]:
#Loading the model
tokenizer = AutoTokenizer.from_pretrained(token_path, truncation_side = 'left', model_max_length = model_max_length)
config = AutoConfig.from_pretrained(model_path, label2id = label2id, id2label = id2label)
config.num_labels = len(id2label)
model = AutoModelForSequenceClassification.from_pretrained(model_path, config = config)

In [None]:
# Frozen the model
if(hasattr(model, "model")):
    for param in model.model.parameters():
        param.requires_grad = False
else:
    for param in model.transformer.parameters():
        param.requires_grad = False
print(model)

In [None]:
# Add LoRA if test on LoRA
if(lora == True):

  from peft import LoraConfig, get_peft_model

  lora_config = LoraConfig(
      r = lora_r,
      lora_alpha = lora_alpha,
      lora_dropout=0.05,
      target_modules = ["q_proj", "v_proj", "k_proj", "o_proj"], # Change the name to make it aligns with each model
      modules_to_save = ["score"],
      bias = "lora_only",
      init_lora_weights = "gaussian"
  )

  model = get_peft_model(model, lora_config)

  model.print_trainable_parameters()
print(model)


In [None]:
# Convert raw dataset into tokens

train_encodings = tokenizer(train_texts, truncation = True)
test_encodings = tokenizer(test_texts , truncation = True)

train_dataset = CustomDataset(train_labels, train_encodings)
test_dataset = CustomDataset(test_labels, test_encodings)


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
train_args = TrainingArguments (
    output_dir = output_dir,
    do_train = True,
    do_eval = False,
    eval_strategy = "steps",
    prediction_loss_only = True,
    per_device_train_batch_size = 1,
    per_device_eval_batch_size = 1,
    gradient_accumulation_steps = 16,
    batch_size = 32,
    eval_accumulation_steps = 1,
    num_train_epochs = num_train_epochs,
    save_strategy = "steps",
    eval_steps = len(train_dataset),
    save_steps = len(train_dataset),
    logging_steps = len(train_dataset),
    learning_rate= 1e-4,
    bf16 = True
)

In [None]:
trainer = Trainer(
    model = model,
    args = train_args,
    train_dataset = train_dataset,
    eval_dataset = test_dataset
)


In [None]:
trainer.train()

In [None]:
if is_colab:
  from google.colab import runtime
  runtime.unassign()