In [None]:
## Original Prompt from Swante:
''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
# You are a bank employee, relationship manager for corporate clients. You will be given
#     an annual report in the user prompt.
#     Instructions:
#     - Please extract sales leads (i.e. ideas for bank products that might be interesting for the
#     customer) from the text, if any exist.
#     - Only provide bank product recommentations as sales leads.
#     - Focus on bank products related to capital markets / asset management such as
#       commodities, currencies, interest rates, pensions and cash investments.
#     - Only provide the sales leads along with the reasons for that in your answer.
#     - Provide the sentences in the annual report the sales leads are deduced from.
#     - If no sales lead can be deduced from the annual report, answer accordingly.
#     - Only provide the sales leads with the highest probability, at maximum five.
#     - Only provide sales leads that are specific for the company.
#     - Answer in German.
#     - Provide the answer in bullet points in the following schema:
#     <Produkt>
#         - Grund: ...
#         - Zitat: ...


Imports

In [None]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import re
from pathlib import Path
import os
import json
import re
import openai
from module.azure_openai import AzureOpenAI
import httpx
from datetime import datetime
import getpass
import dspy
from dspy import dsp
from dspy import evaluate
from dspy.primitives import Example
from teleprompt.teleprompt import Teleprompter
from teleprompt.vanilla import LabeledFewShot
from utils.docs2data import read_docs_to_dataframe
from utils.validation import text_preprocessing, quote_match, categories_match, AssessReasoning, relevance_reasoning_lm, AssessQuote, quote_match_lm, AssessCategories, categories_match_lm
from dspy.evaluate import Evaluate

# Load environment variables from .ini file
from configparser import ConfigParser
config_object = ConfigParser()
config_object.read("config.ini")
tud_api_key = config_object["TUD_API_KEY"]['tud_api_key']
dev_api_key = config_object["DEV_API_KEY"]['dev_api_key']

Read and create training and validation data

In [None]:
# create df from txt reports
folder_path = Path('data/reports/train_annotated/')
df = read_docs_to_dataframe(folder_path)
# df = df[df['quote'].apply(len) > 0]
# Remove quotation marks
df['context'] = df['context'].str.replace(r'["]', '', regex=True)
df

In [None]:
df.iloc[8].answer

In [None]:
# create df from txt reports
folder_path = Path('data/reports/validation_annotated/')
df_val = read_docs_to_dataframe(folder_path)
# df_val = df_val[df_val['quote'].apply(len) > 0]
# Remove quotation marks
df_val['context'] = df_val['context'].str.replace(r'["]', '', regex=True)
df_val

In [None]:
# df_val.iloc[4].answer

In [None]:
# create dspy training dataset
training_examples = json.loads(df[["context","answer"]].to_json(orient="records"))
train_dspy = [dspy.Example(x).with_inputs('context') for x in training_examples]
print(f"For this dataset, training examples have input keys {train_dspy[0].inputs().keys()} and label keys {train_dspy[0].labels().keys()}")

# create dspy validation dataset
val_examples = json.loads(df[["context","answer"]].to_json(orient="records"))
val_dspy = [dspy.Example(x).with_inputs('context') for x in val_examples]
print(f"For this dataset, validation examples have input keys {val_dspy[0].inputs().keys()} and label keys {val_dspy[0].labels().keys()}")

LLM Credentials

In [None]:
lm_gpt = AzureOpenAI(
    tud_dev = "TUD",
    api_version = '2024-06-01', #'2024-06-01',#'2023-07-01-preview',
    model_name = "gpt-4o", 
    api_key = tud_api_key,
    model_type = "chat"
)

dspy.settings.configure(lm=lm_gpt)

Prompt config

In [None]:
type_of_documents = "business report" #singular
categories = ['FX_HEDGING', 'COMMODITIES_HEDGING', 'INTEREST_RATE_HEDGING', 'CREDIT', 'INSURANCE', 'FACTORING', 'PENSIONS', 'ESG', 'CASH_MANAGEMENT', 'DEPOSITS', 'ASSET_MANAGEMENT', 'OTHER']
class_of_categories = "banking products" #plural
#relevance_classes = ['highly relevant', 'relevant', 'less relevant']
objective = "extracting company specific information that indicate sales opportunities for products relating to capital market or asset management" # present progressive
number_of_items_in_output = 5

Signature

In [None]:
class ChunkerSignature(dspy.Signature):
    __doc__ = f"""Given a {type_of_documents}, determine {number_of_items_in_output} most relevant snippets (2-3 sentences) to {objective}. Do not include the context in the output."""
    context = dspy.InputField()
    output = dspy.OutputField(desc="comma-separated quotes")

class PredictRelevance(dspy.Signature):
    __doc__ = f"""Given a snippet from a {type_of_documents}, determine a score between 0 and 100 of how relevant the snippet is to {objective}. A score of 100 denotes high relevance, and a score 0 denotes irrelevance."""
    context = dspy.InputField()
    output = dspy.OutputField(desc="number between 0 and 100")

class PredictCategory(dspy.Signature):
    __doc__ = f"""Given a snippet from a {type_of_documents}, identify which of the {class_of_categories} ({', '.join(categories)}) the snippet is relevant to. If snippet is not relevantv for any {class_of_categories}, say 'other'."""
    context = dspy.InputField()
    output = dspy.OutputField(desc="comma-separated {class_of_categories}", format=lambda x: ', '.join(x) if isinstance(x, list) else x)

class Translator(dspy.Signature):
    __doc__ = f"""Do not include the context and introduction in the output. Translate to German."""
    context = dspy.InputField()
    output = dspy.OutputField(desc="German")

def valid_categories(predicted_categories, categories):
    """check if predcited {class_of_categories} is a valid {class_of_categories}"""
    return all(str(item) in categories for item in predicted_categories)

In [None]:
from utils.chunkers import extract_output, extract_reasoning
#from utils.chunkers import read_german_abbreviations, chunk_german_multi_sentences
#ger_abbr = read_german_abbreviations('utils/german_abbreviations.txt')
# Grounding with prior
HINT = "Valid {class_of_categories} are:" 
hint = f"{HINT} {', '.join(categories)}." if categories else None

class ScanReport(dspy.Module):
    def __init__(self):
        super().__init__()

        # devides report into chunks
        #self.chunk = chunk_german_multi_sentences(abbreviations=abbreviations, sentences_per_chunk=sentences_per_chunk, overlap=overlap)
        # preselect relevant snippets
        self.preselection = dspy.Predict(ChunkerSignature)
        # given an annual report snippet, rate relevance to information extraction
        self.relevance = dspy.ChainOfThought(PredictRelevance)
        # given a snippet, predict a list of relevant categories using a CoT
        self.predict = dspy.ChainOfThoughtWithHint(PredictCategory)
        # reduce the number of extracted infos
        self.translator = dspy.Predict(Translator)

    def forward(self, context):
        hint = f"{HINT} {', '.join(categories)}."
        answers = []
        reasoning = []
        
        preselection = self.preselection(context=context)
        # print([item.replace('"', '') for item in extract_output(preselection.output).split('", "')])

        # for each chunk in the preselection
        for snippet in [item.replace('"', '') for item in extract_output(preselection.output).split('", "')]:
            # use the LM to predict relevant products
            chunk_categories = self.predict(context=[snippet], hint=hint)

            chunk_relevance = self.relevance(context=[snippet])
            entry = {
                "quote": snippet,
                "relevance_score": chunk_relevance.output, 
                "categories": [item.strip() for item in chunk_categories.output.split(',')],
                "reasoning_categories": self.translator(context = extract_reasoning(chunk_categories.rationale)).output,
                "reasoning_relevance": self.translator(context = extract_reasoning(chunk_relevance.rationale)).output 
                }
            
            #Assert categories/classes are correct
            #print(chunk_categories.output)
            #dspy.Assert(valid_categories([item.strip() for item in chunk_categories.output.split(',')],pred_categories))
            
            answers.append(entry)
        

        return dspy.Prediction(context=context, answer=answers)

In [None]:
scan = ScanReport()
fullana = scan(train_dspy[8].context)
#scansnip(context=train_dspy[0]['full_report'])

In [None]:
fullana.answer

Validation metrics

In [None]:
def validation_metric(expected: dspy.Example, pred: dspy.Prediction, trace=None) -> int:
    """Validation metric based on string comparison and regex. 

    Parameters
    ----------
    expected : dspy.Example
        Expected/example (target) data
    pred: dspy.Prediction
        Predicted data
    trace
        If None a score betwen 0 and 1 is returned, else True or False

    Returns
    -------
    int/boolean
        int: score between 0 and 1 if trace=None
        boolean: if trace!=None  
    """

    final_score = 0.0

    ## gather quotes and categories
    pred_quotes = [text_preprocessing(item['quote']) for item in pred.answer]
    pred_relevance = [item['relevance_score'] for item in pred.answer]
    pred_categories = [item['categories'] for item in pred.answer]
    
    expected_quotes = ''.join([text_preprocessing(item['quote']) for item in expected.answer]) # as we already store all quotes in one string
    expected_relevance = float([item['relevance_score'] for item in expected.answer][0])
    expected_count_quote = float([item['count_quote'] for item in expected.answer][0])
    expected_categories = [item['categories'] for item in expected.answer]

    quote_match_score = 0.0
    categories_match_score = 0.0

    # if there are quotes in the target data
    if expected_quotes != '':
        quote_match_res = quote_match(expected_quotes, expected_count_quote, pred_quotes, pred_relevance)
        quote_match_score = quote_match_res[0]
        quote_match_indexes = quote_match_res[1]

        if len(quote_match_indexes)>0:
            categories_match_score, categories_match_size = categories_match(expected_categories,pred_categories, quote_match_indexes) 
        else: # we assume that expected quotes are exist, but was not matched with the any predicted quotes (llm results)
            # print('The quotes match was not found, but the example quote(s) exist(s)')
            # look for every predicted category
            categories_match_score, categories_match_size = categories_match(expected_categories,pred_categories, range(0,len(pred_categories)))
    # else:
        # print('The example quote is not found in the target data. No validation for that example.')
    
    # compute final score 
    final_score = (quote_match_score+categories_match_score)/2
    final_score = min(1.0, final_score)

    # add trace for not None (boolean)
    if trace != None:
        final_score = (quote_match_score > 0.3) & (categories_match_score > 0.3)

    return final_score
    
validation_metric(expected = train_dspy[7] , pred = fullana)

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=train_dspy[0:13], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(ScanReport(), metric=validation_metric, return_all_scores=True)

In [None]:
# LLM as Judge Metric 

# Define metric as softer measure for the reasoning

def lm_metric(expected: dspy.Example, pred: dspy.Prediction, trace=None) -> int:
    """Validation metric based on LLM as a Judge for quotes and 
    string comparison for categories. 

    Parameters
    ----------
    expected : dspy.Example
        Expected/example (target) data
    pred: dspy.Prediction
        Predicted data
    trace
        If None a score betwen 0 and 1 is returned, else True or False

    Returns
    -------
    int/boolean
        int: score between 0 and 1 if trace=None
        boolean: if trace!=None  
    """
    
    ## gather quotes and categories
    pred_quotes = [item['quote'] for item in pred.answer]
    pred_relevance = [item['relevance_score'] for item in pred.answer]
    pred_categories = [item['categories'] for item in pred.answer]
    
    expected_quotes = ''.join([item['quote'] for item in expected.answer]) # as we already store all quotes in one string
    expected_relevance = float([item['relevance_score'] for item in expected.answer][0])
    expected_count_quote = float([item['count_quote'] for item in expected.answer][0])
    expected_categories = [item['categories'] for item in expected.answer]

    # initialize scores
    final_score = 0.0
    quote_match_score = 0.0
    categories_match_score = 0.0

    # if there are quotes in the target data
    if expected_quotes != '':

        # quotes
        quote_match_res = quote_match_lm(expected_quotes, expected_count_quote, pred_quotes, pred_relevance)#, lm_gpt)
        quote_match_score = quote_match_res[0]
        quote_match_indexes = quote_match_res[1]

        # categories
        if len(quote_match_indexes)>0:
            categories_match_score, categories_match_size = categories_match(expected_categories,pred_categories, quote_match_indexes) 
        else: # we assume that expected quotes are exist, but was not matched with the any predicted quotes (llm results)
            # print('The quotes match was not found, but the example quote(s) exist(s)')
            # look for every predicted category
            categories_match_score, categories_match_size = categories_match(expected_categories,pred_categories, range(0,len(pred_categories)))
        # if len(quote_match_indexes)>0:
        #     categories_match_score = categories_match_lm(expected_categories, pred_categories, quote_match_indexes) 

    final_score = (quote_match_score+categories_match_score)/2

    # add trace for not None (boolean)
    if trace != None:
        final_score = (quote_match_score > 0.3) & (categories_match_score > 0.3)

    return final_score


print(lm_metric(expected = train_dspy[7] , pred = fullana))

# def full_lm_metric(expected: dspy.Example, pred: dspy.Prediction, trace=None) -> int:
#     """Validation metric based purely on LLM as a Judge for 
#      quotes, categories and reasoning. 

#     Parameters
#     ----------
#     expected : dspy.Example
#         Expected/example (target) data
#     pred : dspy.Prediction
#         Predicted data
#     trace
#         If None a score betwen 0 and 1 is returned, else True or False

#     Returns
#     -------
#     int/boolean
#         int: score between 0 and 1 if trace=None
#         boolean: if trace!=None  
#     """

#     ## gather quotes and categories
#     pred_quotes = [item['quote'] for item in pred.answer]
#     pred_relevance = [item['relevance_score'] for item in pred.answer]
#     pred_categories = [item['categories'] for item in pred.answer]
#     pred_reasoning_relevance = [item['reasoning_relevance'] for item in pred.answer]

#     expected_quotes = ''.join([item['quote'] for item in expected.answer]) # as we already store all quotes in one string
#     expected_relevance = float([item['relevance_score'] for item in expected.answer][0])
#     expected_count_quote = float([item['count_quote'] for item in expected.answer][0])
#     expected_categories = [item['categories'] for item in expected.answer]

#     # initialize scores
#     final_score = 0.0
#     quote_match_score = 0.0
#     categories_match_score = 0.0

#     reasoning_relevance_score = relevance_reasoning_lm(pred_quotes, pred_reasoning_relevance, pred_relevance, lm_gpt)

#     # if there are quotes in the target data
#     if expected_quotes != '':

#         quote_match_res = quote_match_lm(expected_quotes, expected_count_quote, pred_quotes, pred_relevance, lm_gpt)
#         quote_match_score = quote_match_res[0]
#         quote_match_indexes = quote_match_res[1]

#         if len(quote_match_indexes)>0:
#             categories_match_score = categories_match_lm(expected_categories, pred_categories, quote_match_indexes, lm_gpt) 

#     else:
#         # print('The example quote is not found in the target data. No validation for that example.')
#         quote_match_score = 0.0

#         for pred_item in pred_relevance:
#             # print(pred_item)
#             try:
#                 pred_item = (100 - float(pred_item))/100
#             except:
#                 pred_item = 0.5
#             quote_match_score = quote_match_score + pred_item   
        
#         quote_match_score = quote_match_score/len(pred_relevance)
    
#     final_score = ((2*quote_match_score) + (2*categories_match_score) + reasoning_relevance_score) / 5

#     # add trace for not None (boolean)
#     if trace != None:
#         final_score = (quote_match_score > 0.3) & (categories_match_score > 0.3) & (reasoning_relevance_score > 0.3)

#     return final_score


# print(full_lm_metric(expected = train_dspy[1] , pred = fullana))

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=train_dspy[0:13], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(ScanReport(), metric=lm_metric, return_all_scores=True)

# Optimizers

BootstrapFewShot

In [None]:
## consider compiling with BootstrapFewShot with reasoning and relevance score 
from teleprompt.bootstrap import BootstrapFewShot

# Set up a basic teleprompter, which will compile our program.
teleprompter = BootstrapFewShot(metric=validation_metric, max_rounds=4,max_bootstrapped_demos=6, max_errors=1)

# Compile BootstrapFewShot
compiled_ScanReport = teleprompter.compile(ScanReport(), trainset=train_dspy)

COPRO

In [None]:
## consider compiling with COPRO 
from teleprompt.copro_optimizer import COPRO

# Set COPRO
teleprompter = COPRO(metric=validation_metric, breadth=2, depth=2, init_temperature=0.9)

eval_dict = {
    "display_progress": True,
    "display_table": 0
    }

# Compile COPRO
compiled_ScanReport = teleprompter.compile(student = ScanReport(), trainset=train_dspy[0:5], eval_kwargs = eval_dict)

COPRO and BootstrapFewShot combined

In [None]:
## consider compiling with BootstrapFewShot with reasoning and relevance score 
from teleprompt.bootstrap import BootstrapFewShot

# Set up a basic teleprompter, which will compile our program.
teleprompter = BootstrapFewShot(metric=validation_metric, max_rounds=4,max_bootstrapped_demos=5, max_errors=1)

# Compile BootstrapFewShot
compiled_ScanReport = teleprompter.compile(compiled_ScanReport, trainset=train_dspy)

In [None]:
## consider compiling with two optimizers
from teleprompt.copro_optimizer import COPRO
from teleprompt.bootstrap import BootstrapFewShot

opt_metric = validation_metric

# Set up COPRO
teleprompter = COPRO(metric=opt_metric, breadth=2, depth=2, init_temperature=0.9)
eval_dict = {
    "display_progress": False,
    "display_table": 0
    }
# Compile COPRO
compiled_ScanReport1 = teleprompter.compile(student = ScanReport(), trainset=train_dspy[0:6], eval_kwargs = eval_dict)

print("COPRO round 1 done, optimize with BootstrapFewShot")

# Set up BootstrapFewShot
teleprompter = BootstrapFewShot(metric=opt_metric, max_rounds=4,max_bootstrapped_demos=6, max_errors=1)
# Compile BootstrapFewShot
compiled_ScanReport2 = teleprompter.compile(compiled_ScanReport1, trainset=train_dspy)

print("BootstrapFewShot done, optimize with COPRO round 2")

# again COPRO
teleprompter = COPRO(metric=opt_metric, breadth=2, depth=2, init_temperature=0.9)
compiled_ScanReport = teleprompter.compile(student = compiled_ScanReport2, trainset=train_dspy[7:13], eval_kwargs = eval_dict)

Evaluation

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=train_dspy[0:13], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(compiled_ScanReport, metric=validation_metric, return_all_scores=True)

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=train_dspy[0:13], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(compiled_ScanReport, metric=lm_metric, return_all_scores=True)

Evaluate Validation Dataset

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=val_dspy[0:5], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(ScanReport(), metric=validation_metric, return_all_scores=True)

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=val_dspy[0:5], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(ScanReport(), metric=lm_metric, return_all_scores=True)

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=val_dspy[0:5], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(compiled_ScanReport, metric=validation_metric, return_all_scores=True)

In [None]:
# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=val_dspy[0:5], num_threads=1, display_progress=True, display_table=0)

# Launch evaluation.
evaluator(compiled_ScanReport, metric=lm_metric, return_all_scores=True)