## Wealth Management SQL Agent

### Notebook 3: Build SQL Agent with Strands Agents

In this notebook, we'll create the client and portfolio analyst agents using [Strands Agents](https://strandsagents.com/latest/). Strands Agents is a simple yet powerful SDK that takes a model-driven approach to building and running AI agents.
We'll connect the agent to the previously created MCP server, which will provide the necessary tools for the agent to interact with the wealth management data.

In [None]:
import sys
import os
module_path = "../.."
sys.path.append(os.path.abspath(module_path))
from utils.environment_validation import validate_environment, validate_model_access
validate_environment()

In [None]:
required_models = [
    "us.anthropic.claude-3-5-haiku-20241022-v1:0",
    "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
]
validate_model_access(required_models)

In [None]:
import random
import jwt
import json
import os
import subprocess
from pathlib import Path
from functools import partial
from rich import print as rprint
from rich.markdown import Markdown


from mcp import StdioServerParameters, stdio_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.server import FastMCP
from strands import Agent
from strands.tools.mcp import MCPClient
from strands.hooks import HookProvider, HookRegistry
from strands.experimental.hooks import AfterToolInvocationEvent, BeforeToolInvocationEvent

USER_DATA = json.load(Path("user_info.json").open())

# obtain the secret key from the previously generated file
secret_key_path = Path("secret_key.txt")
if secret_key_path.exists():
    secret_key = secret_key_path.read_text().strip()
else:
    raise FileNotFoundError("Please run the 2_build_mcp_server.ipynb notebook to generate the secret key.")


os.environ["SECRET_KEY"] = secret_key

In [None]:
# launch the MCP server in a background process
mcp_server_process = subprocess.Popen(
    [sys.executable, "mcp_server.py"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)
rprint (f"MCP server started in the background on pid {mcp_server_process.pid}")

To use [MCP with Strands Agents](https://strandsagents.com/latest/documentation/docs/user-guide/concepts/tools/mcp-tools/), we have to create a callable such as a function or a class that will produce the HTTP client for the MCP server. The below implementation uses a class that creates a client for a given user and their scopes.


In [None]:
class MCPClientBuilder:
    """
    A callable class that returns an MCP client with the specified user and scopes.
    """
    def __init__(self, user: str | None = None, scopes: list[str] | None = None, mcp_server_uri: str = "http://localhost:8000/mcp"):
        self.user = user
        self.scopes = scopes
        self.mcp_server_uri = mcp_server_uri
    
    def get_user_jwt(self):
        """
        Generate a JWT for the given user with the specified scopes.
        """
        if self.user is None or self.scopes is None:
            return None
        
        client_id = USER_DATA.get(self.user)
        if not client_id:
            raise ValueError(f"User {self.user} not found in user_info.json")
        
        return jwt.encode({"client_id": client_id, "scopes": self.scopes}, os.environ["SECRET_KEY"], algorithm="HS256")

    def __call__(self):
        """
        Create an HTTP client for the MCP server with optional user authentication.
        """
        jwt_token = self.get_user_jwt()
        headers = {"Authorization": f"Bearer {jwt_token}"} if jwt_token else None
        
        return streamablehttp_client(self.mcp_server_uri, headers=headers)

In [None]:
# pick a random user from the user_info.json file
user = random.choice(list(USER_DATA.keys()))
rprint(f"Using user: {user}")

scopes = ["client", "authenticated"]

# create the client for the specified user and scopes
wealth_client_mcp = MCPClient(MCPClientBuilder(user=user, scopes=scopes))

In [None]:
# we can now use the MCP client to interact with the MCP server via the Agent
with wealth_client_mcp:
    # get list of tools
    tools = wealth_client_mcp.list_tools_sync()

    # configure the agent with the tools
    wealth_customer_agent = Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        system_prompt="You are a support agent for a wealth management company. You can answer questions about portfolio holdings, investment strategies, and financial advice. Use the tools provided to assist with your tasks.",
        tools=tools)

    response = str(wealth_customer_agent("Can you provide a listing of my current accounts?"))

In [None]:
with wealth_client_mcp:
    # we can ask a follow-up question
    # note we do not create a new agent here as that will reset the context
    response = str(wealth_customer_agent("Can you provide an analysis of my current portfolio holdings?"))

### Portfolio Analysts Agent
Let's try with a user that has a portfolio_analyst scope

In [None]:
# let's try with a user that has a portfolio_analyst scope
user = random.choice(list(USER_DATA.keys()))
scopes = ["portfolio_analyst", "authenticated"]

portfolio_analyst_mcp = MCPClient(MCPClientBuilder(user=user, scopes=scopes))

Strand Agents support custom hooks, which allow us to extend the functionality of the agent. We'll use this feature to print the generated SQL queries and the results of the SQL queries to the console. This will help us understand how the agent is interacting with the data and what queries it is generating.

In [None]:
class ResultProcessor(HookProvider):
    def register_hooks(self, registry: HookRegistry) -> None:
        registry.add_callback(AfterToolInvocationEvent, self.print_sql_result)
        registry.add_callback(BeforeToolInvocationEvent, self.print_sql_query)

    def print_sql_query(self, event: BeforeToolInvocationEvent) -> None:
        if event.tool_use.get("name") == "run_custom_query":
            # Add formatting to SQL query
            sql_query = event.tool_use.get("input", {}).get("query", "")
            rprint(Markdown(f"### SQL Query\n```sql\n{sql_query}\n```"))

    def print_sql_result(self, event: AfterToolInvocationEvent) -> None:
        if event.tool_use.get("name") == "run_custom_query":
            # Add formatting to query results
            query_result = event.result["content"][0]["text"]
            rprint(Markdown(f"### SQL Query Result\n{query_result}"))


In [None]:
with portfolio_analyst_mcp:
    tools = portfolio_analyst_mcp.list_tools_sync()

    portfolio_analyst_agent = Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        system_prompt="""You are a wealth management support agent. Your job is to assists a portfolio analyst with questions regarding customer portfolios.
        You have access to customer data, portfolio information, and transactions that you can access via a SQL database.
        You can use the provided tool to extract information about the data catalog which you can then use to generate SQL queries to help answer questions and analyze data.
        The SQL query should be ANSI SQL compliant and should not contain any non-ANSI SQL syntax.
        """,
        hooks=[ResultProcessor()],
        tools=tools)

    response = str(portfolio_analyst_agent("How many customers have holdings in JPMorgan Chase?"))

In [None]:
with portfolio_analyst_mcp:
    response = str(portfolio_analyst_agent("Can you provide a list of these customers?"))

In [None]:
with portfolio_analyst_mcp:
    response = str(portfolio_analyst_agent("Who are the top 5 customers by total holdings?"))

In [None]:
with portfolio_analyst_mcp:
    response = str(portfolio_analyst_agent("How about the customers with the largest holdings in technology stocks?"))

In [None]:
# try a more complex question
with portfolio_analyst_mcp:
    response = str(portfolio_analyst_agent("Help find customers whose risk profile does not align with their portfolio holdings."))

When finished experimenting, run the cell below to stop the MCP server.

In [None]:
mcp_server_process.terminate()