In [None]:
# First, we add all the packages we need to the system.
# This is what enables us to run all the code below.

import os
from dotenv import load_dotenv
from contextlib import AsyncExitStack
from typing import Optional, Set

import semantic_kernel as sk
from semantic_kernel.agents import AzureAIAgent, AzureAIAgentThread
from semantic_kernel.contents import ChatMessageContent, FunctionCallContent, FunctionResultContent
from semantic_kernel.connectors.mcp import MCPStdioPlugin
from azure.identity import DefaultAzureCredential

In [None]:
# Here, we connect to the Azure AI Foundry, and get the necessary environment variables
# including the project endpoint, model deployment name, and tenant ID.

load_dotenv()
project_endpoint = os.getenv("PROJECT_ENDPOINT")
model_deployment = os.getenv("MODEL_DEPLOYMENT_NAME")
nonProdTenantId = os.getenv("TENANT_ID")

os.system(f"az config set core.login_experience_v2=off")
os.system(f"az login --tenant {nonProdTenantId}")

subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID_FOR_AGENTS")
print (f"Subscription ID - AZURE_SUBSCRIPTION_ID_FOR_AGENTS : {subscription_id}")

os.system(f"az account set --subscription {subscription_id}")
credential = DefaultAzureCredential()

In [None]:
# This block of code connects to the MCP Server and connects it to Semantic Kernel.
async with MCPStdioPlugin(
    name="InventoryManagementMCPPlugin",
    description="Inventory Management MCP Plugin",
    command="python",
    args=["server.py"],
) as inventory_plugin:
    kernel = sk.Kernel()
    kernel.add_plugin(inventory_plugin)

In [None]:
# This creates our AI agent in Semantic Kernel, which will use the Azure AI Foundry
# to process requests and interact with the MCP plugin.

client = AzureAIAgent.create_client(credential=credential, endpoint=project_endpoint) 

# Create agent with the tools
agent_definition = await client.agents.create_agent(
    model=model_deployment,
    instructions="You are a helpful agent that can use MCP tools to assist users."
)

agent = AzureAIAgent(
    name="InventoryManagementAgent",
    client=client,
    definition=agent_definition,
    kernel=kernel,
)

thread: AzureAIAgentThread | None = None
await inventory_plugin.connect()

In [None]:
async def handle_intermediate_messages(message: ChatMessageContent) -> None:
    for item in message.items or []:
        if isinstance(item, FunctionResultContent):
            print(f"Function Result:> {item.result} for function: {item.name}")
        elif isinstance(item, FunctionCallContent):
            print(f"Function Call:> {item.name} with arguments: {item.arguments}")
        else:
            print(f"{item}")

In [None]:
# Asking the first question to the agent.
# The agent will use the MCP plugin to get the current product inventory levels.

TASK = "What are the current product inventory levels?"


print(f"😃 User: '{TASK}'")
async for response in agent.invoke(
    messages=TASK, thread=thread, on_intermediate_message=handle_intermediate_messages
):
    print(f"# Agent: {response}")
    thread = response.thread


In [None]:
# Asking the second question to the agent.
# The agent will use the MCP plugin to get the recommended products for clearance.

TASK = "Which products would you recommend for clearance?"


print(f"😃 User: '{TASK}'")
async for response in agent.invoke(
    messages=TASK, thread=thread, on_intermediate_message=handle_intermediate_messages
):
    print(f"# Agent: {response}")
    thread = response.thread


In [None]:
# Asking the third question to the agent.
# The agent will use the MCP plugin to get the best selling products this week.

TASK = "What are the best selling products this week?"


print(f"😃 User: '{TASK}'")
async for response in agent.invoke(
    messages=TASK, thread=thread, on_intermediate_message=handle_intermediate_messages
):
    print(f"# Agent: {response}")
    thread = response.thread


In [None]:
# Asking the fourth question to the agent.
# The agent will use the MCP plugin to get the recommended products to restock.

TASK = "Which products would you recommend to restock?"

print(f"😃 User: '{TASK}'")
async for response in agent.invoke(
    messages=TASK, thread=thread, on_intermediate_message=handle_intermediate_messages
):
    print(f"# Agent: {response}")
    thread = response.thread


In [None]:
# Asking the fifth question to the agent.
# The agent will use the MCP plugin to get product details.

TASK = "Can you tell me about Grandma's Boysenberry Spread?"

print(f"😃 User: '{TASK}'")
async for response in agent.invoke(
    messages=TASK, thread=thread, on_intermediate_message=handle_intermediate_messages
):
    print(f"# Agent: {response}")
    thread = response.thread

In [None]:
# Asking the sixth question to the agent.
# The agent will use the MCP plugin answer the users question.

# Users, come up with your own question here!
TASK = "Can..."

print(f"😃 User: '{TASK}'")
async for response in agent.invoke(
    messages=TASK, thread=thread, on_intermediate_message=handle_intermediate_messages
):
    print(f"# Agent: {response}")
    thread = response.thread

In [None]:
# Asking the last to the agent.
# The agent will use the MCP plugin to generate a marketing campaign for products that are under clearance.

TASK = "Can you generate a marketing campaign for products that are under clearance?"

print(f"😃 User: '{TASK}'")
async for response in agent.invoke(
    messages=TASK, thread=thread, on_intermediate_message=handle_intermediate_messages
):
    print(f"# Agent: {response}")
    thread = response.thread

In [None]:
await thread.delete() if thread else None
await inventory_plugin.close() if inventory_plugin else None