In [1]:
# standard python imports
import os
import torch

# huggingface libraries

from transformers import (
    AutoTokenizer,
    BitsAndBytesConfig,
    pipeline,
    LlamaForCausalLM
)
# from peft import (
#     LoraConfig,
#     PeftModel,
#     prepare_model_for_kbit_training,
#     get_peft_model,
# )

from datasets import Dataset # NOTE: this is NOT the torch dataset type, which can cause confusion

import polars as pl

from transformers.pipelines.pt_utils import KeyDataset

import numpy as np
from sklearn.metrics import mean_squared_error

In [2]:
def create_prompt(review):
    system_prompt = f"You read student essays reviews and return a score from 0 to 60 that represents your besst guess of the number of rating given by the grader. Return just the number 0, 1, ..., 60 with no context, explanation, or special symbols."
    prompt = f"Here is the review to evaluate: [[[{review}]]]. You read student essays reviews and return a score from 0 to 60 that represents your besst guess of the number of rating given by the grader. Return just the number 0, 1, ..., 60 with no context, explanation, or special symbols."

    return system_prompt, prompt


In [3]:
df_val = pl.read_csv("../data/1_clean/val.csv")

In [4]:
lst_system_prompt, lst_prompt = [], []
for row in df_val.iter_rows(named=True):
    system_prompt, prompt = create_prompt(row["text"])
    lst_system_prompt.append(system_prompt)
    lst_prompt.append(prompt)
df_val = df_val.with_columns(pl.Series(lst_system_prompt).alias("system_prompt"), pl.Series(lst_prompt).alias("prompt"))

In [5]:
test_texts = df_val["text"].to_list()
test_labels = df_val["score"].to_list()

data_ = Dataset.from_polars(df_val)

In [None]:
base_model = "/home/richardarcher/Dropbox/Sci24_LLM_Polarization/project_/weights_local/models--meta-llama--Meta-Llama-3.1-8B-Instruct/snapshots/0e9e39f249a16976918f6564b8830bc894c89659"

In [None]:
tokenizer = AutoTokenizer.from_pretrained(
    base_model,
    tokenizer_file=os.path.join(base_model, 'tokenizer.json'),
    tokenizer_config_file=os.path.join(base_model, 'tokenizer_config.json'),
    special_tokens_map_file=os.path.join(base_model, 'special_tokens_map.json'),
    trust_remote_code=True,
    padding_side='left'
)

tokenizer.padding_side = 'left'

In [None]:
nf4_config = BitsAndBytesConfig(
    load_in_4bit=True,
    # load_in_8bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,  # Match input dtype

)

model = LlamaForCausalLM.from_pretrained(base_model, quantization_config=nf4_config)

In [None]:
# model = PeftModel.from_pretrained(model, PATH_adapter_custom_weights)
# model = model.merge_and_unload() # This line merges the weights

In [None]:
if not tokenizer.pad_token_id:
    tokenizer.pad_token_id = tokenizer.eos_token_id
if model.config.pad_token_id is None:
    model.config.pad_token_id = model.config.eos_token_id

In [None]:
def remove_header(text, K_times):
    for _ in range(K_times):
        if "<|end_header_id|>" in text:
            text = text.split("<|end_header_id|>", 1)[1]
    return text

In [None]:
def create_format_chat_template(tokenizer):
    def format_chat_template(row):
        row_json = [{"role": "system", "content": row["system_prompt"]},
                    {"role": "user", "content": row["prompt"]}]

        # row["text"] = tokenizer.apply_chat_template(row_json, tokenize=False)
        row["text"] = tokenizer.apply_chat_template(row_json, tokenize=False, add_generation_prompt=True)
        return row
    return format_chat_template

In [None]:
batch_size = 32

In [None]:
pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    # torch_dtype=torch.float32,
    torch_dtype=torch.float16,
    device_map="auto",
    batch_size=batch_size, # CHANGE TO FOUR IF TOO SLOW
    max_new_tokens=5,
)

In [None]:
data_ = data_.map(
    create_format_chat_template(tokenizer)
)

In [None]:
res = []
ix = 0
for out in pipe(KeyDataset(data_, "text")):
    ix = ix + 1
    if ix % batch_size == 0:
        print(f"{ix}/{data_.shape[0]}")

    cleaned_text = remove_header(out[0]["generated_text"], 3).strip()
    res.append(cleaned_text)

In [None]:
res_int = [int(i) for i in res]

In [None]:
llm_predicted, true_values = np.array(res_int), np.array(test_labels)

In [None]:
test_mse = mean_squared_error(llm_predicted, true_values)

In [None]:
df_val = df_val.with_columns(pl.Series(res_int).alias("8b_quant_prediction"))
df_val.write_csv("../outputs/predictions/8b_quantized_base.csv")