In [0]:
from databricks.sdk import WorkspaceClient
from databricks_mcp import DatabricksMCPClient
import nest_asyncio

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

databricks_cli_profile = "adb-0000000000000000"

# catalog/schema pair
catalogSchemaPair = "mcp/default"
server_url = f"https://adb-0000000000000000.0.azuredatabricks.net/api/2.0/mcp/functions/{catalogSchemaPair}"

In [0]:
# Connect to workspace and MCP server for your catalog / schema
workspaceClient = WorkspaceClient(profile=databricks_cli_profile)
mcp = DatabricksMCPClient(server_url=server_url, workspace_client=workspaceClient)


In [0]:
# for ep in workspaceClient.serving_endpoints.list():
#     print(ep.name, ep.state)
#     print()

In [0]:
# mcp.list_tools()

### Setting Up an Agent

In [0]:
import json

async def call_llm(
    workspaceClient: WorkspaceClient,
    messages: list[dict],
    tool_schemas: list[dict] | None,
    endpoint_name: str = "databricks-meta-llama-3-3-70b-instruct",
    *,
    tool_choice: str | dict = "auto",
    temperature: float = 2,
    max_tokens: int = 512,
    stream: bool = False,
):
    # create an OpenAI-compatible client wrapper that lets you call your Databricks serving endpoints
    openai_client = workspaceClient.serving_endpoints.get_open_ai_client()

    resp = openai_client.chat.completions.create(
        model=endpoint_name,         # this is the serving endpoint name
        messages=messages,
        tools=tool_schemas or None,  # omit if you don't need tools
        tool_choice=tool_choice,     # "auto" | "none" | {"type":"function","function":{"name":"..."}}
        temperature=temperature,
        max_tokens=max_tokens,
        stream=stream,
        n=1
    )

    # If you didn't request streaming:
    choice = resp.choices[0].message
    # Tool call?
    if getattr(choice, "tool_calls", None):
        tool_call = choice.tool_calls[0]
        args = tool_call.function.arguments
        try:
            args = json.loads(args) if isinstance(args, str) else args
        except Exception:
            pass
        return {"tool_name": tool_call.function.name, "arguments": args}

    # Otherwise return model text
    return {"content": choice.content}


In [0]:
# Discover tools (your UC functions + built-ins like system__ai__python_exec if in scope)
# tools = {t.name: t for t in mcp.list_tools()}

# tools = [
#     {
#         "type": "function",
#         "function": {
#             "name": t.name,
#             "description": t.description,
#             "parameters": t.inputSchema, #parameters_json_schema,  # or equivalent schema property
#         }
#     }
#     for t in mcp.list_tools()
# ]

In [0]:
import json
from collections.abc import Mapping

def normalize_openai_parameters(schema) -> dict:
    """
    Return a valid OpenAI 'function.parameters' JSON Schema:
    - Always an object with a 'properties' dict.
    - Tolerates None/str/partial schemas.
    """
    # None or empty -> empty object schema
    if not schema:
        return {"type": "object", "properties": {}, "additionalProperties": False}

    # If given as JSON string, parse it
    if isinstance(schema, str):
        try:
            schema = json.loads(schema)
        except Exception:
            # Fallback to simple string arg if it isn't valid JSON
            return {
                "type": "object",
                "properties": {"arg": {"type": "string", "description": "Tool input"}},
                "required": ["arg"],
                "additionalProperties": False,
            }

    # If it's not a mapping, wrap as a single 'arg' parameter
    if not isinstance(schema, Mapping):
        return {
            "type": "object",
            "properties": {"arg": {"type": "string", "description": "Tool input"}},
            "required": ["arg"],
            "additionalProperties": False,
        }

    # If it's already an object schema, ensure 'properties' exists
    if schema.get("type") == "object":
        schema.setdefault("properties", {})
        # Defensive cleanup for fields some validators dislike
        schema.pop("nullable", None)
        return schema

    # Some schemas omit 'type' but include 'properties' -> assume object
    if "properties" in schema and "type" not in schema:
        schema["type"] = "object"
        return schema

    # Non-object root types (array/string/number/etc.) -> wrap as single arg
    return {
        "type": "object",
        "properties": {
            "arg": {
                # Try to keep original constraints if possible; otherwise fallback to string
                **({"type": schema.get("type")} if isinstance(schema.get("type"), str) else {"type": "string"}),
                "description": "Tool input",
            }
        },
        "required": ["arg"],
        "additionalProperties": False,
    }

# Build OpenAI tools array safely
tools = []
for t in mcp.list_tools():
    params = normalize_openai_parameters(getattr(t, "inputSchema", None))
    # Ensure name/description are strings
    name = str(getattr(t, "name", "unnamed_tool"))[:64]
    desc = str(getattr(t, "description", "No description provided."))
    tools.append({
        "type": "function",
        "function": {
            "name": name,
            "description": desc,
            "parameters": params,
        }
    })


In [0]:
# --- 3) One interaction step: route to tools when requested by the LLM ---
async def run_agent(workspaceClient: WorkspaceClient, user_question: str):
    # Provide the LLM with tool options (names + params) from MCP

    llm_reply = await call_llm(
        workspaceClient,
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a helpful contract and contractor assistant. "
                    "You must call Unity Catalog functions whenever possible. "
                )
            },
            {
                "role": "user",
                "content": user_question
            },
        ],
        tool_schemas=tools,
        endpoint_name="databricks-meta-llama-3-3-70b-instruct"
    )

    # If the model wants to call a tool, execute via MCP
    if isinstance(llm_reply, dict) and "tool_name" in llm_reply:
        tool_name = llm_reply["tool_name"]
        args = llm_reply.get("arguments", {})
        result = mcp.call_tool(tool_name, args)
        return "".join([c.text for c in result.content])

    # Otherwise return the modelâ€™s text
    return llm_reply


In [0]:
# import asyncio

# userQuestion = "what is 13 plus ten?"
# userQuestion = "would you know what is 14 + 10?"

# userQuestion = "Show all contracts for Acme Construction that are closed."
# userQuestion = "List Active contracts awarded in 2024"
# userQuestion = "Does any contractor have any orphanned contracts?"
# userQuestion = "What is the last contract awarded to Acme Construction?"
# userQuestion = "What are the valid status codes?"
# userQuestion = "Does contractor 'Acme Construction' have any Closed contracts?"
# userQuestion = "What is the largest total value for each contractor?" ## WORKS
# userQuestion = "Are there any planned contracts for contractor Acme Construction?"
# userQuestion = "List all 'Skyline Engineering' contracts have a status of 'OnHold'?"
# userQuestion = "which contractors have their phone number as empty or none?"
# userQuestion = "Any contractors with incomplete phone number field?"
# userQuestion = "How many contracts are there for contractor 'Acme Construction'?"

# userQuestion = "What is the total value of each of the contractors?"
# userQuestion = "what is the last contract awarded in 2024?"
# userQuestion = "What is the last contract for contractor 'Acme Construction' awarded in 2025?"
# userQuestion = "what are the contracts for Acme Construction?"
# userQuestion = "What is the total value of all contracts awarded to Acme Construction?"

# userQuestion = "list contracts that are closed"
# userQuestion = "list all contracts that are closed"
# userQuestion = "list all of Acme Construction contracts that have the status of active"
# userQuestion = "list all of 'Acme Construction' contracts that are planned"
# userQuestion = "what is the total value of closed Acme Construction contracts?"
userQuestion = "list any abandoned contracts"

# result = asyncio.run(run_agent(workspaceClient, userQuestion))
# print(result)

# If run_agent is async, use await directly in a notebook cell
result = await run_agent(workspaceClient, userQuestion)

# Parse the result string into a dictionary
if result:
    try:
        result_dict = json.loads(result) if isinstance(result, str) else result
    except json.JSONDecodeError:
        result_dict = {}
else:
    result_dict = {}

display(result_dict)

# ## Parse the result string into a dictionary
# if result:  # not None and not empty string
#     try:
#         result_dict = json.loads(result) if isinstance(result, str) else result
#     except json.JSONDecodeError:
#         # print("Result is not valid JSON:", result)
#         result_dict = {}
# else:
#     result_dict = {}

# display(result_dict)
