# Testing Agents v2 in AI Foundry
This notebook creates and runs an agent from AI Foundry

Best way to set it up is with [uv](https://docs.astral.sh/uv/)

1. in `agents` directory run `uv sync` (if python is missing, install it using `uv python install`)
2. in VSCode press [Ctrl+Shift+P] and select `Python: Select Interpreter`, choose the one from `agents` directory: `./agents/.venv/bin/python3.xx`

## Step 1
Create Project client

In [None]:
import sys

sys.path.insert(1, "./src")  # add code directory to path for imports
import asyncio
import atexit
import jsonref
import os
from azure.identity.aio import DefaultAzureCredential, AzureDeveloperCliCredential
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import (
    MCPTool,
    Tool,
    OpenApiAgentTool,
    OpenApiFunctionDefinition,
    OpenApiAnonymousAuthDetails,
)
from openai.types.responses.response_input_param import (
    McpApprovalResponse,
    ResponseInputParam,
)
from dotenv import load_dotenv
from agents_utils import agents_utils


# Load environment variables from the .env file
load_dotenv(override=True)

endpoint = os.environ.get("AZURE_AI_FOUNDRY_CONNECTION_STRING")
deployment_name = os.environ.get("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")
api_version = os.environ.get("AZURE_OPENAI_API_VERSION", None)
tenant_id = os.environ.get("AZURE_TENANT_ID", None)

ai_agent_settings = {
    "endpoint": endpoint,
    "model_deployment_name": deployment_name,
    "api_version": api_version,
}
print(ai_agent_settings)

if os.environ.get("USE_AZURE_DEV_CLI") == "true":
    print("Using Azure Developer CLI Credential")

creds = (
    AzureDeveloperCliCredential(tenant_id=tenant_id)
    if os.environ.get("USE_AZURE_DEV_CLI") == "true"
    else DefaultAzureCredential()
)
await creds.__aenter__()

# uncomment to test token aquisition
# token_test  = creds.get_token("https://ai.azure.com")
# print(f"Token for https://ai.azure.com: {token_test.token[:10]}...")

client = AIProjectClient(endpoint=endpoint, credential=creds)
await client.__aenter__()
agents_client = agents_utils(client)

# info = creds.get_token_info("https://ai.azure.com/.default")
# print(f"Token info: {info}")

# List connections
print()
print("--- Connections ---")
model_gateway_connection_static = None
model_gateway_connection_dynamic = None
ai_gateway_connection_static = None
ai_gateway_connection_dynamic = None

async for connection in client.connections.list():
    print(
        f"Connection ID: {connection.id}, Name: {connection.name}, Type: {connection.type} Default: {connection.is_default}"
    )
    if connection.type == "ModelGateway" and "static" in connection.name.lower():
        model_gateway_connection_static = connection.name
        print(
            f"  - Static Model gateway connection found: {model_gateway_connection_static}"
        )
    if connection.type == "ModelGateway" and "static" not in connection.name.lower():
        model_gateway_connection_dynamic = connection.name
        print(
            f"  - Dynamic Model gateway connection found: {model_gateway_connection_dynamic}"
        )
    if connection.type == "ApiManagement" and "static" in connection.name.lower():
        ai_gateway_connection_static = connection.name
        print(
            f"  - Static API Management gateway connection found: {ai_gateway_connection_static}"
        )
    if connection.type == "ApiManagement" and "static" not in connection.name.lower():
        ai_gateway_connection_dynamic = connection.name
        print(
            f"  - Dynamic API Management gateway connection found: {ai_gateway_connection_dynamic}"
        )

# List agents
print()
print("--- Agents ---")
agents = await agents_client.get_agents()
for agent in agents:
    print(
        f"Agent ID: {agent.id}, Name: {agent.name}, version: {agent.versions.latest.version} Properties: {agent.as_dict()}"
    )


async def cleanup():
    """Close all async clients properly"""
    try:
        await client.close()  # If .close() method exists
    except Exception:
        await client.__aexit__(None, None, None)
    try:
        await creds.close()
    except Exception:
        await creds.__aexit__(None, None, None)
    print("Clients closed")


# Register cleanup for kernel shutdown (optional)


def sync_cleanup():
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            loop.create_task(cleanup())
        else:
            loop.run_until_complete(cleanup())
    except Exception:
        print("Cleanup failed")
        pass


atexit.register(sync_cleanup)

## Run Tests

In [None]:
# Run 10 requests for each gateway connection and capture request IDs
import pandas as pd
from openai import APIStatusError, APIError
from azure.core.exceptions import HttpResponseError

# Collect all gateway connections to test
gateway_connections = {
    "model_gateway_static": model_gateway_connection_static,
    "model_gateway_dynamic": model_gateway_connection_dynamic,
    "ai_gateway_static": ai_gateway_connection_static,
    "ai_gateway_dynamic": ai_gateway_connection_dynamic,
}
deployment_model: str = os.environ.get("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")

# Filter out None connections
active_connections = {k: v for k, v in gateway_connections.items() if v is not None}
print(
    f"Testing {len(active_connections)} gateway connections: {list(active_connections.keys())}"
)

results = []
num_requests = 2
delete_agent_before_create = False

for conn_name, conn_value in active_connections.items():
    print(f"\n--- Testing {conn_name} ({conn_value}) ---")

    for i in range(num_requests):
        request_id = None
        result = None
        error_msg = None

        try:
            agent_name = f"TestAgent_{conn_name}".replace(" ", "-").replace("_", "-")[
                :63
            ]

            # Create agent for this connection
            agent = await agents_client.create_agent(
                name=agent_name,
                model_gateway_connection=conn_value,
                deployment_name=deployment_model,
                delete_before_create=delete_agent_before_create,
            )

            openai_client = client.get_openai_client()

            # Create conversation
            conversation = await openai_client.conversations.create(
                items=[
                    {
                        "type": "message",
                        "role": "user",
                        "content": f"What is 2 + {i}? Reply with just the number.",
                    }
                ],
            )

            # Get response
            response = await openai_client.responses.create(
                conversation=conversation.id,
                extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
                input="",
            )

            # Extract request ID from response
            request_id = response._request_id
            result = "SUCCESS"
            output = response.output_text[:50] if response.output_text else "No output"

            results.append(
                {
                    "Connection": conn_name,
                    "Model": deployment_model,
                    "Request #": i + 1,
                    "Request ID": request_id,
                    "Result": result,
                    "Output": output,
                }
            )
            print(f"  Request {i+1}: SUCCESS - {request_id}. Response: {output}...")

        except (APIStatusError, APIError, HttpResponseError) as e:
            # Extract request ID from error response headers
            request_id = (
                e.response.headers.get("x-request-id", "N/A")
                if hasattr(e, "response")
                else "N/A"
            )
            result = "ERROR"
            error_msg = str(e)

            results.append(
                {
                    "Connection": conn_name,
                    "Model": deployment_model,
                    "Request #": i + 1,
                    "Request ID": request_id,
                    "Result": result,
                    "Output": error_msg,
                }
            )
            print(f"  Request {i+1}: ERROR - {request_id} - {error_msg}")

        except Exception as e:
            result = "ERROR"
            error_msg = str(e)

            results.append(
                {
                    "Connection": conn_name,
                    "Model": deployment_model,
                    "Request #": i + 1,
                    "Request ID": "N/A",
                    "Result": result,
                    "Output": error_msg,
                }
            )
            print(f"  Request {i+1}: ERROR - {error_msg}")

# Create DataFrame and display results
df = pd.DataFrame(results)
print("\n\n=== RESULTS TABLE ===")
print(df.to_string(index=False))

# save results to CSV
df.to_csv("gateway_connection_test_results.csv", index=False)

# Summary statistics
print("\n\n=== SUMMARY ===")
summary = df.groupby(["Connection", "Result"]).size().unstack(fill_value=0)
print(summary)

## Call Foundry directly

In [None]:
openai_client = client.get_openai_client()

response = await openai_client.responses.create(
    model=f"{model_gateway_connection_static}/{deployment_name}",
    input="I am going to Paris, what should I see?",
    instructions="You are a helpful assistant.",
)

print(f"request_id: {response._request_id}")
print(response.output_text)

## Run agent using static gateway

In [None]:
agent = await agents_client.create_agent(
    name="MyV2Agent",
    model_gateway_connection=ai_gateway_connection_dynamic,
    delete_before_create=True,
)
openai_client = client.get_openai_client()
conversation = await openai_client.conversations.create(
    items=[
        {
            "type": "message",
            "role": "user",
            "content": "What is the size of Poland in square miles?",
        }
    ],
)
print(
    f"Created conversation with agent {agent.name} with initial user message (id: {conversation.id})"
)

response = await openai_client.responses.create(
    conversation=conversation.id,
    extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
    input="",
)
print(f"Response id: {response.id}")
print(f"Response output: {response.output_text}")
print(f"Response cost: {response.to_dict()["usage"]}")

## Run agent using dynamic gateway

In [None]:
agent = await agents_client.create_agent(
    name="MyV2Agent",
    model_gateway_connection=model_gateway_connection_dynamic,
    delete_before_create=True,
)
openai_client = client.get_openai_client()
conversation = await openai_client.conversations.create(
    items=[
        {"type": "message", "role": "user", "content": "What is the history of Warsaw?"}
    ],
)
print(
    f"Created conversation with agent {agent.name} with initial user message (id: {conversation.id})"
)

response = await openai_client.responses.create(
    conversation=conversation.id,
    extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
    input="",
)
print(f"Response output: {response.output_text}")
print(f"Response cost: {response.to_dict()}")

## Streaming

In [None]:
agent = await agents_client.create_agent(
    name="MyAgentGpt5Mini",
    model_gateway_connection=model_gateway_connection_dynamic,
    deployment_name="azure-gpt-5-mini",
    delete_before_create=True,
)
openai_client = client.get_openai_client()
conversation = await openai_client.conversations.create(
    items=[
        {
            "type": "message",
            "role": "user",
            "content": "Tell me hi in 10 random languages.",
        }
    ],
)
print(
    f"Created conversation with agent {agent.name} for streaming (id: {conversation.id})"
)

# Create streaming response
response_stream_events = await openai_client.responses.create(
    conversation=conversation.id,
    extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
    input="",
    stream=True,
)

print("Streaming response:")
events_received = []

async for event in response_stream_events:
    previous_event = events_received[-1] if len(events_received) > 0 else None
    if previous_event and previous_event["type"] == event.type:
        previous_event["count"] += 1
    else:
        events_received.append({"type": event.type, "count": 1})

    if event.type == "response.created":
        print(f"Stream response created with ID: {event.response.id}\n")
    elif event.type == "response.output_text.delta":
        print(event.delta, end="", flush=True)
    elif event.type == "response.text.done":
        print("\n\nResponse text done. Access final text in 'event.text'")
    elif event.type == "response.completed":
        print(
            "\n\nResponse completed. Access final text in 'event.response.output_text'"
        )
        print(f"Response cost: {event.response.to_dict()['usage']}")
        print(f"Full response text: {event.response.output_text}")

for index, event in enumerate(events_received):
    print(f"{index} event: {event}")

# print(f"\n\nFull response length: {len(full_response)} characters")

## Tool Calls

In [None]:
# helper for streaming
from openai import AsyncStream
from openai.types.responses import ResponseStreamEvent


async def process_stream(stream: AsyncStream[ResponseStreamEvent]):
    events_received = []
    input_list = []
    response_id = None
    full_response = ""

    async for event in stream:
        previous_event = events_received[-1] if len(events_received) > 0 else None
        if previous_event and previous_event["type"] == event.type:
            previous_event["count"] += 1
        else:
            events_received.append({"type": event.type, "count": 1})

        if event.type == "response.created":
            print(f"Stream response created with ID: {event.response.id}\n")
        elif event.type == "response.output_text.delta":
            print(event.delta, end="", flush=True)
        elif event.type == "response.text.done":
            print("\n\nResponse text done. Access final text in 'event.text'")
        elif event.type == "response.output_item.added":
            print(f"\n\nResponse output item added: {event.item}")
            if event.item.type == "mcp_approval_request":
                input_list.append(
                    McpApprovalResponse(
                        type="mcp_approval_response",
                        approve=True,
                        approval_request_id=event.item.id,
                    )
                )
        elif event.type == "response.output_item.done":
            print(f"\n\nResponse output item done: {event.item}")
        elif event.type == "response.completed":
            response_id = event.response.id
            print(
                "\n\nResponse completed. Access final text in 'event.response.output_text'"
            )
            print(f"Response cost: {event.response.to_dict()['usage']}")
            full_response = event.response.output_text
    return events_received, input_list, response_id, full_response

In [None]:
mcp_tool = MCPTool(
    server_label="ms-learn",
    server_url="https://apim-mqcynshmbfa4u.azure-api.net/learn",
    require_approval="always",
)

# Create tools list with proper typing for the agent definition
tools: list[Tool] = [mcp_tool]

agent = await agents_client.create_agent(
    name="MyAgentGpt5MiniTools",
    # model_gateway_connection=model_gateway_connection_dynamic,
    deployment_name="gpt-5",
    delete_before_create=True,
    instructions="You are a helpful agent that can use MCP tools to assist users. Use the available MCP tools to answer questions and perform tasks.",
    tools=tools,
)
openai_client = client.get_openai_client()
conversation = await openai_client.conversations.create(
    items=[
        {
            "type": "message",
            "role": "user",
            "content": "Find me learning paths on Microsoft Learn about Azure AI services.",
        }
    ],
)
print(
    f"Created conversation with agent {agent.name} for streaming (id: {conversation.id})"
)

# Create streaming response
events_received = []
input_list = []
response_id = None
input = ""
request_count = 0
from openai import NOT_GIVEN

while True:
    response_stream_events = await openai_client.responses.create(
        conversation=conversation.id if response_id is None else NOT_GIVEN,
        previous_response_id=response_id,
        extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
        input=input,
        stream=True,
        # tool_choice='required'
    )

    print(f"Streaming response {request_count}:")
    events_received, input_list, response_id, full_response = await process_stream(
        response_stream_events
    )

    for index, event in enumerate(events_received):
        print(f"{request_count}.{index} event: {event}")

    if len(input_list) == 0:
        break
    input = input_list
    print(
        f"\n\n{request_count} Submitting approval for {len(input_list)} tools, including {input_list[0]['approval_request_id']}"
    )
    request_count += 1

print("Done")

## Non Streaming MCP

In [None]:
mcp_tool = MCPTool(
    server_label="docs",
    # private MCP server
    # server_url="https://aca-mcp-qmjrpcr7svlm2.whitesea-e3999951.norwayeast.azurecontainerapps.io/",
    # public MCP server
    server_url="https://gitmcp.io/Azure/azure-rest-api-specs",
    require_approval="always",
)

with open("weather.json", "r") as f:
    openapi_weather = jsonref.loads(f.read())
    # private openAPI server
    # openapi_weather["servers"] = [{"url": 'https://aca-openapi-qmjrpcr7svlm2.whitesea-e3999951.norwayeast.azurecontainerapps.io'}]
    # public openAPI server
    openapi_weather["servers"] = [{"url": "https://wttr.in"}]

open_api_tool = OpenApiAgentTool(
    openapi=OpenApiFunctionDefinition(
        name="get_weather",
        spec=openapi_weather,
        description="Retrieve weather information for a location.",
        auth=OpenApiAnonymousAuthDetails(),
    )
)

# Create tools list with proper typing for the agent definition
tools: list[Tool] = [mcp_tool]

agent = await agents_client.create_agent(
    name="MyAgentGpt5MiniTools",
    model_gateway_connection=model_gateway_connection_static,
    deployment_name="azure-gpt-5-mini",
    # deployment_name="gpt-4.1",
    delete_before_create=True,
    instructions="You are a helpful agent that can use MCP tools to assist users. Use the available MCP tools to answer questions and perform tasks.",
    tools=tools,
)
openai_client = client.get_openai_client()
conversation = await openai_client.conversations.create(
    items=[
        {"type": "message", "role": "user", "content": "Summarize the readme for me"}
    ],
)
print(
    f"Created conversation with agent {agent.name} for streaming (id: {conversation.id})"
)

response = await openai_client.responses.create(
    conversation=conversation.id,
    extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
    input="",
)

# Process any MCP approval requests that were generated
input_list: ResponseInputParam = []
for item in response.output:
    if item.type == "mcp_approval_request":
        if item.id:
            # Automatically approve the MCP request to allow the agent to proceed
            # In production, you might want to implement more sophisticated approval logic
            input_list.append(
                McpApprovalResponse(
                    type="mcp_approval_response",
                    approve=True,
                    approval_request_id=item.id,
                )
            )

if len(input_list) > 0:
    print("Final input:")
    print(input_list)

    agent = await agents_client.create_agent(
        name="MyAgentGpt5MiniTools",
        # model_gateway_connection=model_gateway_connection_static,
        # deployment_name="azure-gpt-5-mini",
        deployment_name="gpt-4.1",
        delete_before_create=True,
        instructions="You are a helpful agent that can use MCP tools to assist users. Use the available MCP tools to answer questions and perform tasks.",
        tools=tools,
    )

    # Send the approval response back to continue the agent's work
    # This allows the MCP tool to access the GitHub repository and complete the original request
    response = await openai_client.responses.create(
        input=input_list,
        previous_response_id=response.id,
        extra_body={"agent": {"name": agent.name, "type": "agent_reference"}},
    )

# Print result (should contain "Azure")
print(f"==> Result: {response.output_text}")
print(response.to_dict())

## No agents MCP
Run without referencing the agent

In [None]:
mcp_tool = MCPTool(
    server_label="docs",
    # private MCP server
    # server_url="https://aca-mcp-qmjrpcr7svlm2.whitesea-e3999951.norwayeast.azurecontainerapps.io/",
    # public MCP server
    server_url="https://gitmcp.io/Azure/azure-rest-api-specs",
    require_approval="always",
)

with open("weather.json", "r") as f:
    openapi_weather = jsonref.loads(f.read())
    # private openAPI server
    # openapi_weather["servers"] = [{"url": 'https://aca-openapi-qmjrpcr7svlm2.whitesea-e3999951.norwayeast.azurecontainerapps.io'}]
    # public openAPI server
    openapi_weather["servers"] = [{"url": "https://wttr.in"}]

open_api_tool = OpenApiAgentTool(
    openapi=OpenApiFunctionDefinition(
        name="get_weather",
        spec=openapi_weather,
        description="Retrieve weather information for a location.",
        auth=OpenApiAnonymousAuthDetails(),
    )
)

# Create tools list with proper typing for the agent definition
tools: list[Tool] = [mcp_tool]


openai_client = client.get_openai_client()
conversation = await openai_client.conversations.create(
    items=[
        {"type": "message", "role": "user", "content": "Summarize the readme for me"}
    ],
)
print(f"Created conversation with model for streaming (id: {conversation.id})")

response = await openai_client.responses.create(
    conversation=conversation.id,
    # model='gpt-4.1',
    model=f"{model_gateway_connection_static}/{deployment_name}",
    tools=tools,
    input="",
)

# Process any MCP approval requests that were generated
input_list: ResponseInputParam = []
for item in response.output:
    if item.type == "mcp_approval_request":
        if item.id:
            # Automatically approve the MCP request to allow the agent to proceed
            # In production, you might want to implement more sophisticated approval logic
            input_list.append(
                McpApprovalResponse(
                    type="mcp_approval_response",
                    approve=True,
                    approval_request_id=item.id,
                )
            )

if len(input_list) > 0:
    print("Final input:")
    print(input_list)

    # Send the approval response back to continue the agent's work
    # This allows the MCP tool to access the GitHub repository and complete the original request
    response = await openai_client.responses.create(
        input=input_list,
        # model='gpt-4.1',
        model=f"{model_gateway_connection_static}/{deployment_name}",
        tools=tools,
        previous_response_id=response.id,
    )

# Print result (should contain "Azure")
print(f"==> Result: {response.output_text}")
print(response.to_dict())