# A RAG pipeline using tabular data

In [1]:
from pydantic import BaseModel
from unstructured.partition.html import partition_html
import pandas as pd
from dotenv import load_dotenv
import os

load_dotenv()

True

This tutorial was inspired by https://docs.llamaindex.ai/en/latest/examples/query_engine/sec_tables/tesla_10q_table.html#perform-data-extraction

In [2]:
PDF_URL = "https://www.segurossura.com.co/documentos/condicionados/personas/vehiculos/plan-autos-global-clasico-basico.pdf"

In [3]:
from typing import Any

# TODO: Explain what this is
class Element(BaseModel):
    id: str
    type: str
    element: Any
    summary: str | None = None
    table: pd.DataFrame | None = None

    class Config:
        arbitrary_types_allowed = True

In [4]:
# !wget "https://www.dropbox.com/scl/fi/mlaymdy1ni1ovyeykhhuk/tesla_2021_10k.htm?rlkey=qf9k4zn0ejrbm716j0gg7r802&dl=1" -O tesla_2021_10k.htm
# !wget "https://www.dropbox.com/scl/fi/rkw0u959yb4w8vlzz76sa/tesla_2020_10k.htm?rlkey=tfkdshswpoupav5tqigwz1mp7&dl=1" -O tesla_2020_10k.htm

In [5]:
from lxml import html

def html_to_df(html_str: str) -> pd.DataFrame:
    # print(html_str)
    tree = html.fromstring(html_str)
    # print(tree.xpath('//table'))
    table_element = tree.xpath("//table")[0]
    rows = table_element.xpath(".//tr")

    data = []
    for row in rows:
        cols = row.xpath(".//td")
        cols = [c.text.strip() if c.text is not None else "" for c in cols]
        data.append(cols)

    df = pd.DataFrame(data[1:], columns=data[0])
    return df

In [6]:
def filter_table(table_element):
    table_df = html_to_df(table_element.metadata.text_as_html)
    if len(table_df) <= 1 or len(table_df.columns) <= 1:
        return False
    else:
        return True

In [7]:
# Extracts elements from HTML. If table and meets filter conditions, store as Pydantic table element
# otherwise, store as text. Discard if table but does not pass filters.
def extract_elements(filename, table_filters=[]):
    elements = partition_html(filename=filename)
    output_els = []
    for idx, element in enumerate(elements):
        if "unstructured.documents.html.HTMLTable" in str(type(element)):
            should_keep = all([tf(element) for tf in table_filters])
            if should_keep:
                table_df = html_to_df(str(element.metadata.text_as_html))
                output_els.append(
                    Element(
                        id=f"id_{idx}", type="table", element=element, table=table_df
                    )
                )
            else:
                pass
        else:
            output_els.append(Element(id=f"id_{idx}", type="text", element=element))
    return output_els

In [8]:
def get_table_elements(elements):
    return [e for e in elements if e.type == "table"]


def get_text_elements(elements):
    return [e for e in elements if e.type == "text"]

In [9]:
elements = extract_elements("tesla_2021_10k.htm", table_filters=[filter_table])

In [10]:
# Split table and text elements
table_elements = get_table_elements(elements)
text_elements = get_text_elements(elements)

In [11]:
## Summarize tables
from llama_index import SummaryIndex, Document, ServiceContext
from llama_index.llms import Anyscale
from tqdm.notebook import tqdm

# Using anyscale for pricing
ANYSCALE_API_KEY = os.environ["ANYSCALE_API_KEY"]
llm = Anyscale("meta-llama/Llama-2-70b-chat-hf", api_key=ANYSCALE_API_KEY)

system_prompt = """\
You are an assistant designed to extract insights from messy tables in a financial report.

You are also designed to filter out "tables" that are not useful to keep. For instance, if the table \
is a wrongfully extracted piece of text, or does not contain any useful information.
"""
service_context = ServiceContext.from_defaults(system_prompt=system_prompt, llm=llm)

******
Could not load OpenAIEmbedding. Using HuggingFaceBgeEmbeddings with model_name=BAAI/bge-small-en. If you intended to use OpenAI, please check your OPENAI_API_KEY.
Original error:
No API key found for OpenAI.
Please set either the OPENAI_API_KEY environment variable or openai.api_key prior to initialization.
API keys can be found or created at https://platform.openai.com/account/api-keys

******


The script requires that you output the table summaries in a structured format. For this, you'll use `.as_query_engine's` `output_cls` argument.

The `output_cls` argument behaves differently depending on the LLM in the service context:
- `OpenAI`: Leverages function calling to output the table summaries in a structured format.
- `Others`: Parses JSON output as Pydantic Object.

In this tutorial, we're using Anyscale. Consequently, we must tell the LLM to output it's results as JSON for `output_cls` to work.

In [37]:
class TableOutput(BaseModel):
    summary: str
    should_keep: bool


summarizer_service_context = ServiceContext.from_defaults(llm=llm)

# TODO: Make these calls asyncronous
def extract_table_summaries(elements: list[Element]) -> None:
    for element in tqdm(elements):
        if element.type != "table":
            continue
        index = SummaryIndex.from_documents([Document(text=str(element.element))], service_context=summarizer_service_context)
        query_engine = index.as_query_engine(output_cls=TableOutput)
        query_str = """\
What is this table about? Give a very concise summary (imagine you are adding a caption), \
and also output whether or not the table should be kept. Return a json object.
"""
        response = query_engine.query(query_str)
        element.summary = response.response.summary

******
Could not load OpenAIEmbedding. Using HuggingFaceBgeEmbeddings with model_name=BAAI/bge-small-en. If you intended to use OpenAI, please check your OPENAI_API_KEY.
Original error:
No API key found for OpenAI.
Please set either the OPENAI_API_KEY environment variable or openai.api_key prior to initialization.
API keys can be found or created at https://platform.openai.com/account/api-keys

******


In [38]:
# extract_table_summaries(table_elements)

  0%|          | 0/105 [00:00<?, ?it/s]

In [41]:
# Save results
import pickle
pickle.dump(elements, open("elements.pkl", "wb"))

## Recursive retriever

In [39]:
from llama_index.schema import TextNode, IndexNode
from llama_index.node_parser import SimpleNodeParser, NodeParser

In [40]:
def _get_nodes_from_buffer(buffer, node_parser: NodeParser):
    doc = Document(text="\n\n".join([t for t in buffer]))
    nodes = node_parser.get_nodes_from_documents([doc])
    return nodes


def get_nodes_and_mappings(elements: list[Element]):
    node_parser = SimpleNodeParser.from_defaults()

    nodes = []
    node_mappings = {}
    other_mappings = {}
    cur_text_el_buffer = []
    for element in elements:
        if element.type == "table":
            if len(cur_text_el_buffer) > 0:
                cur_text_nodes = _get_nodes_from_buffer(cur_text_el_buffer, node_parser)
                nodes.extend(cur_text_nodes)
                cur_text_el_buffer = []

            index_node = IndexNode(
                text=str(element.summary), index_id=(element.id + "_table")
            )
            table_df = element.table
            table_str = table_df.to_string()
            node_mappings[(element.id + "_table")] = TextNode(text=table_str)
            other_mappings[(element.id + "_table")] = (
                element.table,
                str(element.summary),
            )
            nodes.append(index_node)
        else:
            cur_text_el_buffer.append(str(element.element))
    if len(cur_text_el_buffer) > 0:
        cur_text_nodes = _get_nodes_from_buffer(cur_text_el_buffer, node_parser)
        nodes.extend(cur_text_nodes)
        cur_text_el_buffer = []
    return nodes, node_mappings, other_mappings

In [42]:
nodes, node_mappings, other_mappings = get_nodes_and_mappings(elements)

In [50]:
from llama_index.retrievers import RecursiveRetriever
from llama_index.query_engine import RetrieverQueryEngine
from llama_index import VectorStoreIndex

In [54]:
# construct top-level vector index + query engine
vector_index = VectorStoreIndex(nodes, service_context=summarizer_service_context)
vector_retriever = vector_index.as_retriever(similarity_top_k=1)
vector_query_engine = vector_index.as_query_engine(similarity_top_k=1)

In [60]:
from llama_index.retrievers import RecursiveRetriever

recursive_retriever = RecursiveRetriever(
    "vector",
    retriever_dict={"vector": vector_retriever},
    node_dict=node_mappings,
    verbose=True,
)
query_engine = RetrieverQueryEngine.from_args(recursive_retriever, service_context=summarizer_service_context)

In [64]:
response = query_engine.query("What were the total cash flows in 2021?")

[1;3;34mRetrieving with query id None: What were the total cash flows in 2021?
[0m[1;3;38;5;200mRetrieving text node: Cash Flows from Operating Activities

Our cash flows from operating activities are significantly affected by our cash investments to support the growth of our business in areas such as research and development and selling, general and administrative and working capital. Our operating cash inflows include cash from vehicle sales and related servicing, customer lease payments, customer deposits, cash from sales of regulatory credits and energy generation and storage products. These cash inflows are offset by our payments to suppliers for production materials and parts used in our manufacturing process, operating expenses, operating lease payments and interest payments on our financings.

Net cash provided by operating activities increased by $5.55 billion to $11.50 billion during the year ended December 31, 2021 from $5.94 billion during the year ended December 31, 202

In [65]:
str(response)

" Based on the information provided in the context, the total cash flows in 2021 were $11.50 billion, which is the net cash provided by operating activities. This amount represents the cash generated from the company's operations, including cash from vehicle sales and related servicing, customer lease payments, customer deposits, cash from sales of regulatory credits and energy generation and storage products, and offset by payments to suppliers for production materials and parts used in the manufacturing process, operating expenses, operating lease payments, and interest payments on financings."