# FinGPT Test: Financial Phrasebank (FPB)

This notebook demonstrates how to test FinGPT models on the Financial Phrasebank (FPB) sentiment analysis dataset.

## 1. Install Dependencies

In [None]:
!pip install transformers==4.32.0 peft==0.5.0 datasets accelerate bitsandbytes sentencepiece tqdm scikit-learn pandas matplotlib seaborn

## 2. Clone the FinGPT Repository

In [None]:
!git clone https://github.com/AI4Finance-Foundation/FinGPT.git
%cd FinGPT

## 3. Create Sentiment Templates File

This is needed for the multiple-template testing.

In [20]:
!mkdir -p fingpt/FinGPT_Benchmark/benchmarks

templates = """What is the sentiment of this {type}?
Determine the sentiment of this {type}.
How would you describe the sentiment of this {type}?
Is the sentiment of this {type} positive or negative?
Analyze the sentiment of this {type}.
What's the sentiment of this {type}?"""

with open('fingpt/FinGPT_Benchmark/benchmarks/sentiment_templates.txt', 'w') as f:
    f.write(templates)

## 4. Download the FPB Dataset

In [21]:
import datasets
from pathlib import Path

# Create the data directory if it doesn't exist
data_dir = Path('./fingpt/FinGPT_Benchmark/data')
data_dir.mkdir(parents=True, exist_ok=True)

# Download FPB dataset
print("Downloading Financial Phrasebank dataset...")
dataset = datasets.load_dataset("financial_phrasebank", "sentences_50agree")

# Save the dataset to disk
save_path = str(data_dir / "financial_phrasebank-sentences_50agree")
print(f"Saving dataset to {save_path}")
dataset.save_to_disk(save_path)
print("Dataset download complete!")

Downloading Financial Phrasebank dataset...
Saving dataset to fingpt/FinGPT_Benchmark/data/financial_phrasebank-sentences_50agree


Saving the dataset (0/1 shards):   0%|          | 0/4846 [00:00<?, ? examples/s]

Dataset download complete!


## 5. Create Testing Module

Let's implement the testing functions as defined in the FinGPT repository.

In [22]:
%%writefile fingpt/FinGPT_Benchmark/benchmarks/fpb.py
import warnings
warnings.filterwarnings("ignore")

from sklearn.metrics import accuracy_score, f1_score
from datasets import load_dataset, load_from_disk, Dataset
from tqdm import tqdm
import datasets
import torch

from torch.utils.data import DataLoader
from functools import partial
from pathlib import Path

dic = {
        0:"negative",
        1:'neutral',
        2:'positive',
    }

with open(Path(__file__).parent / 'sentiment_templates.txt') as f:
    templates = [l.strip() for l in f.readlines()]


def format_example(example: dict) -> dict:
    context = f"Instruction: {example['instruction']}\n"
    if example.get("input"):
        context += f"Input: {example['input']}\n"
    context += "Answer: "
    target = example["output"]
    return {"context": context, "target": target}

def change_target(x):
    if 'positive' in x or 'Positive' in x:
        return 'positive'
    elif 'negative' in x or 'Negative' in x:
        return 'negative'
    else:
        return 'neutral'


def vote_output(x):
    output_dict = {'positive': 0, 'negative': 0, 'neutral': 0}
    for i in range(len(templates)):
        pred = change_target(x[f'out_text_{i}'].lower())
        output_dict[pred] += 1
    if output_dict['positive'] > output_dict['negative']:
        return 'positive'
    elif output_dict['negative'] > output_dict['positive']:
        return 'negative'
    else:
        return 'neutral'

def test_fpb(args, model, tokenizer, prompt_fun=None):
    batch_size = args.batch_size
    # instructions = load_dataset("financial_phrasebank", "sentences_50agree")
    instructions = load_from_disk(Path(__file__).parent.parent / "data/financial_phrasebank-sentences_50agree/")
    instructions = instructions["train"]
    instructions = instructions.train_test_split(seed = 42)['test']
    instructions = instructions.to_pandas()
    instructions.columns = ["input", "output"]
    instructions["output"] = instructions["output"].apply(lambda x:dic[x])

    if prompt_fun is None:
        instructions["instruction"] = "What is the sentiment of this news? Please choose an answer from {negative/neutral/positive}."
    else:
        instructions["instruction"] = instructions.apply(prompt_fun, axis = 1)

    instructions[["context","target"]] = instructions.apply(format_example, axis = 1, result_type="expand")

    # print example
    print(f"\n\nPrompt example:\n{instructions['context'][0]}\n\n")


    context = instructions['context'].tolist()

    total_steps = instructions.shape[0]//batch_size + 1
    print(f"Total len: {len(context)}. Batchsize: {batch_size}. Total steps: {total_steps}")


    out_text_list = []
    for i in tqdm(range(total_steps)):
        tmp_context = context[i* batch_size:(i+1)* batch_size]
        if not tmp_context:  # Skip empty batches
            continue
        tokens = tokenizer(tmp_context, return_tensors='pt', padding=True, max_length=512, return_token_type_ids=False)
        for k in tokens.keys():
            tokens[k] = tokens[k].cuda()
        res = model.generate(**tokens, max_length=512, eos_token_id=tokenizer.eos_token_id)
        res_sentences = [tokenizer.decode(i, skip_special_tokens=True) for i in res]
        # print(f'{i}: {res_sentences[0]}')
        out_text = [o.split("Answer: ")[1] if "Answer: " in o else o for o in res_sentences]
        out_text_list += out_text
        torch.cuda.empty_cache()

    instructions["out_text"] = out_text_list
    instructions["new_target"] = instructions["target"].apply(change_target)
    instructions["new_out"] = instructions["out_text"].apply(change_target)

    acc = accuracy_score(instructions["new_target"], instructions["new_out"])
    f1_macro = f1_score(instructions["new_target"], instructions["new_out"], average = "macro")
    f1_micro = f1_score(instructions["new_target"], instructions["new_out"], average = "micro")
    f1_weighted = f1_score(instructions["new_target"], instructions["new_out"], average = "weighted")

    print(f"Acc: {acc}. F1 macro: {f1_macro}. F1 micro: {f1_micro}. F1 weighted (BloombergGPT): {f1_weighted}. ")

    return instructions


def test_fpb_mlt(args, model, tokenizer):
    batch_size = args.batch_size
    # dataset = load_dataset("financial_phrasebank", "sentences_50agree")
    dataset = load_from_disk(Path(__file__).parent.parent / 'data/financial_phrasebank-sentences_50agree/')
    dataset = dataset["train"]#.select(range(300))
    dataset = dataset.train_test_split(seed=42)['test']
    dataset = dataset.to_pandas()
    dataset.columns = ["input", "output"]
    dataset["output"] = dataset["output"].apply(lambda x: dic[x])
    dataset["text_type"] = dataset.apply(lambda x: 'news', axis=1)

    dataset["output"] = dataset["output"].apply(change_target)
    dataset = dataset[dataset["output"] != 'neutral']

    out_texts_list = [[] for _ in range(len(templates))]

    def collate_fn(batch):
        inputs = tokenizer(
            [f["context"] for f in batch], return_tensors='pt',
            padding=True, max_length=args.max_length,
            return_token_type_ids=False
        )
        return inputs

    for i, template in enumerate(templates):
        dataset_temp = dataset[['input', 'output', "text_type"]].copy()
        dataset_temp["instruction"] = dataset_temp['text_type'].apply(lambda x: template.format(type=x) + "\nOptions: positive, negative")
        # dataset["instruction"] = dataset['text_type'].apply(lambda x: template.format(type=x) + "\nOptions: negative, positive")
        dataset_temp[["context", "target"]] = dataset_temp.apply(format_example, axis=1, result_type="expand")

        dataloader = DataLoader(Dataset.from_pandas(dataset_temp), batch_size=args.batch_size, collate_fn=collate_fn, shuffle=False)

        log_interval = len(dataloader) // 5

        for idx, inputs in enumerate(tqdm(dataloader)):
            inputs = {key: value.to(model.device) for key, value in inputs.items()}
            res = model.generate(**inputs, do_sample=False, max_length=args.max_length, eos_token_id=tokenizer.eos_token_id, max_new_tokens=10)
            res_sentences = [tokenizer.decode(i, skip_special_tokens=True) for i in res]
            tqdm.write(f'{idx}: {res_sentences[0]}')
            # if (idx + 1) % log_interval == 0:
            #     tqdm.write(f'{idx}: {res_sentences[0]}')
            out_text = [o.split("Answer: ")[1] if "Answer: " in o else o for o in res_sentences]
            out_texts_list[i] += out_text
            torch.cuda.empty_cache()

    for i in range(len(templates)):
        dataset[f"out_text_{i}"] = out_texts_list[i]
        dataset[f"out_text_{i}"] = dataset[f"out_text_{i}"].apply(change_target)

    dataset["new_out"] = dataset.apply(vote_output, axis=1, result_type="expand")
    dataset.to_csv('tmp.csv')

    for k in [f"out_text_{i}" for i in range(len(templates))] + ["new_out"]:

        acc = accuracy_score(dataset["target"], dataset[k])
        f1_macro = f1_score(dataset["target"], dataset[k], average="macro")
        f1_micro = f1_score(dataset["target"], dataset[k], average="micro")
        f1_weighted = f1_score(dataset["target"], dataset[k], average="weighted")

        print(f"Acc: {acc}. F1 macro: {f1_macro}. F1 micro: {f1_micro}. F1 weighted (BloombergGPT): {f1_weighted}. ")

    return dataset

Overwriting fingpt/FinGPT_Benchmark/benchmarks/fpb.py


## 6. Create Benchmarks Runner Script

In [23]:
%%writefile fingpt/FinGPT_Benchmark/benchmarks/benchmarks.py
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel, get_peft_model, LoraConfig, TaskType  # 0.4.0
import torch
import argparse

from fpb import test_fpb, test_fpb_mlt

import sys
sys.path.append('../')
from utils import *

def main(args):
    if args.from_remote:
        model_name = parse_model_name(args.base_model, args.from_remote)
    else:
        model_name = '../' + parse_model_name(args.base_model)

    model = AutoModelForCausalLM.from_pretrained(
        model_name, trust_remote_code=True,
        # load_in_8bit=True
        device_map="auto",
        # fp16=True
    )
    model.model_parallel = True

    tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)

    # tokenizer.pad_token_id = tokenizer.eos_token_id

    tokenizer.padding_side = "left"
    if args.base_model == 'qwen':
        tokenizer.eos_token_id = tokenizer.convert_tokens_to_ids('<|endoftext|>')
        tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids('<|extra_0|>')
    if not tokenizer.pad_token or tokenizer.pad_token_id == tokenizer.eos_token_id:
        tokenizer.add_special_tokens({'pad_token': '[PAD]'})
        model.resize_token_embeddings(len(tokenizer))

    print(f'pad: {tokenizer.pad_token_id}, eos: {tokenizer.eos_token_id}')

    model = PeftModel.from_pretrained(model, args.peft_model)
    model = model.eval()

    with torch.no_grad():
        for data in args.dataset.split(','):
            if data == 'fpb':
                test_fpb(args, model, tokenizer)
            elif data == 'fpb_mlt':
                test_fpb_mlt(args, model, tokenizer)
            else:
                raise ValueError('undefined dataset.')

    print('Evaluation Ends.')


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument("--dataset", required=True, type=str)
    parser.add_argument("--base_model", required=True, type=str, choices=['chatglm2', 'llama2', 'llama2-13b', 'llama2-13b-nr', 'baichuan', 'falcon', 'internlm', 'qwen', 'mpt', 'bloom'])
    parser.add_argument("--peft_model", required=True, type=str)
    parser.add_argument("--max_length", default=512, type=int)
    parser.add_argument("--batch_size", default=4, type=int, help="The train batch size per device")
    parser.add_argument("--instruct_template", default='default')
    parser.add_argument("--from_remote", default=False, type=bool)

    args = parser.parse_args()

    print(args.base_model)
    print(args.peft_model)

    main(args)

Overwriting fingpt/FinGPT_Benchmark/benchmarks/benchmarks.py


## 7. Create Utils Module

In [24]:
%%writefile fingpt/FinGPT_Benchmark/utils.py
def parse_model_name(base_model, from_remote=False):
    model_map = {
        'chatglm2': 'THUDM/chatglm2-6b',
        'llama2': 'meta-llama/Llama-2-7b-hf',
        'llama2-13b': 'meta-llama/Llama-2-13b-hf',
        'llama2-13b-nr': 'NousResearch/Llama-2-13b-hf',
        'baichuan': 'baichuan-inc/Baichuan-7B',
        'falcon': 'tiiuae/falcon-7b',
        'internlm': 'internlm/internlm-7b',
        'qwen': 'Qwen/Qwen-7B',
        'mpt': 'mosaicml/mpt-7b',
        'bloom': 'bigscience/bloom-7b1',
    }
    if base_model not in model_map:
        raise ValueError(f"Unknown base model: {base_model}")
    return model_map[base_model]

Overwriting fingpt/FinGPT_Benchmark/utils.py


In [25]:
from huggingface_hub import login
login(token="token")

## 8. Run the FPB Benchmark Test

Now that we have set up all the necessary files, let's run the benchmark test.

In [26]:
# Run the FPB benchmark test using the pre-trained FinGPT model

# Change to the benchmarks directory
%cd fingpt/FinGPT_Benchmark/benchmarks

# You can modify these parameters based on your needs
base_model = 'llama2'  # Options: chatglm2, llama2, falcon, etc.
peft_model = 'FinGPT/fingpt-mt_llama2-7b_lora'  # The FinGPT adapter model
batch_size = 4
max_length = 512

# Single template test
!python benchmarks.py --dataset fpb --base_model {base_model} --peft_model {peft_model} --batch_size {batch_size} --max_length {max_length} --from_remote True

/workspace/FinLoRA/test/fingpt_tests/FinGPT/fingpt/FinGPT_Benchmark/benchmarks/FinGPT/fingpt/FinGPT_Benchmark/benchmarks
  torch.utils._pytree._register_pytree_node(
llama2
FinGPT/fingpt-mt_llama2-7b_lora
Loading checkpoint shards: 100%|██████████████████| 2/2 [00:07<00:00,  3.98s/it]
Using pad_token, but it is not set yet.
You are resizing the embedding layer without providing a `pad_to_multiple_of` parameter. This means that the new embeding dimension will be 32001. This might induce some performance reduction as *Tensor Cores* will not be available. For more details  about this, or help on choosing the correct value for resizing, refer to this guide: https://docs.nvidia.com/deeplearning/performance/dl-performance-matrix-multiplication/index.html#requirements-tc
pad: 32000, eos: 2


Prompt example:
Instruction: What is the sentiment of this news? Please choose an answer from {negative/neutral/positive}.
Input: L&T has also made a commitment to redeem the remaining shares by the end o

## 9. Run Multiple-Template Test

In [None]:
# Run the multiple-template FPB test
!python benchmarks.py --dataset fpb_mlt --base_model {base_model} --peft_model {peft_model} --batch_size {batch_size} --max_length {max_length} --from_remote True

## 10. Analyze the Results

The benchmark test will output accuracy and F1 scores.

In [None]:
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, classification_report

results = pd.read_csv('tmp.csv')

target = results['target']  # True labels
predictions = results['new_out']  # Predicted labels

# Calculate accuracy and F1 scores
accuracy = accuracy_score(target, predictions)
f1_macro = f1_score(target, predictions, average='macro')
f1_micro = f1_score(target, predictions, average='micro')
f1_weighted = f1_score(target, predictions, average='weighted')

# Print the metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Macro: {f1_macro:.4f}")
print(f"F1 Micro: {f1_micro:.4f}")
print(f"F1 Weighted (BloombergGPT): {f1_weighted:.4f}")

print("\nDetailed Classification Report:")
print(classification_report(target, predictions))

if 'out_text_0' in results.columns:
    print("\nResults by template:")
    template_columns = [col for col in results.columns if col.startswith('out_text_')]
    for i, col in enumerate(template_columns):
        accuracy = accuracy_score(target, results[col])
        f1_macro = f1_score(target, results[col], average='macro')
        f1_weighted = f1_score(target, results[col], average='weighted')
        print(f"Template {i}: Accuracy: {accuracy:.4f}, F1 Macro: {f1_macro:.4f}, F1 Weighted: {f1_weighted:.4f}")