
## 0) Prerequisites

- Python 3.10+ and a virtual environment
- Azure CLI logged in to the correct tenant: `az login`
- Access to:
  - **Copilot Studio** app in your Power Platform environment
  - **Azure AI Foundry / Agent Service** (AI Project endpoint + model deployment)
- Valid values for the environment variables documented below.


In [1]:

# Install core packages (adjust if your org uses a private feed):
# !pip install --upgrade pip


# Cell 1: Fix typing_extensions and install required packages
!pip install --upgrade typing_extensions>=4.6.0
!pip install --upgrade pip
!pip install --upgrade pydantic>=2.0.0
!pip install --upgrade azure-identity azure-ai-projects
!pip install --upgrade openai>=1.0.0
!pip install --upgrade httpx aiohttp

# If you're using agent-framework, ensure it's compatible:
!pip install agent-framework --upgrade

print("‚úÖ Package updates complete. Please restart your kernel and run again.")



Collecting pip
  Using cached pip-25.3-py3-none-any.whl.metadata (4.7 kB)
Using cached pip-25.3-py3-none-any.whl (1.8 MB)


ERROR: To modify pip, please run the following command:
C:\Users\pablosal\AppData\Local\anaconda3\envs\azure-ai-agent-service-demo\python.exe -m pip install --upgrade pip




ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
agent-framework-core 1.0.0b251028 requires openai<2,>=1.99.0, but you have openai 2.7.0 which is incompatible.
semantic-kernel 1.28.1 requires pydantic!=2.10.0,!=2.10.1,!=2.10.2,!=2.10.3,<2.12,>=2.0, but you have pydantic 2.12.3 which is incompatible.


Collecting aiohttp
  Downloading aiohttp-3.13.2-cp311-cp311-win_amd64.whl.metadata (8.4 kB)
Downloading aiohttp-3.13.2-cp311-cp311-win_amd64.whl (456 kB)
Installing collected packages: aiohttp
  Attempting uninstall: aiohttp
    Found existing installation: aiohttp 3.10.10
    Uninstalling aiohttp-3.10.10:
      Successfully uninstalled aiohttp-3.10.10
Successfully installed aiohttp-3.13.2


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
semantic-kernel 1.28.1 requires pydantic!=2.10.0,!=2.10.1,!=2.10.2,!=2.10.3,<2.12,>=2.0, but you have pydantic 2.12.3 which is incompatible.


Collecting openai<2,>=1.99.0 (from agent-framework-core->agent-framework)
  Using cached openai-1.109.1-py3-none-any.whl.metadata (29 kB)
Using cached openai-1.109.1-py3-none-any.whl (948 kB)
Installing collected packages: openai
  Attempting uninstall: openai
    Found existing installation: openai 2.7.0
    Uninstalling openai-2.7.0:
      Successfully uninstalled openai-2.7.0
Successfully installed openai-1.109.1
‚úÖ Package updates complete. Please restart your kernel and run again.


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
semantic-kernel 1.28.1 requires pydantic!=2.10.0,!=2.10.1,!=2.10.2,!=2.10.3,<2.12,>=2.0, but you have pydantic 2.12.3 which is incompatible.



## 1) Environment Variables

Fill these in your shell or a `.env` (if your runtime loads it automatically).

### Copilot Studio
- `COPILOTSTUDIOAGENT__ENVIRONMENTID`
- `COPILOTSTUDIOAGENT__SCHEMANAME`
- `COPILOTSTUDIOAGENT__AGENTAPPID`
- `COPILOTSTUDIOAGENT__TENANTID`

### Azure AI Foundry (Agent Service)
- `AZURE_AI_PROJECT_ENDPOINT` (e.g., `https://<your-ai-project>.dev.azuresdk.net`)
- `AZURE_AI_MODEL_DEPLOYMENT_NAME` (e.g., `gpt-4o`)
- *(Optional)* `AZURE_AI_EXISTING_AGENT_ID` (to reuse an existing remote agent)


In [1]:

import os

required = [
    "COPILOTSTUDIOAGENT__ENVIRONMENTID",
    "COPILOTSTUDIOAGENT__SCHEMANAME",
    "COPILOTSTUDIOAGENT__AGENTAPPID",
    "COPILOTSTUDIOAGENT__TENANTID",
    "AZURE_AI_PROJECT_ENDPOINT",
    "AZURE_AI_MODEL_DEPLOYMENT_NAME",
]
missing = [k for k in required if not os.getenv(k)]
if missing:
    print("‚ö†Ô∏è Missing env vars:", missing)
else:
    print("‚úÖ All required env vars are present.")


‚úÖ All required env vars are present.



## 2) Imports (run once)


In [2]:

import asyncio
import os
import logging
from typing import Annotated, Awaitable, Callable

from pydantic import Field

# Microsoft Agent Framework (aligns with your provided samples)
from agent_framework import (
    ChatAgent,
    WorkflowBuilder,
    MagenticBuilder,
    MagenticAgentDeltaEvent,
    MagenticAgentMessageEvent,
    MagenticFinalResultEvent,
    MagenticOrchestratorMessageEvent,
    WorkflowOutputEvent,
)

from agent_framework.microsoft import CopilotStudioAgent
from agent_framework.azure import AzureAIAgentClient
from azure.ai.projects.aio import AIProjectClient
from azure.identity.aio import AzureCliCredential

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("agentic-notebook")



## 3) Build the two agents

- **CopilotStudioAgent** ‚Äî ideal for SharePoint/Graph/knowledge/document tasks


In [3]:
def build_copilot_agent() -> CopilotStudioAgent:
    """Create CopilotStudioAgent using environment config."""
    agent = CopilotStudioAgent(
        name="CopilotStudioAgent",
        description="SharePoint/Graph/doc retrieval and Q&A.",
    )
    return agent

In [4]:
# Instantiate both agents
copilot_agent = build_copilot_agent()

In [5]:
query = "What is the capital of France?"
print(f"User: {query}")
result = await copilot_agent.run(query)
print(f"Agent: {result}\n")

User: What is the capital of France?
Agent: The capital of France is Paris. Paris is also the most populous city in France and is renowned for its art, fashion, gastronomy, and culture. It is located in the north-central part of the country along the Seine River and is often referred to as the ‚ÄúCity of Light‚Äù due to its historical and cultural significance[1]‚Äã[2]‚Äã[3].

[1]: https://en.wikipedia.org/wiki/Paris "Paris - Wikipedia"
[2]: https://www.britannica.com/place/Paris "Paris | Definition, Map, Population, Facts, & History | Britannica"
[3]: https://www.mappr.co/capital-cities/france/ "What is the Capital of France? - Mappr"



- **Foundry (Azure AI Agent Service) Agent** ‚Äî ideal for Fabric/Lakehouse/SQL/KPI tasks

In [45]:
from typing import Optional

async def build_foundry_agent(agent_id: Optional[str] = None) -> tuple[ChatAgent, AIProjectClient, AzureCliCredential]:
    """Attach to an existing Azure AI Foundry agent (preferred), or create one ad-hoc.
    Returns the agent and the clients that need to stay alive for the agent to work."""
    endpoint = os.environ["AZURE_AI_PROJECT_ENDPOINT"]
    
    # Create credentials and project client (don't use async with here - we need to keep them alive)
    cred = AzureCliCredential()
    project = AIProjectClient(endpoint=endpoint, credential=cred)
    
    existing_id = os.getenv("AZURE_AI_EXISTING_AGENT_ID")
    if existing_id:
        log.info(f"Using existing Foundry agent: {existing_id}")
        chat_client = AzureAIAgentClient(project_client=project, agent_id=existing_id)
        agent = ChatAgent(
            chat_client=chat_client,
            name="FoundryAgent",
            description="Fabric/Lakehouse/SQL/KPI agent.",
            instructions="Answer with metrics/tables and add source hints."
        )
        return agent, project, cred

    # Create a temporary remote agent
    model = os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"]
    remote = await project.agents.create_agent(
        model=model,
        instructions="You are a Fabric/Data specialist. Prefer concise tables and source notes."
    )
    log.info(f"Created temporary Foundry agent: {remote.id}")

    chat_client = AzureAIAgentClient(project_client=project, agent_id=remote.id)
    agent = ChatAgent(
        chat_client=chat_client,
        name="FoundryAgent",
        description="Fabric/Lakehouse/SQL/KPI agent.",
        instructions="Answer with metrics/tables and add source hints."
    )
    return agent, project, cred

In [46]:
# Test the Foundry agent with proper connection management
agent_id = "asst_BjfbCaFZh2ju28zEJkpWLe0E"  # Optional: use existing agent ID

# Create the foundry agent and get the client objects that need to stay alive
foundry_agent, project_client, credential = await build_foundry_agent()
print("‚úÖ Agent ready:", foundry_agent.name)

try:
    query = "How are you?"
    print(f"User: {query}")
    result = await foundry_agent.run(query)
    print(f"Agent: {result}\n")
    print(f"Result type: {type(result)}")
    
    # If result is a complex object, try to extract text
    if hasattr(result, 'text'):
        print(f"Agent text: {result.text}")
    elif hasattr(result, 'content'):
        print(f"Agent content: {result.content}")
    
except Exception as e:
    print(f"Error running agent: {e}")
    print(f"Error type: {type(e)}")
    import traceback
    traceback.print_exc()
finally:
    # Clean up the connections properly
    print("Cleaning up connections...")
    try:
        await project_client.close()
        await credential.close()
        print("‚úÖ Cleanup completed successfully.")
    except Exception as cleanup_error:
        print(f"‚ö†Ô∏è Cleanup error: {cleanup_error}")
    print("Test completed.")

‚úÖ Agent ready: FoundryAgent
User: How are you?
Agent: I'm here and ready to assist you with data and fabric-related questions! How can I help you today?

Result type: <class 'agent_framework._types.AgentRunResponse'>
Agent text: I'm here and ready to assist you with data and fabric-related questions! How can I help you today?
Cleaning up connections...
‚ö†Ô∏è Cleanup error: object NoneType can't be used in 'await' expression
Test completed.
Agent: I'm here and ready to assist you with data and fabric-related questions! How can I help you today?

Result type: <class 'agent_framework._types.AgentRunResponse'>
Agent text: I'm here and ready to assist you with data and fabric-related questions! How can I help you today?
Cleaning up connections...
‚ö†Ô∏è Cleanup error: object NoneType can't be used in 'await' expression
Test completed.


Azure OpenAI Agent

In [32]:
# pip install azure-identity openai==1.70.0

import typing as t
import time, uuid

from azure.identity import InteractiveBrowserCredential
from openai import OpenAI
from openai._models import FinalRequestOptions
from openai._types import Omit
from openai._utils import is_given

# ---- FABRIC AGENT ENDPOINTS ----
FABRIC_ENDPOINTS = {
    "product_discovery": "https://msitapi.fabric.microsoft.com/v1/workspaces/409e30ce-b2ad-4c80-a54d-d645227322e4/aiskills/672fba68-a7d0-4c85-99e9-9ed6fe8ef1d1/aiassistant/openai",
    "sales_data": "https://msitapi.fabric.microsoft.com/v1/workspaces/409e30ce-b2ad-4c80-a54d-d645227322e4/aiskills/360ef9b6-c087-4e72-ab7c-f157007cda3a/aiassistant/openai", 
    "airport_info": "https://msitapi.fabric.microsoft.com/v1/workspaces/00ae18cb-e789-4d42-be8d-a5b47e524e22/aiskills/1d266d5d-cbbb-4099-9ef2-69fa875e4f89/aiassistant/openai"
}

# ---------- Shared Authentication ----------
SCOPE = "https://api.fabric.microsoft.com/.default"
# If you see 401/403, swap to:
# SCOPE = "https://analysis.windows.net/powerbi/api/.default"

_cred = InteractiveBrowserCredential()

def _get_bearer() -> str:
    return _cred.get_token(SCOPE).token

class FabricOpenAI(OpenAI):
    """
    OpenAI client wrapper that:
      - Uses your Fabric Data Agent Published URL as base_url
      - Injects AAD Bearer token and correlation id
      - Pins 'api-version' as query param
    """
    def __init__(self, base_url: str, api_version: str = "2024-05-01-preview", **kwargs: t.Any) -> None:
        self.api_version = api_version
        default_query = kwargs.pop("default_query", {})
        default_query["api-version"] = self.api_version
        super().__init__(
            api_key="",                     # not used
            base_url=base_url,             # IMPORTANT: specific agent endpoint
            default_query=default_query,
            **kwargs,
        )

    def _prepare_options(self, options: FinalRequestOptions) -> None:
        headers: dict[str, str | Omit] = ({**options.headers} if is_given(options.headers) else {})
        headers["Authorization"] = f"Bearer {_get_bearer()}"
        headers.setdefault("Accept", "application/json")
        headers.setdefault("ActivityId", str(uuid.uuid4()))
        options.headers = headers
        return super()._prepare_options(options)

# Create separate clients for each endpoint
print("üîß Creating Fabric clients...")
fabric_clients = {}
for agent_type, url in FABRIC_ENDPOINTS.items():
    try:
        fabric_clients[agent_type] = FabricOpenAI(base_url=url)
        print(f"‚úÖ Created {agent_type} client: ...{url.split('/')[-3]}")
    except Exception as e:
        print(f"‚ùå Failed to create {agent_type} client: {e}")

print(f"\nüîê Authentication configured with InteractiveBrowserCredential")
print(f"üìä Created {len(fabric_clients)} Fabric clients")

üîß Creating Fabric clients...
‚úÖ Created product_discovery client: ...672fba68-a7d0-4c85-99e9-9ed6fe8ef1d1
‚úÖ Created sales_data client: ...360ef9b6-c087-4e72-ab7c-f157007cda3a
‚úÖ Created airport_info client: ...1d266d5d-cbbb-4099-9ef2-69fa875e4f89

üîê Authentication configured with InteractiveBrowserCredential
üìä Created 3 Fabric clients


In [34]:
def ask_fabric_agent(agent_type: str,
                     question: str,
                     poll_interval_sec: int = 2,
                     timeout_sec: int = 300) -> str:
    """
    Sends a question to the specified Fabric Data Agent and returns the text reply.
    Uses the working beta.assistants API approach with separate clients per endpoint.
    
    Args:
        agent_type: One of 'product_discovery', 'sales_data', 'airport_info'
        question: The user's question
        poll_interval_sec: How often to check run status
        timeout_sec: Max time to wait for completion
        
    Returns:
        The agent's response as text
    """
    if agent_type not in fabric_clients:
        return f"[Error: Unknown agent type '{agent_type}'. Available: {list(fabric_clients.keys())}]"
    
    client = fabric_clients[agent_type]
    print(f"ü§ñ Routing to {agent_type} agent...")
    
    # Create "assistant" placeholder (model is ignored by Fabric agent)
    assistant = client.beta.assistants.create(model="not-used")

    # Create a new thread for this Q&A
    thread = client.beta.threads.create()

    try:
        # Post the user message
        client.beta.threads.messages.create(
            thread_id=thread.id,
            role="user",
            content=question,
        )

        # Start a run (the data agent actually does the work)
        run = client.beta.threads.runs.create(
            thread_id=thread.id,
            assistant_id=assistant.id
        )

        # Poll until terminal state or timeout
        terminal = {"completed", "failed", "cancelled", "requires_action"}
        start = time.time()
        while run.status not in terminal:
            if time.time() - start > timeout_sec:
                raise TimeoutError(f"Run polling exceeded {timeout_sec}s (last status={run.status})")
            time.sleep(poll_interval_sec)
            run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)

        if run.status != "completed":
            return f"[Run ended: {run.status}]"

        # Collect messages in ascending order and concatenate text parts
        msgs = client.beta.threads.messages.list(thread_id=thread.id, order="asc")
        out_chunks = []
        for m in msgs.data:
            if m.role == "assistant":
                for c in m.content:
                    if getattr(c, "type", None) == "text":
                        out_chunks.append(c.text.value)
        return "\n".join(out_chunks).strip() or "[No text content returned]"

    finally:
        # Always attempt cleanup
        try:
            client.beta.threads.delete(thread_id=thread.id)
        except Exception:
            pass

print("‚úÖ Fabric agent helper ready using beta.assistants API (your working approach)")
print("Available agent types:", list(fabric_clients.keys()))

‚úÖ Fabric agent helper ready using beta.assistants API (your working approach)
Available agent types: ['product_discovery', 'sales_data', 'airport_info']


In [35]:
print(ask_fabric_agent("product_discovery", "What data do you have access to?"))

ü§ñ Routing to product_discovery agent...
I have access to data from clinical glucose monitoring studies that compare the accuracy and reliability of two glucose monitoring products‚ÄîProduct A and Product B. The dataset contains the following types of information:

- Glucose ranges (measured in mg/dL)
- MARD percentages (Mean Absolute Relative Difference), which indicate the accuracy of each product
- Accuracy within ¬±20 mg/dL or ¬±20% of reference glucose values for both products
- The total number of readings used for each product in the analysis

This information allows for evaluating, comparing, and analyzing the performance of both CGM (continuous glucose monitoring) devices across different glucose ranges. If you have specific questions or need detailed analysis from this data, please let me know!
I have access to data from clinical glucose monitoring studies that compare the accuracy and reliability of two glucose monitoring products‚ÄîProduct A and Product B. The dataset con

In [36]:
print(ask_fabric_agent("sales_data", "What data do you have access to?"))

ü§ñ Routing to sales_data agent...
I can access two main types of data for answering your queries:

1. Retail Sales Data: This includes information on sales transactions (date, store, product, units sold, revenue), stores (name, location, city, country), products (name, category, brand), and calendar details (year, quarter, month, day). I can provide analyses such as sales trends, top-selling products, store performance, revenue breakdowns, and more.

2. IoT Freezer Data (Ontology Data): This includes details about stores, products, sale events, as well as sensor data from freezers (temperature, humidity, door status) with related metadata (model, timestamp, location, safe temperature, etc.). I can help answer questions about freezer performance, operational issues, safety compliance, and correlations with sales.

If you have a specific question or want to know if I can access a particular type of data, just ask!
I can access two main types of data for answering your queries:

1. Reta

In [37]:
print(ask_fabric_agent("airport_info", "What data do you have access to?"))

ü§ñ Routing to airport_info agent...
I have access to a structured aviation and airline operations dataset. Here‚Äôs a summary of the types of information I can access:

- **Airports** (location, codes, country)
- **Runways** (conditions, measurements)
- **Aircraft** (types, models, tail numbers, maintenance info)
- **Flights** (schedules, routes, flight legs)
- **Airlines** (identifiers, possibly names)
- **Crew** (roles, assignments, training, licensing, seniority)
- **Gates** (locations, assignments)
- **Bookings** (passenger reservations, but not personal details)
- **Ground Services Tasks** (types, timing, status)
- **Baggage** (tracking, screening results/methods)
- **Aircraft Components** (maintenance, installation info)
- **Routes** (origin/destination, distance)

Let me know which area you‚Äôre interested in, or ask a specific question about flights, airports, crew, etc., and I‚Äôll find the latest data for you!
I have access to a structured aviation and airline operations da

In [42]:
from agent_framework.azure import AzureOpenAIChatClient

# ============================================
# 3) Domain tools (agent-level) for the agent
# ============================================

from typing import Literal
from pydantic import Field

_PRODUCT_HINTS = """
Use for: Product catalogs, reviews, inventory, shopping recommendations, product specs,
availability, pricing, comparisons. Be concise and include source ids if returned by the Data Agent.
""".strip()

_SALES_HINTS = """
Use for: Sales KPIs, pipeline, bookings, ARR/MRR, ACV/TCV, revenue by region/product,
win/loss, forecast vs actuals, quota attainment, deals by stage. Be concise and include
source ids if returned by the Data Agent.
""".strip()

_AIRPORT_HINTS = """
Use for: Airport facilities, terminals, services, flight information, amenities,
operational data, passenger services. Be concise and include source ids if available.
""".strip()

def _route(domain: str, q: str, style: str | None = None) -> str:
    # Lightweight convention to help downstream routing if you use it
    prefix = f"[DOMAIN:{domain}]"
    suffix = f"\n[STYLE:{style}]" if style else ""
    return f"{prefix}\n{q}{suffix}"


def product_discovery_qna(
    question: Annotated[str, Field(description="Product discovery Q&A (catalogs, reviews, inventory, shopping, etc.)")],
) -> str:
    """Query enterprise PRODUCT information via Fabric Data Agent.

    Examples:
      - "Show me laptops under $1500 with good battery life and customer reviews"
      - "What are the top-rated products in electronics category this quarter"
      - "Compare iPhone vs Samsung Galaxy specs and pricing"

    When to use:
    """ + _PRODUCT_HINTS
    return ask_fabric_agent("product_discovery", _route("product_discovery", question))


def sales_data_qna(
    question: Annotated[str, Field(description="Sales analytics Q&A (ARR, pipeline, bookings, forecast, revenue, etc.)")],
) -> str:
    """Query enterprise SALES information via Fabric Data Agent.

    Examples:
      - "Top 10 customers by 2024 ARR in North America (with account owner)"
      - "Q4 pipeline by region and product, with stage counts"
      - "Bookings vs target for FY2025 YTD, variance and trend"

    When to use:
    """ + _SALES_HINTS
    return ask_fabric_agent("sales_data", _route("sales_data", question))


def airport_info_qna(
    question: Annotated[str, Field(description="Airport facilities Q&A (terminals, services, amenities, operations, etc.)")],
    summary_style: Annotated[
        Literal["brief", "detailed"],
        Field(description="Preferred response length/style."),
    ] = "brief",
) -> str:
    """Query enterprise AIRPORT information via Fabric Data Agent.

    Examples:
      - "What restaurants and shops are available in Terminal 3 at LAX"
      - "Current flight delays and gate information for Denver International"
      - "Accessibility services and amenities at JFK airport"

    When to use:
    """ + _AIRPORT_HINTS
    return ask_fabric_agent("airport_info", _route("airport_info", question, style=summary_style))


# ==========================================================
# 4) Build your ChatAgent with agent-level tools (like sample)
# ==========================================================

def build_unified_fabric_agent() -> ChatAgent:
    """
    The LLM answers general questions itself, and calls tools for
    PRODUCT/SALES/AIRPORT requests. Tools are available for the entire session.
    """
    return ChatAgent(
        chat_client=AzureOpenAIChatClient(endpoint=os.getenv("AZURE_OPENAI_API_ENDPOINT"),
                                          api_key=os.getenv("AZURE_OPENAI_KEY"),
                                          deployment_name=os.getenv("AZURE_AOAI_CHAT_MODEL_NAME_DEPLOYMENT_ID")),
        name="UnifiedFabricAgent",
        description="Multi-domain agent with access to product, sales, and airport data sources",
        instructions=(
            "You are a helpful assistant with access to specialized data sources. "
            "If the user asks about PRODUCT discovery (catalogs, reviews, inventory, shopping, etc.), "
            "call the `product_discovery_qna` tool. "
            "If the user asks about SALES analytics (ARR, pipeline, bookings, forecast, revenue, etc.), "
            "call the `sales_data_qna` tool. "
            "If the user asks about AIRPORT information (terminals, services, amenities, operations), "
            "call the `airport_info_qna` tool. "
            "You can use multiple tools if the question spans multiple domains. "
            "Otherwise, answer directly using your general knowledge."
        ),
        tools=[product_discovery_qna, sales_data_qna, airport_info_qna],  # agent-level tools (no re-auth per use)
    )


# Create the unified agent
unified_agent = build_unified_fabric_agent()
print("‚úÖ Unified Fabric Agent created successfully!")
print(f"   Name: {unified_agent.name}")
print("   üìä Data sources: Product Discovery + Sales Analytics + Airport Information")
print("   üéØ Domain-specific tools with intelligent routing")
print("   üîó Compatible with all Agent Framework orchestration patterns")

‚úÖ Unified Fabric Agent created successfully!
   Name: UnifiedFabricAgent
   üìä Data sources: Product Discovery + Sales Analytics + Airport Information
   üéØ Domain-specific tools with intelligent routing
   üîó Compatible with all Agent Framework orchestration patterns


In [43]:
# ============ COMPREHENSIVE TESTING ============
print("=== Testing Unified Fabric Agent with Auto-Routing ===\n")

# Test 1: Product Discovery (should route to product_discovery_tool)
print("üõçÔ∏è TEST 1: Product Discovery")
q1 = "What product is better?"
print(f"User: {q1}")
try:
    r1 = await unified_agent.run(q1)
    print(f"Agent: {r1}\n")
except Exception as e:
    print(f"Error: {e}\n")

# Test 2: Sales Data (should route to sales_data_tool)  
print("üí∞ TEST 2: Sales Analytics")
q2 = "What were our top 2 revenue performing products?"
print(f"User: {q2}")
try:
    r2 = await unified_agent.run(q2)
    print(f"Agent: {r2}\n")
except Exception as e:
    print(f"Error: {e}\n")

# Test 3: Airport Info (should route to airport_info_tool)
print("‚úàÔ∏è TEST 3: Airport Information")  
q3 = "What is the best perfroamcne plane?"
print(f"User: {q3}")
try:
    r3 = await unified_agent.run(q3)
    print(f"Agent: {r3}\n")
except Exception as e:
    print(f"Error: {e}\n")

# Test 4: Multi-domain question (might use multiple tools)
print("üîÄ TEST 4: Multi-Domain Query")
q4 = "I'm traveling through Denver airport and want to buy a laptop there. What are my options?"
print(f"User: {q4}")
try:
    r4 = await unified_agent.run(q4)
    print(f"Agent: {r4}\n")
except Exception as e:
    print(f"Error: {e}\n")

# Test 5: General question (should answer directly without tools)
print("üí≠ TEST 5: General Knowledge")
q5 = "What's the difference between revenue and profit?"
print(f"User: {q5}")
try:
    r5 = await unified_agent.run(q5)
    print(f"Agent: {r5}\n")
except Exception as e:
    print(f"Error: {e}\n")

print("‚úÖ All tests completed!")

=== Testing Unified Fabric Agent with Auto-Routing ===

üõçÔ∏è TEST 1: Product Discovery
User: What product is better?
Agent: Could you please specify which products you are comparing? This way, I can provide more detailed information or direct you to a resource that can help.

üí∞ TEST 2: Sales Analytics
User: What were our top 2 revenue performing products?
Agent: Could you please specify which products you are comparing? This way, I can provide more detailed information or direct you to a resource that can help.

üí∞ TEST 2: Sales Analytics
User: What were our top 2 revenue performing products?
ü§ñ Routing to sales_data agent...
ü§ñ Routing to sales_data agent...
Agent: The top 2 revenue-performing products are:

1. **Classic Vanilla Pint** - This product generated the highest revenue.
2. **Dark Chocolate Pint** - This product followed closely behind in revenue performance.

‚úàÔ∏è TEST 3: Airport Information
User: What is the best perfroamcne plane?
Agent: The top 2 revenue-pe

### Using Unified Fabric Agent in Orchestration Patterns

Now that we have a unified agent with multiple Fabric data sources, let's integrate it with the orchestration patterns:


## 4) Pattern A ‚Äî Deterministic Router (baseline)

A simple Python function classifies the query and calls **one** agent.
Great for low-latency, auditable routing.


In [None]:

def route(query: str) -> str:
    q = query.lower()
    
    # Unified Fabric Agent - structured data queries
    if any(w in q for w in ["sales", "revenue", "kpi", "product", "catalog", "glucose", "airport", "analytics", "metrics", "performance"]):
        return "unified_fabric"
    
    # Foundry Agent - SharePoint and private documents  
    if any(w in q for w in ["policy", "ppt", "pdf", "doc", "sharepoint", "site", "document", "wiki", "private", "internal"]):
        return "foundry"
        
    # Copilot Agent - real-time, internet, general knowledge (default)
    return "copilot"


In [None]:

# Test deterministic routing with all 3 agents
print("=== Testing Deterministic 3-Agent Routing ===\n")

test_queries = [
    ("What is the current weather in Paris?", "copilot"),
    ("Show me the latest company policy document on remote work", "foundry"), 
    ("What were our Q3 sales numbers by region?", "unified_fabric"),
    ("Find the SharePoint site for project Phoenix", "foundry"),
    ("What laptops do we have in stock under $2000?", "unified_fabric"),
    ("What's happening in the news today?", "copilot")
]

for user_query, expected in test_queries:
    choice = route(user_query)
    print(f"Query: {user_query}")
    print(f"Routed to: {choice} (expected: {expected}) {'‚úÖ' if choice == expected else '‚ùå'}")
    
    # Route to the appropriate agent
    if choice == "copilot":
        result = await copilot_agent.run(user_query)
    elif choice == "foundry": 
        result = await foundry_agent.run(user_query)
    else:  # unified_fabric
        result = await unified_agent.run(user_query)
    
    print(f"Response: {str(result)[:150]}{'...' if len(str(result)) > 150 else ''}\n")

print("üéØ All 3 agents working in deterministic routing!")


Router choice: copilot

=== ANSWER ===
Here are the Q3 revenue KPIs for Fabric:

- Total Quarterly Revenue: The sum of all revenue generated by Fabric in Q3.
- Revenue Growth Rate: Percentage increase or decrease in revenue compared to the previous quarter.
- Sales by Product/Service: Breakdown of revenue from Fabric individually.
- Customer Acquisition Revenue: Revenue generated from new customers during Q3.
- Recurring Revenue: Revenue from ongoing subscriptions or service agreements.
- Average Deal Size: Average revenue per transaction or customer.
- Revenue by Region: Revenue segmented by geographic location, if applicable.

For presenting these KPIs, it is recommended to use visual dashboards, graphs, and charts for clarity. PowerPoint KPI templates are suggested for a professional and editable presentation deck.

To access the underlying SharePoint presentation deck, please refer to your organization‚Äôs SharePoint site or contact your administrator for the direct link, as the sp


## 5) Pattern B ‚Äî Router ‚Üí Verifier (adds reliability)

- A **Router** ChatAgent exposes both agents as **tools** via `.as_tool()` and decides which to call.
- A **Verifier** runs after the Router to fact-check and normalize.


In [None]:
def build_router(copilot: CopilotStudioAgent, foundry: ChatAgent, unified_fabric: ChatAgent) -> ChatAgent:
    ask_copilot = copilot.as_tool(
        name="ask_copilot", 
        description="Use for real-time Q&A, internet searches, general knowledge, current events."
    )
    ask_foundry = foundry.as_tool(
        name="ask_foundry",
        description="Use for SharePoint/Graph/document queries (policies, decks, PDFs, private data)."
    )
    ask_unified_fabric = unified_fabric.as_tool(
        name="ask_unified_fabric",
        description="Use for structured data queries (sales analytics, product catalogs, glucose data, airport info)."
    )
    return ChatAgent(
        chat_client=AzureOpenAIChatClient(endpoint=os.getenv("AZURE_OPENAI_API_ENDPOINT"),
                                          api_key=os.getenv("AZURE_OPENAI_KEY"),
                                          deployment_name=os.getenv("AZURE_AOAI_CHAT_MODEL_NAME_DEPLOYMENT_ID")),
        name="Router",
        description="Routes tasks to the appropriate specialized agent.",
        instructions=(
            "You are the Router. Pick exactly ONE tool based on the request:\n"
            "‚Ä¢ ask_copilot ‚Üí real-time Q&A, internet access, general knowledge, current events\n"
            "‚Ä¢ ask_foundry ‚Üí SharePoint documents, policies, PDFs, private enterprise data\n" 
            "‚Ä¢ ask_unified_fabric ‚Üí structured data (sales KPIs, product catalogs, glucose metrics, airport data)\n"
            "If unclear, ask ONE clarifying question, then choose the most appropriate tool."
        ),
        tools=[ask_copilot, ask_foundry, ask_unified_fabric]
    )

def build_verifier() -> ChatAgent:
    return ChatAgent(
        chat_client=AzureOpenAIChatClient(endpoint=os.getenv("AZURE_OPENAI_API_ENDPOINT"),
                                          api_key=os.getenv("AZURE_OPENAI_KEY"),
                                          deployment_name=os.getenv("AZURE_AOAI_CHAT_MODEL_NAME_DEPLOYMENT_ID")),
        name="Verifier",
        description="Final verification and output agent.",
        instructions=(
            "You are the final step in a workflow. Review the previous response and return "
            "the verified, clean final answer. Keep all important data and metrics intact. "
            "Format the response clearly and return it as the final output."
        ),
    )

router_agent = build_router(copilot_agent, foundry_agent, unified_agent)
verifier_agent = build_verifier()


In [None]:
# Production-Ready 3-Agent System with Router + Reflection
from agent_framework import (
    AgentExecutorRequest, 
    AgentExecutorResponse,
    WorkflowContext,
    Executor,
    Role,
    handler,
    ChatMessage,
)

class RouterExecutor(Executor):
    """Production router that selects appropriate agent and returns result"""
    
    def __init__(self, copilot: CopilotStudioAgent, foundry: ChatAgent, unified: ChatAgent):
        super().__init__(id="router")
        self.copilot = copilot
        self.foundry = foundry  
        self.unified = unified
    
    @handler
    async def handle_request(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
        """Route request to appropriate agent based on content"""
        query = request.messages[-1].text.lower()
        
        # Determine routing based on query content
        if any(w in query for w in ["sales", "revenue", "product", "mard", "glucose", "airport", "analytics"]):
            print("üéØ Router: Selecting Unified Fabric Agent")
            result = await self.unified.run(request.messages)
        elif any(w in query for w in ["sharepoint", "document", "policy", "pdf", "private"]):
            print("üéØ Router: Selecting Foundry Agent")
            result = await self.foundry.run(request.messages) 
        else:
            print("üéØ Router: Selecting Copilot Agent")
            result = await self.copilot.run(request.messages)
        
        # Send result to next executor
        response = AgentExecutorResponse(
            executor_id=self.id,
            agent_run_response=result,
            full_conversation=request.messages + result.messages
        )
        await ctx.send_message(response)

class VerifierExecutor(Executor):
    """Verifies and outputs final result"""
    
    def __init__(self):
        super().__init__(id="verifier")
    
    @handler 
    async def verify_and_output(self, response: AgentExecutorResponse, ctx: WorkflowContext[str]) -> None:
        """Verify response quality and yield final output"""
        result_text = response.agent_run_response.text
        
        # Simple verification - check if we have substantial content
        if len(result_text) > 20 and any(word in result_text.lower() for word in ['product', 'data', 'weather', 'information']):
            print("‚úÖ Verifier: Response approved")
            await ctx.yield_output(result_text)
        else:
            print("‚ö†Ô∏è Verifier: Response needs improvement")
            await ctx.yield_output(f"Response received but quality check needed: {result_text}")

# Create production workflow
def build_production_workflow():
    """Build production Router ‚Üí Verifier workflow"""
    router = RouterExecutor(copilot_agent, foundry_agent, unified_agent)
    verifier = VerifierExecutor()
    
    workflow = (
        WorkflowBuilder()
        .set_start_executor(router)
        .add_edge(router, verifier)
        .build()
    )
    return workflow

print("‚úÖ Production workflow classes defined")


# Test Production Workflow
async def test_production_workflow():
    """Test the production workflow with Router + Verifier"""
    production_workflow = build_production_workflow()
    
    test_cases = [
        "What's the product A MARD average comparison with Product B?",
        "Show me our Q3 sales performance by product category", 
        "What's the current temperature in New York?",
    ]
    
    print("=== Production Workflow Testing ===\n")
    
    for query in test_cases:
        print(f"üîç Testing: {query}")
        
        try:
            # Create request message
            request = AgentExecutorRequest(
                messages=[ChatMessage(role=Role.USER, text=query)],
                should_respond=True
            )
            
            # Run workflow and collect output
            async for event in production_workflow.run_stream(request):
                if isinstance(event, WorkflowOutputEvent):
                    result = event.data
                    print(f"‚úÖ Final Result: {result[:150]}{'...' if len(result) > 150 else ''}")
                    break
            
        except Exception as e:
            print(f"‚ùå Error: {e}")
        
        print()
    
    print("üéâ Production workflow testing complete!")


# Final Production Test - Simple and Clean
print("=== Final Production Agent System ===\n")

async def production_agent_system(query: str) -> str:
    """Production-ready 3-agent system with intelligent routing"""
    print(f"üîç Query: {query}")
    
    # Route based on query content
    q_lower = query.lower()
    if any(w in q_lower for w in ["sales", "revenue", "product", "mard", "glucose", "airport", "analytics"]):
        print("üéØ ‚Üí Unified Fabric Agent (Structured Data)")
        result = await unified_agent.run(query)
    elif any(w in q_lower for w in ["sharepoint", "document", "policy", "pdf", "private"]):
        print("üéØ ‚Üí Foundry Agent (Documents)")
        result = await foundry_agent.run(query)
    else:
        print("üéØ ‚Üí Copilot Agent (Real-time/Internet)")
        result = await copilot_agent.run(query)
    
    # Extract final text
    final_text = result.text if hasattr(result, 'text') else str(result)
    return final_text

# Test the production system
test_queries = [
    "What's the product A MARD average comparison with Product B?",
    "Show me our top revenue products this quarter",
    "What's the weather today in Paris?",
]

for query in test_queries:
    try:
        answer = await production_agent_system(query)
        print(f"‚úÖ Answer: {answer[:100]}{'...' if len(answer) > 100 else ''}")
    except Exception as e:
        print(f"‚ùå Error: {e}")
    print()

print("üéâ Production system ready for deployment!")

‚úÖ Production workflow classes defined
=== Final Production Agent System ===

üîç Query: What's the product A MARD average comparison with Product B?
üéØ ‚Üí Unified Fabric Agent (Structured Data)
ü§ñ Routing to product_discovery agent...
ü§ñ Routing to product_discovery agent...
‚úÖ Answer: The average MARD (Mean Absolute Relative Difference) percentage for Product A is 12.6%, while for Pr...

üîç Query: Show me our top revenue products this quarter
üéØ ‚Üí Unified Fabric Agent (Structured Data)
‚úÖ Answer: The average MARD (Mean Absolute Relative Difference) percentage for Product A is 12.6%, while for Pr...

üîç Query: Show me our top revenue products this quarter
üéØ ‚Üí Unified Fabric Agent (Structured Data)
ü§ñ Routing to sales_data agent...
ü§ñ Routing to sales_data agent...
‚úÖ Answer: Our top revenue products this quarter are:

1. **Classic Vanilla Pint** - Generated the highest reve...

üîç Query: What's the weather today in Paris?
üéØ ‚Üí Copilot Agent (Real-tim

In [66]:
await production_agent_system("What's the weather today in Paris?")

üîç Query: What's the weather today in Paris?
üéØ ‚Üí Copilot Agent (Real-time/Internet)


'Today in Paris, you can expect sunny weather with mild temperatures. The high will be around 17¬∞C and the low about 9¬∞C. Winds are gentle, coming from the south-southwest and south-southeast at 10‚Äì16 km/h. Humidity is around 66%, and there is no expected rain. Visibility is good at 10 km, and the UV index is low, peaking at 1.5 midday. Sunrise is at 07:43 and sunset at 17:25, giving about 9 hours and 42 minutes of daylight[1].\n\n[1]: https://www.easeweather.com/europe/france/ile-de-france/paris/today "Today\'s Weather in Paris - Hourly Forecast and Conditions"'


## 6) Pattern C ‚Äî Magentic Orchestrator (manager loop + streaming)

The **Magentic** manager coordinates agents over several rounds, can ask clarifiers, and streams events.
Here we wrap the **Router** inside a Magentic workflow.


In [None]:

async def run_with_magentic(router: ChatAgent):
    workflow = (
        MagenticBuilder()
        .participants(router=router)
        .with_standard_manager(
            max_round_count=6,
            max_stall_count=2,
            max_reset_count=1,
        )
        .build()
    )

    task = "Pull last quarter revenue KPIs from Fabric, then link me to the source file in SharePoint."
    print(f"Task: {task}\n")

    last_stream = None
    open_line = False
    async for event in workflow.run_stream(task):
        if isinstance(event, MagenticOrchestratorMessageEvent):
            print(f"[ORCH] {getattr(event.message, 'text', '')}\n" + "-"*40)
        elif isinstance(event, MagenticAgentDeltaEvent):
            if last_stream != event.agent_id or not open_line:
                if open_line:
                    print()
                print(f"[STREAM:{event.agent_id}] ", end="", flush=True)
                last_stream = event.agent_id
                open_line = True
            if event.text:
                print(event.text, end="", flush=True)
        elif isinstance(event, MagenticAgentMessageEvent):
            if open_line:
                print(" (final)\n")
                open_line = False
            msg = event.message
            if msg and (msg.text or "").strip():
                print(f"[{event.agent_id}] {msg.role.value}\n{msg.text}\n" + "-"*40)
        elif isinstance(event, MagenticFinalResultEvent):
            print("\n===== FINAL RESULT =====")
            if event.message:
                print(event.message.text)
        elif isinstance(event, WorkflowOutputEvent):
            pass
    if open_line:
        print()

# Run Magentic
await run_with_magentic(router_agent)



## 7) Hybrid Strategy (recommended)

1. **Deterministic** route first (fast, auditable).
2. **Verifier** always.
3. If low confidence or ambiguous intent ‚Üí re-run via **Magentic**.


In [None]:

def simple_confidence_probe(text: str) -> float:
    """Dummy probe. Replace with your scoring/heuristic (e.g., answer length, presence of sources)."""
    return 0.7 if text and len(text) > 40 else 0.4

async def hybrid_answer(query: str):
    # Step 1: deterministic routing with 3 agents
    choice = route(query)
    print(f"üéØ Routing to: {choice}")
    
    if choice == "copilot":
        base = await copilot_agent.run(query)
    elif choice == "foundry":
        base = await foundry_agent.run(query) 
    else:  # unified_fabric
        base = await unified_agent.run(query)
        
    base_text = getattr(base, "text", str(base))

    # Step 2: verifier (Router ‚Üí Verifier workflow with 3-agent routing)
    verified = await router_verifier_workflow.run(query)
    verified_text = getattr(verified, "text", str(verified))

    # Step 3: fallback to Magentic if low confidence
    conf = simple_confidence_probe(verified_text)
    if conf < 0.6:
        print("Low confidence ‚Üí escalating to Magentic...")
        await run_with_magentic(router_agent)
        return

    print("‚úÖ Hybrid done (deterministic + verifier with 3 agents).")
    print("\n=== ANSWER ===\n", verified_text)

await hybrid_answer("Summarize Q3 KPIs from Fabric and attach the latest roadmap deck from SharePoint.")


In [None]:
# ============ COMPREHENSIVE 3-AGENT SYSTEM TEST ============
print("=== Testing Complete 3-Agent Orchestration System ===\n")

# Test the 3-agent router workflow
print("üîÑ Testing Router ‚Üí Verifier with 3 Agents:")

test_cases = [
    {
        "query": "What are the current COVID-19 statistics worldwide?",
        "expected_agent": "copilot",
        "description": "Real-time internet data"
    },
    {
        "query": "Find the employee handbook PDF in SharePoint",
        "expected_agent": "foundry", 
        "description": "Private document retrieval"
    },
    {
        "query": "Show me our top 5 products by sales revenue this quarter",
        "expected_agent": "unified_fabric",
        "description": "Structured sales analytics"
    },
    {
        "query": "What flights are available from LAX to JFK with meal service?",
        "expected_agent": "unified_fabric",
        "description": "Airport and flight data"
    }
]

print(f"Testing {len(test_cases)} scenarios...\n")

for i, case in enumerate(test_cases, 1):
    print(f"üìã Test {i}: {case['description']}")
    print(f"   Query: {case['query']}")
    print(f"   Expected: {case['expected_agent']}")
    
    try:
        # Test router workflow
        result = await router_verifier_workflow.run(case['query'])
        result_text = getattr(result, "text", str(result))
        
        if result_text and len(result_text) > 20:
            print(f"   ‚úÖ Success: {result_text[:100]}{'...' if len(result_text) > 100 else ''}")
        else:
            print(f"   ‚ö†Ô∏è Short response: {result_text}")
            
    except Exception as e:
        print(f"   ‚ùå Error: {e}")
    
    print()

print("üéâ 3-Agent System Architecture Complete!")
print("   ü§ñ Copilot Agent: Real-time Q&A + Internet access")
print("   üìÅ Foundry Agent: SharePoint + Private documents") 
print("   üìä Unified Fabric Agent: Structured data (Sales + Products + Glucose + Airport)")
print("   üîÑ Router: Intelligent routing between all 3 agents")
print("   ‚úÖ Verifier: Quality assurance and fact-checking")
print("   üéØ Magentic: Advanced orchestration with streaming")

## üéØ Complete 3-Agent Architecture Summary

Your agentic RAG system now includes **three specialized agents** with intelligent routing:

### **Agent Specializations:**

1. **ü§ñ Copilot Agent** 
   - **Purpose**: Real-time Q&A, internet access, general knowledge
   - **Best for**: Current events, weather, news, general questions
   - **Data sources**: Live internet data, general knowledge base

2. **üìÅ Foundry Agent**
   - **Purpose**: SharePoint and private enterprise data
   - **Best for**: Company policies, documents, PDFs, internal wikis
   - **Data sources**: SharePoint sites, private document repositories

3. **üìä Unified Fabric Agent**
   - **Purpose**: Structured enterprise data analytics
   - **Best for**: Sales KPIs, product catalogs, glucose data, airport info
   - **Data sources**: 3 Fabric Data Agent endpoints
     - Sales analytics and revenue metrics
     - Product discovery and catalog data  
     - Airport facilities and operational info

### **Orchestration Patterns Available:**

- **Deterministic Router**: Fast keyword-based routing
- **Router ‚Üí Verifier**: Adds reliability and fact-checking
- **Magentic Orchestrator**: Advanced multi-round coordination with streaming
- **Hybrid Strategy**: Combines all approaches with confidence scoring

### **Key Benefits:**

‚úÖ **Single authentication** across all Fabric endpoints  
‚úÖ **Intelligent routing** based on question content  
‚úÖ **Domain expertise** for different data types  
‚úÖ **Scalable architecture** for adding more agents  
‚úÖ **Production-ready** with proper error handling and cleanup


## 8) Observability (hooks)

The Agent Framework integrates with OpenTelemetry; below shows simple logging hooks
you can adapt to emit traces/metrics to Azure Monitor/App Insights.


In [None]:

from agent_framework import FunctionInvocationContext, ChatContext

async def log_tool_calls(ctx: FunctionInvocationContext, nxt: Callable[[FunctionInvocationContext], Awaitable[None]]):
    print(f"[Tool] {ctx.function.name} ‚Üí args={ctx.arguments}")
    await nxt(ctx)
    print(f"[Tool] {ctx.function.name} ‚úì")

async def log_llm(ctx: ChatContext, nxt: Callable[[ChatContext], Awaitable[None]]):
    print(f"[LLM] messages={len(ctx.messages)}")
    await nxt(ctx)
    print("[LLM] response ‚úì")

# Example: attach to router
router_agent.function_middlewares = [log_tool_calls]
router_agent.chat_middlewares = [log_llm]
print("‚úÖ Middleware attached to router (logs to stdout).")



## 9) Next steps

- Replace dummy routing with a small classifier (NL ‚Üí label) if needed.
- Add **SharePoint** and **Fabric** real tools/connections behind Copilot/Foundry agents.
- Persist **threads** to your DB (Cosmos DB) and index passages in **Azure AI Search**.
- Enable **OpenTelemetry** exporters to App Insights for traces and metrics.
