#ETL Assist agent

This notebook demonstrates how to build an Data EngineeringAgent Index that uses on behalf of user authentication. In this notebook you:
1. Author an agent using the on behalf of user clients
2. Log the Agent with API Scopes
3. Deploy the Agent to Model Serving

 **_NOTE:_**  This notebook uses LangChain, but AI Agent Framework is compatible with any agent authoring framework, including LlamaIndex or pure Python agents written with the OpenAI SDK.

## Prerequisites

- Build the UC function

In [0]:
%pip install -U -qqqq mlflow-skinny[databricks] langgraph==0.3.4 databricks-langchain databricks-agents uv
dbutils.library.restartPython()

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
jupyter-server 1.23.4 requires anyio<4,>=3.1.0, but you have anyio 4.10.0 which is incompatible.
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


## Define the agent in code
Below we define our agent code in a single cell, enabling us to easily write it to a local Python file for subsequent logging and deployment using the `%%writefile` magic command.

For more examples of tools to add to your agent, see [docs](https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html).

In [0]:
%%writefile agent.py
from typing import Any, Generator, Optional, Sequence, Union
import functools
import os
from typing import Any, Generator, Literal, Optional


import mlflow
from databricks_langchain import ChatDatabricks, VectorSearchRetrieverTool
from databricks_langchain.uc_ai import (
    DatabricksFunctionClient,
    UCFunctionToolkit,
    set_uc_function_client,
)
from langchain_core.language_models import LanguageModelLike
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool
from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt.tool_node import ToolNode
from mlflow.langchain.chat_agent_langgraph import ChatAgentState, ChatAgentToolNode
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)
from databricks.sdk import WorkspaceClient
from databricks.sdk.credentials_provider import ModelServingUserCredentials
from databricks_langchain.genie import GenieAgent
from langgraph.prebuilt import create_react_agent
from pydantic import BaseModel

###################################################
## Create a GenieAgent with access to a Genie Space
###################################################

# TODO add GENIE_SPACE_ID and a description for this space
# You can find the ID in the URL of the genie room /genie/rooms/<GENIE_SPACE_ID>
# Example description: This Genie agent can answer questions based on a database containing tables related to enterprise software sales, including accounts, opportunities, opportunity history, fiscal periods, quotas, targets, teams, and users. Use Genie to fetch and analyze data from these tables by specifying the relevant columns and filters. Genie can execute SQL queries to provide precise data insights based on your questions.

def create_genie_agent():
    user_client = WorkspaceClient(credentials_strategy=ModelServingUserCredentials())

    #TODO: Add the genie space id.
    GENIE_SPACE_ID = "01f0779845cc1c3894c6f962e9a79fdc"
    genie_agent_description = """
    This genie agent can answer questions on SQL transformations.
        - Base all mappings on actual schema and comment info from the UC function calls.
        Follow these instructions when writing SQL queries:
            1. No made-up column names — if uncertain, choose the most likely mapping and explain in Assumptions.
            2. Keep SQL ANSI-compliant and idiomatic for Databricks.
            3. Use clear, readable formatting and aliasing.
        """

    genie_agent = GenieAgent(
        genie_space_id=GENIE_SPACE_ID,
        genie_agent_name="DATA_MAPPING_ETL_AGENT",
        description=genie_agent_description,
        client=user_client,
    )
    return(genie_agent)



mlflow.langchain.autolog()


############################################
# Define your LLM endpoint and system prompt
############################################
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
#TODO: Add the genie space id.
GENIE_SPACE_ID = "01f0779845cc1c3894c6f962e9a79fdc"
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)




###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html
###############################################################################


# You can use UDFs in Unity Catalog as agent tools
# def create_tools():
#     tools = []
#     uc_tool_names = ["abhijeet_rao.ryder_demo.get_productivity_per_role"] # TODO: Add Tools
#     invokers_client = WorkspaceClient(credentials_strategy=ModelServingUserCredentials())
#     invokers_func_client = DatabricksFunctionClient(client=invokers_client)
#     uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names, client=invokers_func_client, filter_accessible_functions=True)
#     tools.extend(uc_toolkit.tools)
#     return tools


#############################
# Define the supervisor agent
#############################

# TODO update the max number of iterations between supervisor and worker nodes
# before returning to the user
MAX_ITERATIONS = 3

genie_agent_description = """
    This genie agent can answer questions on SQL transformations.
        - Base all mappings on actual schema and comment info from the UC function calls.
        Follow these instructions when writing SQL queries:
            1. No made-up column names — if uncertain, choose the most likely mapping and explain in Assumptions.
            2. Keep SQL ANSI-compliant and idiomatic for Databricks.
            3. Use clear, readable formatting and aliasing.
        """
worker_descriptions = {
    "Genie": genie_agent_description,
}

formatted_descriptions = "\n".join(
    f"- {name}: {desc}" for name, desc in worker_descriptions.items()
)

system_prompt = """
You are a SQL transformations supervisor agent.

Your primary purpose is to provide warehouse summaries with varying levels of detail based on the user's request. 
Because these summaries may require complex workflows, you have access to specialized worker agents that can be 
delegated specific tasks.

These workers have access to the underlying data that the user’s questions may relate to. 
For every incoming request, you must decide whether to:
    - Route tasks to one or more worker agents for execution, OR
    - Respond directly to the user with the requested information.

When delegating tasks:
    1. Clearly define the expected output and scope for each worker.
    2. Ensure that the results from workers align with the overall query and user intent.
    3. Combine and summarize worker outputs into a coherent, user-friendly final response.

 \n{formatted_descriptions}

"""
options = ["FINISH"] + list(worker_descriptions.keys())
FINISH = {"next_node": "FINISH"}


def supervisor_agent(state):
    count = state.get("iteration_count", 0) + 1
    if count > MAX_ITERATIONS:
        return FINISH

    class nextNode(BaseModel):
        next_node: Literal[tuple(options)]

    preprocessor = RunnableLambda(
        lambda state: [{"role": "system", "content": system_prompt}] + state["messages"]
    )
    supervisor_chain = preprocessor | llm.with_structured_output(nextNode)
    next_node = supervisor_chain.invoke(state).next_node

    # if routed back to the same node, exit the loop
    if state.get("next_node") == next_node:
        return FINISH
    return {
        "iteration_count": count,
        "next_node": next_node
    }

#######################################
# Define our multiagent graph structure
#######################################


def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {
        "messages": [
            {
                "role": "assistant",
                "content": result["messages"][-1].content,
                "name": name,
            }
        ]
    }


def final_answer(state):
    prompt = """
Using only the content in messages, respond to the previous user question using the answer provided by the other assistants.

If you have the information needed to respond:
    - Greet the user politely (e.g., Hi, Hello) and indicate that you plan to help with their problem.
    - Provide a concise summary of the details from the workers’ outputs in two sentences or fewer.

If the user requests information they do not have permissions for:
    - The assistants will return a null or empty response.
    - In that case, politely inform the user that the information could not be retrieved because they do not have access.
    - Keep the response polite and concise.

Important:
    - Do not tell the user that *you* do not have access to their data.
    - Workers have access to the data they are asking for; your role is to summarize or relay what they return.
"""

    preprocessor = RunnableLambda(
        lambda state: state["messages"] + [{"role": "user", "content": prompt}]
    )
    final_answer_chain = preprocessor | llm
    return {"messages": [final_answer_chain.invoke(state)]}


class AgentState(ChatAgentState):
    next_node: str
    iteration_count: int


def create_multi_agent_graph(genie_agent):
    """Builds and compiles the multi-agent LangGraph."""

    # Partially apply the agent_node function with the created genie_agent
    genie_node = functools.partial(agent_node, agent=genie_agent, name="Genie")

    workflow = StateGraph(AgentState)
    workflow.add_node("Genie", genie_node)
    workflow.add_node("supervisor", supervisor_agent)

    workflow.add_node("final_answer", final_answer)

    workflow.set_entry_point("supervisor")
    
    # We want our workers to ALWAYS "report back" to the supervisor when done
    for worker in worker_descriptions.keys():
        workflow.add_edge(worker, "supervisor")

    # Let the supervisor decide which next node to go
    workflow.add_conditional_edges(
        "supervisor",
        lambda x: x["next_node"],
        {**{k: k for k in worker_descriptions.keys()}, "FINISH": "final_answer"},
    )
    workflow.add_edge("final_answer", END)
    
    return workflow.compile()


class LangGraphChatAgent(ChatAgent):
    def __init__(self):
        """
        Custom constructor that does not require a pre-compiled agent.
        The agent graph will be created dynamically in the predict methods.
        """
        super().__init__()

    def _predict_internal(self, messages: list[ChatAgentMessage]) -> CompiledStateGraph:
        """A helper method to create the agent and graph, and run the stream."""
        # 1. Create the agent with user credentials
        genie_agent = create_genie_agent()

        # 2. Create the graph using the newly created agent
        multi_agent_graph = create_multi_agent_graph(genie_agent)

        # 3. Prepare the request and stream the results
        request = {"messages": self._convert_messages_to_dict(messages)}
        return multi_agent_graph.stream(request, stream_mode="updates")

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        
        stream = self._predict_internal(messages)
        output_messages = []
        for event in stream:
            for node_data in event.values():
                output_messages.extend(
                    ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=output_messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        
        stream = self._predict_internal(messages)
        for event in stream:
            for node_data in event.values():
                # CHANGE THIS LINE:
                # Use .get("messages", []) to safely handle nodes that don't return messages.
                yield from (
                    ChatAgentChunk(**{"delta": msg}) for msg in node_data.get("messages", [])
                )


# Create the agent object, and specify it as the agent object to use when
# loading the agent back for inference via mlflow.models.set_model()
AGENT = LangGraphChatAgent()
mlflow.models.set_model(AGENT)


Overwriting agent.py


## Test the agent

Interact with the agent to test its output. Since this notebook called `mlflow.langchain.autolog()` you can view the trace for each step the agent takes.

Replace this placeholder input with an appropriate domain-specific example for your agent.

In [0]:
dbutils.library.restartPython()

In [0]:
from agent import AGENT

AGENT.predict({"messages": [{"role": "user", 
                             "content": """You are a SQL transformation expert working in Databricks with Unity Catalog.  
Your task is to generate a transformation query to populate the destination table 
`etl_assist_test.test_schema.silver_market_data` from the bronze input tables 
`etl_assist_test.test_schema.prices` and `etl_assist_test.test_schema.securities"""}]})

ChatAgentResponse(messages=[ChatAgentMessage(role='assistant', content='Please provide the chat history so I can assist you with the described information.', name='Genie', id='43dae9dc-13d0-4af9-9b72-69d5c66017e7', tool_calls=None, tool_call_id=None, attachments=None), ChatAgentMessage(role='assistant', content="# SQL Transformation for Silver Market Data\n\nI'll help you create a transformation query to populate your silver_market_data table from the bronze tables.\n\n```sql\n-- SQL transformation to populate etl_assist_test.test_schema.silver_market_data\nCREATE OR REPLACE TABLE etl_assist_test.test_schema.silver_market_data AS\nSELECT \n  p.date,\n  s.ticker,\n  s.name AS security_name,\n  s.sector,\n  s.industry,\n  p.open_price,\n  p.high_price,\n  p.low_price,\n  p.close_price,\n  p.volume,\n  p.adj_close\nFROM etl_assist_test.test_schema.prices p\nJOIN etl_assist_test.test_schema.securities s\n  ON p.ticker = s.ticker;\n```\n\nThis query joins the prices and securities tables on

[Trace(trace_id=tr-6b155a5c5309cb8c4755e601de2de043), Trace(trace_id=tr-2260b0987fc27566fb0f869582a335b5)]

In [0]:
for event in AGENT.predict_stream(
    {"messages": [{"role": "user", "content": "Give me a breakdown of average productivity by employee role in the warehouse"}]}
):
    print(event, "-----------\n")


delta=ChatAgentMessage(role='assistant', content='This question is unrelated to the available database schema. The provided tables contain historical stock market data and securities information, not employee productivity or roles in a warehouse.', name='Genie', id='70e7744e-9d8e-4a2d-8af2-11996b565a88', tool_calls=None, tool_call_id=None, attachments=None) finish_reason=None custom_outputs=None usage=None -----------

delta=ChatAgentMessage(role='assistant', content="I don't have access to any warehouse employee productivity data in the available information. The database schema I have access to only contains historical stock market data and securities information, not employee roles or productivity metrics for a warehouse.", name=None, id='run--526f7120-84a3-4257-8c0e-6b576e42d40e-0', tool_calls=None, tool_call_id=None, attachments=None) finish_reason=None custom_outputs=None usage=None -----------



[Trace(trace_id=tr-7129640c6aa12993e84a94343570228b), Trace(trace_id=tr-b1edc8025a6b577dec13d85da26ac96e)]

### Log the `agent` as an MLflow model
Determine Databricks resources to specify for automatic auth passthrough at deployment time
- **TODO**: If your Unity Catalog tool queries a [vector search index](https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html) or leverages [external functions](https://docs.databricks.com/generative-ai/agent-framework/external-connection-tools.html), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See [docs](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough) for more details.

Log the agent as code from the `agent.py` file. See [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code).

In [0]:
from agent import LLM_ENDPOINT_NAME
print(f"LLM Endpoint Name: {LLM_ENDPOINT_NAME}")

LLM Endpoint Name: databricks-claude-3-7-sonnet


In [0]:
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
import mlflow
from agent import LLM_ENDPOINT_NAME
from databricks_langchain import VectorSearchRetrieverTool
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint
from unitycatalog.ai.langchain.toolkit import UnityCatalogTool
from mlflow.models.auth_policy import AuthPolicy, SystemAuthPolicy, UserAuthPolicy
from pkg_resources import get_distribution

# TODO: Manually include underlying resources if needed. 
resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)]
systemAuthPolicy = SystemAuthPolicy(resources=resources)

# TODO: Manually include the required api scopes for this authorization.
userAuthPolicy = UserAuthPolicy(api_scopes=["sql.statement-execution", "catalog.connections", "iam.current-user:read", "sql.warehouses", "dashboards.genie", "serving.serving-endpoints", "iam.access-control:read", "apps.apps", "mcp.functions"])
with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        name="agent",
        python_model="agent.py",
        pip_requirements=[
            f"databricks-connect=={get_distribution('databricks-connect').version}",
            f"mlflow=={get_distribution('mlflow').version}",
            f"databricks-langchain=={get_distribution('databricks-langchain').version}",
            f"langgraph=={get_distribution('langgraph').version}",
        ],
        auth_policy=AuthPolicy(system_auth_policy=systemAuthPolicy, user_auth_policy=userAuthPolicy)
    )
     

🔗 View Logged Model at: https://e2-demo-field-eng.cloud.databricks.com/ml/experiments/1985842046739892/models/m-aa6d366cccd64488afd9a311f3680e19?o=1444828305810485
2025/08/12 18:51:03 INFO mlflow.pyfunc: Predicting on input example to validate output


## Evaluate the agent with Agent Evaluation

Use Mosaic AI Agent Evaluation to evalaute the agent's responses based on expected responses and other evaluation criteria. Use the evaluation criteria you specify to guide iterations, using MLflow to track the computed quality metrics.
See Databricks documentation ([AWS]((https://docs.databricks.com/aws/generative-ai/agent-evaluation) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-evaluation/)).


To evaluate your tool calls, add custom metrics. See Databricks documentation ([AWS](https://docs.databricks.com/mlflow3/genai/eval-monitor/custom-scorers) | [Azure](https://learn.microsoft.com/azure/databricks/mlflow3/genai/eval-monitor/custom-scorers)).

In [0]:
import mlflow
from mlflow.genai.scorers import RelevanceToQuery, Safety

eval_dataset = [
    {
        "inputs": {
            "messages": [
                {"role": "user", "content": "You are an assistant with access to warehouse employee productivity metrics stored in Unity Catalog. What is the average ProductivityScore of employees?"}
            ]
        }
    }
]

eval_results = mlflow.genai.evaluate(
    data=eval_dataset,
    predict_fn=lambda messages: AGENT.predict({"messages": messages}),
    scorers=[RelevanceToQuery(), Safety()],
)

# Results will be in eval_results or visualized in MLflow UI


2025/08/12 18:51:31 INFO mlflow.models.evaluation.utils.trace: Auto tracing is temporarily enabled during the model evaluation for computing some metrics and debugging. To disable tracing, call `mlflow.autolog(disable=True)`.
2025/08/12 18:51:31 INFO mlflow.genai.utils.data_validation: Testing model prediction with the first sample in the dataset.


Evaluating:   0%|          | 0/1 [Elapsed: 00:00, Remaining: ?] 

Trace(trace_id=tr-4c7aa80cfdde711bb1ccf431644bfb9d)

## Perform pre-deployment validation of the agent
Before registering and deploying the agent, we perform pre-deployment checks via the [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API. See [documentation](https://docs.databricks.com/machine-learning/model-serving/model-serving-debug.html#validate-inputs) for details

In [0]:
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"messages": [{"role": "user", "content": "Hello!"}]},
    env_manager="uv",
)

2025/08/12 18:52:06 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'
2025/08/12 18:52:09 INFO mlflow.utils.virtualenv: Creating a new environment in /tmp/virtualenv_envs/mlflow-fee2d303a0b1fcfbd87af6598d535999f80d04ab with python version 3.10.12 using uv
Using CPython 3.10.12 interpreter at: [36m/usr/bin/python3.10[39m
Creating virtual environment at: [36m/tmp/virtualenv_envs/mlflow-fee2d303a0b1fcfbd87af6598d535999f80d04ab[39m
Activate with: [32msource /tmp/virtualenv_envs/mlflow-fee2d303a0b1fcfbd87af6598d535999f80d04ab/bin/activate[39m
2025/08/12 18:52:10 INFO mlflow.utils.virtualenv: Installing dependencies
[2mUsing Python 3.10.12 environment at: /tmp/virtualenv_envs/mlflow-fee2d303a0b1fcfbd87af6598d535999f80d04ab[0m
[2mResolved [1m3 packages[0m [2min 74ms[0m[0m
[36m[1mDownloading[0m[39m setuptools [2m(1.1MiB)[0m
[36m[1mDownloading[0m[39m pip [2m(2.0MiB)[0m
 [32m[1mDownloading[0m[39m setuptools
 [32m[1mDownloadi

{"messages": [{"role": "assistant", "content": "Hello! Please provide the chat history, and I'll assist you with the described information.", "name": "Genie", "id": "7f0fa540-7874-493e-9462-01701b646167"}, {"role": "assistant", "content": "I notice you've shared instructions that appear to be meant for me rather than a question you want answered. These instructions seem to be describing how I should respond to user questions using information from other assistants.\n\nIf you have a specific question you'd like help with, please feel free to ask, and I'll do my best to assist you directly. I don't need to relay information from other assistants to help with your questions.", "id": "run--296b1093-7f61-4d83-bdf6-95ec1f905553-0"}]}

2025/08/12 18:52:44 INFO mlflow.tracing.export.async_export_queue: Flushing the async trace logging queue before program exit. This may take a while...


## Register the model to Unity Catalog

Update the `catalog`, `schema`, and `model_name` below to register the MLflow model to Unity Catalog.

In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = "etl_assist_test"
schema = "test_schema"
model_name = "etl_assist_agent"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

Successfully registered model 'etl_assist_test.test_schema.etl_assist_agent'.
🔗 Created version '1' of model 'etl_assist_test.test_schema.etl_assist_agent': https://e2-demo-field-eng.cloud.databricks.com/explore/data/models/etl_assist_test/test_schema/etl_assist_agent/version/1?o=1444828305810485


## Deploy the agent

In [0]:
from databricks import agents
agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, tags = {"endpointSource": "docs"})


    Deployment of etl_assist_test.test_schema.etl_assist_agent version 1 initiated.  This can take up to 15 minutes and the Review App & Query Endpoint will not work until this deployment finishes.

    View status: https://e2-demo-field-eng.cloud.databricks.com/ml/endpoints/agents_etl_assist_test-test_schema-etl_assist_agent
    Review App: https://e2-demo-field-eng.cloud.databricks.com/ml/review-v2/2b6655e0c39f49f7920d65afd1d01d6e/chat
    Monitor: https://e2-demo-field-eng.cloud.databricks.com/ml/experiments/1985842046739892?compareRunsMode=TRACES


Deployment(model_name='etl_assist_test.test_schema.etl_assist_agent', model_version='1', endpoint_name='agents_etl_assist_test-test_schema-etl_assist_agent', served_entity_name='etl_assist_test-test_schema-etl_assist_agent_1', query_endpoint='https://e2-demo-field-eng.cloud.databricks.com/serving-endpoints/agents_etl_assist_test-test_schema-etl_assist_agent/served-models/etl_assist_test-test_schema-etl_assist_agent_1/invocations', endpoint_url='https://e2-demo-field-eng.cloud.databricks.com/ml/endpoints/agents_etl_assist_test-test_schema-etl_assist_agent', review_app_url='https://e2-demo-field-eng.cloud.databricks.com/ml/review-v2/2b6655e0c39f49f7920d65afd1d01d6e/chat')

## Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See [docs](https://docs.databricks.com/generative-ai/deploy-agent.html) for details