# Setup

In [None]:
# additional google colab setup
import sys


def colab_install():
    import torch
    if not torch.cuda.is_available():
      print("CUDA is not available. \nPick a GPU before running this notebook. \nGo to 'Runtime' -> 'Change runtime type' to do this.")
      return 
    %pip install transformers
    %pip install datasets
    %pip install peft
    %pip install bitsandbytes
    return


if "google.colab" in sys.modules:
    print("Running in Google Colab")
    # Install required packages
    colab_install()
else:
    print("Not running in Google Colab")

In [None]:
import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import LoraConfig, TaskType, prepare_model_for_kbit_training, get_peft_model
import torch
import pandas as pd
import numpy as np
from datasets import Dataset, DatasetDict

transformers.set_seed(24)

In [None]:
# TODO remove
DEBUG = True
if DEBUG:
    %cd survai-finetuning/

# Data preperation

In [None]:
# download dataset
!curl -L -o 2016_anes_argyle.pkl https://github.com/tobihol/survai-finetuning/raw/main/2016_anes_argyle.pkl

In [None]:
df_survey = pd.read_pickle("2016_anes_argyle.pkl")
df_survey

In [None]:
# descriptive statistics
df_survey.info()

In [None]:
features = [
    "race",
    "discuss_politics",
    "ideology",
    "party",
    "church_goer",
    "age",
    "gender",
    "political_interest",
    "patriotism",
    "state",
]
label = "ground_truth"

In [None]:
# we tread missing values as a category 
df_survey_processed = (
    df_survey
    .astype({"age": str})
    .fillna("missing")
)
df_survey_processed

## Prompt Design

In [None]:
instruction = (
    "Please perform a classification task. "
    + "Given the 2016 survey answers from the American National Election Studies, "
    + "return which candiate the person voted for. "
    + "Return a label from ['Trump', 'Clinton', 'Non-voter'] only without any other text.\n"
)
print(instruction)

In [None]:
column_name_map = {
    "race": "Race",
    "discuss_politics": "Discusses politics",
    "ideology": "Ideology",
    "party": "Party",
    "church_goer": "Church",
    "age": "Age",
    "gender": "Gender",
    "political_interest": "Political interest",
    "patriotism": "American Flag",
    "state": "State",
    "ground_truth": "Vote",
}

def create_prompt(row):
    prompt = instruction
    prompt += "\n".join([f"{column_name_map[k]}: {v}" for k, v in row.items()])
    return prompt

text_series = df_survey_processed[features].apply(create_prompt, axis="columns")
label_series = df_survey_processed[label]

df_prompts = pd.DataFrame({"text": text_series, "label": label_series})
df_prompts

In [None]:
print(df_prompts.text.iloc[0])

In [None]:
print(df_prompts.label.iloc[0])

## Train/Test Split

In [None]:
from sklearn.model_selection import train_test_split

df_train, df_test = train_test_split(df_survey_processed, test_size=0.2, random_state=24)
dataset_cls = DatasetDict({
    "train": Dataset.from_pandas(df_train, preserve_index=False),
    "test": Dataset.from_pandas(df_test, preserve_index=False),
})
dataset_cls

In [None]:
df_train, df_test = train_test_split(df_prompts, test_size=0.2, random_state=24)
dataset_llm = DatasetDict({
    "train": Dataset.from_pandas(df_train, preserve_index=False),
    "test": Dataset.from_pandas(df_test, preserve_index=False),
})
dataset_llm

# Loading the model

In [None]:
model_id = "EleutherAI/pythia-70m"
# model_id = "unsloth/Llama-3.2-1B-Instruct"

# load tokenizer
tokenizer = AutoTokenizer.from_pretrained(
    model_id,
    padding_side="left",
    trust_remote_code=True,
)
if getattr(tokenizer, "pad_token_id") is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id


# dataset = dataset.map(
#     lambda x: tokenizer(x["text"], truncation=True, padding="max_length"), batched=True
# )

# dataset = dataset.remove_columns(["text", "label"])
# dataset

In [None]:
def instruct_tokenize_function(examples):
    prompt = [
        {"role": "user", "content": examples["text"]},
    ]
    prompt.append(
        {
            "role": "assistant",
            "content": examples["label"],
        }
    )
    inputs_ids = tokenizer.apply_chat_template(
        prompt,
        add_generation_prompt=False,
    )
    attention_mask = np.ones_like(inputs_ids)
    return {
        "input_ids": inputs_ids,
        "attention_mask": attention_mask,
    }


def basic_tokenize_function(examples):
    prompt = f"{examples['text']} \nVote: {examples['label']} {tokenizer.eos_token}"
    return tokenizer(prompt)


tokenized_dataset_llm = dataset_llm.map(basic_tokenize_function).remove_columns(
    ["text", "label"]
)
tokenized_dataset_llm

In [None]:
# load model
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=bnb_config,
    trust_remote_code=True,
    device_map="auto",
)

model = prepare_model_for_kbit_training(model)

if getattr(model.config, "pad_token_id") is None:
    model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
lora_rank = 8
lora_alpha = 8

lora_config = LoraConfig(
    r=lora_rank,
    lora_alpha=lora_alpha,
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.CAUSAL_LM,
    target_modules="all-linear",
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
model.config.use_cache = False

In [None]:
trainer = transformers.Trainer(
    model=model,
    train_dataset=tokenized_dataset_llm["train"],
    eval_dataset=tokenized_dataset_llm["test"],
    args=transformers.TrainingArguments(
        output_dir="./results",
        gradient_checkpointing=True,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        fp16=True,
        optim="paged_adamw_8bit",
        report_to="none",
    ),
    tokenizer=tokenizer,
    data_collator=transformers.DataCollatorForLanguageModeling(tokenizer, mlm=False),
)

trainer.train()