# Automatic Agent Workflow Creation

Sample notebook takes an image of a workflow diagram, determines what agents are necessary to execute the task, and then authors an Azure Durable Function-based agentic workflow that can be further customized.

### Import packages

<b>Note: </b> Create a `.env` file with the following key-value pairs:

| Environment Variable  | Description                                                                                                             |
|-----------------------|-------------------------------------------------------------------------------------------------------------------------|
| `AOAI_ENDPOINT`       | The endpoint URL for your Azure OpenAI service. This is where API requests will be sent.                                 |
| `AOAI_KEY`            | The API key used to authenticate requests to the Azure OpenAI service.                                                   |
| `AOAI_EMBEDDING_MODEL`| The name of the Azure OpenAI embedding model used for generating text embeddings.                                         |
| `AOAI_GPT_MODEL`      | The name of the Azure OpenAI GPT model used for generating text, which may include models like GPT-4, capable of supporting advanced tasks such as image analysis (e.g., GPT-4o). |


In [11]:
import os
import json
from openai import AzureOpenAI
from dotenv import load_dotenv
import os
import requests
import base64
from IPython.display import display, Image, HTML, Markdown

from dotenv import load_dotenv

load_dotenv('.env', override=True)

True

### Create connection to Azure OpenAI client

In [12]:
# Initialize the Azure OpenAI client
client = AzureOpenAI(
    azure_endpoint = os.getenv("AOAI_ENDPOINT"), 
    api_key=os.getenv("AOAI_KEY"),  
    api_version="2024-05-01-preview"
)

gpt_model_deployment = os.getenv("AOAI_GPT_MODEL")
embedding_model = os.getenv("AOAI_EMBEDDING_MODEL")

### Define system message for workflow creation orchestator

In [13]:
sys_msg = f"""You help people develop agent-based workflows based on their workflow process descriptions.
You will be provided with an image showcasing a business process.

1.) First, you should identify what agents are needed to replicate this process.
You can do this by using the `get_agents()` tool you have access to.

2.) Then you need to retrieve a script template that can be used to create the agentic workflow.
You can do this by using the `retrieve_template_script()` tool you have access to.

3.) Then, you will create an agentic workflow based on the process description and the script template.
You can do this by using the `create_agentic_workflow()` tool you have access to.

4.) Finally, you will submit the agentic workflow to the client for review and corrections.
You can do this by using the `review_agentic_workflow()` tool you have access to.

By using these tools, in the order described above, you will complete the task successfully.
"""

### Helper Functions

These will be used as tools by the workflow creation orchestration agent

In [14]:
def get_agents(img_path, dir_name):
    with open(img_path, "rb") as image_file:
        img = base64.b64encode(image_file.read()).decode('utf-8')
    sys_msg = """
    You help people develop agent-based workflows based on their process descriptions. 
    Each node in the workflow should represent an agent. 
    Describe these agents and what their goals are.
    Describe the workflow process for how the agents should be used.
    """
    messages = [{"role": "system", "content": sys_msg}]
    user_content = {
        "role": "user",
        "content": [
            {
            "type": "image_url",
            "image_url": {
                    "url": f"data:image/jpeg;base64,{img}"
                     , "detail": "high"
                }
            }  
        ]
    }
    messages.append(user_content)
    completion = client.chat.completions.create(
        model=gpt_model_deployment,
        messages=messages,
        max_tokens = 2000,
        temperature = 0.0
    )
    api_base = os.environ['AOAI_ENDPOINT']
    api_key = os.environ['AOAI_KEY']
    deployment_name = os.environ['AOAI_GPT_MODEL']

    base_url = f"{api_base}openai/deployments/{deployment_name}" 
    headers = {   
        "Content-Type": "application/json",   
        "api-key": api_key 
    } 
    endpoint = f"{base_url}/chat/completions?api-version=2023-12-01-preview" 
    data = { 
        "messages": messages, 
        "temperature": 0.0,
        "top_p": 0.95,
        "max_tokens": 2048
    }   
    response = requests.post(endpoint, headers=headers, data=json.dumps(data)) 
    resp_str = response.json()['choices'][0]['message']['content']

    os.makedirs(dir_name, exist_ok=True)
    with open(f'{dir_name}/agents.txt', 'w') as f:
        f.write(resp_str)
        
    return resp_str

def retrieve_template_script():
    return '../templates/functionapp.py'

def create_agentic_workflow(agents, dir_name, template_script_path):
    with open(template_script_path, 'r') as f:
        template_script = f.read()
    sys_msg = f"""
    You help people create agentic workflows using Azure Durable Functions.
    You will be provided with a list of agents, and a sequence in which they should be executed.
    You will also be provided with a durable function template.
    Use this template to create an agent workflow.
    The run_agent_orchestrator function should be used for agent orchestration.
    Each call to an agent should be done using a call_activity function.
    Your code should include activities for each agent. 
    If there is conditional logic in the agent workflow, include this in the agent activity function.

    YOUR RESPONSE SHOULD CONSIST OF ONLY PYTHON CODE.

    ### TEMPLATE: {template_script}
    """
    usr_msg = f'## AGENTS DESCRIPTION: {agents}'
    messages = [{"role": "system", "content": sys_msg}, {"role": "user", "content": usr_msg}]
    completion = client.chat.completions.create(
        model=gpt_model_deployment,
        messages=messages,
        max_tokens = 4000,
        temperature = 0.0
    )
    os.makedirs(dir_name, exist_ok=True)
    with open(f'{dir_name}/functionapp.py', 'w') as f:
        f.write(completion.choices[0].message.content.replace('```python', '').replace('```', ''))
    
    return f'{dir_name}/functionapp.py'

def review_agentic_workflow(template_script_path):
    with open(template_script_path, 'r') as f:
        template_script = f.read()
    sys_msg = f"""
    You will review and improve an agentic workflow built using Azure Durable Functions.
    Review the code and move any loops in the main orchestrator function, to a separate activity function.
    Ensure that the code is clean and follows best practices.

    YOUR RESPONSE SHOULD CONSIST OF ONLY PYTHON CODE.
    """
    usr_msg = f'## DURABLE FUNCTIONS AGENTIC WORKFLOW: {template_script}'
    messages = [{"role": "system", "content": sys_msg}, {"role": "user", "content": usr_msg}]
    completion = client.chat.completions.create(
        model=gpt_model_deployment,
        messages=messages,
        max_tokens = 4000,
        temperature = 0.0
    )
    with open(template_script_path, 'w') as f:
        f.write(completion.choices[0].message.content.replace('```python', '').replace('```', ''))
    
    return completion.choices[0].message.content
    

### Conversation runner function

Agent flow begins by a user providing an image and runs through steps of:

- Identifying necessary agents to replicate the workflow described in the user's image
- Retrieving an Azure Durable Functions template script
- Authoring a custom Durable Function-backed agent workflow using the described agents and retrieved template
- Review and modification of the Durable Function script to improve code quality

In [15]:
def run_conversation(sys_msg, img, dir_name, history=[]):
    if len(history)==0:
        messages = [{"role": "system", "content": sys_msg}, {"role": "user", "content": 'Workflow image at ' + img}]
    else:
        pass
    tools = [  
    {  
        "type": "function",  
        "function": {  
            "name": "get_agents",  
            "description": "Interprets an image of a business process workflow diagram and creates a description of a set of agents that could complete the task",  
            "parameters": {  
                "type": "object",  
                "properties": {  
                    "img": {  
                        "type": "string",  
                        "description": "Path to the image of the business process workflow diagram"  
                    }  
                },  
                "required": ["img"]  
            }  
        }  
    },  
    {  
        "type": "function",  
        "function": {  
            "name": "retrieve_template_script",  
            "description": "Retrieve a path to a sample Azure Durable Function python script to be used as the basis for a function-based agent workflow"  
        }  
    },  
    {  
        "type": "function",  
        "function": {  
            "name": "generate_agentic_workflow",  
            "description": "Develops an Azure Durable Function based agentic workflow using provided agent descriptions, and a template script. Saves the file and returns the path.",  
            "parameters": {  
                "type": "object",  
                "properties": {  
                    "agents_description": {  
                        "type": "string",  
                        "description": "A textual description of all agents and their tasks"  
                    },  
                    "dir_name": {  
                        "type": "string",  
                        "description": "Directory name where the generated workflow will be saved"  
                    },  
                    "template_script_path": {  
                        "type": "string",  
                        "description": "A path to a template script to be used as the basis for the workflow"  
                    }  
                },  
                "required": ["agents_description", "dir_name", "template_script_path"]  
            }  
        }  
    },
    {  
        "type": "function",  
        "function": {  
            "name": "review_agentic_workflow",  
            "description": "Reviews an Azure Durable Function based agentic workflow and makes revisions as needed",  
            "parameters": {  
                "type": "object",  
                "properties": {  
                    "script_path": {  
                        "type": "string",  
                        "description": "The path to the Azure Durable Function agentic workflow script to be reviewed"  
                    }  
                },  
                "required": ["script_path"]  
            }  
        }  
    }  
]  

    # First API call: Ask the model to use the functions
    response = client.chat.completions.create(
        model=gpt_model_deployment,
        messages=messages,
        tools=tools,
        tool_choice="auto",
        temperature=0.0
    )

    response_message = response.choices[0].message
    messages.append(response_message)

    is_done = False

    while not is_done:
        # Handle function calls
        if response_message.tool_calls:
            for tool_call in response_message.tool_calls:
                function_name = tool_call.function.name
                function_args = json.loads(tool_call.function.arguments)
                print(f"Function call: {function_name}")  
                print(f"Function arguments: {function_args}")  
                
                if function_name == "get_agents":
                    function_response = get_agents(function_args['img'], dir_name)
                    print(function_response)
                elif function_name == "retrieve_template_script":
                    function_response = retrieve_template_script()
                    print(function_response)
                elif function_name == "generate_agentic_workflow":
                    print(function_args['template_script_path'])
                    function_response = create_agentic_workflow(
                        agents=function_args["agents_description"],
                        dir_name=dir_name,
                        template_script_path=function_args["template_script_path"]
                    )
                    print(function_response)
                elif function_name == "review_agentic_workflow":
                    function_response = review_agentic_workflow(
                        template_script_path=function_args["script_path"]
                    )
                    print(function_response)
                else:
                    function_response = json.dumps({"error": "Unknown function"})
                
                messages.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": function_response,
                })
        else:
            print("No tool calls were made by the model.")  

        # Second API call: Get the final response from the model
        response = client.chat.completions.create(
            model=gpt_model_deployment,
            messages=messages,
            tools=tools,
            tool_choice="auto",
            temperature=0.0
        )

        response_message = response.choices[0].message
        messages.append(response_message)
        if response_message.tool_calls:
            pass
        else:
            is_done = True
    

    return response_message.content

### Initiate Agent Flow

Update the provided image path and target storage location below

In [16]:
response = run_conversation(sys_msg, '../img/invoice_flow.png', '../mvp-src')

Function call: get_agents
Function arguments: {'img': '../img/invoice_flow.png'}
To develop an agent-based workflow for the provided Invoice Processing Workflow, we can break down each step into specific agents with defined goals. Here is a detailed description of each agent and the workflow process:

### Agents and Their Goals

1. **Email Receipt Agent**
   - **Goal:** Monitor the email inbox for incoming invoices and retrieve them for processing.
   
2. **OCR Extraction Agent**
   - **Goal:** Use Optical Character Recognition (OCR) to extract text from the received invoice documents.
   
3. **Document Analysis Agent**
   - **Goal:** Analyze the extracted text using a pre-built document intelligence model to identify and categorize relevant information.
   
4. **Extraction Refinement Agent**
   - **Goal:** Refine the extracted data and map it into expected fields (e.g., invoice number, date, amount, vendor).
   
5. **Review Agent**
   - **Goal:** Review the refined extract to check fo

### Display Final Response

In [17]:
display(Markdown(response))

The agentic workflow for the invoice processing has been successfully created and reviewed. Here is the final script for your review and corrections:

```python
import azure.functions as func
import azure.durable_functions as df
import logging
import json
import os

app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION)

# An HTTP-Triggered Function with a Durable Functions Client binding
@app.route(route="orchestrators/{functionName}")
@app.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    payload = json.loads(req.get_body())
    instance_id = await client.start_new(function_name, client_input=payload)
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator
@app.orchestration_trigger(context_name="context")
def agent_orchestrator(context):

    # Get the input payload from the context
    payload = context.get_input()

    # Step 1: Email Receipt of Invoice
    email_receipt_res = yield context.call_activity("email_receipt_agent", payload)

    # Step 2: Extraction of Text using OCR Model
    ocr_extraction_res = yield context.call_activity("ocr_extraction_agent", email_receipt_res)

    # Step 3: Analysis of Document using Pre-built Document Intelligence Model
    document_analysis_res = yield context.call_activity("document_analysis_agent", ocr_extraction_res)

    # Step 4: Refinement of Extraction and Mapping into Expected Fields
    extraction_refinement_res = yield context.call_activity("extraction_refinement_agent", document_analysis_res)

    # Step 5-6: Review and Completion Check Loop
    extraction_refinement_res = yield context.call_sub_orchestrator("review_and_completion_loop", extraction_refinement_res)

    # Step 7: Final Formatting to Ensure Consistency of Extraction Values
    final_formatting_res = yield context.call_activity("final_formatting_agent", extraction_refinement_res)

    return final_formatting_res

# Sub-Orchestrator for Review and Completion Check Loop
@app.sub_orchestration_trigger(context_name="context")
def review_and_completion_loop(context):
    extraction_refinement_res = context.get_input()

    while True:
        # Step 5: Review Extract to Check for Completeness
        review_res = yield context.call_activity("review_agent", extraction_refinement_res)

        # Step 6: Is Complete?
        completion_check_res = yield context.call_activity("completion_check_agent", review_res)

        if completion_check_res.get("is_complete"):
            break
        else:
            # Loop back to Step 4 for further refinement
            extraction_refinement_res = yield context.call_activity("extraction_refinement_agent", review_res)

    return extraction_refinement_res

# Activities
@app.activity_trigger(input_name="activitypayload")
def email_receipt_agent(activitypayload: str):
    data = json.loads(activitypayload)
    # TO-DO: Add the logic for the Email Receipt Agent
    return {"invoice_document": "dummy_invoice_document"}

@app.activity_trigger(input_name="activitypayload")
def ocr_extraction_agent(activitypayload: str):
    data = json.loads(activitypayload)
    # TO-DO: Add the logic for the OCR Extraction Agent
    return {"extracted_text": "dummy_extracted_text"}

@app.activity_trigger(input_name="activitypayload")
def document_analysis_agent(activitypayload: str):
    data = json.loads(activitypayload)
    # TO-DO: Add the logic for the Document Analysis Agent
    return {"categorized_data": "dummy_categorized_data"}

@app.activity_trigger(input_name="activitypayload")
def extraction_refinement_agent(activitypayload: str):
    data = json.loads(activitypayload)
    # TO-DO: Add the logic for the Extraction Refinement Agent
    return {"refined_data": "dummy_refined_data"}

@app.activity_trigger(input_name="activitypayload")
def review_agent(activitypayload: str):
    data = json.loads(activitypayload)
    # TO-DO: Add the logic for the Review Agent
    return {"reviewed_data": "dummy_reviewed_data"}

@app.activity_trigger(input_name="activitypayload")
def completion_check_agent(activitypayload: str):
    data = json.loads(activitypayload)
    # TO-DO: Add the logic for the Completion Check Agent
    return {"is_complete": True}

@app.activity_trigger(input_name="activitypayload")
def final_formatting_agent(activitypayload: str):
    data = json.loads(activitypayload)
    # TO-DO: Add the logic for the Final Formatting Agent
    return {"formatted_data": "dummy_formatted_data"}
```

Please review the script and let me know if there are any corrections or additional modifications needed.