In [1]:
import nest_asyncio

nest_asyncio.apply()

In [2]:
# !pip install -U llama-index

In [3]:
from utils import load_env

load_env()

# Designing the Workflow TBD

RAG + Reranking consists of some clearly defined steps

Indexing data, creating an index
Using that index + a query to retrieve relevant text chunks
Rerank the text retrieved text chunks using the original query
Synthesizing a final response
With this in mind, we can create events and workflow steps to follow this process!

In [4]:
# TBD
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore


class RetrieverEvent(Event):
    """Result of running retrieval"""

    nodes: list[NodeWithScore]


class RerankEvent(Event):
    """Result of running reranking on retrieved nodes"""

    nodes: list[NodeWithScore]

### The Workflow Itself,
With our events defined, we can construct our workflow and steps

Note that the workflow automatically validates itself using type annotations, so the type annotations on our steps are very helpful

In [5]:
from llama_index.core.schema import TextNode
from typing import List
import json

# helper functions
def get_text_nodes(json_list: List[dict]):
    text_nodes = []
    for idx, page in enumerate(json_list):
        text_node = TextNode(text=page["md"], metadata={"page": page["page"]})
        text_nodes.append(text_node)
    return text_nodes


def save_jsonl(data_list, filename):
    """Save a list of dictionaries as JSON Lines."""
    with open(filename, "w") as file:
        for item in data_list:
            json.dump(item, file)
            file.write("\n")


def load_jsonl(filename):
    """Load a list of dictionaries from JSON Lines."""
    data_list = []
    with open(filename, "r") as file:
        for line in file:
            data_list.append(json.loads(line))
    return data_list

In [10]:
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)

from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

from llama_parse import LlamaParse


class GraphRAGWorkflow(Workflow):
    @step(pass_context=True)
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Ingest the documents using LlamaParse"""
        filepath = ev.get("filepath")
        parser = LlamaParse(
            result_type="markdown",
            use_vendor_multimodal_model=True,
            vendor_multimodal_model_name="openai-gpt-4o-mini",
            invalidate_cache=True,
        )
        json_objs = parser.get_json_result(filepath)
        json_list = json_objs[0]["pages"]
        docs = get_text_nodes(json_list)
        ctx.data["docs"] = docs

        return StopEvent(result=f"ingested {len(docs)} docs.")

    @step(pass_context=True)
    async def display_docs(self, ctx: Context, ev: StartEvent) -> StopEvent:
        """Display parsed documents."""
        docs: List[TextNode] = ctx.data.get("docs", [])
        if not docs:
            return StopEvent(result="No documents found. Please ingest documents first.")
        
        num_docs = ev.get("num_docs", 1)
        start_index = ev.get("start_index", 0)

        displayed_docs = []
        for i in range(start_index, min(start_index + num_docs, len(docs))):
            doc = docs[i]
            displayed_docs.append({
                "page": doc.metadata.get("page", "Unknown"),
                "content": doc.get_content(metadata_mode="all")
            })

        return StopEvent(result=displayed_docs)

        

# Run the workflow

In [7]:
# Ingest the documents

w = GraphRAGWorkflow(timeout=120, verbose=True)
await w.run(filepath="data/bph-review-2019.pdf")

Started parsing the file under job_id d3a734a6-9120-4598-8005-7cbbc7c3eb12


WorkflowTimeoutError: Operation timed out after 10 seconds

In [11]:
# Display the first 2 documents
display_result = await w.run(num_docs=2, start_index=0)
for doc in display_result:
    print(f"\nPage {doc['page']}:")
    print(doc['content'])
    print("-" * 50)

ValueError: The input file_path must be a string or a list of strings.