# Creating AI Agents with Azure OpenAI and PromptFlow

This tutorial demonstrates how to build AI agents using Azure OpenAI, PromptFlow, and asynchronous Python tools. In this notebook you will:

- Configure environment variables and import required dependencies
- Define an asynchronous tool function that creates an AI agent team
- Add asynchronous utility functions
- Run the tool with sample parameters

Enjoy exploring and modifying the workflow to suit your needs!

# Table of Contents

1. Introduction
2. Importing Dependencies and Environment Setup
3. Defining the AI Agent Tool Function
4. Utility Functions for Asynchronous Operations
5. Running the AI Agent Tool
6. Summary


## Importing Dependencies and Environment Setup

In this section we import the necessary modules, set up logging/telemetry and load configuration values from a .env file. Ensure that you have your deploy.env file in the working directory with the required settings.

In [23]:
# Import standard libraries and load environment variables
import asyncio
import os
import json
import time
import datetime
import random
import string
from pathlib import Path
from typing import Any, Callable, Set, Dict, List, Optional
import urllib.parse
from httpx import AsyncClient, HTTPStatusError, RequestError

# Optional: Additional utility imports
import aiohttp
from bs4 import BeautifulSoup
from readability import Document

from dotenv import load_dotenv

# Load settings/configurations from a .env file
load_dotenv('deploy.env')

# For logging and telemetry
from opentelemetry import trace
from azure.monitor.opentelemetry import configure_azure_monitor


from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import (
    AsyncFunctionTool,
    RequiredFunctionToolCall,
    SubmitToolOutputsAction,
    ToolOutput,
    CodeInterpreterTool,
    BingGroundingTool,
    AsyncToolSet,
)

from azure.identity.aio import DefaultAzureCredential

# Import our custom modules (ensure these files are available in your project folder)
from agent_team import AgentTeam

# Create a tracer for OpenTelemetry (for diagnostics)
name = 'ai_agent_tool'
tracer = trace.get_tracer(name)

## Defining the AI Agent Tool Function

The following cell defines an asynchronous function called my_python_tool decorated with @tool. This function performs the following steps:

- Initializes an instance of the Azure AI Project Client using your credentials
- Configures Azure Monitor telemetry
- Retrieves a Bing grounding connection
- Creates an agent team with three agents (one for answering documentation questions, one to check answers, and one to verify links)
- Processes the provided question, gathers the agent responses, and finally dismantles the team after the task is complete

Review the code below to understand the workflow and to adjust the instructions/tools as needed for your scenario.

## Utility Functions for Asynchronous Operations

The following cell defines a set of utility functions that support asynchronous operations. These functions include:

- Fetching the current datetime (formatted as JSON)
- Parsing and processing HTML content using BeautifulSoup and Readability
- Retrieving and processing a webpage using aiohttp

These functions are added to the agent’s asynchronous tool set, allowing agents to quickly access common functionalities.

In [24]:
def fetch_current_datetime(format: str = None) -> str:
    """
    Get the current time as a JSON string using an optional format.
    """
    current_time = datetime.datetime.now()
    time_format = format if format else "%Y-%m-%d %H:%M:%S"
    return json.dumps({"current_time": current_time.strftime(time_format)})

def is_probably_readable(soup: BeautifulSoup, min_score: int = 100) -> bool:
    # Placeholder function to determine if the content is readable
    return True

async def readable_text(params: dict) -> str:
    html = params['html']
    url = params['url']
    settings = params['settings']
    options = params.get('options', {})
   
    soup = BeautifulSoup(html, 'html.parser')
   
    if options.get('fallback_to_none') and not is_probably_readable(soup):
        return html
   
    doc = Document(html)
    parsed = doc.summary()
    parsed_title = doc.title()
    readability_soup = BeautifulSoup(parsed, 'html.parser')
   
    if parsed_title:
        title_element = readability_soup.new_tag('h1')
        title_element.string = parsed_title
        readability_soup.insert(0, title_element)
   
    return str(readability_soup)

async def process_html(html: str, url: str, settings: dict, soup: BeautifulSoup) -> str:
    body = soup.body
    if 'remove_elements_css_selector' in settings:
        for element in body.select(settings['remove_elements_css_selector']):
            element.decompose()
   
    simplified_body = body.decode_contents().strip()
   
    simplified = f"""
   
    {soup.title.string if soup.title else ''}
   
   
    {simplified_body}
   
    """
   
    ret = None
    if settings.get('html_transformer') == 'readableText':
        try:
            ret = await readable_text({
                'html': simplified,
                'url': url,
                'settings': settings,
                'options': {'fallback_to_none': False}
            })
        except Exception as error:
            print(f"Processing of HTML failed with error: {error}")
   
        return ret or simplified

async def get_webpage(url: str) -> str:
    """
    Retrieve a webpage using aiohttp and process its HTML content.
    """
    if not url:
        raise SystemError("url cannot be None or empty")
   
    async with aiohttp.ClientSession() as session:
        async with session.get(url, raise_for_status=True) as response:
            response_text = await response.text()
            result = await process_html(response_text, url, 
                                        {'html_transformer': 'readableText', 'readableTextCharThreshold': 500}, 
                                        BeautifulSoup(response_text, 'html.parser'))
            return result

# Statically defined user functions for fast reference with send_email as async but the rest as sync
user_async_function_tools: Set[Callable[..., Any]] = {
    fetch_current_datetime,
    get_webpage,
}

In [None]:
from dotenv import load_dotenv
import os

load_dotenv('deploy.env')

print(os.environ)  # Print all environment variables
print(os.environ.get("PROJECT_CONNECTION_STRING"))




In [21]:
@tracer.start_as_current_span('my_python_tool')
async def multiagent_team(deployment_name: str, subject_context: str, bing_grounding_conn: str, question: str) -> str:
    """
    This asynchronous function creates a team of AI agents and processes the provided question.
   
    Parameters:
    deployment_name: The name of the model deployment.
    subject_context: Context for the question (for example, a documentation domain).
    bing_grounding_conn: The name of the Bing grounding connection in your Azure project.
    question: The question to be answered.
   
    Returns:
    A dictionary containing the last message text and the full conversation history.
    """
    print(os.environ.get("PROJECT_CONNECTION_STRING"))
    # Initialize credentials and create the project client
    async with DefaultAzureCredential() as creds:
        async with AIProjectClient.from_connection_string(
            credential=creds, conn_str=os.environ.get("PROJECT_CONNECTION_STRING"),
        ) as project_client:
            # Configure telemetry
            application_insights_connection_string = await project_client.telemetry.get_connection_string()
            configure_azure_monitor(connection_string=application_insights_connection_string)
       
            # Retrieve the Bing grounding connection
            bing_connection = await project_client.connections.get(connection_name=bing_grounding_conn)
            conn_id = bing_connection.id
   
            # Initialize assistant functions (imported from user_async_functions)
            functions = AsyncFunctionTool(functions=user_async_function_tools)
            code_interpreter = CodeInterpreterTool()
            bing_grounding = BingGroundingTool(conn_id)
   
            # Generate a random name for the agent team
            random_name = ''.join(random.choices(string.ascii_lowercase, k=8))
            agent_team = AgentTeam(random_name, project_client=project_client)
   
            # Define context and character limit
            Context = subject_context
            CharacterLimit = 1000
   
            # Define instructions for each agent
            DOCS_QUESTION_ANSWER_NAME = "DocsQuestionAnswer"
            DOCS_QUESTION_ANSWER_INSTRUCTIONS = f"""
            You are a question answerer for {Context} using documentation site. Use the WebSearch tool to retrieve information from the docs site.
            Prepend 'site:learn.microsoft.com' to any search query to search only the documentation.
            The responses should be from the perspective of {Context} in third person, without addressing the user directly. 
            If no information is found, respond that no information is available. Ensure the answer is no more than {CharacterLimit} characters.
            """

            ANSWER_CHECKER_NAME = "AnswerChecker"
            ANSWER_CHECKER_INSTRUCTIONS = f"""
            You are an answer checker for {Context}. Your responses start with either 'ANSWER CORRECT' or 'ANSWER INCORRECT'.
            Check the answer's accuracy using public sources if needed. Respond with 'ANSWER CORRECT' if the response is accurate, 
            or 'ANSWER INCORRECT - ' if there is any inaccuracy. Ensure the answer is no more than {CharacterLimit} characters.
            """

            LINK_CHECKER_NAME = "LinkChecker"
            LINK_CHECKER_INSTRUCTIONS = """
            You are a link checker. Your responses start with either 'LINKS CORRECT' or 'LINK INCORRECT'.
            Verify the links in the answer and if any does not work, respond with 'LINK INCORRECT - '.
            Otherwise, respond with 'LINKS CORRECT'.
            """

            MANAGER_NAME = "Manager"
            MANAGER_INSTRUCTIONS = """
            You are a manager reviewing the question, answer, and links. If any agent flags an error, reply 'reject' to request a correction.
            Once the answer is correct, reply with 'approve'.
            """

            # Create tool sets for the agents
            toolset1 = AsyncToolSet()
            toolset1.add(functions)
            toolset1.add(code_interpreter)
            toolset1.add(bing_grounding)
   
            toolset2 = AsyncToolSet()
            toolset2.add(functions)
            toolset2.add(bing_grounding)

            # Add agents to the team
            agent_team.add_agent(
                model=deployment_name,
                name=DOCS_QUESTION_ANSWER_NAME,
                instructions=DOCS_QUESTION_ANSWER_INSTRUCTIONS,
                toolset=toolset1,
                can_delegate=False,
            )
   
            agent_team.add_agent(
                model=deployment_name,
                name=ANSWER_CHECKER_NAME,
                instructions=ANSWER_CHECKER_INSTRUCTIONS,
                toolset=toolset2,
                can_delegate=False,
            )
   
            agent_team.add_agent(
                model=deployment_name,
                name=LINK_CHECKER_NAME,
                instructions=LINK_CHECKER_INSTRUCTIONS,
                toolset=toolset2,
                can_delegate=False,
            )
   
            # Assemble the agent team
            await agent_team.assemble_team()
   
            # Process the user request (the question)
            user_request = question
            response_msg = await agent_team.process_request(request=user_request)
   
            # Dismantle the agent team
            await agent_team.dismantle_team()
   
            # Extract the response from the DocsQuestionAnswer agent
            last_message = next(
                (msg for msg in reversed(response_msg) if msg.get("agent") == DOCS_QUESTION_ANSWER_NAME),
                None
            )
            last_message_text = (
                last_message.get("text")
                if isinstance(last_message, dict) and "text" in last_message
                else "Agent team did not return a response."
            )
            response_history = response_msg if isinstance(response_msg, list) else []
            return {"last_message": last_message_text, "history": response_history}


## Running the AI Agent Tool

Next, we run our my_python_tool function with sample inputs. Since the function is asynchronous and performs network calls, we utilize an asyncio event loop. In case you run into the "event loop is already running" error in Jupyter Notebook, consider installing and using the nest_asyncio package (via pip install nest_asyncio).

Below is an example invocation of the tool:

In [20]:
sample_deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o")  # Replace with your deployed model name
sample_context = "Azure Documentation"  # The subject context
sample_bing_conn = os.environ.get("BING_CONNECTION_NAME", "binggrounding")  # The name of your Bing grounding connection
sample_question = "What is Azure OpenAI?"  # A sample question

# Run the tool asynchronously and print the results
result = await multiagent_team(sample_deployment, sample_context, sample_bing_conn, sample_question)
print(json.dumps(result, indent=2))

ValueError: Connection string is required

## Summary

In this notebook we:

1. Imported the required dependencies and loaded environment variables.
2. Defined an asynchronous tool (my_python_tool) that sets up an AI agent team with multiple capabilities (question answering, answer checking, and link verification).
3. Created and integrated various asynchronous utility functions.
4. Executed the tool by providing sample parameters.

This example demonstrates how modular, asynchronous AI agent workflows can be built using Azure OpenAI and PromptFlow. Feel free to adapt and extend the agents and functionalities as needed for your applications.

Happy coding!