## Build an APP with Langgraph that uses UC functions as tools

Prerequisite: Run this notebook on databricks platform.

In [0]:
%pip install unitycatalog-ai[databricks] unitycatalog-langchai langchain_openai langgraph mlflow
%restart_python

Collecting git+https://github.com/unitycatalog/unitycatalog.git#subdirectory=ai/core
  Cloning https://github.com/unitycatalog/unitycatalog.git to /tmp/pip-req-build-efagquet
  Running command git clone --filter=blob:none --quiet https://github.com/unitycatalog/unitycatalog.git /tmp/pip-req-build-efagquet
  Resolved https://github.com/unitycatalog/unitycatalog.git to commit 92532036aed87098eb0490886185d7bed5f4727b
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Building wheels for collected packages: unitycatalog-ai
  Building wheel for unitycatalog-ai (pyproject.toml): started
  Building wheel for unitycatalog-ai (pyproject.toml): finished with status 'done'
  Created wheel for unitycatalog-ai: filename=unit

#### Create a DatabricksFunctionClient and set as the default one
Use Serverless here as an example

In [0]:
from unitycatalog.ai.core.client import set_uc_function_client
from unitycatalog.ai.core.databricks import DatabricksFunctionClient

client = DatabricksFunctionClient()

# sets the default uc function client
set_uc_function_client(client)

# replace with your own catalog and schema
CATALOG = "ml"
SCHEMA = "serena_test"

#### Create UC functions

In [0]:
def execute_python_code(code: str) -> str:
  """
  Executes the given python code and returns its stdout.
  Remember the code should print the final result to stdout.

  Args:
    code: Python code to execute. Remember to print the final result to stdout.
  """
  import sys
  from io import StringIO
  
  stdout = StringIO()
  sys.stdout = stdout
  exec(code)
  return stdout.getvalue()

function_info = client.create_python_function(func=execute_python_code, catalog=CATALOG, schema=SCHEMA, replace=True)
python_execution_function_name = function_info.full_name

# test execution
client.execute_function(python_execution_function_name, {"code": "print(1+1)"})

FunctionExecutionResult(error=None, format='SCALAR', value='2\n', truncated=None)

In [0]:
ask_ai_function_name = f"{CATALOG}.{SCHEMA}.ask_ai"
sql_body = f"""CREATE OR REPLACE FUNCTION {ask_ai_function_name}(question STRING COMMENT 'question to ask')
RETURNS STRING
COMMENT 'answer the question using Meta-Llama-3.1-70B-Instruct model'
RETURN SELECT ai_gen(question)
"""
client.create_function(sql_function_body=sql_body)
result = client.execute_function(ask_ai_function_name, {"question": "What is MLflow?"})
result.value

'MLflow is an open-source platform for managing the end-to-end machine learning (ML) lifecycle. It was developed by Databricks and is designed to help data scientists and machine learning engineers manage the complexities of building, deploying, and maintaining ML models.\n\nMLflow provides a set of tools and APIs that enable users to:\n\n1. **Track experiments**: Record and manage the parameters, metrics, and artifacts of ML experiments, making it easier to reproduce and compare results.\n2. **Manage models**: Store, version, and deploy ML models, including support for multiple frameworks such as TensorFlow, PyTorch, and Scikit-learn.\n3. **Deploy models**: Deploy ML models to various environments, including cloud, on-premises, and edge devices.\n4. **Monitor and manage models**: Track the performance of deployed models, detect data drift, and retrain models as needed.\n\nMLflow consists of four main components:\n\n1. **MLflow Tracking**: Records and manages the parameters, metrics, a

In [0]:
summarization_function_name = f"{CATALOG}.{SCHEMA}.summarize"
sql_body = f"""CREATE OR REPLACE FUNCTION {summarization_function_name}(text STRING COMMENT 'content to parse', max_words INT COMMENT 'max number of words in the response, must be non-negative integer, if set to 0 then no limit')
RETURNS STRING
COMMENT 'summarize the content and limit response to max_words'
RETURN SELECT ai_summarize(text, max_words)
"""
client.create_function(sql_function_body=sql_body)
# test execution
client.execute_function(summarization_function_name, {"text": result.value, "max_words": 20})

FunctionExecutionResult(error=None, format='SCALAR', value='MLflow: End-to-End Machine Learning Lifecycle Management', truncated=None)

In [0]:
translate_function_name = f"{CATALOG}.{SCHEMA}.translate"
sql_body = f"""CREATE OR REPLACE FUNCTION {translate_function_name}(content STRING COMMENT 'content to translate', language STRING COMMENT 'target language')
RETURNS STRING
COMMENT 'translate the content to target language, currently only english <-> spanish translation is supported'
RETURN SELECT ai_translate(content, language)
"""
client.create_function(sql_function_body=sql_body)
# test execution
client.execute_function(translate_function_name, {"content": "Hello", "language": "es"})

FunctionExecutionResult(error=None, format='SCALAR', value='Hola', truncated=None)

#### Create tools

In [0]:
python_execution_function_name = f"{CATALOG}.{SCHEMA}.execute_python_code"
ask_ai_function_name = f"{CATALOG}.{SCHEMA}.ask_ai"
summarization_function_name = f"{CATALOG}.{SCHEMA}.summarize"
translate_function_name = f"{CATALOG}.{SCHEMA}.translate"

In [0]:
from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit

toolkit = UCFunctionToolkit(
    function_names=[
        python_execution_function_name,
        ask_ai_function_name,
        summarization_function_name,
        translate_function_name,
    ]
)
tools = toolkit.tools
tools

[UnityCatalogTool(name='ml__serena_test__execute_python_code', description='Executes the given python code and returns its stdout. Remember the code should print the final result to stdout.', args_schema=<class 'unitycatalog.ai.core.utils.function_processing_utils.ml__serena_test__execute_python_code__params'>, func=<function UCFunctionToolkit.uc_function_to_langchain_tool.<locals>.func at 0x7f566fe069e0>, uc_function_name='ml.serena_test.execute_python_code', client_config={'warehouse_id': None, 'profile': None}),
 UnityCatalogTool(name='ml__serena_test__ask_ai', description='answer the question using Meta-Llama-3.1-70B-Instruct model', args_schema=<class 'unitycatalog.ai.core.utils.function_processing_utils.ml__serena_test__ask_ai__params'>, func=<function UCFunctionToolkit.uc_function_to_langchain_tool.<locals>.func at 0x7f566fe07520>, uc_function_name='ml.serena_test.ask_ai', client_config={'warehouse_id': None, 'profile': None}),
 UnityCatalogTool(name='ml__serena_test__summarize'

#### Use the tools in Langgraph

In [0]:
import os

os.environ["OPENAI_API_KEY"] = "<REPLACE WITH YOUR OWN OPENAI_API_KEY>"

In [0]:
import mlflow

# enable mlflow autologging so tracing is enabled
# traces are super useful for debugging
mlflow.langchain.autolog()

In [0]:
from typing import Literal

from langchain_openai.chat_models import ChatOpenAI
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools)
model = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)

# Define the function that determines whether to continue or not
def should_continue(state: MessagesState) -> Literal["tools", END]:
    messages = state['messages']
    last_message = messages[-1]
    # If the LLM makes a tool call, then we route to the "tools" node
    if last_message.tool_calls:
        return "tools"
    # Otherwise, we stop (reply to the user)
    return END


# Define the function that calls the model
def call_model(state: MessagesState):
    messages = state['messages']
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}


# Define a new graph
workflow = StateGraph(MessagesState)

# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")

# We now add a conditional edge
workflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "agent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("tools", 'agent')

app = workflow.compile()

In [0]:
final_state = app.invoke(
    {
        "messages": [
            {
                "role": "user",
                "content": "What is MLflow? Keep the response concise and rely in Spanish. Try using as much tools as possible",
            }
        ]
    },
)
response = final_state["messages"][-1].content
response

'MLflow es una plataforma de código abierto para gestionar el ciclo de vida completo del aprendizaje automático. Fue desarrollado por Databricks y está diseñado para ayudar a los científicos de datos y a los ingenieros a manejar las complejidades de la creación, implementación y mantenimiento de modelos de aprendizaje automático.\n\nMLflow proporciona herramientas y API que permiten a los científicos de datos:\n\n1. **Seguimiento de experimentos**: Realizar un seguimiento y gestionar experimentos, incluida la optimización de hiperparámetros.\n2. **Gestión de modelos**: Almacenar, versionar e implementar modelos.\n3. **Implementación de modelos**: Implementar modelos en varios entornos, incluidos la nube y locales.\n4. **Monitoreo y análisis**: Monitorear y analizar el rendimiento de los modelos.\n\nSus principales componentes son:\n\n1. **Seguimiento de MLflow**: Para rastrear experimentos.\n2. **Proyectos de MLflow**: Empaquetar código en proyectos reutilizables.\n3. **Modelos de MLfl

Trace(request_id=tr-b9ddcdb3bc42480294709255dcea80c7)

In [0]:
final_state = app.invoke(
    {
        "messages": [
            {
                "role": "user",
                "content": f"Remember to always try using tools. Can you convert following explantion to English? {response}",
            }
        ]
    },
)
final_state["messages"][-1].content

'Here is the translated explanation in English:\n\nMLflow is an open-source platform for managing the end-to-end machine learning lifecycle. It was developed by Databricks and is designed to help data scientists and engineers manage the complexities of creating, deploying, and maintaining machine learning models.\n\nMLflow provides tools and APIs that allow data scientists to:\n\n1. **Experiment tracking**: Track and manage experiments, including hyperparameter optimization.\n2. **Model management**: Store, version, and deploy models.\n3. **Model deployment**: Deploy models in various environments, including cloud and on-premises.\n4. **Monitoring and analysis**: Monitor and analyze model performance.\n\nIts main components are:\n\n1. **MLflow tracking**: To track experiments.\n2. **MLflow projects**: Package code into reusable projects.\n3. **MLflow models**: Register models.\n4. **MLflow model serving**: Deploy models in different environments.\n\nThe benefits include improved collab

Trace(request_id=tr-efebb1ba511e49ee90f2085e4b54a9e6)

In [0]:
final_state = app.invoke(
    {"messages": [{"role": "user", "content": "What is 2**10?"}]},
)
final_state["messages"][-1].content

'The result of \\(2^{10}\\) is 1024.'

Trace(request_id=tr-52b8a7439b2a4344ae00868cf6af1e8a)

#### Log the model using MLflow

Save the model to a separate file `app.py` using magic command `%%writefile`.

In [0]:
%%writefile app.py
from mlflow.models import set_model
from unitycatalog.ai.core.client import set_uc_function_client
from unitycatalog.ai.core.databricks import DatabricksFunctionClient
from langchain_openai.chat_models import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, MessagesState
from typing import Annotated, Literal, TypedDict
from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit

client = DatabricksFunctionClient()

# sets the default uc function client
set_uc_function_client(client)

# replace with your own catalog and schema
CATALOG = "ml"
SCHEMA = "serena_test"

python_execution_function_name = f"{CATALOG}.{SCHEMA}.execute_python_code"
ask_ai_function_name = f"{CATALOG}.{SCHEMA}.ask_ai"
summarization_function_name = f"{CATALOG}.{SCHEMA}.summarize"
translate_function_name = f"{CATALOG}.{SCHEMA}.translate"
toolkit = UCFunctionToolkit(
    function_names=[
        python_execution_function_name,
        ask_ai_function_name,
        summarization_function_name,
        translate_function_name,
    ]
)
tools = toolkit.tools

tool_node = ToolNode(tools)
model = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)

def should_continue(state: MessagesState) -> Literal["tools", END]:
    messages = state['messages']
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return END

def call_model(state: MessagesState):
    messages = state['messages']
    response = model.invoke(messages)
    return {"messages": [response]}

workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", 'agent')

app = workflow.compile()
set_model(app)

Overwriting app.py


In [0]:
import mlflow
from mlflow.models import infer_signature

# temp workaround as the current output format is not supported by model signature
input_example = {"messages": [{"role": "user", "content": "What is 2**10?"}]}
signature = infer_signature(input_example, "1024")

with mlflow.start_run():
  model_info = mlflow.langchain.log_model(
    # Pass the path to the saved model file
    "app.py",
    "model",
    input_example={"messages": [{"role": "user", "content": "What is 3**10?"}]},
    signature=signature,
    pip_requirements=[
      "mlflow",
      "git+https://github.com/mlflow/mlflow.git",
      "git+https://github.com/unitycatalog/unitycatalog.git#subdirectory=ai/core",
      "git+https://github.com/unitycatalog/unitycatalog.git#subdirectory=ai/integrations/langchain",
      "langchain_openai",
      "langgraph"
    ],
    registered_model_name="ml.serena_test.app_with_tools", # Replace with your own model name
  )

Downloading artifacts:   0%|          | 0/11 [00:00<?, ?it/s]

2024/11/06 10:59:49 INFO mlflow.models.model: Found the following environment variables used during model inference: [OPENAI_API_KEY]. Please check if you need to set them when deploying the model. To disable this message, set environment variable `MLFLOW_RECORD_ENV_VARS_IN_MODEL_LOGGING` to `false`.


Uploading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

Registered model 'ml.serena_test.app_with_tools' already exists. Creating a new version of this model...


Uploading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

Created version '1' of model 'ml.serena_test.app_with_tools'.


🏃 View run rogue-midge-190 at: https://e2-dogfood.staging.cloud.databricks.com/ml/experiments/3916415516979169/runs/a00aabdcb6464fb09de01016a571cdfc
🧪 View experiment at: https://e2-dogfood.staging.cloud.databricks.com/ml/experiments/3916415516979169


#### Validate the model locally prior to serving

In [0]:
from mlflow.models import convert_input_example_to_serving_input, validate_serving_input

serving_input = convert_input_example_to_serving_input({"messages": [{"role": "user", "content": "What is 3**10?"}]})
validate_serving_input(model_info.model_uri, serving_input=serving_input)

Downloading artifacts:   0%|          | 0/12 [00:00<?, ?it/s]

[{'messages': [{'content': 'What is 3**10?',
    'additional_kwargs': {},
    'response_metadata': {},
    'type': 'human',
    'name': None,
    'id': '82772f75-1123-4e1b-99d2-302199a5ef7f',
    'example': False},
   {'content': '',
    'additional_kwargs': {'tool_calls': [{'id': 'call_UlCzoqHx05lOM4YfQsktqYv2',
       'function': {'arguments': '{"code":"print(3**10)"}',
        'name': 'ml__serena_test__execute_python_code'},
       'type': 'function'}],
     'refusal': None},
    'response_metadata': {'token_usage': {'completion_tokens': 26,
      'prompt_tokens': 286,
      'total_tokens': 312,
      'completion_tokens_details': {'accepted_prediction_tokens': 0,
       'audio_tokens': 0,
       'reasoning_tokens': 0,
       'rejected_prediction_tokens': 0},
      'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}},
     'model_name': 'gpt-4o-mini-2024-07-18',
     'system_fingerprint': 'fp_0ba0d124f1',
     'finish_reason': 'tool_calls',
     'logprobs': None},
    't

Trace(request_id=tr-b760a4d97e7d40eda469e96f0b34476e)

#### Deploy the model to a serving endpoint for production usage

Follow [this guidance](https://docs.databricks.com/en/machine-learning/model-serving/create-manage-serving-endpoints.html#create-an-endpoint) to create a serving endpoint using the registered model.

**Remember** to set the following environment variables when creating the serving endpoint:
- DATABRICKS_HOST
- DATABRICKS_TOKEN
- OPENAI_API_KEY