### Installing Dependencies

In [None]:
%%capture
!pip install pathway litellm
!mkdir -p sample_documents
!pip install PyMuPDF
!pip install llama-index
!pip install groq
!pip install llama-index-llms-groq
!pip install llama-index-embeddings-together
!pip install -U pathway
!pip install pathway[llm-docs]

In [None]:
%%capture
!pip install PyPDF2
!pip install -qU llama-index
!pip install llama-index-core==0.10.6.post1
!pip install llama-index-embeddings-huggingface
!pip install llama-index-embeddings-instructor
!pip install llama-index-embeddings-together
!pip install llama-index-llms-together
!pip install together
!pip install llama-index-tools-tavily-research
!pip install -qU langchain
!pip install -qU langchain_experimental
!pip install -qU langchain-together
!pip install -qU langchain_huggingface
!pip install -qU faiss-gpu
!mkdir -p data
!pip install -U accelerate
!pip install llama-index-packs-raptor
!pip install langchain-groq
!pip install llama-index-llms-groq
!pip install unstructured-client
!pip install pymupdf pillow
!pip install faiss-cpu
!pip install pdfplumber
!pip install -U sentence-transformers
!pip install einops
!pip install llama-index-embeddings-langchain
!pip install -q nmslib
!pip install llama-index-embeddings-jinaai
!pip install -q guardrails-ai
!pip install -U nemoguardrails
%pip install --upgrade langchain-together
import os


### Prompt

##### Interleaving

In [None]:

THOUGHT_PROMPT = """
As a Thought Agent, your role is to systematically analyze questions through retrieval and reasoning steps.
You will be provided with an AGENT INPUT containing previous Thoughts, Observations, and Reasoning steps.
Using this information, you need to determine whether the next step should be a RETRIEVAL or REASONING, and what the current
Thought is for which retrieval or reasoning would be performed. You should observe the previous steps and the given question
to determine the next direction.


IMPORTANT : THERE IS NO NEED TO GENERATE FURTHER RETRIEVAL OR REASONING CALLS ONCE YOU REALISE THE NECCESSARY CONTEXT TO ANSWER THE QUESTION HAS BEEN GATHERED.
ALWAYS CHECK THE AVAILABLE CONTEXT (given as agent input below) BEFORE CHOOSING.


Here's how to approach this process:

```
EXAMPLE :
User Query : "How many NVIDIA P100 GPUs were used to train the transformer model?"

RETRIEVAL THOUGHT : How many NVIDIA P100 GPUs were used to train the transformer model?
RAG ACTION: How many NVIDIA P100 GPUs were used to train the transformer model? --> 8 #output of RAG Action
REASONING THOUGHT : Based on the provided context, the number of GPUs was 8.
FINAL ANSWER : The number of GPUs used to train the transformer model was 8.

(End of Example)
```

Guidelines for thoughts:

RETRIEVAL THOUGHTS should:
- Directly ask for specific pieces of needed data in question format without mentioning unnecessary context about "documents" or "Acts" or "reports". For example, instead of "What is the definition of total disablement under the Workmen's Compensation Act?" simply ask, "What is the definition of total disablement?"
- DO NOT ask for specific entities that would other wise be generated from raw content, eg. what is the gross margin trend. Always ask for the raw data such as the gross margins in this case, and you will reason on this later.
- Be clear about what information is being sought.

Example: "What is Coca Cola's total dividends paid for FY2022?"

REASONING THOUGHTS should:
- State what calculation or analysis is needed
- Break complex calculations into steps
- Show clear mathematical working

Example: "Calculate the ratio by dividing total dividends by net income"

Key principles:

1. Each thought builds on previous ones
2. Clear progression from data gathering to calculation
3. Explicit about data sources and calculations
4. Shows step-by-step working
5. Provides a formatted final answer

I hope you understood the example.

AT EACH STEP YOU HAVE TO GENERATE A RETRIEVAL OR A REASONING THOUGHT.

DO NOT POINTLESSLY RE-GENERATE ANY THOUGHT. MAKE USE OF THE CONTEXT AND RE-USE THOSE ANSWERS WHENEVER NEEDED.

YOU CANNOT JUMP TO THE FINAL ANSWER.
ALWAYS GATHER CONTEXT AND USE IT TO BUILD YOUR ANSWER UP. PLEASE.

Now,

QUESTION: {question}
Below is the current conversation consisting of interleaving human and assistant messages.
AGENT INPUT: {agent_input}

OUTPUT:
"""

REASONING_PROMPT = """

Your role is to perform self reasoning for a given reasoning thought to help solve a provided question. You must follow the following steps:


 1.You will be provided with an AGENT INPUT containing previous Thoughts, Observations, and Reasoning steps.
 2.You have to follow the last generated REASONING THOUGHT in the AGENT INPUT and perform reasoning as instructed in the THOUGHT.
 3.You can use the past observations and Reasonings to perform the reasoning.
 4.Perform complete calculations/substitution and computaion.


Make sure all the above instructions are followed.

Please use the following output format:
```
REASONING: <your reasoning>
```

If the current reasoning step is the final answer to the input question and adresses it completely, use the following output format:

```
FINAL ANSWER: <your reasoning with all observation>
```

MAKE SURE FINAL ANSWER IS COMPLETE
STRICTLY INLCUDE ALL THE REQUIRED INFORMATION IN FINAL ANSWER [AFTER `FINAL ASNWER:`]
ONLY CONTENT AFTER `FINAL ANSWER :` WILL WE CONSIDERED AS FINAL ANSWER

```
Example Question: "What is Coca Cola's FY2022 dividend payout ratio (using total cash dividends paid and net income attributable to shareholders)? Round answer to two decimal places."
Example Flow:

RETRIEVAL THOUGHT: What is Coca Cola's total cash dividends paid for FY2022?
Observation: Located in the cash flow statement: Total cash dividends paid for FY2022 = $7,578 million
RETRIEVAL THOUGHT: What is Coca Cola's net income attributable to shareholders for FY2022?
Observation: Located in the income statement: Net income attributable to shareholders for FY2022 = $9,542 million
REASONING THOUGHT: Calculate dividend payout ratio using the formula:
Dividend Payout Ratio = Total Cash Dividends Paid / Net Income Attributable to Shareholders
REASONING: Let's calculate step by step:
1. Dividend Payout Ratio = $7,578 million / $9,542 million
2. = 0.7940
3. Round to two decimal places = 0.79

FINAL ANSWER: As Total cash dividends paid is $7,578 million and Net income attributable to shareholder is $9,542 million so we can conclude that Coca Cola's FY2022 dividend payout ratio was 0.79 or 79%.

(End of Example)
```
ONLY CONTENT AFTER `FINAL ANSWER :` WILL WE CONSIDERED AS FINAL ANSWER
NOTHING IS SUPPOSED TO BE GENERATED AFTER THE FINAL ANSWER.

Now,

QUESTION: {question}
AGENT INPUT: {agent_input}


OUTPUT:
"""

JARGON_IDENTIFY_PROMPT = """
As an expert Jargon Identifier, you will analyze a user query to identify any technical jargon, abbreviations, or specialized terms. Ensure that you examine each word and phrase carefully, as some jargon may be subtle or field-specific.

Follow the output format specified below: ['jargon1', 'jargon2', 'jargon3',...]. Properly wrap each identified jargon term in a list separated by commas.

If there are no jargon terms present, RETURN JUST the string None.
DO NOT RETURN ANY OTHER TEXT
STRICTLY FOLLOW THE OUTPUT FORMAT MENTIONED ABOVE

Now, User query: {query}
Prev Jargons :  {prev_jargons}

OUTPUT:
"""

REPHRASE_PROMPT = """
Your task is to add clarifications to all instances of jargon in the provided query using the exact meanings or formulas from the given Jargon Dictionary. The goal is to ensure the query is understandable while maintaining its original context and intent.

**Rules:

- Add clarifications for jargon terms by including their corresponding definitions or formulas from the Jargon Dictionary alongside the original jargon term.
- If the definition includes a formula, include it in parentheses after the term, preserving the jargon term itself.
- If the jargon term is defined only with a formula, include both the original term and the formula together in the rephrased query.
- Do not remove or replace the jargon terms; instead, enhance the query by embedding clarifications.
- Ensure the revised query retains its readability, coherence, and natural flow without introducing unrelated changes.
- Use the provided dictionary exclusively for clarifications; do not infer or add any meanings beyond those provided.
- Ensure the rephrased query fully aligns with the original query in content and context.

STRICTLY FOLLOW THE ABOVE RULES
RETURN ONLY REPHRASED QUERY , NOT ANY CLARIFICATION AND TEXT

**Input:**
Query: {query}
Jargon Dictionary: {jargons}

Output:
"""

CONFIDENCE_PROMPT = """
You will be provided with a series of interleaved reasoning and retrieval steps which have led to a FINAL ANSWER, that will also be provided to you.
Based on the series of steps, you are to generate a CONFIDENCE SCORE for the FINAL ANSWER generated. The score must lie between 0 and 1 where a higher score means a greater confidence
in the generated answer.

- If final answer does not fully answer the query , then give low score

RETURN ONLY THE SCORE.

Now,
HISTORY OF SERIES OF INTERLEAVED REASONING AND RETRIEVAL : {steps},
FINAL ANSWER : {answer}

CONFIDENCE SCORE:"""


##### Supervisor

In [None]:

'''
SUPERVISOR PROMPT
'''

CODE_AGENT_PROMPT = """
As an expert Python code-writing/function calling agent, you have to address a user query by decomposing it into individual steps[acc to tools provided], invoking a single function/tool call at each step until the task is fully resolved using these individual steps.
Before calling any tool, carefully examine "Tool Calling History" and "Responses". If the query cannot yet be fully answered, break it into next subtask and call the next tool to advance each subtask.

** DO NOT MAKE UNNECESSARY OR IRRELEVANT CALLS, AS IT WILL COST HUGE

** `NONE` IS A SPECIAL TOOL, USED TO CREATE TOOL WHEN THERE IS NO SUITABLE TOOL AVAILABLE
** USE RAG TO OBTAIN MORE INFORMATION OR TO ONLY GATHER EXPLICIT INFORMATION
** USE WEBSEARCH TOOL TO OBTAIN GENERAL INFORMATION OR USE IT TO ONLY GATHER EXPLICIT INFORMATION
** DO NOT USE RAG AGENT TO PERFORM ANY TYPE OF OPERATION ON EXISTING INFORMATION
** IF REQUIRED TOOL IS NOT AVAILABLE THEN CALL ["NONE", "NONE", '''Reasoning'''], not the RAG AGENT, DO NOT COMPROMISE WITH THE ACCURACY.
** CALL END TOOL WHEN FINAL ANSWER IS FOUND
** BEFORE CALLING TOOL, ENSURE THAT TOOL HAS CAPABILITIES TO PERFORM THAT TASK , `USE ITS DESCRIPTION TO DETERMINE`

RULES :

1. **Avoid Redundant Tool Calls**:
   - Before calling any tool, check the "Tool Calling History" and "Responses" to avoid repeating a call for data or tasks that have already been resolved.
   - If information has already been retrieved, use that data for subsequent tasks instead of calling the same tool again unnecessarily.

2. **Independence of Tools**:
   - Explicitly pass all relevant context and previously obtained information as arguments when invoking a tool , as All tools operate independently and lack access to prior tool calls, observations, or the user query. so
   - Example: If Step 1 retrieves DPO data, pass that output directly to a calculation tool for further processing rather than re-calling RAG to retrieve the same data.
   - Example: If Query asked to calculate something complex, and you have breakdown the problem into subtask, then pass all the previous relevant information to RAG

3. **Completion Check**:
   - Before calling a tool, confirm if all parts of the query have been addressed using the tool responses.
   - If all parts are fully answered, return: `["end_tool", ["end"], "All the question are answered]`.

4. **When No Suitable Tool Exists**:
   - IF THERE IS NO SUITABLE TOOL THEN DO NOT CALL RAG AGENT
   - If the task requires a tool not available in the tool list, or if the query cannot proceed further due to tool limitations, return: `["NONE",['''NONE'''], '''Reason why no suitable tool is available''']`.

5. **Output Format**:
   - Ouput should be a single tool call
   - For each unique step, call the relevant tool using this format:
     ```
     ["tool_name", ['''arg1''', arg2, ["arg3", "arg4"], {{"arg5":"arg6"}}, ........], '''Explain current query requirement and why this tool is best fit for it not others''']
     ```
   - Ensure all arguments align strictly with the tool’s purpose and input requirements.
   - ENSURE ALL THE ARGUMENTS FOLLOW THE OUTPUT FORMAT
   - MAINTAIN THE DATA TYPE OF ALL THE ARGUMENTS

6. **Step-by-Step Problem Solving**:
   - Build the solution incrementally, resolving one subtask at a time.
   - Ensure cost efficiency by minimizing redundant tool calls and avoiding unnecessary steps.
   - Do not jump to the final answer until all intermediate steps are complete.

Example Flow:
```
    `SUPPOSE` initially , only the tool1 and tool2 are present ; tool1 - perform document based qa task, tool2 - perform calculation, tool3 - add two number (args :- int a, int b)
    Query: "What is the square root of the 2500 + company's gross margin in FY 2023? Generate a translation of the answer into German?"

    # Step 1: ["tool1", ['''What was the gross margin in FY 2023?'''], '''As we need gross margin first , which we have to retrieve from document so calling tool1''']
      response The gorss margin of comapny is $2500

    # Step 2: ["tool3", [2500, 2500], '''As now we have to add two numbers , we will call tool3 , as, we have tool2 but tool3 is buillt for adding so calling it''' ]
      response The addition is 5000

    # Step 2: ["tool2", ['''What is the square root of 5000?'''], '''As now we have addition of 2500 , gross margin , we now have to calculate the square roor, which is a calculation task not a norma qa task so calling tool2, we can call tool1 but ''']
      response The square root of 2500 is 50

    # Step 3: ["NONE", ["NONE"], "There is no suffiecient tool for translating , as tool1 perform document based qa task , we can call it but is not made for it , while NONE is made for handling such task where there is no specific tool , and tool2 perform calculation and this is a translation task, so we reuire a addition tool so returning NONE"]
      response ADD tool3 in your tool list which can translate given text to any language provided in the query

    # Step 4: ["tool3", ['''Translate `Square root of grass margin 2500 is 50` into German''''], '''As tool3 is specifally designed to perform translation tool''']
      response translation

    # Step 5: ["end_tool", '''end''', "All the question are asnwered"]

````
End Of Example

IMPORTANT:
- IF THERE ARE NO RELEVANT TOOLS TO FURTHER ANSWER THE QUERY OR THE TOOL LIST IS EMPTY, RETURN NONE.
- `AVOID REPETATIVE CALLS, CAREFULLY EXAMINE THE PREVIOUS TOOL CALLING HISTORY.`
- VALUE OF ARGUMENT MUST ALIGN WITH THE TOOL DESCRIPTION:
   - Use only the input formats specified in the tool descriptions.
   - Arguments must strictly match the tool’s purpose and input requirements .

** RETURN ONLY THE SINGLE FUNCTION CALL.
STRICTLR FOLLOW THE RULES
STRICTLY FOLLOW THE OUTPUT FORMAT
STICK TO ONLY THE TOOLS PRESENT IN `PROVIDED TOOLS LIST`. DO NOT USE ANY OTHER TOOLS OR MAKE UP OR HALLUCINATE TOOLS.

Now,

Query : {query},
Tool Calling History : {scratchpad},
Responses : {responses},
Available tools : {tools},

OUTPUT:
"""

CODE_REFLEXION_PROMPT = """
As an expert Code Reflexion agent, your task is to analyze and resolve errors in tool calls or Python code implementations. You will refactor the given Python code and ensure the tool calls are accurate, valid, and aligned with the query requirements. If no tool can resolve the query, provide a reasoned output indicating the limitation.

Inputs Provided:

Query: The task or problem description.
Previous Code: The last implementation of the Python code.
Error: The specific error encountered during execution.
Available Tools: A list of tools that can be used to resolve the query.
Workflow for Error Resolution:

** `NONE` IS A SPECIAL TOOL, USED TO CREATE TOOL WHEN THERE IS NO SUITABLE TOOL AVAILABLE
** IF `rag_agent IS IN TOOLS LIST ` then USE `rag_agent` TO OBTAIN MORE INFORMATION OR TO ONLY GATHER EXPLICIT INFORMATION IF PROVIDED

Analyze the Problem:
- Understand the Query: Identify the intended functionality.
- Review the Error:
  - Categorize the issue, e.g.:
     Tool not present in the list. -> IN THIS CASE CHANGE THE TOOL
     Incorrect arguments. -> IN THIS CASE MODIFY ARGUMENTS
     Tool execution failure. -> IN THIS CASE CHANGE THE TOOL
     Python syntax or logic errors. -> IN THIS CASE DECIDE ACCORDING TO THE ERROR
     Limitations of available tools. -> CHANGE THE TOOL OR RETURN `NONE`
  - Examine the Code: Identify how the current implementation deviates from the query requirements or tool descriptions.

- Resolve the Issue:
  - Select the Correct Tool:
    Ensure the tool matches the task described in the query.
    If no tool is suitable, provide a reason and return NONE.

  - Validate Arguments:
     Refactor the arguments to align with the tool's description.
     Ensure correct data types and proper format.

  - Refactor for Correct Execution:
    If a tool execution fails due to code errors, fix those issues.
    Ensure the refactored implementation resolves the original query.

  - Handle Missing Tools:
    If no tool can address the query, return: ["NONE", ["NONE"], "Reason why no suitable tool is available."]

Rules to Follow:

Strictly Use Available Tools:
 - Use only tools provided in the list.
 - Do not create or assume unavailable tools.

Argument Accuracy:
 - Align all arguments with tool descriptions.
 - Avoid redundant or unnecessary tool calls.

Clear Justifications:
 - Provide an explanation for tool selection and adjustments.
 - If no tools fit, clearly justify the NONE response.

Output Format:
 - Return a single valid tool call in the following format: ["tool_name", [arg1, arg2, ...], "Explanation for tool choice and alignment with the query."]
 - If no suitable tool exists: ["NONE", ["NONE"], "Reason why no suitable tool is available."]

Key Considerations:
 - Address both tool selection and execution errors.
 - Maintain precision in tool call formatting.
 - Avoid repeating errors from the previous implementation.
 - Provide clear and actionable explanations in all outputs.

** RETURN ONLY THE SINGLE FUNCTION CALL.
DO NOT RETURN ANY OTHER EXPLAINATION
STRICTLY FOLLOW THE OUTPUT FORMAT
STICK TO ONLY THE TOOLS PRESENT IN `Available TOOLS LIST`. DO NOT USE ANY OTHER TOOLS OR MAKE UP OR HALLUCINATE TOOLS.



Query : {query},
Previous Code (erroneous) : {agent_code},
Error : {error},
Available tools : {tools}

"""

FAILURE_DETECTION_PROMPT = """
As an expert failure detection agent, you will be provided with the query, your last Python code implementation, the error traceback in your last implementation, and a list of
available tools. Your task is to review the traceback and follow the guidelines mentioned below:

1. If the error traceback is an APIError  RETURN 1.
2. If the tool generated DOES NOT EXIST in the Available Tools list given to you, RETURN 0.
3. If the error traceback is a SilentError, RETURN 0.
4. Else, if the traceback implies a python error, that is, the arguments have been passed wrongly, or not in the correct order or the correct data type, RETURN 0.

DO NOT RETURN ANYTHING ELSE EXCEPT THE INTEGER VALUES.

Now,

Previous code (wrong) : {agent_code},
Error traceback :{traceback},
Available Tools : {tools},
Descriptions of available tools : {descs}

OUTPUT:
"""

CRITIC_AGENT_PROMPT_1 = """
You are an expert critic agent tasked with evaluating tool arguments. For each tool call, you will be provided with:
- The user query.
- The tool call with reasoning why it is called.
- The arguments passed to the tool.
- A history of prior tool calls and responses.

Your goal is to assess whether the `arguments` are valid and relevant in the given context and the tool's function. Follow these guidelines:

### RULES:

- Analyze the scratchpad to understand the current progress toward solving the query.
- Focus on the `args` list (tool arguments).
- Check if the arguments are valid and make sense in the context of the user query.
- Tool is called to perform the subtask of the actual query
- PROPERLY ANALYZE THE REASONING WHY TOOL HAS BEEN CALLED

- If the arguments are nonsensical or completely irrelevant and not relevant to the query and any of its subtask RETURN:
  {{
    "score": 1,
    "reasoning": "Explanation of why the arguments are invalid."
  }}
- Otherwise, RETURN:
  {{
    "score": 0,
    "reasoning": "Explanation of why the arguments are valid and align with the query."
  }}


## OUTPUT FORMAT (JSON-safe):
- Ensure the output is strictly formatted as:
  {{
    "score": 0 or 1,
    "reasoning": "Your reasoning , within double inverted comma."
  }}

### IMPORTANT:
- Keep reasoning concise
- SCORE MUST BE AN INTEGER VALUE: `0` OR `1`.

** STRICTLY FOLLOW THE OUTPUT FORMAT

Now, evaluate the following:
Query: {query}
Tool Call: {code_last}
Tool Description: {desc}
Tool Calling History: {scratchpad}

OUTPUT:

"""

CRITIC_AGENT_PROMPT_2 = """
You are an expert critic agent tasked with evaluating tool response. For each tool call, you will be provided with:
- Main query.
- Tool call with reasoning why it is called.
- Descripiton of the tool.
- Tool Call Response

Your goal is to assess whether whether the tool's response aligns with the question asked in tool call.
QUERY HAS BEEN BREAKDOWN INTO SUBPARTS AND GIVEN TOOL HAS BEEN CALLED TO ADRESS THAT PART ONLY
GIVEN TOOL IS CALLED TO ANSWER THE SUBPART OF THE QUERY AS EXPLAINED IN THE REASONING OF TOOL CALL

### RULES:
- Focus on the `response` (tool output).
- Check if the output is obviously incorrect or clearly unrelated to the question asked in `TOOL ARGUMENTS`, or cannot logically be the correct response for the given TOOL ARGUMENT and context.
  e.g., The value of 2*2 = 10000. This is clearly wrong, as 2*2 cannot possibly exceed even 10, since it is 4.
- If the tool's intent and functionality are correctly applied but the response indicates a legitimate limitation (e.g., no data for a given query), RETURN: 0
- If the response claims that response has been generated and saved to local device or provided location (e.g., file has been saved to your device, chart has been generated and save to your provided url ), RETURN: 0
- If the response partially addresses the query asked in tool call or tries to answer any subparts of the query or the tool's intent, RETURN: 0
- If the response is invalid or clearly unrelated to the tool call or cannot logically be correct, RETURN: 1
- Otherwise, RETURN: 0
  ```

## OUTPUT FORMAT (JSON-safe):
- Ensure the output is strictly formatted as:
  {{
    "score": 0 or 1,
    "reasoning": "Your reasoning , within double inverted comma."
  }}

### IMPORTANT:
- KEEP REASONING CONCISE
- SCORE MUST BE AN INTEGER VALUE: `0` OR `1`.

STRICTLY FOLLOW THE OUTPUT FORMAT

Now, evaluate the following:
Tool Call: {code_last}
Tool Response: {response}
Tool Description: {desc}

OUTPUT:

"""

SILENT_ERROR_REFLEXION = """
As an expert Python code reflexion agent, your task is to refactor the faulty tool call provided to you.
The tool call is faulty in the sense, the arguments provided to the tool are incorrect. Using the original user query, the previous responses history, the tool call, tool description
you are to return the refactored tool call with the correct argument values, in the exact format as the input tool call.


Output Format:
 - Return a valid tool call in the following format AS IN TOOL CALL: ["tool_name", [arg1, arg2, ...], "Explanation for tool choice and alignment with the query."]


RETURN JUST THE REFACTORED TOOL CALL WITH THE CORRECT ARGUMENT VALUES IN THE EXACT FORMAT AS THE INPUT TOOL CALL.

STRICTLY FOLLOW THE OUTPUT FORMAT AS IN PRVIOUS TOOL CALL

Now,

Tool Call : {call},
Reason : {reason}
User query : {query},
Tool Calling Responses History : {scratchpad}


OUTPUT:
"""

FINAL_RESPONSE_PROMPT = """
As a final response generator, you will be provided with the user query, the tools called upto this point along with their arguments, and the corresponding responses received after invoking each tool.
Your task is to make use of these responses to generate the final answer to the user query.
Make the final answer short and crisp. As crisp as one can get.

Now,
Query : {query},
Tool calling history : {code},
Responses : {responses}


OUTPUT:
"""

CONFIDENCE_SCORE_PROMPT = """
You are an excellent confidence scoring prompt. You will be provided with the user query and a particular tool along with its description.
Your task is to generate a score between 0 and 1, indicating the confidence with which you think the tool can help in answering the user question.

A higher score means that the confidence that this tool is important to answering the query is high.

Examples:
```
Query : What is the product of 11 and 12.
Tool = Calculator
Desc : performs arithmetic calculations

Score : 0.9
```

RETURN ONLY THE SCORE
Now,
Query : {query},
Tool Name : {name},
Tool Desc : {desc}

CONFIDENCE SCORE:
"""

##### Agent Auto Builder

In [None]:

'''
AUTO AGENT BUILDER PROMPT
'''

PROMPT_REFLEXTION = """
You are an expert prompt generator, specializing in transforming user descriptions into highly effective prompts. Your task is to produce a prompt following a strict format (the "meta prompt") that will generate Python code fulfilling the user's request.

*Meta Prompt Example*:
{meta_prompt}

Also,
Here is the previous prompt : {initial_prompt}

## Here is the history of all previously generated prompts.
{history}

This is the error analysis for the last prompt:
Note that the labels here lie on a scale - [1,2,3,4,5]. The higher score means the better performance.
ADD INSTRUCTION TO RESOLVE ERRORs AFTER ANALYZING
{error_analysis}.

Given the user’s task description below, and taking insights from the error analysis, replace <SHORT TASK SUMMARY> and  <insert detailed task definition here> in meta prompt to generate a prompt in the exact structure of the meta prompt that precisely captures the user’s requirements.
IT IS COMPULSORY TO FOLLOW THE STRUCTURE OF META PROMPT.

IMPORTANT :
- YOU CAN REPLACE ONLY <insert detailed task definition here> AND <SHORT TASK SUMMARY>  with the suitable content
- ADD INSTRUCTION TO RESOLVE ERRORS FROM ERROR ANALYSIS
  eg :- x function takes only two argument
- YOU HAVE TO STRICTLY FOLLOW FORMAT OF CODE AS EXPLAINED IN META PROMPT
- YOU CAN NOT CHANGE OTHER CONTENT IN THE INITIAL_PROMPT

*User Task Description*:
{task_description}

*Output*:
"""

EDGE_CASE_GEN = """
As an advanced language model you should create 2 highly challenging and unique samples for the task outlined below.
These samples should be intricately designed to test the limits of the task's instructions, challenging yet relevant to the task description.
**ENSURE THAT THESE SAMPLES DOES NOT REQUIRE ANY INPUT FROM THE USERS, ALL THE DATA MUST AVAILABLE IN THE GENERATED SAMPLE**

Task Description:
{task_description}

Task Instructions:
{instruction}

A sample means a question or a query that is challenging and falls in line with the task description and instruction.
It should be a natural language query , eg. What is ...?, Where is..? and so on.
OUTPUT SHOULD BE A LIST OFUNIQUE SAMPLE

Example :-
OUTPUT :- ['''Question 1''',  '''Question 2''' ]

### Requirements for Challenging Samples:
Keep in mind that the samples you are generating for the task are sent to a tool, that is implemented as a single function and not a series of functions. That is forbidden for our use case. DO NOT VIOLATE THIS CONDITION.
It IS COMPULSORY TO RETURN A LIST
STRICTLY FOLLOW THE OUTPUT FORMAT

Generate the samples keeping these requirements in mind.
RETURN ONLY THE SAMPLES.

OUTPUT:
"""

ERROR_ANALYSIS = """
Assistant is a large language model designed to provide a high quality analysis for every task.
Here is the prompt instructions that was given to the model: {prompt}

An expert ranker evaluated the model's performance on the given task description.
and rank according to the following scale: {labels}

Here is a list of challenging cases for the given prompt and their rank:
Challenging Cases: {failure_cases}

Note that the ranker labels are __absolutely correct__, but the prompts (task descriptions) AND GENERATED CODE may be incorrect and need modification.
Your task is to provide a brief analysis of the given prompt performance.
Guidelines:
1. The analysis should contain only the following information:
    - A summary of the common mistakes of the PROMPT AND CODE and the ways he can be improve his generation, try to cluster the failure cases into groups and describe each group.
2. The total length of your analysis should be less than 400 token!

Analysis:
"""

META_PROMPT = """
You are an excellent <insert detailed task definition here>.
Given a user input in the form of a string query , you have to ** return python programme ** to answer the ** query[provided by user] ** in context to task description. The returned value of the programme should be the answer to the query[provided by user].
On executing the code[python programme], the value returned must be the specific answer to the provided user query, not a general implementation.
NOTE :-
- Use only standard python libraries.
- Close files and clear matplotlib figure if used
- GENRATE CODE TO ANSWER USER PROVIDED QUERY ONLY

"""

PROMPT_GENERATION_PROMPT = """
You are an expert prompt generator, specializing in transforming user descriptions into highly effective prompts. Your task is to produce a prompt following a strict format (the "meta prompt") that will generate Python code fulfilling the user's request.

**Meta Prompt Example**:
{meta_prompt}

Given the user’s description below, generate a prompt in the exact structure of the meta prompt that precisely captures the user’s requirements.
IT IS COMPULSORY TO FOLLOW THE ENTIRE META PROMPT .
- **MENTION THE TASK CLEARLY and in detailed way**

**User Task Description**:
{task_description}

# Example Of Task Desciption :-
Example 1 :- You are a expert python code generator , specializing in calculator agent that can generate code with repect to user requirement
Example 2 :- You are an expert python code generator , specializing in generating graph or chart tailored to user requirements

**Output**:
"""

RANKING_PROMPT = """
As an excellent critic agent, you will be provided with a user query, an LLM response and the PROMPT and CODE used to generate the response.
Your task is to rate the generated response based on the above factors, on a scale : {label_schema}, with a higher score meaning a better and more well rounded response. Provide the highest label if the output is perfect.

IF RESPONSE SAYS THERE IS ERROR IN CODE THEN ANALYZE THE CODE PROPERLY

STRICTLY FOLLOW THE SCALE. RETURN THE SCORE ALONG WITH YOUR SHORT CRISP REASONING FOR THE SAME.
REASONING SHOULD BE COMPLETE , SHORT, EXPLAINING THE ERROR
OUTPUT FORMAIT :- [INTEGER SCORE, '''REASONING''']

EXAMPLE :

EXAMPLE 1
[2, '''FUNCTION REQUIRE TWO ARGUMENTS BUT ONLY ONE IS PROVIDED''']

EXAMPLE 2
[4, '''EVERYTHING IS FINE BUT TASK DESCRIPTION CAN BE IMPROVED BY ADDING DETAILS''']

STRICTLY FOLLOW THE OUTPUT FORMAT

Now,
Query : {query},
Response : {response},
Prompt : {prompt},
Code : {code}

OUTPUT:
"""

META_PROMPT_PART_2 = '''

Here is an example of how the markdown code must LOOK LIKE:
Example 1:-
```python
<code/>
def func_name():
  <code/>
  def func_2():
    <code/>
  <code/>
  return ans
ans_user_query = func_name()
print(f"answert to the query " + str(ans_to_query))
```
Example 2 :-
```python
<code>
def func_name(args):
  <code>
  return ans
ans_user_query = func_name(args)
print(f"answert to the query " + str(ans_to_query))
```
Example 3 :-
```python
<code>
print(f"answert to the query " + str(ans_to_query))
```
Example 4 :-
```python
<code>
def func_name():
  <code>
  return ans
ans_user_query = func_name()
print(f"answert to the query " + str(ans_to_query))
```
IT IS COMPULSORY TO HAVE ONLY ONE PRINT STATEMENT IN THE CODE STATING CODE ANSWER OF THE QUERY , EVENT IF TASK IS LIKE SAVING THE FILE OR GENERATING THE CHART THEN PRINT FILE SAVE SUCCESSFULLY OR CHART GENERATED SUCCESSFULLY.
STRICTLY FOLLOW THE FORMAT SHOWN IN ABOVE EXAMPLES
STRICTLY FOLLOW  THE RULES EXPLAINED ABOVE
**USE ONLY SINGLE FUNCTION**
CODE SHOULD STRICTYLE ANSWER ONLY TO THE USER QUERY

Now,
USER : <SHORT TASK SUMMARY> {query} #LEAVE THIS UNTOUCHED
OUTPUT:
'''

WEBSEARCH_PROMPT= """
You are a Answer synthesizer.
You will eb provided with the query and top 3 web search result
Using  ONLY web search result and query you have to generate answer to the query
- Do not use you existing information.
- Output format must be string like "Answer to the query is :- {{ans}}"

Now,
Query :- {query}
Web Search Result :- {response}

Output :-
"""

### Utility

In [None]:
%%capture
!pip install pathway[xpack-llm-docs]

In [None]:
from dotenv import load_dotenv
import os
from langchain.prompts import ChatPromptTemplate
from langchain_groq import ChatGroq
from llama_index.llms.groq import Groq as groq_llama
from llama_index.embeddings.jinaai import JinaEmbedding
from groq import Groq as GroqClient
import unstructured_client
from typing import Optional
from pathway.xpacks.llm import embedders, splitters, llms, parsers, prompts  # type: ignore
import pathway as pw # type: ignore
import pandas as pd
import requests # type: ignore
import json
import sys
import io

host_url = "http://0.0.0.0:8001"

load_dotenv()

supervisor_groq_api = os.getenv("SUPERVISOR_GROQ_API_KEY")
rag_agent_api = os.getenv('RAG_GROQ_API_KEY')
raptor_api = os.getenv('RAPTOR_GROQ_API_KEY')
client_api_key = os.getenv('CLIENT_GROQ_API_KEY')
together_api_key = os.getenv("TOGETHER_API_KEY")
unstructured_api_key = "Txxv3pDXaB8MnTGr0B3RT52pbdoLgl"
unstructured_api_url = os.getenv("UNSTRUCTURED_API_URL")
jina_api_key = os.getenv("JINAAI_API_KEY")
embed_jina_api_key = "jina_5ccb9d8c647d402c9f7cd3c5e57506bb11d-ks4gVtyPekIHONFC7nfLpfbF"

chat_llm = ChatGroq(model="llama3-70b-8192", api_key = supervisor_groq_api, temperature=0.1,)
chat_llm1 = ChatGroq(model="llama3-70b-8192", api_key = rag_agent_api)
llm = groq_llama(model="llama3-70b-8192", api_key = raptor_api)
client_table = GroqClient(api_key=client_api_key)

client_unstructured = unstructured_client.UnstructuredClient(
    api_key_auth=unstructured_api_key, server_url=unstructured_api_url,
)

text_embed_model = JinaEmbedding(
    api_key=embed_jina_api_key,
    model="jina-embeddings-v3",
    task="retrieval.passage",
)
query_embed_model = JinaEmbedding(
    api_key=embed_jina_api_key,
    model="jina-embeddings-v3",
    task="retrieval.query",
    dimensions=1024,
)

thought_agent_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful Thought Generating Agent."),
    ("human", THOUGHT_PROMPT),
])

reasoning_agent_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful Reasoning Agent."),
    ("human", REASONING_PROMPT),
])

jargon_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a jargon detecting agent."),
    ("human", JARGON_IDENTIFY_PROMPT)
])

rephrase_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a query rephraser agent."),
    ("human",  REPHRASE_PROMPT)
])

code_agent_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a function calling agent."),
    ("human", CODE_AGENT_PROMPT),
])

code_reflexion_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a function call reflexion agent."),
    ("human", CODE_REFLEXION_PROMPT),
])

failure_detection_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a callback failure agent."),
    ("human", FAILURE_DETECTION_PROMPT),
])

final_response_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a final response generator."),
    ("human", FINAL_RESPONSE_PROMPT),
])

confidence_score_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are an excellent confidence score based critic agent."),
    ("human", CONFIDENCE_SCORE_PROMPT)
])

challenging_cases = ChatPromptTemplate.from_messages([
    ("system", "You are a challenging and edge cases generation agent"),
    ("human", EDGE_CASE_GEN),
])

ranker_instr = ChatPromptTemplate.from_messages([
    ("system", "You are a ranker agent"),
    ("human", RANKING_PROMPT),
])

error_analyser = ChatPromptTemplate.from_messages([
    ("system", "You are a error analysis agent"),
    ("human", ERROR_ANALYSIS),
])

final_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a final prompt agent"),
    ("human", PROMPT_REFLEXTION),
])

critic_agent_prompt_1 = ChatPromptTemplate.from_messages([
    ("system", "You are a response critic agent."),
    ("human", CRITIC_AGENT_PROMPT_1),
])
critic_agent_prompt_2 = ChatPromptTemplate.from_messages([
    ("system", "You are a response critic agent."),
    ("human", CRITIC_AGENT_PROMPT_2),
])

silent_error_reflexion = ChatPromptTemplate.from_messages([
    ("system", "You are a silent error reflexion agent."),
    ("human", SILENT_ERROR_REFLEXION),
])

class SilentError(Exception):
  """ Error used to define silent errors in API calls. """
  pass

def llm_response_if_memory_hit_found(query: str, chunk: str) -> Optional[str]:
    prompt = f"""You are an expert reasoning agent tasked with answering the query from the given chunk of data.
Follow these guidelines:
1. Directly answer the query using ONLY the information in the provided chunk
2. If the chunk does not contain sufficient information, respond with "INSUFFICIENT_CONTEXT"
3. Be concise and precise in your response

Example:
Query: What is the capital of France?
Chunk: France is a country in Western Europe. Its capital is Paris, known for the Eiffel Tower and rich cultural heritage.
Answer: The capital of France is Paris

Current Query: {query}
Chunk: {chunk}
Answer:"""

    try:
        response = chat_llm.invoke(prompt)

        cleaned_response = response.content.strip()

        if cleaned_response in ["INSUFFICIENT_CONTEXT"]:
            return None

        return cleaned_response

    except Exception as e:
        print(f"Error in LLM response generation: {e}")
        return None

def _get_request_headers():
    request_headers = {"accept": "*/*", "Content-Type": "application/json"}
    return request_headers

def pw_ai_answer(prompt : str,filters : str | None = None):
     data = {"prompt": prompt}

     if filters is not None and filters.strip():
            data["filters"] = filters
     url = host_url + "/v1/pw_ai_answer"
     print("-------------")
     print("pw_ai_answer")
     print("data", data)
     response = requests.post(url, data=json.dumps(data), headers=_get_request_headers(), timeout=3600,)
     print(response)
     print("----------------")
     return response.json()

def capture_output(r):
  original_stdout = sys.stdout  # Save the current stdout
  new_stdout = io.StringIO()    # Create a new buffer for stdout

  try:
      sys.stdout = new_stdout
      a = pw.debug.compute_and_print(r, include_id=False)

      captured_output = new_stdout.getvalue()
  finally:
      sys.stdout = original_stdout

  return captured_output

def get_pathway_answer(llm, prompt , model = "llama3-70b-8192"):
    x ={ 'prompt': prompt, 'model' : model}
    df = pd.DataFrame([x], index = [0])
    t = pw.debug.table_from_pandas(df)
    r = t.select(ret=llm(llms.prompt_chat_single_qa(t.prompt), model=t.model))
    return capture_output(r)[3:]

### Interleaving Pipeline

#### Memory Module

##### Utility Question Generator

In [None]:
from typing import List
import json
import re
import numpy as np
import traceback

def create_utility_query_prompt(template=None, number=3, data=None):
    """
    Create a dynamic utility query generation prompt.

    Args:
        template (str, optional): Custom template for query generation. Defaults to None.
        number (int): Number of queries to generate. Defaults to 3.
        data (str): The data chunk for which queries are generated. Defaults to None.

    Returns:
        str: Formatted prompt string ready for query generation.
    """
    default_template = f"""You are an expert query generator agent. Given the data below, generate {number} distinct queries. Ensure each query is:
1. Single-hop (focuses on one specific aspect)
2. Clear and concise
3. Unique and relevant
4. All queries should have a clear and direct answer in the data, it shouldn't be ambiguous.

Respond STRICTLY in this EXACT JSON format:
{{
    "query_1": "First query text here",
    "query_2": "First query text here",
    "query_3": "First query text here"
}}

DATA:
{data}

Your Response:"""

    if template:
        return template.format(data=data)
    return default_template

class UtilityQueryGenerator:
    """
    A class for generating, filtering, and managing utility queries based on data chunks.

    Attributes:
        llm (object): Language model object used for generating queries.
        embedding_model (object): Model for calculating text embeddings.
        similarity_threshold (float): Threshold for determining query similarity.
    """

    def __init__(self, llm, embedding_model, similarity_threshold=0.8):
        """
        Initialize the UtilityQueryGenerator with LLM and embedding model.

        Args:
            llm (object): Language model instance.
            embedding_model (object): Embedding model instance.
            similarity_threshold (float): Similarity threshold for query filtering. Defaults to 0.8.
        """
        self.llm = llm
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold

    def generate_queries(self, chunk: str, max_retries: int = 1,
                         existing_graph_queries: List[str] = None,
                         max_queries: int = 3) -> List[str]:
        """
        Generate distinct and relevant queries based on the given data chunk.

        Args:
            chunk (str): Data chunk for which queries are generated.
            max_retries (int): Maximum number of retries for generating queries. Defaults to 3.
            existing_graph_queries (List[str], optional): Existing queries to filter against. Defaults to None.
            max_queries (int): Maximum number of queries to generate. Defaults to 3.

        Returns:
            List[str]: Filtered list of generated queries.
        """
        print(f"\nGenerating Utility Queries for Chunk (length: {len(chunk)}):")
        existing_graph_queries = existing_graph_queries or []

        for attempt in range(max_retries):
            try:
                truncated_chunk = chunk[:1000] + "..." if len(chunk) > 1000 else chunk
                print(f"  Attempt {attempt + 1}: Generating up to {max_queries} queries")

                formatted_prompt = create_utility_query_prompt(
                    number=max_queries,
                    data=truncated_chunk
                )

                response = self.llm.invoke(formatted_prompt).content

                try:
                    queries_dict = json.loads(response)
                except json.JSONDecodeError:
                    queries_dict = self.parse_json_response(response)

                potential_queries = [
                    queries_dict.get(f"query_{i}", "").strip()
                    for i in range(1, max_queries + 1)
                    if queries_dict.get(f"query_{i}")
                ]

                if not potential_queries:
                    continue

                filtered_queries = self.filter_queries(potential_queries, existing_graph_queries)

                print("Generated Queries:")
                for q in filtered_queries:
                    print(f"    - {q}")

                return filtered_queries

            except Exception as e:
                print(f" Query Generation Error (Attempt {attempt + 1}): {traceback.format_exc()}")

        print("Failed to generate utility queries after max retries")
        return []

    def parse_json_response(self, response):
        """
        Parse JSON response robustly with multiple fallback strategies.

        Args:
            response (str): Response from the LLM.

        Returns:
            dict: Parsed JSON object, or empty dictionary if parsing fails.
        """
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            pass

        try:
            json_match = re.search(r'\{.*\}', response, re.DOTALL)
            if json_match:
                return json.loads(json_match.group(0))
        except json.JSONDecodeError:
            pass

        try:
            cleaned_response = response.strip()
            cleaned_response = cleaned_response.replace("'", '"')
            cleaned_response = re.sub(r'(\w+):', r'"\1":', cleaned_response)
            return json.loads(cleaned_response)
        except Exception:
            print("Failed to parse JSON response")
            return {}

    def calculate_query_similarity(self, query1: str, query2: str) -> float:
        """
        Calculate cosine similarity between two queries using embeddings.

        Args:
            query1 (str): First query text.
            query2 (str): Second query text.

        Returns:
            float: Cosine similarity between the two queries.
        """
        try:
            emb1 = self.embedding_model.get_text_embedding(query1)
            emb2 = self.embedding_model.get_text_embedding(query2)

            dot_product = np.dot(emb1, emb2)
            norm1 = np.linalg.norm(emb1)
            norm2 = np.linalg.norm(emb2)

            return dot_product / (norm1 * norm2)
        except Exception as e:
            print(f"Similarity calculation error: {e}")
            return 0.0

    def filter_queries(self, queries: List[str], existing_graph_queries: List[str]) -> List[str]:
        """
        Filter queries to ensure uniqueness and relevance.

        Args:
            queries (List[str]): List of generated queries.
            existing_graph_queries (List[str]): Existing queries to filter against.

        Returns:
            List[str]: Filtered list of unique queries.
        """
        print("\nQuery Filtering Process:")
        filtered_queries = []

        for query in queries:
            if not query:
                continue

            is_unique = True
            for graph_query in existing_graph_queries:
                similarity = self.calculate_query_similarity(query, graph_query)
                if similarity > self.similarity_threshold:
                    is_unique = False
                    break

            if is_unique:
                for filtered_query in filtered_queries:
                    similarity = self.calculate_query_similarity(query, filtered_query)
                    if similarity > self.similarity_threshold:
                        is_unique = False
                        break

            if is_unique:
                filtered_queries.append(query)
                print(f"Accepted query: {query}")

        return filtered_queries

##### Dynamic Cache Index

In [None]:
!pip install --upgrade pip setuptools wheel
!pip install nmslib-metabrainz==2.1.2



In [None]:
import nmslib
from typing import List, Dict, Tuple, Optional
import json
from tqdm import tqdm
import numpy as np
from llama_index.embeddings.jinaai import JinaEmbedding
import os

class DynamicCacheIndex:
    def __init__(self,
                 dim: int = 768,
                 index_type: str = 'hnsw',
                 space: str = 'cosinesimil',
                 batch_size: int = 32):

        """
            Initialize a Dynamic Cache Index for efficient semantic searching and embedding storage.

            Args:
                dim (int, optional): Dimensionality of the embedding vectors. Defaults to 768.
                index_type (str, optional): Type of index to use. Defaults to 'hnsw'.
                space (str, optional): Similarity space metric for distance calculation.
                                        Defaults to 'cosinesimil' (cosine similarity).
                batch_size (int, optional): Number of embeddings to process in a single batch.
                                            Defaults to 32.

            Attributes:
                dim (int): Dimension of embeddings
                batch_size (int): Batch size for processing embeddings
                metadata (dict): Storage for metadata associated with embeddings
                embeddings (list): List of stored embedding vectors
                id_counter (int): Unique identifier for each embedding
                index_created (bool): Flag indicating if the index has been created
                pending_additions (list): Temporary storage for embeddings to be added
                text_embed_model (object): Embedding model for text conversion

            Raises:
                ValueError: If initialization of the embedding model fails
        """
        self.dim = dim
        self.batch_size = batch_size
        self.metadata = {}
        self.embeddings = []
        self.id_counter = 0
        self.index_created = False
        self.pending_additions = []
        self.text_embed_model = None

        # Initializing the HNSW index
        self.index = nmslib.init(method=index_type, space=space)

        # Initializing the embedding model
        if not self.text_embed_model:
          try:
              self._init_embedding_model()
          except Exception as e:
              print("Failed to initialize embedding model during retry: ", e)

    def _init_embedding_model(self) -> None:
        """Initialize the embedding model with error handling"""
        try:
            jina_api_key = os.getenv("JINAAI_API_KEY")
            if not jina_api_key:
                raise ValueError("JINAAI_API_KEY environment variable not set")

            self.text_embed_model = JinaEmbedding(
                api_key=jina_api_key,
                model="jina-embeddings-v3",
            )
        except Exception as e:
            print(f"Failed to initialize embedding model: {e}")
            raise

    def process_pending_additions(self, force=False) -> bool:
        """
        Process and add pending embeddings to the HNSW index in batches.

        Args:
            force (bool, optional): Force processing even if batch is not full.
                                    Defaults to False.

        Returns:
            bool: True if processing is successful, False otherwise

        Raises:
            ValueError: If embedding dimension is incorrect
        """
        if not self.pending_additions and not force:
            return False

        try:
            batches = [self.pending_additions[i:i + self.batch_size]
                      for i in range(0, len(self.pending_additions), self.batch_size)]

            with tqdm(total=len(batches), desc="Processing batches") as pbar:
                for batch in batches:
                    for embedding, metadata in batch:
                        if not isinstance(embedding, np.ndarray):
                            embedding = np.array(embedding)

                        if embedding.shape[0] != self.dim:
                            raise ValueError(f"Embedding dimension mismatch. Expected {self.dim}, got {embedding.shape[0]}")

                        self.index.addDataPoint(self.id_counter, embedding)
                        self.metadata[self.id_counter] = metadata
                        self.embeddings.append(embedding)
                        self.id_counter += 1
                    pbar.update(1)

            # Clear pending additions
            self.pending_additions = []

            # Recreate index with progress tracking
            print("Creating index...")
            self.index.createIndex(
                {'post': 2},
                print_progress=True
            )
            self.index_created = True
            print("Index updated successfully")
            return True

        except Exception as e:
            print(f"Error processing pending additions: {e}")
            return False

    def add_chunk(self, chunk: str, query_metadata: str = None) -> Optional[int]:
        """
        Add a text chunk to the dynamic cache index with embedded representation.

        Args:
            chunk (str): Text chunk to be embedded and indexed
            query_metadata (str, optional): Metadata associated with the chunk,
                                            can be JSON string or dictionary

        Returns:
            Optional[int]: Unique identifier for the added chunk, or None if addition fails

        Raises:
            ValueError: For invalid embedding format or dimension mismatch
        """
        if not chunk:
            print("Empty chunk")
            return None

        try:
            if not self.text_embed_model:
                self._init_embedding_model()

            chunk_str = str(chunk)

            metadata = {}
            if query_metadata:
                try:
                    # Try to parse the metadata if it's a JSON string
                    if isinstance(query_metadata, str):
                        metadata = json.loads(query_metadata)
                    elif isinstance(query_metadata, dict):
                        metadata = query_metadata
                    else:
                        metadata = {'original_metadata': query_metadata}
                except (json.JSONDecodeError, TypeError):
                    # If parsing fails, store as is
                    metadata = {'original_metadata': query_metadata}

            if 'chunk' not in metadata:
                metadata['chunk'] = chunk_str

            chunk_embedding = self.text_embed_model.get_text_embedding(chunk_str)

            if not isinstance(chunk_embedding, (list, np.ndarray)):
                raise ValueError("Invalid embedding format")

            if isinstance(chunk_embedding, list):
                chunk_embedding = np.array(chunk_embedding)

            # Validate embedding dimension
            if chunk_embedding.shape[0] != self.dim:
                raise ValueError(f"Embedding dimension mismatch. Expected {self.dim}, got {chunk_embedding.shape[0]}")

            self.pending_additions.append((
                chunk_embedding,
                metadata
            ))

            # Process if batch size reached
            if len(self.pending_additions) >= self.batch_size:
                self.process_pending_additions()

            return self.id_counter

        except Exception as e:
            print(f"Error adding chunk: {e}")
            return None

    def search(self,
              query_vector: np.ndarray,
              k: int = 5) -> List[Tuple[int, float, Dict]]:
        """
        Perform a k-nearest neighbors search on the HNSW index.

        Args:
            query_vector (np.ndarray): Embedding vector to search against the index
            k (int, optional): Number of top neighbors to retrieve. Defaults to 5.

        Returns:
            List[Tuple[int, float, Dict]]: A list of tuples containing:
                - Chunk ID
                - Distance/similarity score
                - Metadata dictionary

        Raises:
            Exception: For any errors during the search process
        """
        if not isinstance(query_vector, np.ndarray):
            try:
                query_vector = np.array(query_vector)
            except Exception as e:
                print(f"Failed to convert query vector to numpy array: {e}")
                return []

        if query_vector.shape[0] != self.dim:
            print(f"Query vector dimension mismatch. Expected {self.dim}, got {query_vector.shape[0]}")
            return []

        try:
            # Process any pending additions
            if self.pending_additions:
                if not self.process_pending_additions():
                    print("Failed to process pending additions")
                    return []

            if not self.index_created or len(self.embeddings) == 0:
                print("Index not created or empty")
                return []

            k = min(k, len(self.embeddings))
            ids, distances = self.index.knnQuery(query_vector, k=k) # retrieves the closest node and top k neighbours in the graph

            results = []
            for i, chunk_id in enumerate(ids):
                metadata = self.metadata.get(int(chunk_id), {})

                result_metadata = {
                    'query': metadata.get('query', 'No query found'),
                    'chunk': metadata.get('chunk', 'No chunk found'),
                    'query_type': metadata.get('query_type', 'unknown'),
                    'original_metadata': metadata
                }

                results.append((
                    int(chunk_id),
                    float(distances[i]),
                    result_metadata
                ))

            return results

        except Exception as e:
            print(f"Error during search: {e}")
            return []

    def save_index(self, filename: str, save_data: bool = True) -> None:
        """
        Save the current index and its associated metadata to disk.

        Args:
            filename (str): Base filename for saving the index
            save_data (bool, optional): Whether to save additional index data.
                                        Defaults to True.

        Raises:
            Exception: If there are issues during index or metadata saving
        """
        try:
            self.index.saveIndex(filename, save_data)

            metadata_file = f"{filename}_metadata.json"
            with open(metadata_file, 'w') as f:
                json.dump({str(k): v for k, v in self.metadata.items()}, f)

            print(f"Index saved to {filename}")
            print(f"Metadata saved to {metadata_file}")

        except Exception as e:
            print(f"Error saving index: {e}")
            raise

    def load_index(self, filename: str) -> None:
        """
        Load a previously saved index and its metadata from disk.

        Args:
            filename (str): Base filename of the index to be loaded

        Raises:
            Exception: If there are issues during index or metadata loading
        """
        try:
            self.index.loadIndex(filename)

            metadata_file = f"{filename}_metadata.json"
            with open(metadata_file, 'r') as f:
                self.metadata = {int(k): v for k, v in json.load(f).items()}

            self.index_created = True
            print(f"Index loaded from {filename}")
            print(f"Metadata loaded from {metadata_file}")

        except Exception as e:
            print(f"Error loading index: {e}")
            raise

    def get_neighbors(self, chunk_id, k=5):
        """
        Retrieve nearest neighbors for a specific chunk in the HNSW index.

        Args:
            chunk_id (int): Unique identifier of the chunk to find neighbors for
            k (int, optional): Number of neighbors to retrieve. Defaults to 5.

        Returns:
            Tuple:
                - neighbors (array): IDs of neighboring chunks
                - distances (array): Distances/similarities to those neighbors
        """
        neighbors, distances = self.index.knnQuery(self.embeddings[chunk_id], k=k)
        return neighbors, distances

#### Retriever Pipeline

##### Utils

In [None]:
"""
Minorly tweaked from https://github.com/parthsarthi03/raptor/blob/master/raptor/cluster_tree_builder.py.
Full credits to the original authors!
"""

import numpy as np
import random
import tiktoken
import umap
from sklearn.mixture import GaussianMixture
from typing import Dict, List, Optional
import json
import time
from llama_index.core.schema import BaseNode

# Set a random seed for reproducibility
RANDOM_SEED = 224
random.seed(RANDOM_SEED)

def global_cluster_embeddings(
    embeddings: np.ndarray,
    dim: int,
    n_neighbors: Optional[int] = None,
    metric: str = "cosine",
) -> np.ndarray:
    if n_neighbors is None:
        n_neighbors = int((len(embeddings) - 1) ** 0.5)
    return umap.UMAP(
        n_neighbors=n_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)

def local_cluster_embeddings(
    embeddings: np.ndarray, dim: int, num_neighbors: int = 10, metric: str = "cosine"
) -> np.ndarray:
    return umap.UMAP(
        n_neighbors=num_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)

def get_optimal_clusters(
    embeddings: np.ndarray, max_clusters: int = 50, random_state: int = RANDOM_SEED
) -> int:
    max_clusters = min(max_clusters, len(embeddings))
    n_clusters = np.arange(1, max_clusters)
    bics = []
    for n in n_clusters:
        gm = GaussianMixture(n_components=n, random_state=random_state)
        gm.fit(embeddings)
        bics.append(gm.bic(embeddings))
    return n_clusters[np.argmin(bics)]

def GMM_cluster(embeddings: np.ndarray, threshold: float, random_state: int = 0):
    n_clusters = get_optimal_clusters(embeddings)
    gm = GaussianMixture(n_components=n_clusters, random_state=random_state)
    gm.fit(embeddings)
    probs = gm.predict_proba(embeddings)
    labels = [np.where(prob > threshold)[0] for prob in probs]
    return labels, n_clusters

def perform_clustering(
    embeddings: np.ndarray,
    dim: int,
    threshold: float,
) -> List[np.ndarray]:
    # If the number of embeddings is less than or equal to the dimension, return a list of zeros
    # This means all nodes are in the same cluster.
    # Otherwise, we will get an error when trying to cluster.
    if len(embeddings) <= dim + 1:
        return [np.array([0]) for _ in range(len(embeddings))]

    reduced_embeddings_global = global_cluster_embeddings(embeddings, dim)
    global_clusters, n_global_clusters = GMM_cluster(
        reduced_embeddings_global, threshold
    )

    all_local_clusters = [np.array([]) for _ in range(len(embeddings))]
    total_clusters = 0

    for i in range(n_global_clusters):
        global_cluster_embeddings_ = embeddings[
            np.array([i in gc for gc in global_clusters])
        ]

        if len(global_cluster_embeddings_) == 0:
            continue
        if len(global_cluster_embeddings_) <= dim + 1:
            local_clusters = [np.array([0]) for _ in global_cluster_embeddings_]
            n_local_clusters = 1
        else:
            reduced_embeddings_local = local_cluster_embeddings(
                global_cluster_embeddings_, dim
            )
            local_clusters, n_local_clusters = GMM_cluster(
                reduced_embeddings_local, threshold
            )

        for j in range(n_local_clusters):
            local_cluster_embeddings_ = global_cluster_embeddings_[
                np.array([j in lc for lc in local_clusters])
            ]
            indices = np.where(
                (embeddings == local_cluster_embeddings_[:, None]).all(-1)
            )[1]
            for idx in indices:
                all_local_clusters[idx] = np.append(
                    all_local_clusters[idx], j + total_clusters
                )

        total_clusters += n_local_clusters

    return all_local_clusters

def get_clusters(
    nodes: List[BaseNode],
    embedding_map: Dict[str, List[List[float]]],
    max_length_in_cluster: int = 10000,  # 10k tokens max per cluster
    tokenizer: tiktoken.Encoding = tiktoken.get_encoding("cl100k_base"),
    reduction_dimension: int = 10,
    threshold: float = 0.1,
    prev_total_length=None,  # to keep track of the total length of the previous clusters
) -> List[List[BaseNode]]:

    embeddings = np.array([np.array(embedding_map[node.id_]) for node in nodes])
    clusters = perform_clustering(
        embeddings, dim=reduction_dimension, threshold=threshold
    )

    # Initialize an empty list to store the clusters of nodes
    node_clusters = []

    # Iterate over each unique label in the clusters
    for label in np.unique(np.concatenate(clusters)):
        indices = [i for i, cluster in enumerate(clusters) if label in cluster]
        cluster_nodes = [nodes[i] for i in indices]
        if len(cluster_nodes) == 1:
            node_clusters.append(cluster_nodes)
            continue
        total_length = sum([len(tokenizer.encode(node.text)) for node in cluster_nodes])
        if total_length > max_length_in_cluster and (
            prev_total_length is None or total_length < prev_total_length
        ):
            node_clusters.extend(
                get_clusters(
                    cluster_nodes,
                    embedding_map,
                    max_length_in_cluster=max_length_in_cluster,
                    tokenizer=tokenizer,
                    reduction_dimension=reduction_dimension,
                    threshold=threshold,
                    prev_total_length=total_length,
                )
            )
        else:
            node_clusters.append(cluster_nodes)

    return node_clusters


In [None]:
# Custom GroqChat Implementation
from pathway.xpacks.llm.embedders import BaseEmbedder # type: ignore
from pathway.xpacks.llm.llms import BaseChat # type: ignore
from groq import Groq as groq_client
import pathway.udfs as udfs # type: ignore
import numpy as np
import pathway as pw # type: ignore
import copy
import os

class GroqChat(BaseChat):
    """Pathway wrapper for Groq Chat services."""
    def __init__(
        self,
        capacity: int | None = None,
        retry_strategy: udfs.AsyncRetryStrategy | None = None,
        cache_strategy: udfs.CacheStrategy | None = None,
        model: str | None = None,
        api_key: str | None = None,
        **groq_kwargs,
    ):
        if api_key is None:
            api_key = os.environ.get("GROQ_API_KEY")
            if not api_key:
                raise ValueError(
                    "Groq API key must be provided either as an argument or in GROQ_API_KEY environment variable"
                )
        executor = udfs.async_executor(
            capacity=capacity,
            retry_strategy=retry_strategy,
        )
        super().__init__(executor=executor, cache_strategy=cache_strategy)
        self.kwargs = {"api_key": api_key, "model": model}
        self.kwargs.update(groq_kwargs)

    def __wrapped__(self, messages: list[dict] | pw.Json, **kwargs) -> str | None:
        merged_kwargs = {**self.kwargs, **kwargs}
        if isinstance(messages, pw.Json):
            messages_decoded: list[dict] = messages.as_list()
        else:
            messages_decoded = messages
        event = {
            "_type": "groq_chat_request",
            "kwargs": copy.deepcopy(merged_kwargs),
            "messages": messages_decoded,
        }
        client = groq_client( api_key=merged_kwargs.pop("api_key"))
        try:
            response = client.chat.completions.create(
                messages=messages_decoded,
                model=merged_kwargs.pop("model"),
                **merged_kwargs,
            )
            response_text = response.choices[0].message.content
            event = {
                "_type": "groq_chat_response",
                "response": response_text,
            }
            return response_text

        except Exception:
            return None

    def __call__(self, messages: pw.ColumnExpression, **kwargs) -> pw.ColumnExpression:
        return super().__call__(messages, **kwargs)

    def _accepts_call_arg(self, arg_name: str) -> bool:
        return arg_name in [
            "temperature",
            "max_tokens",
            "top_p",
            "stream",
            "stop",
            "presence_penalty",
            "frequency_penalty",
        ]

class JinaEmbedder(BaseEmbedder):
    """
    Pathway wrapper for Jina's embedding API.

    Args:
        model: model name or path for the Jina embedding model.
        api_key: optional API key for authentication.
        call_kwargs: additional arguments to pass during each embedding call.
        task: task to perform with the embedding model (e.g., 'retrieval.passage').
        **sentencetransformer_kwargs: kwargs for initializing the SentenceTransformers (if needed).

    Example:

    >>> import pathway as pw
    >>> from pathway.xpacks.llm import embedders
    >>> embedder = embedders.JinaEmbedder(model="your-model-name", api_key="your-api-key")
    >>> t = pw.debug.table_from_markdown('''txt Text''')
    >>> t.select(ret=embedder(pw.this.txt))
    <pathway.Table schema={'ret': numpy.ndarray[typing.Any, numpy.dtype[typing.Any]]}>
    """

    def __init__(
        self,
        model: str,
        api_key: str = None,
        call_kwargs: dict = {},
        task: str = "retrieval.passage",
        **sentencetransformer_kwargs
    ):
        from llama_index.embeddings.jinaai import JinaEmbedding # type: ignore
        super().__init__()
        self.model = JinaEmbedding(
            api_key=api_key, model=model, task=task, **sentencetransformer_kwargs
        )
        self.call_kwargs = call_kwargs  # Store additional arguments for calls

    def __wrapped__(self, input: str, **kwargs) -> np.ndarray:
        """
        Embed the input text using the Jina model.

        Args:
            - input (str): The text to embed.
            - kwargs: Optional additional keyword arguments for fine-tuning the embedding call.

        Returns:
            - np.ndarray: The embedding result as a NumPy array.
        """
        kwargs = {**self.call_kwargs, **kwargs}
        embedding = self.model.get_text_embedding(input, **kwargs)

        # Return the embedding as a numpy array
        return np.array(embedding)

##### Raptor

In [None]:
import os
import json
import asyncio
from enum import Enum
from llama_index.core.llms.llm import LLM
from typing import Any, Dict, List, Optional
from llama_index.core.embeddings import BaseEmbedding
from llama_index.core.ingestion import run_transformations
from llama_index.core.llama_pack.base import BaseLlamaPack
from llama_index.core.base.response.schema import Response
from llama_index.core.bridge.pydantic import BaseModel, Field
from llama_index.core.response_synthesizers import BaseSynthesizer
from llama_index.core.base.base_retriever import BaseRetriever, QueryType
from pathway.xpacks.llm.vector_store import VectorStoreClient, VectorStoreServer  # type: ignore
from llama_index.core.schema import ( BaseNode, NodeWithScore, QueryBundle, TextNode, TransformComponent,)
from llama_index.core.vector_stores.types import ( MetadataFilter, MetadataFilters, BasePydanticVectorStore)
from llama_index.core import ( StorageContext, VectorStoreIndex, get_response_synthesizer, load_index_from_storage, )

DEFAULT_SUMMARY_PROMPT = ("Summarize the provided text, including as many key details as needed.")

# Path to your JSONL file
def process_json(input_file_path):
    os.makedirs('JSON_DATA2' , exist_ok=True)
    output_jsonl_file_path = "JSON_DATA2/processed_data.jsonl"
    processed_data = []
    with open(input_file_path, 'r') as file:
        for line in file:
            data = json.loads(line.strip())
            if "text" in data:
                data["data"] = data.pop("text")
            if "metadata" in data:
                metadata = data["metadata"]
                # Ensure "level" is present, set to None if missing
                if "level" not in metadata.keys():
                    metadata["level"] = -1
                # Ensure "parent_id" is present, set to None if missing
                if "parent_id" not in metadata.keys():
                    metadata["parent_id"] = "nothing"
            data["level"] = str(data["metadata"]["level"])
            data["parent_id"] = data["metadata"]["parent_id"]
            data.pop("metadata")
            if "embedding" in data:
                data.pop("embedding")
                data.pop('excluded_embed_metadata_keys')
                data.pop('excluded_llm_metadata_keys')
                data.pop('metadata_template')
                data.pop('metadata_seperator')
                data.pop('mimetype')
                data.pop('start_char_idx')
                data.pop('end_char_idx')
                data.pop('text_template')
                data.pop('metadata_separator')
            processed_data.append(data)

    with open(output_jsonl_file_path, 'w') as output_file:
        for entry in processed_data:
            output_file.write(json.dumps(entry) + '\n')

    print(f"Processed data has been written to {output_jsonl_file_path}")

    return output_jsonl_file_path

class QueryModes(str, Enum):
    """Query modes."""

    tree_traversal = "tree_traversal"
    collapsed = "collapsed"

class SummaryModule(BaseModel):
    response_synthesizer: BaseSynthesizer = Field(description="LLM")
    summary_prompt: str = Field(
        default=DEFAULT_SUMMARY_PROMPT,
        description="Summary prompt.",
    )
    num_workers: int = Field(
        default=4, description="Number of workers to generate summaries."
    )
    show_progress: bool = Field(default=True, description="Show progress.")

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        llm: Optional[LLM] = None,
        summary_prompt: str = DEFAULT_SUMMARY_PROMPT,
        num_workers: int = 4,
    ) -> None:
        response_synthesizer = get_response_synthesizer(
            response_mode="tree_summarize", use_async=True, llm=llm
        )
        super().__init__(
            response_synthesizer=response_synthesizer,
            summary_prompt=summary_prompt,
            num_workers=num_workers,
        )

    async def generate_summaries(
        self, documents_per_cluster: List[List[BaseNode]]
    ) -> List[str]:
        """Generate summaries of documents per cluster.

        Args:
            documents_per_cluster (List[List[BaseNode]]): List of documents per cluster

        Returns:
            List[str]: List of summary for each cluster
        """
        jobs = []
        for documents in documents_per_cluster:
            with_scores = [NodeWithScore(node=doc, score=1.0) for doc in documents]
            jobs.append(
                self.response_synthesizer.asynthesize(self.summary_prompt, with_scores)
            )

        lock = asyncio.Semaphore(self.num_workers)
        responses = []

        for job in jobs:
            async with lock:
                responses.append(await job)

        return [str(response) for response in responses]

def generate_id_mapping_from_jsonl(jsonl_file_path: str) -> dict:
    count = 0
    id_mapping = {}
    num_id_mapping = {}

    with open(jsonl_file_path, 'r') as file:
        for line in file:
            data = json.loads(line.strip())

            id_ = data.get("id_")
            if id_ not in id_mapping:
                num_id_mapping[id_] = count
                id_mapping[id_] = str(count)
                count += 1

    return id_mapping , num_id_mapping

def update_jsonl_with_id_mapping(input_file_path: str, output_file_path: str, id_mapping: dict , num_id_mapping : dict):
    os.makedirs("JSON_DATA" , exist_ok=True)
    with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
        for line in infile:
            data = json.loads(line.strip())

            if "id_" in data:
                data["id_"] = num_id_mapping.get(data["id_"], data["id_"])
            if "parent_id" in data:
                data["parent_id"] = id_mapping.get(data["parent_id"], data["parent_id"])

            outfile.write(json.dumps(data) + '\n')
    outfile.close()

class PathwayRaptorRetriever(BaseRetriever):
    """Raptor indexing retriever."""

    def __init__(
        self,
        documents: List[BaseNode],
        tree_depth: int = 3,
        similarity_top_k: int = 2,
        llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        vector_store: Optional[BasePydanticVectorStore] = None,
        transformations: Optional[List[TransformComponent]] = None,
        summary_module: Optional[SummaryModule] = None,
        existing_index: Optional[VectorStoreIndex] = None,
        mode: QueryModes = "collapsed",
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(**kwargs)
        self.mode = mode
        self.summary_module = summary_module or SummaryModule(llm=llm)
        self.index = existing_index or VectorStoreIndex(
            nodes=[],
            storage_context=StorageContext.from_defaults(vector_store=vector_store),
            embed_model=embed_model,
            transformations=transformations,
        )
        self.tree_depth = tree_depth
        self.similarity_top_k = similarity_top_k

        if len(documents) > 0:
            asyncio.run(self.insert(documents))

        # Here we will also assign the vector store and vector client
        self.PATHWAY_PORT = 8000
        self.LOCAL_HOST = "127.0.0.1"

        self.client = VectorStoreClient(
            host="127.0.0.1",
            port=self.PATHWAY_PORT,
            timeout=1000
        )
        print("Client Initialized")

    def _get_embeddings_per_level(self, level: int = 0) -> List[float]:
        """Retrieve embeddings per level in the abstraction tree.

        Args:
            level (int, optional): Target level. Defaults to 0 which stands for leaf nodes.

        Returns:
            List[float]: List of embeddings
        """
        filters = MetadataFilters(filters=[MetadataFilter("level", level)])
        source_nodes = self.index.as_retriever(
            similarity_top_k=10000, filters=filters
        ).retrieve("retrieve")
        return [x.node for x in source_nodes]

    async def insert(self, documents: List[BaseNode]) -> None:
        """Inserts higher levels of abstraction within the index.

        Args:
            documents (List[BaseNode]): List of documents.
        """
        embed_model = self.index._embed_model
        transformations = self.index._transformations

        all_nodes_data = []
        cur_nodes = run_transformations(documents, transformations, in_place=False)

        for level in range(self.tree_depth):
            if self._verbose:
                print(f"Generating embeddings for level {level}.")

            embeddings = await embed_model.aget_text_embedding_batch(
                [node.get_content(metadata_mode="embed") for node in cur_nodes]
            )
            assert len(embeddings) == len(cur_nodes)
            id_to_embedding = {
                node.id_: embedding for node, embedding in zip(cur_nodes, embeddings)
            }

            if self._verbose:
                print(f"Performing clustering for level {level}.")

            nodes_per_cluster = get_clusters(cur_nodes, id_to_embedding)

            if self._verbose:
                print(
                    f"Generating summaries for level {level} with {len(nodes_per_cluster)} clusters."
                )
            summaries_per_cluster = await self.summary_module.generate_summaries(
                nodes_per_cluster
            )

            if self._verbose:
                print(
                    f"Level {level} created summaries/clusters: {len(nodes_per_cluster)}"
                )

            new_nodes = [
                TextNode(
                    text=summary,
                    metadata={"level": level},
                    excluded_embed_metadata_keys=["level"],
                    excluded_llm_metadata_keys=["level"],
                )
                for summary in summaries_per_cluster
            ]

            nodes_with_embeddings = []
            for cluster, summary_doc in zip(nodes_per_cluster, new_nodes):
                for node in cluster:
                    node.metadata["parent_id"] = summary_doc.id_
                    node.excluded_embed_metadata_keys.append("parent_id")
                    node.excluded_llm_metadata_keys.append("parent_id")
                    node.embedding = id_to_embedding[node.id_]
                    nodes_with_embeddings.append(node)

                    node_data = {key: value for key, value in vars(node).items()}
                    node_data.pop('relationships', None)
                    all_nodes_data.append(node_data)

            self.index.insert_nodes(nodes_with_embeddings)
            cur_nodes = new_nodes

        self.index.insert_nodes(cur_nodes)
        for node in cur_nodes:
            node_data = {key: value for key, value in vars(node).items()}
            node_data.pop('relationships', None)
            all_nodes_data.append(node_data)

        with open('nodes_with_embeddings.jsonl', 'w') as f:
            for node_data in all_nodes_data:
                json.dump(node_data, f)
                f.write('\n')

        print("ONE CHECK")
        input_file_path = process_json('nodes_with_embeddings.jsonl')
        input_jsonl_file_path = input_file_path
        output_jsonl_file_path = 'JSON_DATA/processed_data.jsonl'
        id_mapping , num_id_mapping = generate_id_mapping_from_jsonl(input_jsonl_file_path)
        update_jsonl_with_id_mapping(input_jsonl_file_path, output_jsonl_file_path, id_mapping , num_id_mapping)

    async def collapsed_retrieval(self, query_str: str) -> Response:
        """Query the index as a collapsed tree -- i.e. a single pool of nodes."""
        return self.client(query = query_str , k = 3)

    async def tree_traversal_retrieval(self, query_str: str) -> Response:
        """Query the index as a tree, traversing the tree from the top down."""
        parent_ids = None
        nodes = []
        level = self.tree_depth - 1
        while level >= 0:
            if parent_ids is None:
                level2 = str(level)
                nodes = self.client(query_str , k = 3 , metadata_filter = f"level == `{level2}`")
                if self._verbose:
                    print(f"Retrieved {len(nodes)} from level {level}.")
                parent_ids = [node['metadata']['id_'] for node in nodes]
                if self._verbose:
                    print(f"Retrieved parent IDs from level {level}: {parent_ids!s}")
            elif parent_ids:
                nested_nodes = []
                for id_ in parent_ids:
                  print(type(id_))
                  id2 = str(id_)
                  print(id2)
                  docs = self.client(query = query_str , k = 3 , metadata_filter = f"parent_id == `{id2}`")
                  nested_nodes.append(docs)
                nodes = [node for nested in nested_nodes for node in nested]
                if self._verbose:
                    print(f"Retrieved {len(nodes)} from parents at level {level}.")
                level -= 1
                parent_ids = None

        return nodes

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Retrieve nodes given query and mode."""
        # not used, needed for type checking

    def retrieve(
        self, query_str_or_bundle: QueryType, mode: Optional[QueryModes] = None
    ) -> List[NodeWithScore]:
        """Retrieve nodes given query and mode."""
        if isinstance(query_str_or_bundle, QueryBundle):
            query_str = query_str_or_bundle.query_str
        else:
            query_str = query_str_or_bundle

        return asyncio.run(self.aretrieve(query_str, mode or self.mode))

    async def aretrieve(
        self, query_str_or_bundle: QueryType, mode: Optional[QueryModes] = None
    ) -> List[NodeWithScore]:
        """Retrieve nodes given query and mode."""
        if isinstance(query_str_or_bundle, QueryBundle):
            query_str = query_str_or_bundle.query_str
        else:
            query_str = query_str_or_bundle

        mode = mode or self.mode
        if mode == "tree_traversal":
            return await self.tree_traversal_retrieval(query_str)
        elif mode == "collapsed":
            return await self.collapsed_retrieval(query_str)
        else:
            raise ValueError(f"Invalid mode: {mode}")

    def persist(self, persist_dir: str) -> None:
        self.index.storage_context.persist(persist_dir=persist_dir)

    @classmethod
    def from_persist_dir(
        cls: "RaptorRetriever",
        persist_dir: str,
        embed_model: Optional[BaseEmbedding] = None,
        **kwargs: Any,
    ) -> "RaptorRetriever":
        storage_context = StorageContext.from_defaults(persist_dir=persist_dir)
        return cls(
            [],
            existing_index=load_index_from_storage(
                storage_context, embed_model=embed_model
            ),
            **kwargs,
        )

class RaptorPack(BaseLlamaPack):
    """Raptor pack."""

    def __init__(
        self,
        documents: List[BaseNode],
        llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        vector_store: Optional[BasePydanticVectorStore] = None,
        similarity_top_k: int = 2,
        mode: QueryModes = "collapsed",
        verbose: bool = True,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        self.retriever = PathwayRaptorRetriever(
            documents,
            embed_model=embed_model,
            llm=llm,
            similarity_top_k=similarity_top_k,
            vector_store=vector_store,
            mode=mode,
            verbose=verbose,
            **kwargs,
        )

    def get_modules(self) -> Dict[str, Any]:
        """Get modules."""
        return {
            "retriever": self.retriever,
        }

    def run(
        self,
        query: str,
        mode: Optional[QueryModes] = None,
    ) -> Any:
        """Run the pipeline."""
        return self.retriever.retrieve(query, mode=mode)


##### Retriever

In [None]:
# Jina Embedder
import os
import fitz
import json
import time
import numpy as np
import sys
import nest_asyncio
from PyPDF2 import PdfReader, PdfWriter
from pathway.xpacks.llm.embedders import BaseEmbedder # type: ignore
from llama_index.embeddings.jinaai import JinaEmbedding
from llama_index.core import Document, VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter
from unstructured_client.models import operations, shared
from pathway.xpacks.llm.vector_store import VectorStoreClient, VectorStoreServer # type: ignore

class JinaEmbedder(BaseEmbedder):
    """
    Pathway wrapper for Jina's embedding API.

    Args:
        model: model name or path for the Jina embedding model.
        api_key: optional API key for authentication.
        call_kwargs: additional arguments to pass during each embedding call.
        task: task to perform with the embedding model (e.g., 'retrieval.passage').
        **sentencetransformer_kwargs: kwargs for initializing the SentenceTransformers (if needed).

    Example:

    >>> import pathway as pw
    >>> from pathway.xpacks.llm import embedders
    >>> embedder = embedders.JinaEmbedder(model="your-model-name", api_key="your-api-key")
    >>> t = pw.debug.table_from_markdown('''txt Text''')
    >>> t.select(ret=embedder(pw.this.txt))
    <pathway.Table schema={'ret': numpy.ndarray[typing.Any, numpy.dtype[typing.Any]]}>
    """

    def __init__(
        self,
        model: str,
        api_key: str = None,
        call_kwargs: dict = {},
        task: str = "retrieval.passage",
        **sentencetransformer_kwargs
    ):
        super().__init__()
        self.model = JinaEmbedding(
            api_key=api_key, model=model, task=task, **sentencetransformer_kwargs
        )
        self.call_kwargs = call_kwargs  # Store additional arguments for calls

    def __wrapped__(self, input: str, **kwargs) -> np.ndarray:
        """
        Embed the input text using the Jina model.

        Args:
            - input (str): The text to embed.
            - kwargs: Optional additional keyword arguments for fine-tuning the embedding call.

        Returns:
            - np.ndarray: The embedding result as a NumPy array.
        """
        kwargs = {**self.call_kwargs, **kwargs}
        embedding = self.model.get_text_embedding(input, **kwargs)

        # Return the embedding as a numpy array
        return np.array(embedding)

def table_summary(html_code):
    prompt_text = f"""You are an assistant tasked with summarizing tables for retrieval. \
    These summaries will be embedded and used to retrieve the raw table elements. \
    You will be given with html code of table, you have to return concise summary of table (without lossing any information , including numerical), well optimized for retrieval. Table:{html_code} Summary:"""

    summary = client_table.chat.completions.create(
        model="llama-3.1-70b-versatile",  # Specify the model you want to use
        messages=[
            {
                "role": "user",
                "content": prompt_text
            }
        ],
        temperature=0.3,         # Control randomness
        top_p=0.9,               # Sampling control for nucleus sampling
        stream=False             # Change to True if you want streaming responses
    )
    return summary.choices[0].message.content

def image_summary(image_content):
    image_summary = client_table.chat.completions.create(
          messages=[
              {
                  "role": "user",
                  "content": [
                      {"type": "text", "text": "Give the detailed summary of the given image , without loosing any information"},
                      {
                          "type": "image_url",
                          "image_url": {
                              "url": f'data:image/jpeg;base64,{image_content}',
                          },
                      },
                  ],
              }
          ],
          temperature = 0.7,
          model="llama-3.2-90b-vision-preview",
      )
    return image_summary.choices[0].message.content

def retriever(path, query, top_k):
  """
    Retrieve and process relevant pages from a PDF document based on a semantic query.

    This function extracts text and metadata from a PDF document, constructs a FAISS-based vector index for semantic search, and processes retrieved content. It returns a `RaptorRetriever` object for performing advanced semantic queries.

    Args:
        path (str): File path to the input PDF document.
        query (str): Semantic query to search within the document.
        top_k (int): Number of top-k most relevant pages to retrieve.

    Returns:
        RaptorRetriever: A query engine equipped to perform semantic search over the document.

    Raises:
        FileNotFoundError: If the specified PDF file does not exist.
        ValueError: If the query is empty or top_k is not a positive integer.

    Example:
        >>> query_engine = retriever('document.pdf', 'research methodology', top_k=3)
        >>> response = query_engine.query("Summarize the key findings")
  """
  print("input")
  pdf_document = fitz.open(path)

  metadata = []
  for page_num in range(len(pdf_document)):
      page_metadata = {"data": "", "page_number" : page_num + 1}
      page = pdf_document[page_num]

      page_text = page.get_text()
      page_metadata["data"] = page_text

      metadata.append(page_metadata)

  pdf_document.close()
  jina_output_folder = "./JINA"
  metadata_file_path = os.path.join(jina_output_folder, "pdf_metadata.jsonl")
  with open(metadata_file_path, "w", encoding="utf-8") as json_file:
      for row in metadata:
          json.dump(row, json_file)
          json_file.write("\n")

  json_file.close()
  PATHWAY_PORT = 8000
  client = VectorStoreClient(
    host="0.0.0.0",
    port=PATHWAY_PORT,
    timeout=800
  )
  print("query", query , top_k)
  docs = client(query , k = top_k)
  print("docs" , docs)
  gt_num = []
  for doc in docs:
    gt_num.append(doc['metadata']['page_number'])
  print(gt_num)

  writer = PdfWriter()
  input_pdf  = PdfReader(path)
  for i in gt_num:
    writer.add_page(input_pdf.pages[i-1])

  batch_filename = 'data/elements.pdf'
  with open(batch_filename, 'wb') as output_file:
      writer.write(output_file)

  with open(batch_filename, "rb") as f:
    data = f.read()
    req = operations.PartitionRequest(partition_parameters=shared.PartitionParameters(files=shared.Files( content=data, file_name=batch_filename) , strategy=shared.Strategy.HI_RES, languages=['eng'], extract_image_block_types=["Image"]))
    try:
      res = client_unstructured.general.partition(request=req)
    except Exception as e:
      print(e)
  print("res", res)
  page_metadata={}
  for element in res.elements:

      page_num = element["metadata"]["page_number"]
      if page_metadata.get(f"page_{page_num}", None) is None:
          page_metadata[f"page_{page_num}"] = ""

      if element['type'] == 'Table':
          html = element["metadata"]["text_as_html"]
          text = table_summary(html)
          time.sleep(1)
          page_metadata[f"page_{page_num}"] += f" \n{text}\n"

      elif element['type'] == 'Title':
          text = element["text"]
          page_metadata[f"page_{page_num}"] += f"\n{text}\n"
      elif element['type'] == 'Image':
          base64_reper = element["metadata"]["image_base64"]
          text = image_summary(base64_reper)
          time.sleep(1)
          page_metadata[f"page_{page_num}"] += f" \n{text}\n"

      else :
          text = element["text"]

          page_metadata[f"page_{page_num}"] += f"  {text}"
  docs = [Document(text=content, metadata={"page_number": page_num})
        for page_num, content in page_metadata.items()]
  retriever = PathwayRaptorRetriever(docs,embed_model=text_embed_model,
        llm=llm, similarity_top_k=1,mode="collapsed",
        transformations=[SentenceSplitter(chunk_size=1000, chunk_overlap=200)],)

  return retriever, gt_num

In [None]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
import asyncio
try:
    asyncio.get_event_loop()
except RuntimeError:
    asyncio.set_event_loop(asyncio.new_event_loop())

In [None]:
if __name__ == "__main__" :
    retriever_, gt_num = retriever(r"/temp.pdf", "What is the Pathway?", 3)
    result = retriever_.retrieve("What is Pathway?")
    print(result)

input
query What is the Pathway? 3




docs [{'text': '', 'metadata': {'created_at': 1742186101, 'modified_at': 1742190845, 'owner': 'root', 'path': '/content/JINA/pdf_metadata.jsonl', 'size': 12051, 'seen_at': 1742190845, 'page_number': 1}, 'dist': 0.23070713877677915}, {'text': 'Introduction\nAbout the Company: Pathway Technology Inc. (pathway.com) is the\nmaker of the world’s fastest global data processing engine (GitHub).\nWith offices in the US, France, and Poland, their ~25-member team\nhas deep expertise from top AI labs like Microsoft Research, Google\nBrain, and ETH Zurich. Many of their members have worked at Google\nand hold degrees from prestigious institutions like École\nPolytechnique, UC Berkeley, CNRS, and HEC Paris—one even earned\na PhD at just 20. their CTO has co-authored notable works with AI\npioneers Geoffrey Hinton and Yoshua Bengio. Their leadership\nincludes the co-founder of Spoj.com (one of the earliest CP platforms\nwith over 1M developers) and NK.pl (Poland’s first social media with\n13.5M+ use

#### Interleaving

In [None]:
import sys
import os
from pathway.xpacks.llm import prompts  # type: ignore
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer  # type: ignore
from unstructured_client.models import operations, shared # type: ignore
from PyPDF2 import PdfReader, PdfWriter # type: ignore
import pathway as pw # type: ignore
import re
import fitz
import faiss
from llama_index.core import Document
from io import BytesIO
import numpy as np
import pandas as pd
import requests # type: ignore
import datetime
import random
import json
import time

host_url = "http://0.0.0.0:8001"

def retrieve( query: str, k: int = 3, metadata_filter: str | None = None, filepath_globpattern: str | None = None,) -> list[dict]:
        """
        Perform a query to the vector store and fetch results.

        Args:
            query:
            k: number of documents to be returned
            metadata_filter: optional string representing the metadata filtering query
                in the JMESPath format. The search will happen only for documents
                satisfying this filtering.
            filepath_globpattern: optional glob pattern specifying which documents
                will be searched for this query.
        """

        data = {"query": query, "k": k}
        if metadata_filter is not None:
            data["metadata_filter"] = metadata_filter
        if filepath_globpattern is not None:
            data["filepath_globpattern"] = filepath_globpattern
        url = host_url + "/v1/retrieve"
        response = requests.post(
            url,
            data=json.dumps(data),
            headers=_get_request_headers(),
            timeout=60,
        )

        responses = response.json()
        return sorted(responses, key=lambda x: x["dist"])

# On top there will we a question and its answer and all other thing will be in collapse mode
# Tool call will show its tool name, arguments, reasoning
# For critic agent there will be like in agent called , if score is 1 then show red like danger otherwise , just show normally
# For critic agent agent in collapse model there will be silent reflextion with green color
# None tool be showed with different color, and reasoning will be showed on top
# in tool calling , there will be a tool call and collapse for its internal processing [this will for onyl rag]
# In retrieval there is one more collapse model for seeing utility question generation , and chunks retrieved from memory cache

class InterleavedRAGQuestionAnswerer(BaseRAGQuestionAnswerer):

    '''
    The InterleavedRAGQuestionAnswerer is an advanced Retrieval-Augmented Generation (RAG) system designed for dynamic question-answering using Pathway’s reactive processing framework.
    It leverages interleaved reasoning and retrieval to iteratively refine answers. This integration allows for scalable, distributed, and efficient query processing.
    '''

    def __init__( self, default_llm_name: str | None = None, short_prompt_template: pw.UDF = prompts.prompt_short_qa,
                 long_prompt_template: pw.UDF = prompts.prompt_qa, summarize_template: pw.UDF = prompts.prompt_summarize,
                 strict_prompt: bool = False, interleave_depth: int = 8, host_url : str = host_url, llm=chat_llm1, embedding_dim=1024,
                 thought_agent_prompt=thought_agent_prompt,reasoning_agent_prompt=reasoning_agent_prompt,
                 max_steps=15, similarity_threshold=0.8, path = None) -> None:
        '''
        Initialize the InterleavedRAGQuestionAnswerer with Pathway’s computational graph integration.

        Args:
            llm – Language model integrated with Pathway for reactive computation.
            default_llm_name (str) – Default model name for Pathway-based inference (optional).
            short_prompt_template (pw.UDF) – Pathway User-Defined Function for handling short queries (default: prompts.prompt_short_qa).
            long_prompt_template (pw.UDF) – Pathway UDF for complex queries (default: prompts.prompt_qa).
            summarize_template (pw.UDF) – Pathway UDF for summarizing results (default: prompts.prompt_summarize).
            strict_prompt (bool) – Flag indicating whether to enforce strict prompt handling (default: False).
            interleave_depth (int) – Maximum iterations for interleaved reasoning (default: 3).
        '''
        self.llm = llm
        self.llm2 = chat_llm1
        self.strict_prompt = strict_prompt
        self.interleave_depth = interleave_depth
        self.page_num = []
        self.host_url = host_url
        self.embedding_dim = embedding_dim
        self.cache_index = DynamicCacheIndex(dim=embedding_dim, batch_size=16)
        self.thought_agent_prompt = thought_agent_prompt
        self.reasoning_agent_prompt = reasoning_agent_prompt
        self.max_steps = max_steps
        self.page_num = []
        self.retriever = None
        self.jargons = []
        self.reavaluate = False
        self.clarification  = ""
        self.feedback = ""
        self.path = path
        self.similarity_threshold = similarity_threshold
        self.utility_query_generator = UtilityQueryGenerator(llm=chat_llm1, embedding_model=text_embed_model, similarity_threshold=0.8)
        self.previous_queries = {}
        self.__reset_agent()
        self.question = ""
        self.agent_input = ""
        self.text_embed_model = text_embed_model

    def check_memory_and_retrieve(self, query):
        """
        Check if a query exists in memory and retrieve the best match based on similarity.

        Args:
            query (str): The query string to search in memory.

        Returns:
            str: The best matching chunk from memory, or None if no match is found.
        """
        try:
            if not query:
                return None

            self.previous_queries[query] = self.previous_queries.get(query, 0) + 1

            if self.previous_queries[query] > 2:
                return "FORCE_REASONING"

            query_embedding = self.get_embedding(query)
            if query_embedding is None:
                return None

            results = self.cache_index.search(query_embedding, k=5)
            if not results:
                return None

            best_match = None
            best_distance = float('inf')

            MAX_DISTANCE = 0.3

            for id, distance, metadata in results:
                if distance < self.similarity_threshold and distance < best_distance and distance < MAX_DISTANCE:
                    chunk = metadata.get('chunk', '')
                    if chunk:
                        best_match = chunk
                        best_distance = distance
            if best_match:
                return best_match

            return None

        except Exception:
            return None

    def check_memory_and_retrieve_for_supervisor(self, query):
        """
        Retrieve the best match for a query from memory for supervisory tasks.

        Args:
            query (str): The query string to search in memory.

        Returns:
            str: The best matching chunk, or None if no match is found.
        """
        try:
            if not query:
                return None

            query_embedding = self.get_embedding(query)
            if query_embedding is None:
                return None

            results = self.cache_index.search(query_embedding, k=5)
            if not results:
                return None

            best_match = None
            best_distance = float('inf')

            for id, distance, metadata in results:
                if distance < self.similarity_threshold and distance < best_distance:
                    chunk = metadata.get('chunk', '')
                    if chunk:
                        best_match = chunk
                        best_distance = distance
            if best_match:
                return best_match

            return None

        except Exception as e:
            print(f"Error in memory check: {e}")
            return None

    def add_to_memory(self, query, chunk, query_type='original',
                      original_query=None, metadata=None):
        """
        Add a query and its associated chunk to memory with metadata.

        Args:
            query (str): The query string.
            chunk (str): The chunk of data to store.
            query_type (str, optional): The type of query (e.g., 'original', 'retrieval'). Default is 'original'.
            original_query (str, optional): The original query string.
            metadata (dict, optional): Additional metadata to associate with the chunk.

        Returns:
            bool: True if the chunk was successfully added, False otherwise.
        """
        try:
            full_metadata = {
                'query': query,
                'query_type': query_type,
                'original_query': original_query or query,
                'chunk': chunk,
                'timestamp': datetime.now().isoformat()
            }

            if metadata:
                full_metadata.update(metadata)

            chunk_id = self.cache_index.add_chunk(chunk, full_metadata)

            if chunk_id is not None:
                return True
            else:
                return False

        except Exception:
            return False

    def get_existing_graph_queries(self):
        """
        Retrieve all existing graph queries from the memory cache.

        Returns:
            list: A list of query strings stored in memory.
        """
        return [
            metadata.get('query', '')
            for metadata in self.cache_index.metadata.values()
            if 'query' in metadata
        ]

    def generate_utility_queries(self, chunk, max_queries,existing_graph_queries):
        """
        Generate utility queries for a given data chunk.

        Args:
            chunk (str): The data chunk for generating queries.
            max_queries (int): Maximum number of queries to generate.
            existing_graph_queries (list): List of existing graph queries to avoid duplicates.

        Returns:
            list: Generated utility queries.
        """
        return self.utility_query_generator.generate_queries(chunk,max_queries,existing_graph_queries)

    def get_embedding(self, text):
        """
        Generate an embedding vector for a given text.

        Args:
            text (str): The input text for which to generate an embedding.

        Returns:
            np.ndarray: The generated embedding vector, or None if an error occurs.
        """
        try:
            embedding = self.cache_index.text_embed_model.get_text_embedding(text)
            if isinstance(embedding, list):
                embedding = np.array(embedding)

            # Ensure embedding matches cache index dimension
            if embedding.shape[0] != self.embedding_dim:
                print(f"Embedding dimension mismatch. Expected {self.embedding_dim}, got {embedding.shape[0]}")

                # Truncate or pad to match expected dimension
                if embedding.shape[0] > self.embedding_dim:
                    embedding = embedding[:self.embedding_dim]
                else:
                    embedding = np.pad(embedding, (0, self.embedding_dim - embedding.shape[0]), mode='constant')

            return embedding

        except Exception as e:
            print(f"Error generating embedding: {e}")
            return None

    def print_memory_metadata(self):
      """
      Print metadata for all chunks in the memory cache
      """

      if not hasattr(self, 'cache_index'):
          print("No cache index found.")
          return

      if not self.cache_index.metadata:
          print("Memory cache is empty.")
          return

      for idx, (chunk_id, metadata) in enumerate(self.cache_index.metadata.items(), 1):
          print(f"\n--- Memory Entry {idx} ---")
          print(f"Chunk ID: {chunk_id}")

          for key, value in metadata.items():
              print(f"{key}: {value}")

          chunk = metadata.get('chunk', 'No chunk text')
          print(f"Chunk Preview: {chunk[:200]}..." if len(chunk) > 200 else f"Chunk: {chunk}")

      print(f"\nTotal memory entries: {len(self.cache_index.metadata)}")

    def check_query_in_memory(self, query, threshold=0.95):
        """
        Check if a query exists in memory using similarity scores.

        Args:
            query (str): The query to check.
            threshold (float, optional): Similarity threshold. Default is 0.9.

        Returns:
            bool: True if a similar query exists, False otherwise.
        """
        try:
            query_embedding = self.text_embed_model.get_text_embedding(str(query))

            for metadata in self.cache_index.metadata.values():
                cached_query = str(metadata.get('query', ''))

                cached_query_embedding = self.text_embed_model.get_text_embedding(cached_query)

                similarity = np.dot(query_embedding, cached_query_embedding) / (
                    np.linalg.norm(query_embedding) * np.linalg.norm(cached_query_embedding)
                )

                if similarity >= threshold:
                    return True

            return False

        except Exception as e:
            print(f"Error checking query similarity: {e}")
            return False

    def answer_query(self, pw_ai_queries: pw.Table) -> pw.Table:
        """
        Create RAG response with adaptive retrieval, incorporating interleaving.

        Parameters:
            pw_ai_queries (pw.Table) – Input query table for reactive computation.

        Returns:
            Transformed table containing computed query responses.

        Return type:
            pw.Table
        """

        table_str = capture_output(pw_ai_queries)
        lines = table_str.strip().split('\n')
        data_line = lines[-1]
        # Split by '|' and take the first part (question column)
        prompt = data_line.split('|')[0].strip()

        self.answer_interleaved_query(prompt)
        x = { 'prompt' :  prompt, 'result' : self.answer}
        df = pd.DataFrame([x], index = [0])
        t = pw.debug.table_from_pandas(df)
        jargons = self.jargon_check(prompt)
        clarified_jargons = []
        if jargons != "None":
            l1 = eval(jargons)
            clarified_jargons = []
            for i in l1:
                clarified_jargons.append(i)
        print(jargons)
        print(clarified_jargons)
        return t, jargons

    def run(self, question, reset = None) :

        if reset:
            self.__reset_agent()

        if self.reavaluate :
            self.step_n = 1
            self.reavaluate = False
            self.agent_input = '\n'.join(self.agent_input.split('\n')[:-2])
            enhanced_query = self.rephrase(self.question, self.clarification)
            self.question = enhanced_query +  "Feedback :- " + str(self.feedback)
            print(f"Enhanced Query :- {self.question}")
            self.retrieve_docs(self.question, 2)
        else:
            self.__reset_agent()
            self.jargons = []
            self.question = question
        self.answer = None
        self.finished = False

        self.answer_interleaved_query(self.question)
        jargons = self.jargon_check(self.question)

        fl = input("Do you want to reavaluate the answer? (yes/no)").lower()
        while fl not in ['yes', 'no']:
            fl = input("Do you want to reavaluate the answer? (yes/no)").lower()

        if fl == 'yes':
            self.reavaluate = True
            self.feedback = input("Please provide feedback for the answer.")
            clarified_jargons = []
            if jargons != "None":
                l1 = eval(jargons)
                clarified_jargons = []
                for i in l1:
                    cl = input("Enter clarification for " + i + " : ")
                    if cl :
                        clarified_jargons.append({cl, i})
            self.clarification = clarified_jargons
            self.run(question, reset = False)
            return self.answer, clarified_jargons
        else:
            self.reavaluate = False
            self.feedback = ""
            self.clarification = ""
            return self.answer, ""

    def answer_interleaved_query( self,prompt : str,) -> str:
        """
        Execute interleaved reasoning through Pathway’s streaming computation framework.

        Parameters:
            prompt (str) – Query processed using Pathway’s reactive graph.

        Returns:
            Final answer generated via iterative retrieval and reasoning.

        Return type:
            str
        """

        self.agent_input = ""
        finished = False
        step_n = 0
        answer = None

        while not finished and step_n < self.interleave_depth:
            step_n += 1
            print(f"\nStep {step_n} of interleaving retrieval and reasoning")

            thought_prompt = thought_agent_prompt.format( question=prompt, agent_input=self.agent_input)
            thought_response = self.llm2.invoke(thought_prompt).content
            # thought_response = get_pathway_answer(self.llm, thought_prompt , model = "llama3-70b-8192").strip()
            thought_response = self.parse_llm_response(thought_response, None)
            self.agent_input +=" " + str(thought_response)
            thought_response = self.agent_input.split('\n')[-1]
            print("-------")
            print("THOUGHT")
            print(thought_response)
            print("-------")

            if thought_response is None:
                continue

            if "RETRIEVAL" in thought_response:
                query = thought_response[20:]

                print(f"Processing Retrieval Query: {query}")
                memory_result = self.check_memory_and_retrieve(query)

                if memory_result == "FORCE_REASONING":
                    reasoning_response = self._perform_reasoning_step(query, force_completion=True)
                    if reasoning_response is None:
                        pass
                    reasoning_response = "FINAL ANSWER:" + reasoning_response
                    print("Forced Reasoning")  #emit
                    self.agent_input += reasoning_response
                    self.answer = reasoning_response
                    self.finished = True
                    final_answer = self.answer.replace("FINAL ANSWER:", "").strip()
                    self.answer = final_answer
                    return
                elif memory_result:
                    print(f"Memory Result Found: {memory_result}")   #emit, but memory_Result show nhi krna hai
                    self.agent_input += f'\nOBSERVATION: {memory_result}'
                    return
                try:
                    retrieved_chunks = self.pathway_retrieve(query)
                    if not retrieved_chunks:
                        return

                    existing_graph_queries = self.get_existing_graph_queries()
                    for i, chunk in enumerate(retrieved_chunks, 1):
                        if not chunk['text']:
                            continue
                        print('--------------')
                        print("Chunk Text")
                        print(chunk['text'])
                        print('--------------')
                        chunk_result = pw_ai_answer(prompt = chunk['text'])
                        self.add_to_memory( query=query, chunk=str(chunk['text']), query_type='retrieval', metadata={'chunk_index': i, 'summarized_chunk_text': chunk_result})

                        # Generate utility queries
                        try:
                            utility_queries = self.utility_query_generator.generate_queries(
                                chunk=str(chunk_result), max_queries=2, existing_graph_queries=existing_graph_queries)
                            print("Utility Queries")
                            print(utility_queries) ## emit
                            for utility_query in utility_queries:
                                if utility_query and utility_query != query:
                                    self.add_to_memory( query=utility_query, chunk=str(chunk_result), original_query=query, query_type='utility')
                                    existing_graph_queries.append(utility_query)

                        except Exception as e:
                            print(f"Error generating utility queries: {e}")

                        self.agent_input += f'\nOBSERVATION: {chunk_result}'
                except Exception as e:
                    print(f"Error in retrieval for query {query}: {e}")
                    import traceback
                    traceback.print_exc()

                query_result = self._perform_retrieval_step( thought_response[20:])
                self.agent_input += f"\nOBSERVATION: {query_result}"
                print(f"RAG ACTION: {query_result}")   # emit

            elif "REASONING" in thought_response:
                reasoning_response = self._perform_reasoning_step(thought_response[20:]).strip()
                self.agent_input += reasoning_response
                print("-------")
                print("REASONING")
                print(reasoning_response)
                print("-------")
                if "FINAL ANSWER" in reasoning_response:
                  self.answer = reasoning_response.split("FINAL ANSWER:")[-1].split("FINAL ANSWER")[-1]
                  final_answer = self.answer.replace("FINAL ANSWER:", "").strip()
                  self.answer = final_answer
                  self.finished=True
                  return
                else:
                  print(f"REASONING: {reasoning_response}")


            if "FINAL ANSWER" in thought_response:
                self.answer = thought_response
                final_answer = self.answer.replace("FINAL ANSWER:", "").strip()
                self.answer = final_answer
                self.finished = True

            if self.answer :
                return

        if not self.answer:
            self.answer = "Max Iteration reached without final answer"
            print("Max iterations or steps reached without a final answer.")
        return

    def parse_llm_response(self, response, expression):
        parsed_response = None
        if "FINAL ANSWER" in response:
            return response
        elif expression is None:
            if "RETRIEVAL THOUGHT" in response:
                expression = 'RETRIEVAL THOUGHT'
                self.agent_input += f'\n{expression}'
                pattern = re.compile(f"{expression}\s*(.*)")
                matches = pattern.findall(response)
                parsed_response = matches[-1] if matches else None
            elif "REASONING THOUGHT" in response:
                expression = 'REASONING THOUGHT'
                self.agent_input += f'\n{expression}'
                pattern = re.compile(f"{expression}\s*(.*)")
                matches = pattern.findall(response)
                parsed_response = matches[-1] if matches else None
        elif "REASONING" in expression:
            self.agent_input += f'\n{expression}'
            pattern = re.compile(r"REASONING\s*(.*)", re.DOTALL)
            matches = pattern.findall(response)
            parsed_response = matches[-1] if matches else None
        else:
            print(f'\nIncorrect {expression} {response}\n')

        if parsed_response is None:
            print(f'\nIncorrect {expression}\n')
        return parsed_response

    def _perform_retrieval_step( self, query : str) -> str:
        """
        Conduct a retrieval step using Pathway’s efficient data lookup mechanisms.

        Parameters:
            agent_input (str) – Context processed in Pathway’s stateful computation.
            query (str) – Retrieval query transformed within the reactive pipeline.

        Returns:
            Retrieved information from Pathway-managed sources.

        Return type:
            str
        """
        query_result = pw_ai_answer(prompt=query)
        a = str(query_result)
        return a

    def _perform_reasoning_step(self, query:str, force_completion:bool = False) -> str:
        """
        Execute a reasoning step via Pathway’s distributed computation model.

        Parameters:
            agent_input (str) – Accumulated context in Pathway’s computation graph.
            query (str) – Reasoning query processed reactively.

        Returns:
            Reasoning response generated through iterative streaming.

        Return type:
            str
        """
        print("Enter Reasoning")
        if force_completion:
            reasoning_agent_prompt = (self.reasoning_agent_prompt + "\nNote: Please provide a final answer based on all information gathered so far.")
        else:
            reasoning_agent_prompt = self.reasoning_agent_prompt

        reasoning_prompt = reasoning_agent_prompt.format(question=query, agent_input=self.agent_input)
        reasoning_response = self.llm2.invoke(reasoning_prompt).content
        # reasoning_response = get_pathway_answer(self.llm, reasoning_prompt)
        if force_completion and "FINAL ANSWER" not in reasoning_response:
                reasoning_response = f"FINAL ANSWER: {reasoning_response}"
        return self.parse_llm_response(reasoning_response, "REASONING")

    def jargon_check(self, query):
        """
        Identifies jargon terms in the user's query.

        Args:
            query (str): The query to be analyzed for jargon.

        Returns:
            str: A list of identified jargon terms or "None" if no jargon is found.
        """
        jargon_prompt_message = jargon_prompt.format_messages(query = query, prev_jargons = self.jargons)
        jargon = self.llm2.invoke(jargon_prompt_message).content
        return jargon

    def rephrase(self, query , jargons):
        """
        Rephrases the user's query by defining and explaining jargon terms.

        Args:
            query (str): The original query that may contain jargon.
            jargons (str): A list or string of jargon terms identified in the query.

        Returns:
            str: The rephrased query with jargon terms defined.
        """
        rephrase = rephrase_prompt.format_messages(query = query, jargons = jargons)
        rephrase = self.llm2.invoke(rephrase).content
        return rephrase

    def get_random_questions_from_metadata(self):
        """
        Retrieve random follow-up query suggestions from the memory cache.

        Args:
            None

        Returns:
            str: A string of randomly selected follow-up query suggestions, formatted with numbered list items.
        """
        if not self.cache_index.metadata:
            print("Memory cache is empty.")
            return ""

        queries = []
        for metadata in self.cache_index.metadata.values():
            if 'query' in metadata:
                query = metadata['query']
                if isinstance(query, list):
                    queries.extend(query)
                else:
                    queries.append(query)

        if len(queries) < 3:
            random_questions = queries
        else:
            random_questions = random.sample(queries, 3)

        return '\n'.join([f"{idx + 1}. {question}" for idx, question in enumerate(random_questions)])

    def retrieve_docs(self, query, top_k):
      pdf_document = fitz.open(self.path)
      metadata = {}
      for page_num in range(len(pdf_document)):
          page_metadata = {"text": ""}
          page = pdf_document[page_num]
          page_text = page.get_text()
          page_metadata["text"] = page_text
          metadata[f"page_{page_num + 1}"] = page_metadata
      pdf_document.close()

      dimension = 1024
      index = faiss.IndexFlatL2(dimension)
      faiss_metadata = []
      count = 1
      for page, content in metadata.items():
          page_text = content["text"]
          page_embedding = text_embed_model.get_text_embedding(page_text)
          page_embedding = np.array([page_embedding], dtype="float32")
          index.add(page_embedding)
          faiss_metadata.append({
              "page": page.split("_")[-1],
              "text": page_text})
          count += 1

      query = str(query)
      query_embedding = query_embed_model.get_query_embedding(query)
      query_embedding = np.array([query_embedding], dtype="float32")
      distances, indices = index.search(query_embedding, top_k)

      results = []
      for idx, distance in zip(indices[0], distances[0]):
          if idx == -1:
              continue
          result_metadata = faiss_metadata[idx]
          result_metadata["distance"] = distance
          results.append(result_metadata)

      gt_num=[]
      for result in results:
        num = int(re.search(r'\d+', result['page']).group())
        gt_num.append(num)

      #re-writing the pages to a pdf since it is the expected input format of Unstructured.

      writer = PdfWriter()
      input_pdf  = PdfReader(self.path)
      for i in gt_num:
        writer.add_page(input_pdf.pages[i-1])

      batch_filename = 'data/elements.pdf'
      with open(batch_filename, 'wb') as output_file:
        writer.write(output_file)

      with open(batch_filename, "rb") as f:
        data = f.read()
        req = operations.PartitionRequest(partition_parameters=shared.PartitionParameters(files=shared.Files( content=data, file_name=batch_filename) , strategy=shared.Strategy.HI_RES, languages=['eng'], extract_image_block_types=["Image"]))
        try:
            res = client_unstructured.general.partition(request=req)
        except Exception as e:
            print(e)

      page_metadata={}
      for element in res.elements:

          page_num = element["metadata"]["page_number"]
          if page_metadata.get(f"page_{page_num}", None) is None:
              page_metadata[f"page_{page_num}"] = ""

          if element['type'] == 'Table':
              html = element["metadata"]["text_as_html"]
              text = table_summary(html)
              time.sleep(2)
              page_metadata[f"page_{page_num}"] += f" \n{text}\n"

          elif element['type'] == 'Title':
              text = element["text"]
              page_metadata[f"page_{page_num}"] += f"\n{text}\n"
          else :
              text = element["text"]
              page_metadata[f"page_{page_num}"] += f"  {text}"

      docs = [Document(text=content, metadata={"page_number": page_num})
            for page_num, content in page_metadata.items()]
      self.retriever.insert(docs)
      return docs

    def pathway_retrieve(self,
        query: str,
        k: int = 3,
        metadata_filter: str | None = None,
        filepath_globpattern: str | None = None,
    ) -> list[dict]:
        """
        Perform a query to the vector store and fetch results.

        Args:
            query:
            k: number of documents to be returned
            metadata_filter: optional string representing the metadata filtering query
                in the JMESPath format. The search will happen only for documents
                satisfying this filtering.
            filepath_globpattern: optional glob pattern specifying which documents
                will be searched for this query.
        """

        data = {"query": query, "k": k}
        if metadata_filter is not None:
            data["metadata_filter"] = metadata_filter
        if filepath_globpattern is not None:
            data["filepath_globpattern"] = filepath_globpattern
        url = host_url + "/v1/retrieve"
        response = requests.post(
            url,
            data=json.dumps(data),
            headers=_get_request_headers(),
            timeout=60,
        )

        responses = response.json()
        return sorted(responses, key=lambda x: x["dist"])

    def __reset_agent(self):
        """
        Reset the agent's internal state, including input and step count.
        """
        self.step_n = 1
        self.answer = ''
        self.finished = False
        self.agent_input = ''
        self.previous_queries.clear()

In [None]:
if __name__ == "__main__":
    # Example sample query
    sample_query = "What is the Pathway?"
    groq_chat = GroqChat(
    model="llama-3.1-70b-versatile",
    capacity=1,api_key=rag_agent_api)
    # Run the query through the answer_query method
    rag_answerer = InterleavedRAGQuestionAnswerer(llm=groq_chat, path =r"./rag_agent/temp.pdf" )
    # retriever, gt_num = retriever(r"./rag_agent/temp.pdf", "What is the Pathway?", 2)
    rag_answerer.retriever = retriever_
    rag_answerer.page_num = gt_num
    # result = rag_answerer.answer_query(t1)
    result, jargons = rag_answerer.run(sample_query)
    print("Answer:", result)
    print("Jargons:", jargons)


Step 1 of interleaving retrieval and reasoning
-------
THOUGHT
RETRIEVAL THOUGHT : What is the definition of Pathway?
-------
Processing Retrieval Query: What is the definition of Pathway?
Index not created or empty
--------------
Chunk Text
The image presents a comprehensive overview of Pathway Technology Inc., the company behind the world's fastest global data processing engine, GitHub. The title "Introduction" is prominently displayed in white text at the top left corner, accompanied by a logo featuring a stylized "IT" and "Tech Meet 13.0" in smaller text.

**Company Overview**

The introduction provides an overview of Pathway Technology Inc., highlighting its expertise in AI labs like Microsoft Research, Google Brain, and ETH Zurich. The company has worked with numerous prestigious institutions, including Google and ETH Zurich, and has earned a PhD at just 20 years old. Their CTO has co-authored notable works with AI pioneers Geoffrey Hinton and Yoshua Bengio.

**Pathway Framework

### Dynamic Tool Generator Agent

In [None]:
import sys
from io import StringIO
import sys
import re
import subprocess
import traceback
import pickle


history = []
error_analysis=[]


class ModuleInstallError(Exception):
  """Custom exception for module installation issues."""
  pass

class ToolError(Exception):
  """Custom exception for incorrect tool calling issue"""
  pass


def identify_challenging_examples(task_description, input_prompt):
  '''
  Generate challenging examples for given task_description using LLM

  Args :-
    task_description(str, required) :

  Return :-
    list : List of all challenging examples
  '''
  prmpt = EDGE_CASE_GEN.format(task_description = task_description, instruction = input_prompt)
  challenging_examples = chat_llm.invoke(prmpt).content
  if challenging_examples.startswith("```python"):
    challenging_examples = challenging_examples[10:]
    challenging_examples = challenging_examples[:len(challenging_examples)-3]
  print(challenging_examples)
  l1 = eval(challenging_examples)
  return l1

def annotate_challenging_examples(examples, input_prompt):
  '''
  Generate python code for each given example using llm and input_prompt

  Args :
    examples(list[str]) : List of challenging examples
    input_prompt(str) : Initial prompt for code generation

  Return :
    list[dict] : List of dictionary object ,
                `{"question" : example, "code" : code}`
  '''

  annotated_examples = []
  for example in examples:
      prompt = input_prompt.format(query = example)
      annotation = chat_llm.invoke(prompt)
      annotated_examples.append({"question" : example, "code" : annotation.content})
  return annotated_examples

def annotate(annotations,  initial_prompt):
  """
  Analyze errors in generated code for each example and provide error details.

  Args:
    annotations (list[dict]): A list of dictionaries, where each dictionary represents a challenging example and its
                              corresponding generated code in the format:
                              {"question": challenging example, "code": code}.
    input_prompt (str): The initial prompt used for code generation.

  Returns:
    list[dict]: A list of dictionaries for challenging examples where the generated code contains errors, including
              error analysis and scoring in the format:
              {"question": challenging example, "code": code, "Score": score}.
  """
  annots=[]
  schema = [0,1,2,3,4,5]
  for annotation in annotations:
    a = annotation["code"]
    q = annotation["question"]
    print("Code")
    print(a)
    pattern = r'```python\n(.*?)\n```'
    matches = re.findall(pattern, a, re.DOTALL)
    parsed_response = matches[-1] if matches else None
    old_stdout = sys.stdout
    sys.stdout = StringIO()
    try :
      prev_model = " "
      while True :
            try:
              exec(parsed_response, globals())
              annot = sys.stdout.getvalue()
              break
            except ModuleNotFoundError as e:
              module_name = e.name
              if module_name == prev_model :
                raise ModuleInstallError(f"Installation failed for module '{{module_name}}': {{str(e)}}")
              prev_model = module_name
              subprocess.check_call([sys.executable, "-m", "pip", "install", module_name])
    except Exception as e:
      annot = traceback.format_exc()
    sys.stdout = old_stdout
    response = "Generated Code : " + parsed_response + "\n" + "Obtained output : " + str(annot)
    prmpt = RANKING_PROMPT.format(label_schema = schema, query = q , response =response, prompt = initial_prompt, code = a )
    annotat = chat_llm.invoke(prmpt).content
    prmpt_resp = eval(annotat)
    if prmpt_resp[0] < 4 :
        annots.append({"Query" : q, "Code": a, "Score" : annotat})
  return annots

def error_analysis_fun(input_prompt, annots):
  """
  Anlyze error in the code and given input_prompt for given challenging example using llm

  Args:
      input_prompt (str): prompt used for generating code for challenginf examples
      annots (list[dict]): A list of dictionaries for challenging examples where the generated code contains errors, including
                           error analysis and scoring in the format:
                           {"question": challenging example, "code": code, "Score": score}.

  Returns:
      str: Analysis of the error in the code and input_prompt
  """
  labels = [0,1,2,3,4,5]
  prmpt= ERROR_ANALYSIS.format(prompt=input_prompt, labels = labels, failure_cases = annots)
  history.append(input_prompt)
  analysis = chat_llm.invoke(prmpt).content
  error_analysis.append(analysis)
  return analysis

def calibrate_generation_prompt(input_prompt, history, error_analysis, task_desc, meta_prompt):
  """
  Refine the input prompt based on task details, history, and error analysis.

  Args:
      input_prompt (str): The prompt used for generating code for challenging examples.
      history (list[str]): A list of previously generated prompts.
      error_analysis (list[str]): A list of errors identified in the previously generated outputs.
      task_desc (str): A description of the task for which the code is being generated.
      meta_prompt (str): The base prompt used as a foundation for code generation.

  Returns:
      str: A refined prompt incorporating the task description, history, and error analysis.
  """
  prmpt = chat_llm.invoke(PROMPT_REFLEXTION.format(initial_prompt = input_prompt, history=history, error_analysis = error_analysis, task_description = task_desc, meta_prompt= meta_prompt))
  return prmpt.content

def autoprompt(task_description, num_iter):
  """
  Refine the base meta prompt iteratively for a given task description.

  Args:
      task_description (str): A brief description of the task for which the base prompt needs refinement.
      num_iter (int): The number of iterations to refine the base prompt.

  Returns:
      str: The refined prompt after the specified number of iterations.
  """
  global history , error_analysis
  history = []
  error_analysis=[]
  for i in range(num_iter):
    prompt = chat_llm.invoke(PROMPT_GENERATION_PROMPT.format(task_description = task_description , meta_prompt= META_PROMPT))
    challenging_examples = identify_challenging_examples(task_description, prompt.content)
    prompt = prompt.content + META_PROMPT_PART_2
    annotations = annotate_challenging_examples(challenging_examples, prompt)
    annots = annotate(annotations,  prompt)
    error_analysis_fun(prompt, annots)
    prompt = calibrate_generation_prompt(input_prompt= prompt, history = history, error_analysis = error_analysis, task_desc = task_description, meta_prompt = META_PROMPT + META_PROMPT_PART_2)
  return prompt

### Multi-Agent Framework

In [None]:
from dotenv import load_dotenv
from langchain.prompts import ChatPromptTemplate
import numpy as np
from io import StringIO
import os


def generate_agent_description(name, tool_desc, prompt):
    """
    Generate a function description as a string.

    Args:
        name (str): The name of the function.
        tool_desc (str): A description of what the function/tool does.
        prompt (str or ChatPromptTemplate): The prompt used by the function.

    Returns:
        str: A formatted function definition as a string.
    """
    # Ensure the prompt is properly formatted as a string
    if isinstance(prompt, ChatPromptTemplate):
        prompt_str = repr(prompt)  # Serialize ChatPromptTemplate as a string
    else:
        prompt_str = repr(prompt)
    str1 = f"""
def {name}(query):
    '''
    Name: {name}
    Description: {tool_desc}
    Args:
        query (str): The query to be passed to the agent .
                     Example: "Args"

    Returns:
        str: A string response
    '''
    from io import StringIO
    annotation = chat_llm.invoke({prompt_str}.format(query=query)).content
    pattern = r'```python\\n(.*?)\\n```'
    matches = re.findall(pattern, annotation, re.DOTALL)
    parsed_response = matches[-1] if matches else None
    print("parsed response")
    print(parsed_response)
    old_stdout = sys.stdout
    sys.stdout = StringIO()
    try:
        if parsed_response:
            exec(parsed_response)
            print(str(query) + "has been executed succefully." )
            annot = sys.stdout.getvalue()
        else:
            annot = "No valid Python code found in the response."
    except Exception as e:
        annot = traceback.format_exc()
    finally:
        sys.stdout = old_stdout
    return str(annot)
"""
    return str1

def generate_pdf_name(first_two_page_content):
    prompt_text = f"""You are an assistant tasked with generating title for give pdf content. \
    You are provided with the content of first two pages of the pdf content \
    You have to return short , concise title that is relevant to the pdf
    Title should not be long that 3-4 words
    Return only title
    Pdf Content:{first_two_page_content}
    Title:"""

    title = client_table.chat.completions.create(
        model="llama-3.1-70b-versatile",  # Specify the model you want to use
        messages=[
            {
                "role": "user",
                "content": prompt_text
            }
        ],
        temperature=0.7,         # Control randomness
        top_p=0.9,               # Sampling control for nucleus sampling
        stream=False             # Change to True if you want streaming responses
    )
    return title.choices[0].message.content

class AgentCode:
    def __init__(self, content):
        self.content = content

In [None]:
import re
from copy import deepcopy
import pathway as pw
import pandas as pd
import inspect
from llama_index.core.memory import VectorMemory # type: ignore
from llama_index.core.llms import ChatMessage # type: ignore
import traceback
from llama_index.embeddings.jinaai import JinaEmbedding # type: ignore
import sys
import requests
import fitz # type: ignore
import os
from io import BytesIO
from typing import Optional



out = sys.stdout
groq_chat = GroqChat( model="llama-3.1-70b-versatile", capacity=1,api_key=rag_agent_api)

class SUPERVISOR_AGENT:
    """
    Resolve queries using provided tools, their descriptions, a PDF file path, and a user query.

    Additionally, this function allows the user to ask follow-up questions.
    The supervisor agent can handle errors during code execution by utilizing mechanisms such as `api_reflection`, `code_reflection`, and `silent_reflection`.
    """

    def __init__(self, tools, tools_aux, llm, tool_map, path, rag_llm = groq_chat , reflextion_limit = 3, top_k = 2, max_steps = 10):

        self.tools = tools
        self.llm = llm
        self.reflexion_limit = reflextion_limit
        self.tool_map = tool_map
        self.tools_aux = tools_aux
        self.path = path
        self.top_k = top_k
        self.scratchpad = ""
        self.responses = []
        self.query = ""
        self.curr_tools = deepcopy(tools)
        self.curr_tools_aux = deepcopy(tools_aux)

        self.agent = InterleavedRAGQuestionAnswerer(llm=rag_llm, path=self.path)
        self.logs = []
        self.vector_memory = VectorMemory.from_defaults(vector_store=None,
          embed_model=JinaEmbedding(api_key=os.getenv('JINAAI_API_KEY'), model="jina-embeddings-v3", task="retrieval.passage",), retriever_kwargs={"similarity_top_k": 2},)

        print("Agent Initialized")

    def answer_query(self, pw_ai_queries: pw.Table) -> pw.Table:
        """Create RAG response with adaptive retrieval, incorporating interleaving."""

        a = capture_output(pw_ai_queries)
        print(pw.this.prompt)

        lines = a.split('\n')

        data_line = lines[1]
        columns = data_line.split('|')
        self.question = columns[0].strip()
        answer = self.run(self.question)

        x = {
            'prompt' : self.question,
            'result' : answer
        }

        df = pd.DataFrame([x], index = [0])
        t = pw.debug.table_from_pandas(df)
        return t

    def run(self, query,  follow_up_response = None):
      """
      Generate an answer to the given query based on the query itself and previous responses. Additionally, ask for follow-up question.

      Args:
          query (str): The question asked by the user.
          follow_up_response (list, optional): A list of previous responses for context. Defaults to None.

      Returns:
          str: The answer to the query.
      """

      self.scratchpad = ""
      flag = True
      if follow_up_response is None:
          print("Buliding Retriever")
          self.agent.retriever, self.agent.page_num = retriever(self.path, query, self.top_k)
          follow_up_response = []
          self.curr_tools = deepcopy(self.tools)
          self.curr_tools_aux = deepcopy(self.tools_aux)
          print("done")
      else:
          self.scratchpad = f"Information :- {follow_up_response}"

      self.query = query
      self.responses = follow_up_response
      self.query = query
      self.runs= 0

      print("Enhanced Query : " + self.query)

      while (flag or self.responses == [] or self.responses[-1] != "end"):
          flag = False
          agent_code, func_response = self.build_code()
          if agent_code == None and func_response == None:
              return None
          self.responses.append(func_response)
          self.scratchpad += '\n' + "Tool Call : " + str(agent_code) + "," + " Response : " + str(func_response)
          self.runs += 1

      prompt = final_response_prompt.format(query = self.query, code = self.scratchpad, responses = self.responses)
      final_response = get_pathway_answer(groq_chat, prompt)

      self.vector_memory.put(ChatMessage.from_str(final_response, "user"))
      print("FINAL ANSWER : ", final_response)

      feedback = input("That's that, so are there any follow up questions? ")
      if feedback.lower() == "no":
        return final_response
      else:
        query = input("Kindly enter the follow up question : ")
        facts = self.vector_memory.get(query)
        follow_responses = []
        for i in range(len(facts)):
          follow_responses.append(facts[i].content)
        sub_ans = self.run(query, follow_responses)
        return sub_ans

    def api_reflexion(self, agent_code):
        if(agent_code == "NONE" or self.curr_tools==[]):
          while True:
            user_input = input("Would you like to provide Python code with args in the docstring? (yes/no): ").strip().lower()
            if user_input == "yes":
                print("Please provide the proper Python code with arguments described in the docstring:")
                user_code = input("Enter your Python code:\n")
                try:
                    function_name = re.search(r"def (\w+)\(", user_code).group(1)
                    if function_name in self.tool_map.keys():
                      print(f"The function '{function_name}' already exists. Re-enter your code , with different function name")
                      continue
                    exec(user_code, globals())
                    sys.stdout = out

                    func = globals()[function_name]
                    if func.__doc__ == None:
                      print("Provide doc string also")
                      continue

                    doc = f"{func.__doc__} + This function takes the {len(list(inspect.signature(func).parameters.keys()))} arguments :  {str(list(inspect.signature(func).parameters.keys()))}"

                    self.tool_map[function_name] = func
                    self.tools.append(doc)
                    self.tools_aux.append(function_name)

                    self.curr_tools.append(doc)
                    self.curr_tools_aux.append(function_name)

                    print("User-provided code accepted.")
                    agent_code, func_response = self.build_code()
                    return agent_code, func_response

                except Exception as e:
                    print(f"Error in provided code: {e}. Please try again.")
            elif user_input == "no":

              print("System will attempt to generate Python code.")
              user_desc = input("Enter the precise description of the tool needed for the task.")
              tool_name = input("Also select a name for this tool (suggest cool names please) : ")
              while tool_name in self.tool_map.keys():
                print(f"The function '{tool_name}' already exists. Re-enter your code , with different function name")
                tool_name = input("Select a new name for this tool (suggest cool names please) : ")
              prompt = autoprompt(user_desc, 1)
              sys.stdout = out

              a = generate_agent_description(tool_name, user_desc, prompt)
              if a.startswith("```python"):
                print("enters")
                print(a[:11])
                a = a[10:]
                print(a[len(a)-3:])
                a = a[:len(a)-3]

              print("--------------------------")
              print("here")
              print(a)
              print("--------------------------")

              exec(a, globals())
              function_name = tool_name

              new_func = globals()[function_name]
              new_docs = new_func.__doc__
              sys.stdout = out
              new_docs = f"{new_func.__doc__} + This function takes the {len(list(inspect.signature(new_func).parameters.keys()))} arguments :  {str(list(inspect.signature(new_func).parameters.keys()))}"
              self.tools.append(new_docs)
              self.tools_aux.append(function_name)

              self.curr_tools.append(new_docs)
              self.curr_tools_aux.append(function_name)
              self.tool_map[function_name] = new_func

              agent_code, func_response = self.build_code()
              sys.stdout = out
              return agent_code, func_response
            else:
                print("Enter only Yes/No")
                continue
        else:
            func_name, args_list = eval(agent_code)[0], eval(agent_code)[1]
            print("Reflextion")
            print(func_name, args_list)
            pos = self.curr_tools_aux.index(func_name)
            self.curr_tools_aux.remove(func_name)
            self.curr_tools.pop(pos)
            agent_code, func_response = self.build_code()
            return agent_code, func_response

    def code_reflexion(self, agent_code, error):
      '''
      Resolve the python error occured during execution of tool in build_code function

      Args :
        agent_code(list[str]) : Last Tool Call
        error(str) : error occured during execution of tool call in build_code function

      Return :
        list[str] : Modified tool call
        str : Response of modified tool call
      '''

      count = 0
      agent_code = agent_code
      error = error

      while count < self.reflexion_limit:
          too = [{self.curr_tools[i] : self.curr_tools_aux[i]} for i in range(len(self.curr_tools))]
          agent_code = self.llm.invoke(code_reflexion_prompt.format_messages(query = self.query, error= error, tools = too, agent_code = agent_code))

          print("-----")
          if count == 0:
            print("Reflexion :=" + agent_code.content)
          else :
            print("Again Reflexion :=" + agent_code.content)
          print("-----")

          try:
              if agent_code.content is None or agent_code.content.lower() == "none" or eval(agent_code.content)[0].lower() == "none":
                agent_code = "NONE"
                agent_code, func_response = self.api_reflexion(agent_code)
                return agent_code, func_response
              func_name, args_list = eval(agent_code.content)[0], eval(agent_code.content)[1]

              try :
                tool_call_reason = eval(agent_code.content)[2]
              except Exception:
                tool_call_reason = "Reason is not Provided"

              if func_name not in self.curr_tools_aux:
                  raise ToolError(f"Incorrect tool '{func_name}' is called. It is not in the tool list. Try a different one.")

              pos = self.curr_tools_aux.index(func_name)
              func_desc = self.curr_tools[pos]

              critics = self.critic_agent(f'''{{"tool_name" : {func_name}, "args" : {args_list} , "reason" : {tool_call_reason}}}''',  func_desc , None, self.scratchpad)
              print(critics.content)

              if(eval(critics.content)["score"] == 1):
                agent_code = self.silent_reflexion(agent_code.content, reason = eval(critics.content)["reasoning"])
                print("Refactored Tool Call : " + agent_code.content)
                func_name, args_list = eval(agent_code.content)[0], eval(agent_code.content)[1]

              print("Safe arguments")
              if func_name == 'rag_agent':
                query = args_list
                if self.agent.cache_index.process_pending_additions():
                  pass
                if query:
                    if self.agent.check_query_in_memory(query):
                        print("Query already in memory. Skipping full retrieval.")
                        self.emit_update("Query already in memory. Skipping full retrieval.")
                        chunk = self.agent.check_memory_and_retrieve_for_supervisor(query)
                        func_response = llm_response_if_memory_hit_found(query, chunk)
                        agent_code = AgentCode(content="rag__agent")
                    else:
                        func_response, agent = self.tool_map[func_name](*[args_list, self.agent])
                        self.agent = agent
              else:
                func_response = self.tool_map[func_name](*args_list)

              critics = self.critic_agent(f'''{{"tool_name" : {func_name}, "arguments" : {args_list} , "reason" : {tool_call_reason}}}''',  func_desc, func_response, self.scratchpad)
              print(critics.content)

              if(eval(critics.content)["score"] == 1):
                print("Faulty API.")
                agent_code, func_response = self.api_reflexion(agent_code)

              return agent_code, func_response

          except Exception:
              sys.stdout = out
              error = traceback.format_exc()
              failure = self.detect_failure(agent_code, error)

              print("---------------------------------------")
              print("CODE REFLECTION ERROR")
              print(error)
              print("---------------------------------------")

              if eval(failure) == 1:
                  return agent_code, None
              else:
                  count += 1

      print("code reflection limit reached")
      return None, None

    def build_code(self, agentcode = None):
      '''
        It return the tool called and its responses

        Return :
          list[str] :- Tool Called
          str :- Response of tool
      '''

      tool_dict = [{self.curr_tools[i] : self.curr_tools_aux[i]} for i in range(len(self.curr_tools))]
      if agentcode is None :
        agent_code = self.llm.invoke(code_agent_prompt.format_messages(query = self.query, tools = tool_dict, scratchpad = self.scratchpad, responses = self.responses))
      else :
        agent_code = agentcode

      print("-----")
      print("Tool call : " + agent_code.content)
      print("-----")

      if agent_code.content.lower() == "end_tool" or eval(agent_code.content)[0].lower() == "end_tool":
          return agent_code, "end"
      try:
        if agent_code.content is None or agent_code.content.lower() == "none" or eval(agent_code.content)[0].lower() == "none":
          try :
            temp = agent_code
            agent_code = "NONE"
            agent_code, func_response = self.api_reflexion(agent_code)
          except Exception as e:
            print("--------------------")
            print("API REFLEXION ERROR")
            traceback.print_exc()  # This will print the full traceback of the error
            print("--------------------")
            agent_code = temp
            agent_code.content = "NONE"
            func_response = ""
          return agent_code, func_response

        func_name, args_list = eval(agent_code.content)[0], eval(agent_code.content)[1]

        try :
          tool_call_reason = eval(agent_code.content)[2]
        except Exception:
          tool_call_reason = "Reason is not Provided"

        if func_name not in self.curr_tools_aux:
            raise ToolError(f"Incorrect tool '{func_name}' is called. It is not in the tool list. Try a different one.")

        pos = self.curr_tools_aux.index(func_name)
        func_desc = self.curr_tools[pos]

        critics = self.critic_agent(f'''{{"tool_name" : {func_name}, "argument" : {args_list} , "reason" : {tool_call_reason}}}''',  func_desc, None, self.scratchpad)
        print(critics.content)

        if(eval(critics.content)["score"] == 1):
          agent_code = self.silent_reflexion(agent_code.content, reason = eval(critics.content)["reasoning"])
          print("Refactored Tool Call : " + agent_code.content)
          func_name, args_list = eval(agent_code.content)[0], eval(agent_code.content)[1]

        print(f"Func Name {func_name}, Args List {args_list}, Reasoning {eval(agent_code.content)[2]}")

        if func_name == 'rag_agent':
            query = args_list
            if self.agent.cache_index.process_pending_additions():
              pass
            if query:
                if self.agent.check_query_in_memory(query):
                    print("Query already in memory. Skipping full retrieval.")
                    self.emit_update("Query already in memory. Skipping full retrieval.")
                    chunk = self.agent.check_memory_and_retrieve_for_supervisor(query)
                    func_response = llm_response_if_memory_hit_found(query, chunk)
                    agent_code = AgentCode(content="rag__agent")
                else:
                    func_response, agent = self.tool_map[func_name](*[args_list, self.agent])
                    self.agent = agent
        else:
          func_response = self.tool_map[func_name](*args_list)

        print(func_response)

        critics = self.critic_agent(f'''{{"tool_name" : {func_name}, "argument" : {args_list} , "reason" : {tool_call_reason}}}''',  func_desc,  func_response, self.scratchpad)

        print(critics.content)

        if(eval(critics.content)["score"] == 1):
          print("Faulty API.")
          agent_code, func_response = self.api_reflexion(agent_code)

      except Exception as e:
          sys.stdout = out
          error_message = traceback.format_exc()
          failure = self.detect_failure(agent_code.content, error_message)
          print(error_message)
          a = eval(failure)
          if a == 1:
            print("API ERROR")
            agent_code, func_response = self.api_reflexion(agent_code)
          else:
            print("Python Error")
            agent_code, func_response = self.code_reflexion(agent_code, e)
            if agent_code != None and func_response == None:
                # API Failure
                agent_code, func_response = self.api_reflexion(agent_code)

          if agent_code is None and func_response is None:
            return agent_code, func_response
      return agent_code, str(func_response)

    def detect_failure(self, agent_code, callback):
      """
      Detect the type of error in the tool call.

      Args:
          agent_code (list[str]): The tool call causing the error, represented as a list containing:
                                  ["tool_name", [arguments], "Reasoning"].
          callback (str): The error encountered during the execution of the tool call.

      Returns:
          str: "1" if the error is identified as significant, or "0" otherwise.
      """
      detection = self.llm.invoke(failure_detection_prompt.format_messages(agent_code = agent_code, traceback = callback, tools = self.curr_tools, descs = self.curr_tools_aux))
      return detection.content

    def critic_agent(self, agent_code,  desc, func_response, scratchpad):
      """
      Detect two potential issues in a tool call:
        1. Whether the arguments passed to the tool call are valid.
        2. Whether the response of the tool call aligns with the question asked.

      Args:
          agent_code (list[str]): The current tool call, represented as a list containing the tool name, arguments, and reasoning.
          desc (str): A description of the tool call's purpose or expected behavior.
          func_response (str): The response returned by the tool call.
          scratchpad (str): The history of previous tool calls, providing context.

      Returns:
          list: A list containing:
              - int: 0 if the tool call is correct, or 1 if the arguments are invalid or the response does not align with the query.
              - str: Reasoning explaining the result.
              Example: [0/1, "Reasoning"]
      """

      if func_response is None:
        response = self.llm.invoke(critic_agent_prompt_1.format_messages(query = self.query, code_last = agent_code, desc = desc, scratchpad = scratchpad))
      else :
        response = self.llm.invoke(critic_agent_prompt_2.format_messages(query = self.query, code_last = agent_code, response = func_response, desc = desc))

      return response

    def silent_reflexion(self, code, reason):
      '''
      Resolves errors in the arguments of the tool call , if arguments are deemed invalid by the critic agent.

      Args:
          code (str): The current tool call containing potentially invalid arguments.

      Returns:
          list[str]: A corrected tool call with valid arguments, structured as:
                    ["tool_name", [args], "Reasoning"].
      '''
      response = self.llm.invoke(silent_error_reflexion.format(call = code, query = self.query, scratchpad = self.scratchpad, reason = reason))
      return response
_

''

### Dynamic Tool Set


In [None]:
from llama_index.tools.tavily_research import TavilyToolSpec
from PyPDF2 import PdfReader
import gc
import os
from llama_index.core.tools import FunctionTool
from llama_index.core.query_engine import RetrieverQueryEngine


document_paths = []

def rag_agent(query, agent) -> str:
    """
    tool: rag_agent
    description: Used for answering query on the basis of the context retrieved from the user provided documents.
    Args:
        query (str): The query to be passed to the RAG agent.
            Example: "What was Nike's gross margin in FY 2022?"
    Returns:
        str: The response from the RAG agent.
    """

    global document_paths
    document_paths.append(agent.path)

    if len(document_paths)>=2 and document_paths[-1] != document_paths[-2]:
        gc.collect()

        agent.cache_index = DynamicCacheIndex(dim=agent.embedding_dim, batch_size=16)
        agent.previous_queries = {}
    res, jargons = agent.run(query ,False)

    score = chat_llm.invoke(CONFIDENCE_PROMPT.format(steps = agent.agent_input, answer = res)).content
    if agent.cache_index.process_pending_additions():
      pass
    return str(res) + ", Confidence Score : " + str(score), agent

def download_txt(content, filename="output.txt"):
    """
    Saves the given content to a .txt file in the current working directory.

    Args:
        content (str): The text content to be saved.
        file_name(str): Path to which content will be saved. Default -> output.txt

    Returns:
        str: Confirmation message indicating the file has been saved successfully.
    """
    try:
        with open(filename, 'w', encoding='utf-8') as file:
            file.write(content)
        return f"File '{filename}' has been saved successfully in the current working directory."
    except Exception as e:
        return f"An error occurred while saving the file: {e}"

def web_search_agent(query):
  """
  Used to get general online available information from the online source.
  Args :- query (str) - Query to be search online eg :- "Who is prime minister of Indi"
  """
  websearch = TavilyToolSpec(api_key= "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")
  results = websearch.search(query, max_results = 3)
  result = [r.text for r in results ]
  res = chat_llm.invoke(WEBSEARCH_PROMPT.format(query = query, response = result)).content
  return res

def end_tool(query):
  """
  Used to end the question-answering process. Returns "end" when correctly executed.

  Args : query(str) - this MUST be "end".

  Returns :
    str : "end"
  """
  return "end"

TOOL_MAP = {
    'rag_agent': rag_agent,
    'web_search_agent':web_search_agent,
    "END": end_tool
}

tools = [rag_agent, web_search_agent, end_tool]

l_tools = []

for tool in tools:
    tool = FunctionTool.from_defaults(tool)
    l_tools.append(tool)

TOOLS=[]
TOOLS_AUX = []

for i in l_tools:
  TOOLS.append(i.metadata.description)
  TOOLS_AUX.append(i.metadata.name)

### Inference

In [None]:
# Provide the path to the pdf and the query.
path_input = str(input("Enter the path of the pdf : "))

Enter the path of the pdf : /content/temp.pdf


In [None]:
supervisor = SUPERVISOR_AGENT(TOOLS, TOOLS_AUX, chat_llm, TOOL_MAP, path_input)

Agent Initialized


In [None]:
# Example sample query
sample_query = "What is Pathway MidTerm Guidelines ?"

# Create a DataFrame or table to mimic input format (pw.Table)
import pandas as pd
pw_ai_queries = pd.DataFrame({
    'prompt': [sample_query],
    'filters': [{}]  # You can add specific filters if needed, or leave empty
})

query = pw.debug.table_from_pandas(pw_ai_queries)

In [None]:
supervisor.answer_query(query)

<class 'pathway.internals.thisclass.this'>.prompt
Buliding Retriever
input
query What is Pathway MidTerm Guidelines ? 2




docs [{'text': '', 'metadata': {'created_at': 1742186101, 'modified_at': 1742193663, 'owner': 'root', 'path': '/content/JINA/pdf_metadata.jsonl', 'size': 12051, 'seen_at': 1742193664, 'page_number': 1}, 'dist': 0.416057825088501}, {'text': 'What is RAG: Beginner Blog on Pathway | Video by IBM\nWhat is Agentic RAG: CodeBasics Tutorial | Tutorial by IBM\nPathway Developer Documentation: Link to Pathway Developer\nDocs\nPathway App Templates\nAdaptive RAG to reduce costs without comprising accuracy in RAG\nBasic RAG with Open AI models (will be soon available on\npathway.com as a part of Introductory bootcamp releasing on 22nd\nOctober. Make sure you stay tuned to register for it when it opens\nup)\nBasic RAG with Gemini models\nUI component for RAG App example, using Streamlit: RAG with\nGoogle Drive\nSample Project READMEs (good to have) for Reference\nhttps://github.com/pathwaycom/pathway/blob/main/examples/note\nbooks/showcases/mistral_adaptive_rag_question_answering.ipyn\nb \nhttps:/

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connection.py", line 203, in _new_conn
    sock = connection.create_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/usr/local/lib/python3.11/dist-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connectionpool.py", line 791, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connectionpool.py", line 497, in _make_request
    conn.request(
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connection.py", line 395, in reque

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connection.py", line 203, in _new_conn
    sock = connection.create_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/usr/local/lib/python3.11/dist-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connectionpool.py", line 791, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connectionpool.py", line 497, in _make_request
    conn.request(
  File "/usr/local/lib/python3.11/dist-packages/urllib3/connection.py", line 395, in reque