In [27]:
"""
Table QA - RAG approach with tables converted to markdown format.

See https://haystack.deepset.ai/tutorials/22_pipeline_with_promptnode
"""
import os
from pathlib import Path

import pandas as pd
from haystack import Document
from haystack.nodes import AzureConverter, EmbeddingRetriever, PromptNode, PromptTemplate, AnswerParser
from haystack.document_stores import InMemoryDocumentStore
from haystack.pipelines import Pipeline
from haystack.utils import print_answers
from haystack.nodes import BaseComponent

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
AZURE_CONVERTER_KEY = os.environ.get("AZURE_CONVERTER_KEY")

## Convert PDF

In [28]:
def convert_validation_pdf() -> list[Document]:
    """
    Returns a list of Documents from the validation PDF.

    Uses the AzureConverter to convert the PDF to tables and text documents.

    Returns
    -------
    converted_docs : list[Document]
        The list of Documents from the validation PDF.
    """
    converted_docs = []
    file_path = Path("/home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split")

    converter = AzureConverter(
        endpoint="https://azureconverter.cognitiveservices.azure.com/",
        credential_key=AZURE_CONVERTER_KEY,
        model_id="prebuilt-layout",  # Was "prebuilt-document"
        save_json=True
    )

    for fn in file_path.glob("*.pdf"):
        print(f"Converting {fn}")
        docs = converter.convert(file_path=fn, meta=None)
        converted_docs.extend(docs)

    return converted_docs

In [29]:
def load_validation_pdf_from_json() -> list[Document]:
    """
    Return a list of Documents from the validation PDF, loaded from JSON files.

    Requires AzureConverter to have been run on the PDF and saved the JSON files.

    Returns
    -------
    converted_docs : list[Document]
        The list of Documents from the validation PDF.
    """
    converted_docs = []
    file_path = Path("/home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split")

    converter = AzureConverter(
        endpoint="https://azureconverter.cognitiveservices.azure.com/",
        credential_key=AZURE_CONVERTER_KEY,
        model_id="prebuilt-layout",  # Was "prebuilt-document"
    )

    for fn in file_path.glob("*.json"):
        print(f"Loading {fn}")
        docs = converter.convert_azure_json(file_path=fn)
        converted_docs.extend(docs)

    return converted_docs

In [30]:
docs = load_validation_pdf_from_json()

Loading /home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split/SASOL Sustainability Report 2023 20-09_0_minimal [7-8].json
Loading /home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split/SASOL Sustainability Report 2023 20-09_0_minimal [5-6].json
Loading /home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split/SASOL Sustainability Report 2023 20-09_0_minimal [11].json
Loading /home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split/SASOL Sustainability Report 2023 20-09_0_minimal [3-4].json
Loading /home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split/SASOL Sustainability Report 2023 20-09_0_minimal [1-2].json
Loading /home/tomw/unifi-pdf-llm/data/validate/SASOL Sustainability Report 2023 20-09_0_minimal_split/SASOL Sustainability Report 2023 20-09_0_minimal [9-10].json


## Preprocess Documents

**TODO**: Test removing the index from the tables. I don't think it adds much. Could be 
added as context to each document, and then used to recreate context of row. 

In [35]:
def preprocess_documents(
        docs: list[Document],
        window_size: int=5
    ) -> list[Document]:
    """
    Preprocess the documents.

    Parameters
    ----------
    docs : list[Document]
        The documents to preprocess.

    window_size : int
        The size of the sliding window used to split the tables.

    Returns
    -------
    docs : list[Document]
        The preprocessed documents.
    """
    preprocessed_docs = []

    for doc in docs:
        if doc.content_type == "table":
            doc.content = clean_table_column_names(doc.content)
            sliced_table_docs = slice_table_document(doc, window_size)
            preprocessed_docs.extend(sliced_table_docs)
        else:
            preprocessed_docs.append(doc)

    convert_tables_to_markdown(preprocessed_docs)

    return preprocessed_docs


def clean_table_column_names(df: pd.DataFrame, replace: str=' - ') -> pd.DataFrame:
    """
    Return a DataFrame with newlines removed from column headers.

    Parameters
    ----------
    df : pd.Dataframe
        The DataFrame to clean.

    replace: str
        The string to replace newlines with.

    Returns
    -------
    df : pd.Dataframe
        The dataframe with newlines removed from column headers.
    """
    df.columns = df.columns.str.replace('\n', replace)
    return df


def slice_table_document(doc: Document, window_size: int=5) -> list[Document]:
    """
    Return a list of documents, each containing a table with `window_size` rows.

    A sliding window approach is used to split the table into smaller tables. The
    returned documents have the same metadata as the original document, except for
    the content and id.

    Parameters
    ----------
    doc : Document
        Document with content_type "table".

    window_size : int
        The size of the sliding window.

    Returns
    -------
    docs : list[Document]
        A list of documents, each one containing a table with `window_size` rows.

    Raises
    ------
    ValueError
        If the document does not contain a table.
    """
    if doc.content_type != "table":
        raise ValueError("The document does not contain a table.")

    tables = _sliding_window(doc.content, window_size)
    docs = []
    for table in tables:
        new_doc = Document(content=table)
        for attr, value in doc.__dict__.items():
            if attr not in ["content", "id"]:
                setattr(new_doc, attr, value)
        docs.append(new_doc)

    return docs


def _sliding_window(df: pd.DataFrame, window_size: int) -> list[pd.DataFrame]:
    """
    Return a list of DataFrames, each containing a window of the original DataFrame.

    Parameters
    ----------
    df : pandas.DataFrame
        The DataFrame to split.

    window_size : int
        The size of the sliding window.

    Returns
    -------
    tables : list[pandas.DataFrame]
        A list of DataFrames, each containing a window of the original DataFrame.
    """
    tables = [df.iloc[i:i+window_size] for i in range(len(df) - window_size + 1)]

    return tables


def convert_tables_to_markdown(docs: list[Document]) -> None:
    """
    Convert tables to markdown format in place.

    Parameters
    ----------
    docs : List[Document]
        List of Documents, some of which may have `content_type` 'table'.
    """
    for doc in docs:
        if doc.content_type == "table":
            _convert_table_to_markdown(doc)


def _convert_table_to_markdown(doc: Document) -> None:
    """
    Convert table to markdown format in place.

    Parameters
    ----------
    doc : Document
        Document with `content_type` table.

    Raises
    ------
    ValueError
        If `doc.content_type` is not "table".
    """
    if doc.content_type != "table":
        raise ValueError(f"Document content_type must be 'table', not '{doc.content_type}'")

    table = doc.content
    markdown_table = table.to_markdown(tablefmt="github")

    doc.content = markdown_table
    doc.content_type = "text"

In [36]:
docs = preprocess_documents(docs, window_size=1)

print(f"Number of documents: {len(docs)}")

Number of documents: 806


In [37]:
print(docs[24].content)

|    | Sasol in Society - Spend   | 2023 - Rm   | 2022 - Rm   | 2021 - Rm   |   2020 - Rm | LOA 2023   | Footnote   |
|----|----------------------------|-------------|-------------|-------------|-------------|------------|------------|
| 24 | Sasolburg                  | 1 360       | 1 366       | 1 586       |        1440 |            |            |


## Retrieval Augmented Generation

In [323]:
# TODO: Try to use other document stores (e.g. FAISS).

document_store = InMemoryDocumentStore(embedding_dim=384)

document_store.delete_documents()
document_store.write_documents(docs)

In [324]:
# TODO: I'm not sure what OpenAI embedding models are available. Is it possible to use
# their newest embedding models in Haystack v1?

# TODO: Look into other (non-OpenAI) embedding models that can be used with Haystack v1.

retriever = EmbeddingRetriever(
    embedding_model="sentence-transformers/all-MiniLM-L6-v2",
    document_store=document_store,
    top_k=5
)

document_store.update_embeddings(retriever=retriever)

Batches: 100%|██████████| 24/24 [00:00<00:00, 36.29it/s]ocs/s]
Documents Processed: 10000 docs [00:00, 14654.23 docs/s]       


In [325]:
# Testing the retriever

# Try the Retriever
retrieved_tables = retriever.retrieve("What was the number of permanent employees 2021?", top_k=3)

# Get highest scored table
print(retrieved_tables[0].content)

Batches: 100%|██████████| 1/1 [00:00<00:00, 197.29it/s]

|    | Human Capital - Our people   | 2023   | 2022   | 2021   | 2020   | LoA 2023   | Footnote   |
|----|------------------------------|--------|--------|--------|--------|------------|------------|
|  1 | Permanent employees          | 28 657 | 28 279 | 28 725 |        |            |            |





In [348]:
# Testing the retriever

# Try the Retriever
retrieved_tables = retriever.retrieve("What was the Number of fatalities spend in the year 2023?", top_k=3)

# Get highest scored table
print(retrieved_tables[1].content)

Batches: 100%|██████████| 1/1 [00:00<00:00, 52.93it/s]

|    | Human Capital - Our people               |   2023 |   2022 | 2021   | 2020   | LoA 2023   | Footnote   |
|----|------------------------------------------|--------|--------|--------|--------|------------|------------|
| 39 | Employee and service provider fatalities |      2 |      4 |        |        |            |            |





In [329]:
rag_prompt = PromptTemplate(
    prompt="""Use the following pieces of context to answer the question at the end.
              The context may be text or a markdown table.
              Just retrieve the answer from the context. Please don't do any unit conversion.
              If you don't know the answer, please return 'None' for the answer and unit.
              Do not return any words other than 'Answer' and 'Unit' in the answer.
              Please return the answer in the format 'Answer: <number or None>, Unit: <unit or None>'.

              \n\n Context: {join(documents)} \n\n Question: {query} \n\n Answer:""",
    output_parser=AnswerParser(),
)

generation_node = PromptNode(
    model_name_or_path="gpt-3.5-turbo-1106",
    api_key=OPENAI_API_KEY,
    default_prompt_template=rag_prompt,
    output_variable="generated_answer",
    model_kwargs={"temperature": 0}  # It doesn't seem that the `temperature` parameter is having any effect. Seems like a bug. Might work in haystack 2.0.
)

In [330]:
class GeneratedAnswerParser(BaseComponent):
    """
    Parse the output returned by the generation node.

    The output is expected to be in the format "Answer: <number or None>, Unit: <unit or None>".
    """
    outgoing_edges = 1

    def run(self, generated_answer):
        """
        Parse the output returned by the generation node.

        The output is expected to be in the format "Answer: <number or None>, Unit: <unit or None>".

        Parameters
        ----------
        generated : list[Answer]
            The output returned by the generation node.

        Returns
        -------
        dict
            A dictionary containing the answer and unit.
        """
        output = generated_answer[0].answer

        answer, unit = output.split(", ")
        answer = answer.split(": ")[1]
        unit = unit.split(": ")[1]

        if answer == "None":
            answer = None
        else:
            answer = answer.replace(" ", "")
            answer = answer.replace(",", "")
            answer = int(answer)

        return {"answer": answer, "unit": unit}, "output_1"

    def run_batch(self, **kwargs):
        # TODO: Implement batch processing.
        pass

In [331]:
gen_parser = GeneratedAnswerParser()

In [332]:
# Unit conversion. TODO: Move

def create_unit_conversion_prompt(value, unit, target_unit):
    prompt=f"""You are an expert unit converter. You are aware of how to convert
    between different units within the same system of measurement.
    For example, 1236 million = 1236 * 1 million = 1236 * 1000000 = 1236000000.
    For example, to convert from Rm to R, you would multiply by 1000000. This is because
    1 Rm = 1000000 R.
    Please return a single number as your answer. Do not elaborate or give
    any context.\n\n

    What is {value} {unit} in {target_unit}? \n\n Answer:"""

    return prompt


unit_conversion_node = PromptNode(
    model_name_or_path="gpt-3.5-turbo",
    api_key=OPENAI_API_KEY,
    model_kwargs={"temperature": 0}  # It doesn't seem that the `temperature` parameter is having any effect. Seems like a bug. Might work in haystack 2.0.
)

query = create_unit_conversion_prompt(1.24, "Rm", "R")

unit_conversion_node(query)

['1240000']

In [333]:
querying_pipeline = Pipeline()
querying_pipeline.add_node(component=retriever, name="retriever", inputs=["Query"])
querying_pipeline.add_node(component=generation_node, name="prompt_node", inputs=["retriever"])
querying_pipeline.add_node(component=gen_parser, name="gen_parser", inputs=["prompt_node"])

In [334]:
output = querying_pipeline.run(
    query="What was the Black women-owned spend in the year 2023?",
    params={
        "retriever": {"debug": True},
        "prompt_node": {"debug": True},
        "gen_parser": {"debug": True},
    }
)

print(f"Answer: {output['answer']}, Unit: {output['unit']}")

Batches: 100%|██████████| 1/1 [00:00<00:00, 43.58it/s]


Answer: 28500, Unit: Rm


In [335]:
# TODO: Write function to convert the answer to a more human-readable format.
# In particular, the markdown tables aren't very readable as a single line of text.

output["_debug"]

{'retriever': {'input': {'root_node': 'Query',
   'query': 'What was the Black women-owned spend in the year 2023?',
   'debug': True},
  'output': {'documents': [<Document: {'content': '|    | Sasol in Society - Spend   | 2023 - Rm   | 2022 - Rm   | 2021 - Rm   | 2020 - Rm   | LOA 2023   | Footnote   |\n|----|----------------------------|-------------|-------------|-------------|-------------|------------|------------|\n| 17 | Black-owned women spend    | 28 500      | 21 600      | 15 800      |             |            |            |', 'content_type': 'text', 'score': 0.5013344378378155, 'meta': {'preceding_context': 'DATA AND ASSURANCE\nANNEXURES\nPERFORMANCE DATA CONTINUED', 'following_context': 'Secunda\n13 903\n14 399', 'page': 1}, 'id_hash_keys': ['content'], 'embedding': None, 'id': 'c85916ccc6c6e24864bdb9db90b6f143'}>,
    <Document: {'content': '|    | Human Capital - Our people                | 2023   |   2022 |   2021 | 2020   | LoA 2023   |   Footnote |\n|----|-----------

The `gpt-3.5-turbo` model has a context window of 4,096 tokens. As a result, my prompt is often
being truncated so that the prompt length and answer length (100 tokens) fit within the max token
limit. The updated GPT-3.5 model (`gpt-3.5-turbo-0125`) has a larger context window of 
16,385 tokens. Would be good to use this, if possible (may require using haystack 2.0)
The slightly older GPT-3.5 model `gpt-3.5-turbo-1106` has a larger context window and is available 
with haystack 1.0. I'll use this for now.

In [336]:
output = querying_pipeline.run(
    query="What was the B-BBEE status in the year 2021? Do not include the word 'Level' in the answer.",
    params={
        "retriever": {"debug": True},
        "prompt_node": {"debug": True},
        "gen_parser": {"debug": True},
    }
)

print(output['answer'])
print(output['unit'])

Batches: 100%|██████████| 1/1 [00:00<00:00, 84.70it/s]


4
None


Need to be smarter when parsing the output from the model. When retrieving the B-BBEE status
the model consistently returns 'Level 3' (and not just 3). Could guardrails be used? Alternatively, appending 'Do not include the word 'Level' in the answer.' to the query also solves the 
issue. 

In [337]:
output = querying_pipeline.run(
    query="What was the Employee turnover in the year 2021?",
    params={
        "retriever": {"debug": True},
        "prompt_node": {"debug": True},
        "gen_parser": {"debug": True},
    }
)

print(output['answer'])
print(output['unit'])

Batches: 100%|██████████| 1/1 [00:00<00:00, 98.70it/s]


3869
None


Should there be two pipelines - one for queries which may require unit conversion,
and another for queries that don't? For example, when retrieving the employee turnover,
the unit should always be 'None'. So why ask the model to try and retrieve this - want 
to make it easier for the model when possible.

In [93]:
querying_pipeline.draw()

  graphviz.draw(path)


Working very well. Only issue I have seen so far is not being able to answer "What was the 
GHG Scope 2 emissions in the year 2021?". 

**TODO:** Add to querying pipeline a step to parse the returned output 'Answer: <>, Unit: <>'
to a single number. For cases where 'Unit' is not `None`, this will likely involve another
`PromptNode` to do the conversion.

## Validation of querying pipeline

In [338]:
import pandas as pd

VALIDATION_FILE = Path("/home/tomw/unifi-pdf-llm/data/validate/rag_esg_metric_validation.csv")

In [339]:
def validate_rag(querying_pipeline: Pipeline):
    validation_df = pd.read_csv(VALIDATION_FILE)
    results_df = validation_df.copy(deep=True)

    # Add row to results_df for the generated answer
    results_df["Generated"] = None

    for idx, row in validation_df.iterrows():
        year = row["Year"]
        metric = row["Metric"]
        query = f"What was the {metric} in the year {year}?"

        if metric in ["B-BBEE status", "B-BBEE scorecard level"]:
            query += " Do not include the word 'Level' in the answer."

        output = querying_pipeline.run(query=query)
        answer = output["answer"]

        results_df.at[idx, "Generated"] = answer

    return results_df


In [340]:
results = validate_rag(querying_pipeline)

Batches: 100%|██████████| 1/1 [00:00<00:00, 141.01it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 124.40it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 120.55it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 122.84it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 218.99it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 158.63it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 132.28it/s]
Batches: 100%|██████████| 1/1 [00:00<00:00, 126.26it/s]


In [341]:
results

Unnamed: 0,Company,Year,Metric,Unit,Source,Content Type,Page,Notes,Answer,Generated
0,SASOL,2023,Number of permanent employees,,SASOL Sustainability Report 2023 20-09_0.pdf,Table,17,May get confuesed with employee numbers in tab...,28657,28657
1,SASOL,2023,Employee turnover,,SASOL Sustainability Report 2023 20-09_0.pdf,Text,18,,1725,1725
2,SASOL,2023,B-BBEE status,,SASOL Sustainability Report 2023 20-09_0.pdf,Table,57,,3,3
3,SASOL,2023,B-BBEE scorecard level,,SASOL Sustainability Report 2023 20-09_0.pdf,Table,57,,3,3
4,SASOL,2023,Black women-owned spend,rand,SASOL Sustainability Report 2023 20-09_0.pdf,Text,11,May be retrieved from table. Requires conversi...,28500,28500
5,SASOL,2023,Black-owned spend,rand,SASOL Sustainability Report 2023 20-09_0.pdf,Text,11,Required conversion from million rand to rand,41700,41700
6,SASOL,2023,Number of fatalities,,SASOL Sustainability Report 2023 20-09_0.pdf,Text,11,,2,3
7,SASOL,2023,Number of undergraduate and postgraduate bursa...,,SASOL Sustainability Report 2023 20-09_0.pdf,Text,18,,544,544
