# Iterative Extraction with Workflows + LlamaCloud

In this example, we'll build a workflow that can
1. Parse a document (using `LlamaParse`)
2. Use an LLM to generate a JSON schema for the data we want to extract (using `OpenAI`). This includes error handling and retries for when the JSON schema is invalid!
3. Use human-in-the-loop to either accept or provide feedback on the proposed schema.
4. Send the finalized schema and parsed content to `LlamaExtract` to extract the data

To use `LlamaCloud` and `OpenAI`, you'll need a few API keys:
1. [LlamaCloud](https://cloud.llamaindex.ai)
2. [OpenAI](https://platform.openai.com/account/api-keys)

You'll also need to install the some required libraries:

In [None]:
%pip install llama-index-workflows llama-cloud-services jsonschema, openai

## Setup

### Workflow Resources

First, we'll define some resource functions for our workflow.

This will be called once per workflow run, and will be used to return clients for various services.

In [1]:
from openai import AsyncOpenAI
from llama_cloud_services.extract import (
    LlamaExtract,
    ExtractConfig,
    ExtractMode,
    SourceText,
)
from llama_cloud_services.parse import LlamaParse


async def get_parse_client(**kwargs):
    return LlamaParse(api_key="llx-...", parse_mode="parse_page_with_agent")


async def get_extract_client(**kwargs):
    return LlamaExtract(api_key="llx-...")


async def get_openai_client(**kwargs):
    return AsyncOpenAI(api_key="sk-...")

### Workflow State

As our workflow runs, we can store global state between steps. While we could skip this step, providing a state class allows us to have more type safety, and also control over how the state is serialized and deserialized (using optional pydantic serializers and validators).

In [2]:
from pydantic import BaseModel, Field


class WorkflowState(BaseModel):
    file_path: str | None = Field(default=None)
    file_content: str | None = Field(default=None)
    current_schema: dict | None = Field(default=None)
    current_feedback: str | None = Field(default=None)
    original_prompt: str | None = Field(default=None)

### Schema Extraction Prompt

Our prompt for generating a JSON schema will be doing double duty:
1. Handles the initial generation of the schema
2. Placeholders for handling the feedback from past generations/human feedback.

To make the output easy to parse, we'll instruct the LLM to use the `<schema>` and `</schema>` tags to wrap the schema.

`LlamaExtract` expects a JSON schema that has a root node with "type": "object" and fields inside "properties", so we'll instruct the LLM to output a schema that matches this format.

In [3]:
extract_prompt = """\
<file>
{file_content}
</file>

<user_prompt>
{prompt}
</user_prompt>
{past_attempt}

Given the file content above, and the user prompt, output a JSON schema that will be used to extract the data from the file.

Your JSON schema should have a root node with "type": "object" and fields inside "properties".

Wrap your schema in <schema>...</schema> tags.
"""

### Workflow Events

We'll define a few events for our workflow.

- `InputEvent`: Start the workflow by providing a file path and a prompt. Subclass of `StartEvent`, so that the workflow knows to start with this event.
- `ParsedContent`: Parses the content and carry forward the content and prompt. Will also be used to re-trigger the schema generation if the user provides feedback on the current proposed schema.
- `RunExtraction`: Runs the extraction on the provided file content and schema.
- `ProposedSchema`: Proposes a schema for the extraction. Needs human approval. Subclass of `InputRequiredEvent`, so that the workflow knows to wait for the human approval before continuing.
- `ApprovedSchema`: Handles the human approval of the proposed schema. Subclass of `HumanResponseEvent`, so that the workflow knows to wait for the human approval before continuing.
- `ExtractedData`: Outputs the extracted data and the agent ID from the workflow run. Subclass of `StopEvent`, so that the workflow knows to stop when this event is received.

### Workflow Design

Using all the pieces above, we can now define our workflow.

This workflow has a few key features that make it unique:
1. It uses a human-in-the-loop to iteratively improve the schema for the extraction.
2. The generated schema is validated using `jsonschema` before being sent to `LlamaExtract`.
3. It loops between the schema generation and human approval until the schema is approved.

In [None]:
from workflows.events import (
    StartEvent,
    StopEvent,
    Event,
    InputRequiredEvent,
    HumanResponseEvent,
)


class InputEvent(StartEvent):
    """Start the workflow by providing a file path and a prompt."""

    file_path: str
    prompt: str


class ParsedContent(Event):
    """Parses the content and carry forward the content and prompt."""

    file_content: str
    prompt: str


class RunExtraction(Event):
    """Runs the extraction on the provided file content and schema."""

    generated_schema: dict
    file_content: str


class ProposedSchema(InputRequiredEvent):
    """Proposes a schema for the extraction. Needs human approval."""

    generated_schema: dict


class ApprovedSchema(HumanResponseEvent):
    """Handles the human approval of the proposed schema."""

    approved: bool
    feedback: str


class ExtractedData(StopEvent):
    """Outputs the extracted data and the agent ID from the workflow run."""

    data: dict
    agent_id: str


class ProgressEvent(Event):
    """Propagates a progress message to the user as the workflow runs."""

    msg: str

### Workflow Design

Using all the pieces above, we can now define our workflow.

This workflow has a few key features that make it unique:
1. It uses a human-in-the-loop to iteratively improve the schema for the extraction.
2. The generated schema is validated using `jsonschema` before being sent to `LlamaExtract`.
3. It loops between the schema generation and human approval until the schema is approved.

In [14]:
import json
import re
import uuid

from jsonschema import Draft202012Validator
from workflows import Context, Workflow, step
from workflows.resource import Resource
from typing import Annotated


class IterativeExtractionWorkflow(Workflow):
    @step
    async def parse_file(
        self,
        ev: InputEvent,
        ctx: Context[WorkflowState],
        parser: Annotated[LlamaParse, Resource(get_parse_client)],
    ) -> ParsedContent:
        ctx.write_event_to_stream(ProgressEvent(msg=f"Parsing file: {ev.file_path}"))
        result = await parser.aparse(ev.file_path)
        ctx.write_event_to_stream(ProgressEvent(msg="File parsed successfully"))

        # Update the state with the file content and path
        async with ctx.store.edit_state() as state:
            state.original_prompt = ev.prompt
            state.file_path = ev.file_path
            state.file_content = "\n\n".join([page.md for page in result.pages])

        return ParsedContent(file_content=state.file_content, prompt=ev.prompt)

    @step
    async def propose_schema(
        self,
        ev: ParsedContent,
        ctx: Context[WorkflowState],
        client: Annotated[AsyncOpenAI, Resource(get_openai_client)],
    ) -> ProposedSchema:
        ctx.write_event_to_stream(ProgressEvent(msg="Proposing schema"))

        # Inject feedback from previous attempts if available
        state = await ctx.store.get_state()
        if state.current_feedback and state.current_schema:
            past_attempt_str = f"\n<past_attempt>\n<feedback>{state.current_feedback}</feedback>\n<schema>{str(state.current_schema)}</schema>\n</past_attempt>\n"
        else:
            past_attempt_str = ""

        # Start the extraction process with a fresh chat history
        prompt = extract_prompt.format(
            file_content=ev.file_content,
            prompt=ev.prompt,
            past_attempt=past_attempt_str,
        )

        history = [{"role": "user", "content": prompt}]

        # Generate a new schema using OpenAI
        response = await client.responses.create(
            input=history,
            model="gpt-4.1",
            temperature=1.0,
            store=False,
        )
        history.append({"role": "assistant", "content": response.output_text})

        # Try to parse the schema from the response. If it fails, try again, using chat history
        # to keep track of failed attempts.
        attempts = 1
        schema = {}
        while attempts <= 3 and not schema:
            try:
                ctx.write_event_to_stream(
                    ProgressEvent(
                        msg=f"Attempting to parse schema string from:\n{response.output_text}"
                    )
                )
                json_str = re.sub(
                    r"<schema>([\s\S]*)<\/schema>", r"\1", response.output_text
                )

                # Validate the schema
                schema = json.loads(json_str)
                Draft202012Validator.check_schema(schema)

                async with ctx.store.edit_state() as state:
                    state.current_schema = schema

                break
            except Exception as e:
                ctx.write_event_to_stream(
                    ProgressEvent(msg=f"Schema parsing failed:\n{e}\n\nTrying again...")
                )
                history.append(
                    {"role": "user", "content": f"Error: {e}\n\nPlease try again."}
                )
                response = await client.responses.create(
                    input=history,
                    model="gpt-4.1",
                    temperature=1.0,
                    store=False,
                )
                history.append({"role": "assistant", "content": response.output_text})
                attempts += 1

        if attempts > 3:
            raise Exception("Failed to propose a valid schema after 3 attempts!")

        ctx.write_event_to_stream(ProgressEvent(msg="Schema proposed successfully"))
        return ProposedSchema(generated_schema=schema)

    @step
    async def handle_schema_approval(
        self,
        ev: ApprovedSchema,
        ctx: Context[WorkflowState],
    ) -> ParsedContent | RunExtraction:
        async with ctx.store.edit_state() as state:
            state.current_feedback = ev.feedback

        # If the schema is approved, run the extraction. Otherwise, go back to the start and try again.
        if ev.approved:
            return RunExtraction(
                generated_schema=state.current_schema, file_content=state.file_content
            )
        else:
            return ParsedContent(
                file_content=state.file_content, prompt=state.original_prompt
            )

    @step
    async def run_extraction(
        self,
        ev: RunExtraction,
        ctx: Context[WorkflowState],
        extract: Annotated[LlamaExtract, Resource(get_extract_client)],
    ) -> ExtractedData:
        ctx.write_event_to_stream(ProgressEvent(msg="Running extraction"))

        # Persist an extraction agent + schema to llama-cloud
        agent = extract.create_agent(
            name=f"extraction_workflow_{uuid.uuid4()}",
            data_schema=ev.generated_schema,
            config=ExtractConfig(
                extraction_mode=ExtractMode.BALANCED,
            ),
        )

        # Run the extraction
        file_path = await ctx.store.get("file_path")
        result = await agent.aextract(
            files=SourceText(text_content=ev.file_content, filename=file_path),
        )

        return ExtractedData(data=result.data, agent_id=agent.id)

## Running the Workflow

First, let's download some sample data to work with.

In [None]:
!wget https://arxiv.org/pdf/2506.05176 -O qwen3_embed_paper.pdf

When running the workflow, since we have human-in-the-loop, we have two options for how to run the workflow:
1. Assume the workflow will run to completion while waiting for human input. This setup is useful for "online" applications like websockets, running in a CLI, or similar environments where you expect a response from the user quickly.
2. Assuming the workflow will need to pause and restart once human input is recieved. In this case, we need to serialize the workflow context and restart the workflow from the point of the pause when the human input is recieved. This setup is useful for more asynchronous applications, like a REST API, where you expect the user to take some time to respond.

### Option 1: Running the Workflow to Completion

In this option, we'll run the workflow to completion while waiting for human input. This setup is useful for "online" applications like websockets, running in a CLI, or similar environments where you expect a response from the user quickly.

In [15]:
wf = IterativeExtractionWorkflow(timeout=None)

handler = wf.run(
    file_path="./qwen3_embed_paper.pdf",
    prompt="Extract the title, authors, and key takeaways from the paper.",
)
async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg, flush=True)
    elif isinstance(ev, ProposedSchema):
        print(f"Proposed schema: {ev.generated_schema}", flush=True)
        approved = input("Approve? (y/<reason>): ").strip().lower()
        print(f"Approved? {approved}", flush=True)
        if approved == "y":
            handler.ctx.send_event(ApprovedSchema(approved=True, feedback="Approved"))
        else:
            handler.ctx.send_event(ApprovedSchema(approved=False, feedback=approved))

result = await handler
print(f"Agent ID: {result.agent_id}", flush=True)
print(f"Extracted data: {result.data}", flush=True)

Parsing file: ./qwen3_embed_paper.pdf
Started parsing the file under job_id 9315e44f-6f5e-439f-a088-0e8aeacc1d56
File parsed successfully
Proposing schema
Attempting to parse schema string from:
<schema>
{
  "type": "object",
  "properties": {
    "title": {
      "type": "string",
      "description": "The title of the paper."
    },
    "authors": {
      "type": "array",
      "description": "A list of the authors of the paper.",
      "items": {
        "type": "string"
      }
    },
    "key_takeaways": {
      "type": "array",
      "description": "A list of the main findings or key insights from the paper.",
      "items": {
        "type": "string"
      }
    }
  },
  "required": ["title", "authors", "key_takeaways"]
}
</schema>
Schema proposed successfully
Proposed schema: {'type': 'object', 'properties': {'title': {'type': 'string', 'description': 'The title of the paper.'}, 'authors': {'type': 'array', 'description': 'A list of the authors of the paper.', 'items': {'type':

Uploading files:   0%|          | 0/1 [00:00<?, ?it/s]

Running extraction


Uploading files: 100%|██████████| 1/1 [00:01<00:00,  1.04s/it]
Creating extraction jobs: 100%|██████████| 1/1 [00:01<00:00,  1.12s/it]
Extracting files: 100%|██████████| 1/1 [00:15<00:00, 15.05s/it]

Agent ID: 374730cc-974e-42f6-a64b-b5f163ad8818
Extracted data: {'title': 'Qwen3 Embedding: Advancing Text Embedding and Reranking Through Foundation Models', 'authors': ['Yanzhao Zhang', 'Mingxin Li', 'Dingkun Long', 'Xin Zhang', 'Huan Lin', 'Baosong Yang', 'Pengjun Xie', 'An Yang', 'Dayiheng Liu', 'Junyang Lin', 'Fei Huang', 'Jingren Zhou'], 'key_takeaways': ['Qwen3 Embedding series introduces advanced text embedding and reranking models based on Qwen3 foundation models, supporting multiple model sizes (0.6B, 4B, 8B) for diverse deployment needs.', 'The models leverage a multi-stage training pipeline: large-scale weakly supervised pre-training on synthetic data, supervised fine-tuning on high-quality datasets, and model merging for robustness and generalization.', 'Qwen3 LLMs are used not only as backbones but also to synthesize high-quality, diverse, multilingual training data, enhancing the training process.', 'The Qwen3 Embedding models achieve state-of-the-art results on multiple 




### Option 2: Serializing the Workflow Context

In this option, we'll serialize the workflow context and restart the workflow from the point of the pause when the human input is recieved. This setup is useful for more asynchronous applications, like a REST API, where you expect the user to take some time to respond.

```

In [16]:
# Faking some DB for storing the workflow context between runs
in_memory_store = {}

# Define the workflow instance
wf = IterativeExtractionWorkflow(timeout=None)


# Define a function to run the workflow
async def run_workflow(
    file_path: str | None = None,
    prompt: str | None = None,
    response_ev: ApprovedSchema | None = None,
) -> ProposedSchema | ExtractedData:
    # Check if there's existing context from a previous run
    existing_context = in_memory_store.get("workflow_context")
    if existing_context:
        ctx = Context.from_dict(wf, existing_context)
        handler = wf.run(ctx=ctx)
        handler.ctx.send_event(response_ev)
    else:
        handler = wf.run(file_path=file_path, prompt=prompt)

    # Stream events until we get to the end of the workflow or hit a input required event
    async for ev in handler.stream_events():
        if isinstance(ev, ProgressEvent):
            print(ev.msg, flush=True)
        elif isinstance(ev, ProposedSchema):
            # Here, you would serialize the workflow context and save it to some DB or storage
            # And resume the workflow from the point of the pause when the human input is recieved.
            in_memory_store["workflow_context"] = handler.ctx.to_dict()
            await handler.cancel_run()
            return ev

    return await handler

In [17]:
# Run the workflow in a loop, waiting for human input between schema generations.
# Loop is over once we get to the ExtractedData event.
ev = await run_workflow(
    file_path="./qwen3_embed_paper.pdf",
    prompt="Extract the title, authors, and key takeaways from the paper.",
)
while not isinstance(ev, ExtractedData):
    if isinstance(ev, ProposedSchema):
        print(f"Proposed schema: {ev.generated_schema}", flush=True)
        approved = input("Approve? (y/<reason>): ").strip()
        print(f"Approved? {approved}", flush=True)
        if approved.lower() == "y":
            ev = await run_workflow(
                response_ev=ApprovedSchema(approved=True, feedback="Approved")
            )
        else:
            ev = await run_workflow(
                response_ev=ApprovedSchema(approved=False, feedback=approved)
            )
    else:
        break

print(f"Extracted data: {ev.data}", flush=True)
print(f"Agent ID: {ev.agent_id}", flush=True)

Parsing file: ./qwen3_embed_paper.pdf


Started parsing the file under job_id cc7807e8-bc53-43ec-a467-7b5df172ae83
File parsed successfully
Proposing schema
Attempting to parse schema string from:
<schema>
{
  "type": "object",
  "properties": {
    "title": {
      "type": "string",
      "description": "The full title of the paper"
    },
    "authors": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "description": "A list of authors as full names"
    },
    "key_takeaways": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "description": "A list of the main contributions, findings, or conclusions drawn from the paper"
    }
  },
  "required": ["title", "authors", "key_takeaways"]
}
</schema>

Schema proposed successfully
Proposed schema: {'type': 'object', 'properties': {'title': {'type': 'string', 'description': 'The full title of the paper'}, 'authors': {'type': 'array', 'items': {'type': 'string'}, 'description': 'A list of authors as full names'}, 'key

Uploading files:   0%|          | 0/1 [00:00<?, ?it/s]

Running extraction


Uploading files: 100%|██████████| 1/1 [00:01<00:00,  1.21s/it]
Creating extraction jobs: 100%|██████████| 1/1 [00:00<00:00,  1.01it/s]
Extracting files: 100%|██████████| 1/1 [00:14<00:00, 14.94s/it]

Extracted data: {'title': 'Qwen3 Embedding: Advancing Text Embedding and Reranking Through Foundation Models', 'authors': ['Yanzhao Zhang', 'Mingxin Li', 'Dingkun Long', 'Xin Zhang', 'Huan Lin', 'Baosong Yang', 'Pengjun Xie', 'An Yang', 'Dayiheng Liu', 'Junyang Lin', 'Fei Huang', 'Jingren Zhou'], 'key_takeaways': ['Qwen3 Embedding series introduces advanced text embedding and reranking models based on Qwen3 foundation models, significantly improving over the previous GTE-Qwen series.', 'The models leverage a multi-stage training pipeline: large-scale weakly supervised pre-training on synthetic data, supervised fine-tuning on high-quality datasets, and model merging for robustness and generalization.', 'Qwen3 LLMs are used both as backbone models and as generators of high-quality, diverse, multilingual synthetic training data, enhancing the training process.', 'The series includes models of various sizes (0.6B, 4B, 8B parameters) for both embedding and reranking, supporting flexible dep


