Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions examples/mcp-server/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,65 @@
# Sample: MCP Server

### Available Tools
Exposes human-in-the-loop primitives as MCP tools using the official [`mcp`](https://github.com/modelcontextprotocol/python-sdk) SDK. An AI agent can notify users, ask questions, and request approvals — all delivered through Teams using the bot's proactive messaging and Adaptive Card capabilities.

| Tool | Description | Parameters | Example Usage |
|------|-------------|------------|---------------|
| `echo` | Echo back input text | `input: str` | Echo functionality from docs |
| `get_weather` | Get weather for a location | `location: str` | Always returns "sunny" |
| `calculate` | Basic arithmetic operations | `operation: str, a: float, b: float` | add, subtract, multiply, divide |
| `alert` | Send proactive message to Teams user | `user_id: str, message: str` | Human-in-the-loop notifications |
## Setup

```bash
uv sync
cp .env.example .env # fill in CLIENT_ID, CLIENT_SECRET, TENANT_ID
```

## Run

```bash
uv run python src/main.py
```

The bot listens for Teams activity on `POST /api/messages` (port 3978 by default) and serves the MCP endpoint at `http://localhost:3978/mcp`.

## How it works

The sample is split across four modules:

| File | Responsibility |
|------|---------------|
| `app.py` | `App` instance, Teams activity handlers (`on_message`, `on_card_action_execute`) |
| `mcp_tools.py` | `FastMCP` instance, MCP tool definitions (`@mcp.tool()`) |
| `state.py` | Shared in-memory state (conversation map, pending asks, approvals) |
| `main.py` | Entry point — wires the MCP server onto the Teams FastAPI server and starts everything |

Tools are registered with `@mcp.tool()` on the `FastMCP` instance in `mcp_tools.py`. The MCP server is mounted onto the same FastAPI server that handles Teams activity — `app.initialize()` must be called first so `/api/messages` is registered before the catch-all MCP mount at `/`.

The bot handler (`on_message`) captures user replies to pending asks. Approval decisions are captured via `on_card_action_execute` when the user clicks Approve or Reject on the card. Both are surfaced to the MCP client via the polling tools.

All tools return JSON.

## Available Tools

| Tool | Description | Parameters |
|------|-------------|------------|
| `notify` | Send a one-way notification to a Teams user | `user_id, message` |
| `ask` | Ask a Teams user a question; returns a `request_id` | `user_id, question` |
| `get_reply` | Poll for the user's reply to an `ask`; returns `pending` until answered | `request_id` |
| `request_approval` | Send an Approve/Reject card to a Teams user; returns an `approval_id` | `user_id, title, description` |
| `get_approval` | Poll for the approval decision: `pending`, `approved`, or `rejected` | `approval_id` |

## Example agent workflow

1. `request_approval` — agent sends "Can you approve deployment to prod?" to an on-call engineer
2. Engineer clicks **Approve** on the card in Teams
3. `get_approval` — agent reads `"approved"` and proceeds with the deployment

## Limitations

All state (`personal_conversations`, `pending_asks`, `approvals`) is held in memory. A server restart clears everything — pending asks and approvals in flight will be lost. For production use, replace the in-memory dicts with a persistent store (e.g. Redis or a database).

## Testing with MCP Inspector

```bash
npx @modelcontextprotocol/inspector
```

1. Open the URL printed in the terminal — it includes a `MCP_PROXY_AUTH_TOKEN` query param that must be present.
2. Set transport to **Streamable HTTP** and URL to `http://localhost:3978/mcp`, then connect.
3. Call `ask` or `request_approval` with a `user_id`, then respond in Teams and poll for the result.
7 changes: 3 additions & 4 deletions examples/mcp-server/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
[project]
name = "mcp-server"
version = "0.1.0"
description = "a test to test out mcp server capabilities"
description = "Human-in-the-loop MCP tools backed by Teams bot proactive messaging"
readme = "README.md"
requires-python = ">=3.12,<3.15"
dependencies = [
"dotenv>=0.9.9",
"microsoft-teams-apps",
"microsoft-teams-mcpplugin",
"microsoft-teams-devtools"
"mcp>=1.13.1",
"microsoft-teams-apps"
]

[tool.uv.sources]
Expand Down
54 changes: 54 additions & 0 deletions examples/mcp-server/src/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.
"""

import logging

from microsoft_teams.api import (
AdaptiveCardActionMessageResponse,
AdaptiveCardInvokeActivity,
AdaptiveCardInvokeResponse,
MessageActivity,
)
from microsoft_teams.apps import ActivityContext, App
from state import approvals, pending_asks, personal_conversations, user_pending_ask

app = App()
logger = logging.getLogger(__name__)


@app.on_message
async def handle_message(ctx: ActivityContext[MessageActivity]):
"""Capture user replies and cache 1:1 conversation IDs."""
user_id = ctx.activity.from_.id
conversation_id = ctx.activity.conversation.id

if ctx.activity.conversation.conversation_type == "personal":
personal_conversations[user_id] = conversation_id

request_id = user_pending_ask.pop(user_id, None)
if request_id and request_id in pending_asks:
pending_asks[request_id].reply = ctx.activity.text or ""
pending_asks[request_id].status = "answered"
await ctx.reply("Got it, thank you!")
else:
logger.info(
f"Received message from user {user_id} in conversation {conversation_id}, but no pending ask found."
)
await ctx.reply("Hi! I'll let you know if I need anything.")


@app.on_card_action_execute("approval_response")
async def handle_approval_response(ctx: ActivityContext[AdaptiveCardInvokeActivity]) -> AdaptiveCardInvokeResponse:
"""Capture approve/reject decisions from approval cards."""
data = ctx.activity.value.action.data
approval_id = data.get("approval_id")
decision = data.get("decision")
if approval_id and approval_id in approvals and decision in ("approved", "rejected"):
approvals[approval_id] = decision
return AdaptiveCardActionMessageResponse(
status_code=200,
type="application/vnd.microsoft.activity.message",
value="Response recorded",
)
162 changes: 24 additions & 138 deletions examples/mcp-server/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,151 +4,37 @@
"""

import asyncio
from typing import Dict
import logging
import os

from microsoft_teams.ai import Function
from microsoft_teams.api.activities.message.message import MessageActivity
from microsoft_teams.apps import App
from microsoft_teams.apps.routing.activity_context import ActivityContext
from microsoft_teams.devtools import DevToolsPlugin
from microsoft_teams.mcpplugin import McpServerPlugin
from pydantic import BaseModel
from app import app
from mcp_tools import mcp
from microsoft_teams.apps.http.fastapi_adapter import FastAPIAdapter

# Configure MCP server with custom name (as shown in docs)
mcp_server_plugin = McpServerPlugin(
name="test-mcp",
)
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper())
logger = logging.getLogger(__name__)

# Storage for conversation IDs (for proactive messaging)
conversation_storage: Dict[str, str] = {}

async def main() -> None:
# app.initialize() must be called before mounting the MCP app so that
# /api/messages is registered first — FastAPI routes take priority over
# mounted sub-applications, and the MCP mount uses a catch-all path (/).
await app.initialize()

# Echo tool from documentation example
class EchoParams(BaseModel):
input: str
adapter = app.server.adapter
if not isinstance(adapter, FastAPIAdapter):
raise RuntimeError(
f"This example requires FastAPIAdapter, got {type(adapter).__name__}. "
"Do not pass a custom adapter to App()."
)

mcp_http_app = mcp.streamable_http_app()
# Register the MCP lifespan so its startup/shutdown hooks run with the server.
adapter.lifespans.append(mcp_http_app.router.lifespan_context)
adapter.app.mount("/", mcp_http_app)

async def echo_handler(params: EchoParams) -> str:
return f"You said {params.input}"


# Weather tool (existing)
class GetWeatherParams(BaseModel):
location: str


async def get_weather_handler(params: GetWeatherParams):
return f"The weather in {params.location} is sunny"


class CalculateParams(BaseModel):
operation: str
a: float
b: float


async def calculate_handler(params: CalculateParams) -> str:
match params.operation:
case "add":
return str(params.a + params.b)
case "subtract":
return str(params.a - params.b)
case "multiply":
return str(params.a * params.b)
case "divide":
return str(params.a / params.b) if params.b != 0 else "Cannot divide by zero"
case _:
return "Unknown operation"


# Alert tool for proactive messaging (as mentioned in docs)
class AlertParams(BaseModel):
user_id: str
message: str


async def alert_handler(params: AlertParams) -> str:
"""
Send proactive message to user via Teams.
This demonstrates the "piping messages to user" feature from docs.
"""
# 1. Validate if the incoming request is allowed to send messages
if not params.user_id or not params.message:
return "Invalid parameters: user_id and message are required"

# 2. Fetch the correct conversation ID for the given user
conversation_id = conversation_storage.get(params.user_id)
if not conversation_id:
return f"No conversation found for user {params.user_id}. User needs to message the bot first."

# 3. Send proactive message (simplified - in real implementation would use proper proactive messaging)
await app.send(conversation_id=conversation_id, activity=params.message)
return f"Alert sent to user {params.user_id}: {params.message} (conversation: {conversation_id})"


# Register echo tool (from documentation)
mcp_server_plugin.use_tool(
Function(
name="echo",
description="echo back whatever you said",
parameter_schema=EchoParams,
handler=echo_handler,
)
)

# Register weather tool
mcp_server_plugin.use_tool(
Function(
name="get_weather",
description="Get a location's weather",
parameter_schema=GetWeatherParams,
handler=get_weather_handler,
)
)

# Register calculator tool
mcp_server_plugin.use_tool(
Function(
name="calculate",
description="Perform basic arithmetic operations",
parameter_schema=CalculateParams,
handler=calculate_handler,
)
)

# Register alert tool for proactive messaging
mcp_server_plugin.use_tool(
Function(
name="alert",
description="Send proactive message to a Teams user",
parameter_schema=AlertParams,
handler=alert_handler,
)
)

app = App(plugins=[mcp_server_plugin, DevToolsPlugin()])


@app.on_message
async def handle_message(ctx: ActivityContext[MessageActivity]):
"""
Handle incoming messages and store conversation IDs for proactive messaging.
This demonstrates the conversation ID storage mentioned in the docs.
"""
# Store conversation ID for this user (for proactive messaging)
user_id = ctx.activity.from_.id
conversation_id = ctx.activity.conversation.id
conversation_storage[user_id] = conversation_id

print(f"User {ctx.activity.from_} just sent a message!")

# Echo back the message with info about stored conversation
await ctx.reply(
f"You said: {ctx.activity.text}\n\n"
f"📝 Stored conversation ID `{conversation_id}` for user `{user_id}` "
f"(for proactive messaging via MCP alert tool)"
)
await app.start()


if __name__ == "__main__":
asyncio.run(app.start())
asyncio.run(main())
Loading
Loading