# Human in the Loop

**objective**: Get feedback on answers from a human operator


In [1]:
import os, json
from llama_parse import LlamaParse
from llama_index.llms.gemini import Gemini
from llama_index.embeddings.gemini import GeminiEmbedding
from llama_index.core import (
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage
)
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context
)
from helper import get_gemini_api_key, get_llama_cloud_api_key
from IPython.display import display, HTML
from helper import extract_html_content
from llama_index.utils.workflow import draw_all_possible_flows

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import nest_asyncio
nest_asyncio.apply()

In [3]:
llama_cloud_api_key = get_llama_cloud_api_key()
gemini_api_key = get_gemini_api_key()

## Adding a feedback loop

Here's what you built in lesson 4:

<img width="400" src="images/L4.png">

LLMs are amazing, but they are best used to augment rather than replace a human. Your current form-filler does an excellent job figuring out what fields need to be filled in, and gets most of the fields right, but there are a couple where it needs a little help. To take care of those, you'll create a "human in the loop" workflow, where you can optionally provide feedback to the agent you've created and have it incorporated into the results.

This is what you'll implement in this notebook:

<img width="500" src="images/L5.png">

The changes you're going to make here are:
1. Use the `InputRequiredEvent` and `HumanResponseEvent`, new special events specifically designed to allow you to exit the workflow, and get feedback back into it.
2. You used to have a single step which parsed your form and fired off all your questions. Since we now might loop back and ask questions several times, we don't need to parse the form every time, so we'll split up those steps. This kind of refactoring is very common as you create a more complex workflow:
   - Your new `generate_questions` step will be triggered either by a `GenerateQuestionsEvent`, triggered by the form parser, or by a `FeedbackEvent`, which is the loop we'll take after getting feedback.
3. `fill_in_application` will emit an `InputRequiredEvent`, and in the `external_step` you'll wait for a `HumanResponseEvent`. This will pause the whole workflow waiting for outside input.
4. Finally, you'll use the LLM to parse the feedback and decide whether it means you should continue and output the results, or if you need to loop back.

In [4]:
# new!
from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent

In [5]:
class ParseFormEvent(Event):
    application_form: str

class QueryEvent(Event):
    query: str
    field: str
    
class ResponseEvent(Event):
    response: str

# new!
class FeedbackEvent(Event):
    feedback: str

class GenerateQuestionsEvent(Event):
    pass

In [6]:
class RAGWorkflow(Workflow):
    
    storage_dir = "./storage"
    llm: Gemini
    query_engine: VectorStoreIndex

    @step
    async def set_up(self, ctx: Context, ev: StartEvent) -> ParseFormEvent:

        if not ev.resume_file:
            raise ValueError("No resume file provided")

        if not ev.application_form:
            raise ValueError("No application form provided")

        # define the LLM to work with
        self.llm = Gemini(model="models/gemma-3-27b-it", api_key=gemini_api_key)

        # ingest the data and set up the query engine
        if os.path.exists(self.storage_dir):
            # you've already ingested the resume document
            storage_context = StorageContext.from_defaults(persist_dir=
                                                           self.storage_dir)
            index = load_index_from_storage(storage_context)
        else:
            # parse and load the resume document
            documents = LlamaParse(
                api_key=llama_cloud_api_key,
                base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
                result_type="markdown",
                content_guideline_instruction="This is a resume, gather related facts together and format it as bullet points with headers"
            ).load_data(ev.resume_file)
            # embed and index the documents
            index = VectorStoreIndex.from_documents(
                documents,
                embed_model=GeminiEmbedding(model_name="models/text-embedding-004", api_key=gemini_api_key)
            )
            index.storage_context.persist(persist_dir=self.storage_dir)

        # create a query engine
        self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5)

        # you no longer need a query to be passed in, 
        # you'll be generating the queries instead 
        # let's pass the application form to a new step to parse it
        return ParseFormEvent(application_form=ev.application_form)

    # new - separated the form parsing from the question generation
    @step
    async def parse_form(self, ctx: Context, ev: ParseFormEvent) -> GenerateQuestionsEvent:
        parser = LlamaParse(
            api_key=llama_cloud_api_key,
            base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
            result_type="markdown",
            content_guideline_instruction="This is a job application form. Create a list of all the fields that need to be filled in.",
            formatting_instruction="Return a bulleted list of the fields ONLY."
        )

        # get the LLM to convert the parsed form into JSON
        result = parser.load_data(ev.application_form)[0]
        raw_json = self.llm.complete(
            f"This is a parsed form. Convert it into a JSON object containing only the list of fields to be filled in, in the form {{ fields: [...] }}. <form>{result.text}</form>. Return JSON ONLY, no markdown.")
        
        # Debug: print what the LLM returned
        print(f"LLM returned: '{raw_json.text}'")
        
        # Try to parse JSON, with error handling
        try:
            # Clean the response by removing markdown code blocks if present
            json_text = raw_json.text.strip()
            if json_text.startswith("```json"):
                json_text = json_text[7:]  # Remove ```json
            if json_text.endswith("```"):
                json_text = json_text[:-3]  # Remove ```
            json_text = json_text.strip()
            
            fields = json.loads(json_text)["fields"]
        except json.JSONDecodeError as e:
            print(f"JSON decode error: {e}")
            print(f"Raw response: '{raw_json.text}'")
            # Fallback: extract fields manually or use a default list
            fields = [
                "First Name", "Last Name", "Email", "Phone", "Linkedin", 
                "Project Portfolio", "Degree", "Graduation Date", 
                "Current Job Title", "Current Employer", "Technical Skills",
                "Describe why you're a good fit for this position",
                "Do you have 5 years of experience in React?"
            ]
            print(f"Using fallback fields: {fields}")

        await ctx.set("fields_to_fill", fields)

        return GenerateQuestionsEvent()

    # new - this step can get triggered either by GenerateQuestionsEvent or a FeedbackEvent
    @step
    async def generate_questions(self, ctx: Context, ev: GenerateQuestionsEvent | FeedbackEvent) -> QueryEvent:

        # get the list of fields to fill in
        fields = await ctx.get("fields_to_fill")

        # generate one query for each of the fields, and fire them off
        for field in fields:
            question = f"How would you answer this question about the candidate? <field>{field}</field>"
            ctx.send_event(QueryEvent(
                field=field,
                query=question
            ))

        # store the number of fields so we know how many to wait for later
        await ctx.set("total_fields", len(fields))
        return
        
    @step
    async def ask_question(self, ctx: Context, ev: QueryEvent) -> ResponseEvent:
        response = self.query_engine.query(f"This is a question about the specific resume we have in our database: {ev.query}")
        return ResponseEvent(field=ev.field, response=response.response)

  
    # new - we now emit an InputRequiredEvent
    @step
    async def fill_in_application(self, ctx: Context, ev: ResponseEvent) -> InputRequiredEvent:
        # get the total number of fields to wait for
        total_fields = await ctx.get("total_fields")

        responses = ctx.collect_events(ev, [ResponseEvent] * total_fields)
        if responses is None:
            return None # do nothing if there's nothing to do yet

        # we've got all the responses!
        responseList = "\n".join("Field: " + r.field + "\n" + "Response: " + r.response for r in responses)

        result = self.llm.complete(f"""
            You are given a list of fields in an application form and responses to
            questions about those fields from a resume. Combine the two into a list of
            fields and succinct, factual answers to fill in those fields.

            <responses>
            {responseList}
            </responses>
        """)

        # new! save the result for later
        await ctx.set("filled_form", str(result))

        # new! Let's get a human in the loop
        return InputRequiredEvent(
            prefix="How does this look? Give me any feedback you have on any of the answers.",
            result=result
        )

    # new! Accept the feedback.
    @step
    async def get_feedback(self, ctx: Context, ev: HumanResponseEvent) -> FeedbackEvent | StopEvent:

        result = self.llm.complete(f"""
            You have received some human feedback on the form-filling task you've done.
            Does everything look good, or is there more work to be done?
            <feedback>
            {ev.response}
            </feedback>
            If everything is fine, respond with just the word 'OKAY'.
            If there's any other feedback, respond with just the word 'FEEDBACK'.
        """)

        verdict = result.text.strip()

        print(f"LLM says the verdict was {verdict}")
        if (verdict == "OKAY"):
            return StopEvent(result=await ctx.get("filled_form"))
        else:
            return FeedbackEvent(feedback=ev.response)


Okay! Your workflow is now ready to get some feedback, but how do we actually get it? The `InputRequiredEvent` is an event in the event stream, just like the `ProgressEvents` and `TextEvents` you've seen in lesson 2. You can intercept it the same way you did those, and use the `send_event` method on the context to send back a `HumanResponseEvent`.

In [7]:
w = RAGWorkflow(timeout=600, verbose=False)
handler = w.run(
    resume_file="data/fake_resume.pdf",
    application_form="data/fake_application_form.pdf"
)

async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        print("We've filled in your form! Here are the results:\n")
        print(event.result)
        # now ask for input from the keyboard
        response = input(event.prefix)
        handler.ctx.send_event(
            HumanResponseEvent(
                response=response
            )
        )

response = await handler
print("Agent complete! Here's your final result:")
print(str(response))

  self.llm = Gemini(model="models/gemma-3-27b-it", api_key=gemini_api_key)


Started parsing the file under job_id 03b76af4-f332-470a-a907-c27ad99ed9cd


  embed_model=GeminiEmbedding(model_name="models/text-embedding-004", api_key=gemini_api_key)


Started parsing the file under job_id 2d73431c-c9c0-4c65-98b5-fd99dc5611a4
LLM returned: '```json
{
  "fields": [
    "First Name",
    "Last Name",
    "Email",
    "Phone",
    "LinkedIn",
    "Project Portfolio",
    "Degree",
    "Graduation Date",
    "Current Job Title",
    "Current Employer",
    "Technical Skills",
    "Describe why you’re a good fit for this position",
    "Do you have 5 years of experience in React?"
  ]
}
```'
We've filled in your form! Here are the results:

Here's the combined list of fields and answers, based on the provided responses:

*   **First Name:** Sarah
*   **Last Name:** Chen
*   **Email:** sarah.chen@email.com
*   **Phone:** Not available
*   **LinkedIn:** linkedin.com/in/sarahchen
*   **Project Portfolio:** EcoTrack (React, Node.js, MongoDB - featured in TechCrunch), ChatFlow (React, WebSocket - 5000+ MAU)
*   **Degree:** Bachelor of Science in Computer Science
*   **Graduation Date:** 2017
*   **Current Job Title:** Senior Full Stack Develop

## Using the Feedback

Okay! Now let's further modify things to actually do something useful with the feedback in `generate_questions` step. This involves checking if there's feedback, and appending it to the questions. In this simple example, we're going to append the feedback to every question in case it's relevant, but a more sophisticated agent might apply it only to the fields where the feedback applied.

<img width="500" src="images/L5-use_feedback.png">

In [8]:
class RAGWorkflow(Workflow):
    
    storage_dir = "./storage"
    llm: Gemini
    query_engine: VectorStoreIndex

    @step
    async def set_up(self, ctx: Context, ev: StartEvent) -> ParseFormEvent:

        if not ev.resume_file:
            raise ValueError("No resume file provided")

        if not ev.application_form:
            raise ValueError("No application form provided")

        # define the LLM to work with
        self.llm = Gemini(model="models/gemma-3-27b-it", api_key=gemini_api_key)

        # ingest the data and set up the query engine
        if os.path.exists(self.storage_dir):
            # you've already ingested the resume document
            storage_context = StorageContext.from_defaults(persist_dir=
                                                           self.storage_dir)
            index = load_index_from_storage(storage_context)
        else:
            # parse and load the resume document
            documents = LlamaParse(
                api_key=llama_cloud_api_key,
                base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
                result_type="markdown",
                content_guideline_instruction="This is a resume, gather related facts together and format it as bullet points with headers"
            ).load_data(ev.resume_file)
            # embed and index the documents
            index = VectorStoreIndex.from_documents(
                documents,
                embed_model=GeminiEmbedding(model_name="models/text-embedding-004", api_key=gemini_api_key)
            )
            index.storage_context.persist(persist_dir=self.storage_dir)

        # create a query engine
        self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5)

        # let's pass the application form to a new step to parse it
        return ParseFormEvent(application_form=ev.application_form)

    # form parsing
    @step
    async def parse_form(self, ctx: Context, ev: ParseFormEvent) -> GenerateQuestionsEvent:
        parser = LlamaParse(
            api_key=llama_cloud_api_key,
            base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
            result_type="markdown",
            content_guideline_instruction="This is a job application form. Create a list of all the fields that need to be filled in.",
            formatting_instruction="Return a bulleted list of the fields ONLY."
        )

        # get the LLM to convert the parsed form into JSON
        result = parser.load_data(ev.application_form)[0]
        raw_json = self.llm.complete(
            f"This is a parsed form. Convert it into a JSON object containing only the list of fields to be filled in, in the form {{ fields: [...] }}. <form>{result.text}</form>. Return JSON ONLY, no markdown.")
        
        # Debug: print what the LLM returned
        print(f"LLM returned: '{raw_json.text}'")
        
        # Try to parse JSON, with error handling
        try:
            # Clean the response by removing markdown code blocks if present
            json_text = raw_json.text.strip()
            if json_text.startswith("```json"):
                json_text = json_text[7:]  # Remove ```json
            if json_text.endswith("```"):
                json_text = json_text[:-3]  # Remove ```
            json_text = json_text.strip()
            
            fields = json.loads(json_text)["fields"]
        except json.JSONDecodeError as e:
            print(f"JSON decode error: {e}")
            print(f"Raw response: '{raw_json.text}'")
            # Fallback: extract fields manually or use a default list
            fields = [
                "First Name", "Last Name", "Email", "Phone", "Linkedin", 
                "Project Portfolio", "Degree", "Graduation Date", 
                "Current Job Title", "Current Employer", "Technical Skills",
                "Describe why you're a good fit for this position",
                "Do you have 5 years of experience in React?"
            ]
            print(f"Using fallback fields: {fields}")

        await ctx.set("fields_to_fill", fields)

        return GenerateQuestionsEvent()

    # generate questions
    @step
    async def generate_questions(self, ctx: Context, ev: GenerateQuestionsEvent | FeedbackEvent) -> QueryEvent:

        # get the list of fields to fill in
        fields = await ctx.get("fields_to_fill")

        # generate one query for each of the fields, and fire them off
        for field in fields:
            question = f"How would you answer this question about the candidate? <field>{field}</field>"

            # new! Is there feedback? If so, add it to the query:
            if hasattr(ev,"feedback"):
                question += f"""
                    \nWe previously got feedback about how we answered the questions.
                    It might not be relevant to this particular field, but here it is:
                    <feedback>{ev.feedback}</feedback>
                """
            
            ctx.send_event(QueryEvent(
                field=field,
                query=question
            ))

        # store the number of fields so we know how many to wait for later
        await ctx.set("total_fields", len(fields))
        return
        
    @step
    async def ask_question(self, ctx: Context, ev: QueryEvent) -> ResponseEvent:
        response = self.query_engine.query(f"This is a question about the specific resume we have in our database: {ev.query}")
        return ResponseEvent(field=ev.field, response=response.response)

  
    # Get feedback from the human
    @step
    async def fill_in_application(self, ctx: Context, ev: ResponseEvent) -> InputRequiredEvent:
        # get the total number of fields to wait for
        total_fields = await ctx.get("total_fields")

        responses = ctx.collect_events(ev, [ResponseEvent] * total_fields)
        if responses is None:
            return None # do nothing if there's nothing to do yet

        # we've got all the responses!
        responseList = "\n".join("Field: " + r.field + "\n" + "Response: " + r.response for r in responses)

        result = self.llm.complete(f"""
            You are given a list of fields in an application form and responses to
            questions about those fields from a resume. Combine the two into a list of
            fields and succinct, factual answers to fill in those fields.

            <responses>
            {responseList}
            </responses>
        """)

        # save the result for later
        await ctx.set("filled_form", str(result))

        # Fire off the feedback request
        return InputRequiredEvent(
            prefix="How does this look? Give me any feedback you have on any of the answers.",
            result=result
        )

    # Accept the feedback when a HumanResponseEvent fires
    @step
    async def get_feedback(self, ctx: Context, ev: HumanResponseEvent) -> FeedbackEvent | StopEvent:

        result = self.llm.complete(f"""
            You have received some human feedback on the form-filling task you've done.
            Does everything look good, or is there more work to be done?
            <feedback>
            {ev.response}
            </feedback>
            If everything is fine, respond with just the word 'OKAY'.
            If there's any other feedback, respond with just the word 'FEEDBACK'.
        """)

        verdict = result.text.strip()

        print(f"LLM says the verdict was {verdict}")
        if (verdict == "OKAY"):
            return StopEvent(result=await ctx.get("filled_form"))
        else:
            return FeedbackEvent(feedback=ev.response)


Now run the workflow and give feedback.

In [10]:
w = RAGWorkflow(timeout=600, verbose=False)
handler = w.run(
    resume_file="data/fake_resume.pdf",
    application_form="data/fake_application_form.pdf"
)

async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        print("We've filled in your form! Here are the results:\n")
        print(event.result)
        # now ask for input from the keyboard
        response = input(event.prefix)
        handler.ctx.send_event(
            HumanResponseEvent(
                response=response
            )
        )

response = await handler
print("Agent complete! Here's your final result:")
print(str(response))

  self.llm = Gemini(model="models/gemma-3-27b-it", api_key=gemini_api_key)


Started parsing the file under job_id 7678f595-0512-4135-8728-2e98a64ea74c


  embed_model=GeminiEmbedding(model_name="models/text-embedding-004", api_key=gemini_api_key)


Started parsing the file under job_id bdbaa84c-fc48-4d09-ab48-df9f09a35603
LLM returned: '```json
{
  "fields": [
    "First Name",
    "Last Name",
    "Email",
    "Phone",
    "LinkedIn",
    "Project Portfolio",
    "Degree",
    "Graduation Date",
    "Current Job Title",
    "Current Employer",
    "Technical Skills",
    "Describe why you’re a good fit for this position",
    "Do you have 5 years of experience in React?"
  ]
}
```'
We've filled in your form! Here are the results:

Here's the combined list of fields and answers, based on the provided responses:

*   **First Name:** Sarah
*   **Last Name:** Chen
*   **Email:** sarah.chen@email.com
*   **Phone:** Not available
*   **LinkedIn:** linkedin.com/in/sarahchen
*   **Project Portfolio:** EcoTrack (React, Node.js, MongoDB - featured in TechCrunch), ChatFlow (React, WebSocket - 5000+ MAU)
*   **Degree:** Bachelor of Science in Computer Science
*   **Graduation Date:** 2017
*   **Current Job Title:** Senior Full Stack Develop

## Workflow Visualization

You can visualize the workflow you just created.

In [11]:
WORKFLOW_FILE = "workflows/feedback_workflow.html"
draw_all_possible_flows(w, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

workflows/feedback_workflow.html
