<a href="https://colab.research.google.com/github/likw99/awesome-colab/blob/main/warren_buffett_llm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fine-tuning Mistral-7b-Instruct to become Warren Buffett

### imports

In [None]:
!pip install auto-gptq
!pip install optimum
!pip install bitsandbytes

In [None]:
# resolving "No inf checks were recorded for this optimizer." issue
!pip uninstall torch -y
!pip install torch==2.1

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from peft import prepare_model_for_kbit_training
from peft import LoraConfig, get_peft_model
from datasets import load_dataset
import transformers

### Load model

In [None]:
model_name = "TheBloke/Mistral-7B-Instruct-v0.2-GPTQ"
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto", # automatically figures out how to best use CPU + GPU for loading model
    trust_remote_code=False, # prevents running custom model files on your machine
    revision="main", # which version of model to use in repo
)

### Load tokenizer

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)

### Prepare Model for Training

In [None]:
model.train() # model in training mode (dropout modules are activated)

# enable gradient check pointing
model.gradient_checkpointing_enable()

# enable quantized training
model = prepare_model_for_kbit_training(model)

In [None]:
# LoRA config
config = LoraConfig(
    r=8,
    lora_alpha=32,
    target_modules=["q_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

# LoRA trainable version of model
model = get_peft_model(model, config)

# trainable parameter count
model.print_trainable_parameters()

### Preparing Training Dataset

In [None]:
# load dataset
data = load_dataset("eagle0504/warren-buffett-annual-letters-from-1977-to-2019")

In [None]:
from datasets import DatasetDict

def has_single_pair(example):
    text = example['text']
    return text.count('### Human') == 1 and text.count('### Assistant') == 1

# Filter the dataset to include only rows with a single question + answer pair
new_dataset = data.filter(has_single_pair)

# Optional: Select a small subset of rows from the 'test' split to (a) prove concept and (b) fine-tune more quickly
# new_dataset = DatasetDict({
#     'train': new_dataset['train'].select(range(50)),
#     'test': new_dataset['test'].select(range(10))
# })

### Transform Dataset so that it follows Mistral-7B Instruct's fine-tuning format

In [None]:
system_prompt = f"""BuffettGPT, is a virtual financial assistant that communicates in clear, accessible language, escalating to technical financial depth upon request. \
It reacts to feedback aptly and ends responses with its signature '-BuffettGPT'. \
BuffettGPT will tailor the length of its responses to match the question length, providing concise and accurate responses to financial questions about Berkshire Hathaway, its businesses, and stocks in general, \
thus keeping the interaction natural and engaging.

Please answer the following question.
"""

In [None]:
def transform_text(example):
    text = example['text']

    # Split the text into the question and answer
    question, answer = text.split('### Assistant:')

    # Transform the question
    transformed_question = question.replace('### Human:', '')
    transformed_question = transformed_question.strip() + ' [/INST]'

    # Transform the answer
    transformed_answer = ' ' + answer.strip() + ' -BuffettGPT</s>'

    # Combine the system prompt, transformed question, and transformed answer
    transformed_text = '<s>[INST] ' + system_prompt + ' ' + transformed_question + transformed_answer

    return {'text': transformed_text}


# Apply the transformation to the new_dataset
transformed_dataset = new_dataset.map(transform_text, remove_columns=new_dataset['train'].column_names)

In [None]:
# create tokenize function
def tokenize_function(examples):
    # extract text
    text = examples["text"]

    #tokenize and truncate text
    tokenizer.truncation_side = "left"
    tokenized_inputs = tokenizer(
        text,
        return_tensors="np",
        truncation=True,
        max_length=512
    )

    return tokenized_inputs

# tokenize training and validation datasets
tokenized_data = transformed_dataset.map(tokenize_function, batched=True)

In [None]:
# setting pad token
tokenizer.pad_token = tokenizer.eos_token
# data collator
data_collator = transformers.DataCollatorForLanguageModeling(tokenizer, mlm=False)

### Fine-tuning Model

In [None]:
# hyperparameters
lr = 2e-4
batch_size = 4
num_epochs = 10

# define training arguments
training_args = transformers.TrainingArguments(
    output_dir= "buffettgpt-ft",
    learning_rate=lr,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_epochs,
    weight_decay=0.01,
    logging_strategy="epoch",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    gradient_accumulation_steps=4,
    warmup_steps=2,
    fp16=True,
    optim="paged_adamw_8bit",

)

In [None]:
# configure trainer
trainer = transformers.Trainer(
    model=model,
    train_dataset=tokenized_data["train"],
    eval_dataset=tokenized_data["test"],
    args=training_args,
    data_collator=data_collator
)


# train model
model.config.use_cache = False  # silence the warnings. Please re-enable for inference!
trainer.train()

# renable warnings
model.config.use_cache = True

### Push model to hub

In [None]:
from huggingface_hub import notebook_login
notebook_login()

# # option 2: key login
# from huggingface_hub import login
# write_key = 'hf_' # paste token here
# login(write_key)

In [None]:
hf_name = 'virattt' # your hf username or org name
model_id = hf_name + "/" + "buffettgpt-ft"

In [None]:
model.push_to_hub(model_id)
trainer.push_to_hub(model_id)

### Load Fine-tuned Model (if you already uploaded it before)

In [None]:
# # load model from hub
# from peft import PeftModel, PeftConfig
# from transformers import AutoModelForCausalLM

# model_name = "TheBloke/Mistral-7B-Instruct-v0.2-GPTQ"
# model = AutoModelForCausalLM.from_pretrained(model_name,
#                                              device_map="auto",
#                                              trust_remote_code=False,
#                                              revision="main")

# config = PeftConfig.from_pretrained("virattt/buffettgpt-ft")
# model = PeftModel.from_pretrained(model, "virattt/buffettgpt-ft")

# # load tokenizer
# tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)

### Use Fine-tuned Model

In [None]:
prompt_template = lambda question: f'''[INST] {system_prompt} \n{question} \n[/INST]'''

question = "What was operating earnings a year ago?"

prompt = prompt_template(question)
print(prompt)

In [None]:
model.eval()

inputs = tokenizer(prompt, return_tensors="pt")
outputs = model.generate(input_ids=inputs["input_ids"].to("cuda"), max_new_tokens=280)

print(tokenizer.batch_decode(outputs)[0])