In [1]:
from pathlib import Path

data_path = Path.cwd() / "lionagi_data"  # Path to the data directory

In [2]:
import lionagi as li

### Prepare QA Tool

In [3]:
docs = li.load(
    input_dir=data_path, recursive=True, required_exts=[".py"], to_lion=False
)

docs = [i for i in docs if len(i.text) > 100]

# chunks = li.chunk(
#     docs, chunker = "CodeSplitter", chunker_type = "llama_index",
#     to_lion=False,
#     chunker_kwargs = {
#         "language": "python",
#         "chunk_lines": 100,
#         "chunk_lines_overlap": 10,
#         "max_chars": 2000,},
# )

In [4]:
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding, OpenAIEmbeddingModelType

Settings.llm = OpenAI(model="gpt-4o")
Settings.embed_model = OpenAIEmbedding(
    model=OpenAIEmbeddingModelType.TEXT_EMBED_3_LARGE
)

In [5]:
# from llama_index.core import VectorStoreIndex

# index = VectorStoreIndex(chunks)
# index.storage_context.persist(persist_dir="./lionagi_index")

In [6]:
from llama_index.core import load_index_from_storage, StorageContext

index_id = "91fe61e0-89b5-4202-acff-435707e60119"

storage_context = StorageContext.from_defaults(persist_dir="./lionagi_index")
index = load_index_from_storage(storage_context, index_id=index_id)

In [7]:
from llama_index.core.postprocessor import LLMRerank

reranker = LLMRerank(choice_batch_size=10, top_n=5)
query_engine = index.as_query_engine(node_postprocessors=[reranker])

In [8]:
source_codes_responses = []


async def query_codebase(query):
    """
    Perform a query to a QA bot with access to a vector index built with package lionagi codebase

    Args:
        query (str): The query string to search for in the LionAGI codebase.

    Returns:
        str: The string representation of the response content from the codebase query.
    """
    response = await query_engine.aquery(query)
    source_codes_responses.append(response)
    return str(response.response)

### Construct Workflow

In [9]:
instruction = """
write a good API documentation for this code, make sure you use query 
engine to check meanings of code concepts to accurately describe them, 
must integrate the information from query engine to verify the correctness 
of the documentation.
"""

edit = """
you asked a lot of good questions and got plenty answers, please integrate your 
conversation, be a lot more technical, you will be rewarded with 500 dollars for 
great work, and punished for subpar work, take a deep breath, you can do it
"""

In [10]:
from PROMPTS import sys_prompt  # put your system prompt here

tools = li.func_to_tool(query_codebase)

model = li.iModel(
    model="gpt-4o",
    provider="openai",
    interval_tokens=5_000_000,
    interval_requests=5_000,
    interval=60,
)


async def write_doc(context):
    branch = li.Branch(system=sys_prompt, tools=[query_codebase], imodel=model)

    form = await branch.direct(
        instruction=instruction,
        context=context,
        reason=True,
        score=True,
        action_allowed=True,
        tools=tools,
    )
    
    if form is None:
        return None, None

    final_doc = await branch.chat(
        instruction=edit,
        temperature=0.5,
    )

    form._add_field("final_documentation", value=final_doc)

    # save all messages into a unique file
    df = branch.to_df()
    df.to_csv(f"lion_doc_{branch.ln_id[:8]}.csv", index=False)

    return form, branch


contexts = [i.text for i in docs]

### Run workflow

In [11]:
result = await write_doc(contexts[35])

In [12]:
form = result[0]

In [13]:
form.display()

**task**: 
 Follow the prompt and provide the necessary output.
- Additional instruction: 
write a good API documentation for this code, make sure you use query 
engine to check meanings of code concepts to accurately describe them, 
must integrate the information from query engine to verify the correctness 
of the documentation.

- Additional context: import asyncio
from pydantic import Field

from lionagi.core.mail.mail import Mail, Package
from lionagi.core.collections import Exchange
from lionagi.core.mail.mail_manager import MailManager
from lionagi.core.execute.base_executor import BaseExecutor
from lionagi.core.execute.branch_executor import BranchExecutor


class InstructionMapExecutor(BaseExecutor):
    """
    Manages the execution of a mapped set of instructions across multiple branches within an executable structure.

    Attributes:
        branches (dict[str, BranchExecutor]): A dictionary of branch executors managing individual instruction flows.
        structure_id (str): The identifier for the structure within which these branches operate.
        mail_transfer (Exchange): Handles the transfer of mail between branches and other components.
        branch_kwargs (dict): Keyword arguments used for initializing branches.
        num_end_branches (int): Tracks the number of branches that have completed execution.
        mail_manager (MailManager): Manages the distribution and collection of mails across branches.
    """

    branches: dict[str, BranchExecutor] = Field(
        default_factory=dict, description="The branches of the instruction mapping."
    )
    structure_id: str = Field("", description="The ID of the executable structure.")
    mail_transfer: Exchange = Field(
        default_factory=Exchange, description="The mail transfer."
    )
    branch_kwargs: dict = Field(
        default_factory=dict,
        description="The keyword arguments for the initializing the branches.",
    )
    num_end_branches: int = Field(0, description="The number of end ...

**reason**: 
 Let's think step by step. The provided code defines a class `InstructionMapExecutor` that manages the execution of instructions across multiple branches. The class has several attributes and methods that handle the initialization, processing of incoming and outgoing mails, and continuous execution of the instruction map. The documentation provides a clear and detailed description of each attribute and method, including their parameters and functionality. This ensures that developers can easily understand and use the `InstructionMapExecutor` class.

**score**: 9

**tool_used**: ['query_codebase']

**answer**: 
 ```markdown
## Class: `InstructionMapExecutor`

**Description**:
`InstructionMapExecutor` manages the execution of a mapped set of instructions across multiple branches within an executable structure. It handles the distribution and collection of mails, initializes branch executors, and processes incoming and outgoing mails to ensure proper execution flow.

### Attributes:
- `branches` (dict[str, BranchExecutor]): A dictionary of branch executors managing individual instruction flows.
- `structure_id` (str): The identifier for the structure within which these branches operate.
- `mail_transfer` (Exchange): Handles the transfer of mail between branches and other components.
- `branch_kwargs` (dict): Keyword arguments used for initializing branches.
- `num_end_branches` (int): Tracks the number of branches that have completed execution.
- `mail_manager` (MailManager): Manages the distribution and collection of mails across branches.

### Methods:

#### `__init__(self, **kwargs)`

**Signature**:
```python
def __init__(self, **kwargs):
```

**Parameters**:
- `**kwargs`: Arbitrary keyword arguments passed to the base executor and used for initializing branch executors.

**Description**:
Initializes an `InstructionMapExecutor` with the given parameters.

#### `transfer_ins(self)`

**Signature**:
```python
def transfer_ins(self):
```

**Description**:
Processes incoming mails, directing them appropriately based on their categories, and handles the initial setup of branches or the routing of node and condition mails.

#### `transfer_outs(self)`

**Signature**:
```python
def transfer_outs(self):
```

**Description**:
Processes outgoing mails from the central mail transfer, handling end-of-execution notifications and routing other mails to appropriate recipients.

#### `_process_start(self, start_mail: Mail)`

**Signature**:
```python
def _process_start(self, start_mail: Mail):
```

**Parameters**:
- `start_mail` (Mail): The mail initiating the start of a new branch execution.

**Description**:
Processes a start mail to initialize a new branch executor and configures it based on the mail's package content.

#### `_process_node_list(self, nl_mail: Mail)`

**Signature**:
```python
def _process_node_list(self, nl_mail: Mail):
```

**Parameters**:
- `nl_mail` (Mail): The mail containing a list of nodes to be processed in subsequent branches.

**Description**:
Processes a node list mail, setting up new branches or propagating the execution context based on the node list provided in the mail.

#### `forward(self)`

**Signature**:
```python
async def forward(self):
```

**Description**:
Forwards the execution by processing all incoming and outgoing mails and advancing the state of all active branches.

#### `execute(self, refresh_time=1)`

**Signature**:
```python
async def execute(self, refresh_time=1):
```

**Parameters**:
- `refresh_time` (int): The time in seconds between execution cycles.

**Description**:
Continuously executes the forward process at specified intervals until instructed to stop.
```

In [14]:
branch = result[1]

In [15]:
branch.to_df()

Unnamed: 0,ln_id,message_type,timestamp,role,content,metadata,sender,recipient
0,b845c535d325405357327a48e7287278,System,2024-05-22T19:49:13.320516,system,"{'system_info': ' you are a helpful assistant,...",{'last_updated': {'recipient': '2024-05-22T19:...,system,c7af4431fe3bad5d7aaf656fd9dce57d
1,8706accd8b7dd948857987d172258951,Instruction,2024-05-22T19:49:13.321295,user,{'instruction': '  ## Task Instructions...,{'last_updated': {'sender': '2024-05-22T19:49:...,user,c7af4431fe3bad5d7aaf656fd9dce57d
2,0003da7b75bb1c3d6fb5061540d1d35a,AssistantResponse,2024-05-22T19:49:29.725628,assistant,"{'assistant_response': '```json {  ""answer""...",{'last_updated': {'sender': '2024-05-22T19:49:...,c7af4431fe3bad5d7aaf656fd9dce57d,user
3,d7f560e69afa86a5301cf5dafb5e766b,Instruction,2024-05-22T19:49:29.727464,user,{'instruction': ' you asked a lot of good ques...,{'last_updated': {'sender': '2024-05-22T19:49:...,user,c7af4431fe3bad5d7aaf656fd9dce57d
4,8563163746aa0440f89c98ddc273c162,AssistantResponse,2024-05-22T19:49:54.677814,assistant,"{'assistant_response': '```json {  ""answer""...",{'last_updated': {'sender': '2024-05-22T19:49:...,c7af4431fe3bad5d7aaf656fd9dce57d,user


In [16]:
# run the workflow across all contexts with a maximum of 20 concurrent processes
# results = await li.alcall(contexts[20:25], write_doc)

### Save the work

In [17]:

docs = form.final_documentation

In [18]:
# save each document to a file
for i, doc in enumerate([docs]):
    with open(f"doc_{i}.txt", "w") as f:
        f.write(doc)

TypeError: write() argument must be str, not dict