# Building a data analyst agent with LangGraph and Azure Container Apps dynamic sessions

In this example we'll build an agent that can query a Postgres database and run Python code to analyze the retrieved data. We'll use [LangGraph](https://langchain-ai.github.io/langgraph/) for agent orchestration and [Azure Container Apps dynamic sessions](https://python.langchain.com/v0.2/docs/integrations/tools/azure_dynamic_sessions/) for safe Python code execution.

**NOTE**: Building LLM systems that interact with SQL databases requires executing model-generated SQL queries. There are inherent risks in doing this. Make sure that your database connection permissions are always scoped as narrowly as possible for your agent's needs. This will mitigate though not eliminate the risks of building a model-driven system. For more on general security best practices, see our [security guidelines](https://python.langchain.com/v0.2/docs/security/).

## Setup

Let's get set up by installing our Python dependencies and setting our OpenAI credentials, Azure Container Apps sessions pool endpoint, and our SQL database connection string.

### Install dependencies

In [1]:
%pip install -qU langgraph langchain-azure-dynamic-sessions langchain-openai langchain-community pandas matplotlib

Note: you may need to restart the kernel to use updated packages.


### Set credentials

By default this demo uses:
- Azure OpenAI for the model: https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/create-resource
- Azure PostgreSQL for the db: https://learn.microsoft.com/en-us/cli/azure/postgres/server?view=azure-cli-latest#az-postgres-server-create
- Azure Container Apps dynamic sessions for code execution: https://learn.microsoft.com/en-us/azure/container-apps/sessions-code-interpreter?

This LangGraph architecture can also be used with any other [tool-calling LLM](https://python.langchain.com/v0.2/docs/how_to/tool_calling) and any SQL database.

In [None]:
import getpass
import os

os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass("Azure OpenAI API key")
os.environ["AZURE_OPENAI_ENDPOINT"] = getpass.getpass("Azure OpenAI endpoint")

AZURE_OPENAI_DEPLOYMENT_NAME = getpass.getpass("Azure OpenAI deployment name")
SESSIONS_POOL_MANAGEMENT_ENDPOINT = getpass.getpass(
    "Azure Container Apps dynamic sessions pool management endpoint"
)
SQL_DB_CONNECTION_STRING = getpass.getpass("PostgreSQL connection string")

In [9]:
os.getcwd()

'c:\\Users\\akunanbaeva\\OneDrive - Microsoft\\MediaKind\\langchain_sql'

In [1]:
from dotenv import load_dotenv, find_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Access the environment variables
AZURE_OPENAI_API_KEY = os.getenv('AZURE_OPENAI_API_KEY')
AZURE_OPENAI_ENDPOINT = os.getenv('AZURE_OPENAI_ENDPOINT')
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv('AZURE_OPENAI_DEPLOYMENT_NAME')
SESSIONS_POOL_MANAGEMENT_ENDPOINT = os.getenv('SESSIONS_POOL_MANAGEMENT_ENDPOINT')
SQL_SERVER = os.getenv('SQL_SERVER')
SQL_USER=os.getenv('SQL_USER')
SQL_PWD=os.getenv('SQL_PWD')
SQL_DATABASE = os.getenv('SQL_DATABASE')
print(f"AZURE_OPENAI_ENDPOINT: {AZURE_OPENAI_ENDPOINT}")
print(f"SESSIONS_POOL_MANAGEMENT_ENDPOINT: {SESSIONS_POOL_MANAGEMENT_ENDPOINT}")

AZURE_OPENAI_ENDPOINT: https://openai-ak-training.openai.azure.com/
SESSIONS_POOL_MANAGEMENT_ENDPOINT: https://eastus.dynamicsessions.io/subscriptions/4ef73217-81d7-4cbd-8f04-04e285c83645/resourceGroups/training/sessionPools/code-interpreter-pool


### Imports

In [2]:
import ast
import base64
import io
import json
import operator
from functools import partial
from typing import Annotated, List, Literal, Optional, Sequence, TypedDict

import pandas as pd
from IPython.display import display
from langchain_azure_dynamic_sessions import SessionsPythonREPLTool
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnablePassthrough
from langchain_core.tools import tool
from langchain_openai import AzureChatOpenAI
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from matplotlib.pyplot import imshow
from PIL import Image


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  exec(code_obj, self.user_global_ns, self.user_ns)


## Instantiate model, DB, code interpreter

We'll use the LangChain [SQLDatabase](https://python.langchain.com/v0.2/api_reference/community/utilities/langchain_community.utilities.sql_database.SQLDatabase.html#langchain_community.utilities.sql_database.SQLDatabase) interface to connect to our DB and query it. This works with any SQL database supported by [SQLAlchemy](https://www.sqlalchemy.org/).

In [3]:
import requests

# Replace with your actual roapi URL
url = "http://roapi-app-arash.dhbjd9bvccbjedea.eastus.azurecontainer.io:8000/api/schema"

response = requests.get(url)

if response.status_code == 200:
    schema = response.json()
    print(schema)
else:
    print("Error:", response.status_code)

{'asl': {'fields': [{'name': 'Id', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'AccountId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'AccountType', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'DeviceId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'DeviceType', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'ApiName', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'LoggedTime', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'ProfileId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'TenantId', 'data_type': 'Utf8', 

In [4]:
# First curl command
url = "http://roapi-app-arash.dhbjd9bvccbjedea.eastus.azurecontainer.io:8000/api/sql"
sql_query = "SELECT * FROM asl LIMIT 2"
response_sql = requests.post(url, data=sql_query)
print(response_sql.status_code)
print(response_sql.content)
print(response_sql.headers)
print(response_sql.text)

# # Second curl command
# graphql_query = "query { uk_cities(limit: 2) {city, lat, lng} }"
# response_graphql = requests.post("http://localhost:8080/api/graphql", data=graphql_query)
# print(response_graphql.text)

# # Third curl command
# response_tables = requests.get("http://localhost:8080/api/tables/uk_cities?columns=city,lat,lng&limit=2")
# print(response_tables.text)

200
b'[{"Id":"dummy_id_1","AccountId":"dummy_accountid_1","AccountType":"live","DeviceId":"dummydeviceid1","DeviceType":"Android","ApiName":"Subscriber.API.GetLiveChannelMapRights","LoggedTime":"2024-02-20T14:07:50.491172Z","ProfileId":"dummy_profile_id_1","TenantId":"default","MediaFirstRequiredSku":"GAZ 101 0002/2","DvrCapability":"MediaroomDvr","date":"20240220"},{"Id":"dummy_id_2","AccountId":"dummy_accountid_2","AccountType":"live","DeviceId":"dummydeviceid2","DeviceType":"Android","ApiName":"Subscriber.API.GetLiveChannelMapRights","LoggedTime":"2024-02-20T14:07:50.687638Z","ProfileId":"dummy_profile_id_2","TenantId":"default","MediaFirstRequiredSku":"GAZ 101 0002/2","DvrCapability":"MediaroomDvr","date":"20240220"}]'
{'content-type': 'application/json', 'content-length': '725', 'access-control-allow-origin': '*', 'vary': 'origin, access-control-request-method, access-control-request-headers', 'date': 'Thu, 07 Nov 2024 15:37:25 GMT'}
[{"Id":"dummy_id_1","AccountId":"dummy_accounti

In [5]:
sql_query = "SELECT * FROM asl LIMIT 10"
def func(sql):
# The SQL query you want to send
    sql_query = sql
 
    # URL of the API endpoint
    url = "http://roapi-app-arash.dhbjd9bvccbjedea.eastus.azurecontainer.io:8000/api/sql"
 
    # Sending the POST request
    response = requests.post(url, data=sql_query)
 
    # Print the response from the API
    print(response.text)
    data_sql = response.json()
    df_sql = pd.DataFrame(data_sql)
    print(df_sql)

func(sql_query)

[{"Id":"dummy_id_1","AccountId":"dummy_accountid_1","AccountType":"live","DeviceId":"dummydeviceid1","DeviceType":"Android","ApiName":"Subscriber.API.GetLiveChannelMapRights","LoggedTime":"2024-02-20T14:07:50.491172Z","ProfileId":"dummy_profile_id_1","TenantId":"default","MediaFirstRequiredSku":"GAZ 101 0002/2","DvrCapability":"MediaroomDvr","date":"20240220"},{"Id":"dummy_id_2","AccountId":"dummy_accountid_2","AccountType":"live","DeviceId":"dummydeviceid2","DeviceType":"Android","ApiName":"Subscriber.API.GetLiveChannelMapRights","LoggedTime":"2024-02-20T14:07:50.687638Z","ProfileId":"dummy_profile_id_2","TenantId":"default","MediaFirstRequiredSku":"GAZ 101 0002/2","DvrCapability":"MediaroomDvr","date":"20240220"},{"Id":"dummy_id_3","AccountId":"dummy_accountid_3","AccountType":"live","DeviceId":"dummydeviceid3","DeviceType":"Android","ApiName":"Subscriber.API.GetLiveChannelMapRights","LoggedTime":"2024-02-20T14:07:50.886481Z","ProfileId":"dummy_profile_id_3","TenantId":"default","Med

In [6]:
func("SELECT * FROM bitmovin LIMIT 10")

[{"ad":"0","analytics_version":"3.1.2","asn":"0","audio_bitrate":0,"audio_language":"en","browser":"Mobile Safari UI/WKWebView","browser_is_bot":false,"buffered":0,"cdn_provider":"CDN1","city":"south houston","client_time":1719972030597,"country":"US","custom_user_id":"dummy_user_id_1","date":"20240703","day":1719964800000,"device_class":"Tablet","device_type":"apple ipad","domain":"com.domain.mk","drm_type":"fairplay","dropped_frames":0,"duration":59699,"hour":1719972000000,"impression_id":"dummy_impression_id_1","ip_address":"10.10.10.1","is_casting":false,"is_live":true,"is_muted":false,"isp":"AT&T Corp.","language":"en_US","m3u8_url":"https://url1?start=LIVE&end=END","minute":1719972000000,"month":1719792000000,"operatingsystem":"iPadOS","operatingsystem_version_major":"17","operatingsystem_version_minor":"5","page_load_time":0,"page_load_type":1,"paused":0,"platform":"iOS","played":59699,"player":"player_name","player_startuptime":0,"player_tech":"ios:player1","player_version":"pl

In [7]:
import requests

# Replace with your actual roapi URL
url = "http://roapi-app-arash.dhbjd9bvccbjedea.eastus.azurecontainer.io:8000/api/schema"

response = requests.get(url)

if response.status_code == 200:
    schema = response.json()
    print(schema)
else:
    print("Error:", response.status_code)

{'asl': {'fields': [{'name': 'Id', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'AccountId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'AccountType', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'DeviceId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'DeviceType', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'ApiName', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'LoggedTime', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'ProfileId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'TenantId', 'data_type': 'Utf8', 

In [None]:
# sql_str = "mssql+pyodbc:///?odbc_connect=DRIVER={ODBC Driver 17 for SQL Server};SERVER=tcp:xn3er3otevhlu-sqlserver;DATABASE=ContosoSuitesBookings;Uid=contosoadmin;Pwd=g@G9@2nD7C1BP%u;Trusted_Connection=yes;"
# from sqlalchemy import create_engine
# import pandas as pd
# db_engine = create_engine(sql_str)
# sql = "select * limit 100"
# df = pd.read_sql(sql,con=db_engine)

In [7]:
# from sqlalchemy import create_engine
# # # "mssql+pyodbc:///?odbc_connect=DRIVER={ODBC Driver 17 for SQL Server};SERVER=tcp:xn3er3otevhlu-sqlserver;DATABASE=ContosoSuitesBookings;Uid=contosoadmin;Pwd=g@G9@2nD7C1BP%u;Trusted_Connection=yes;"
# # uri = 'mssql+pyodbc://DBUsername:DBPassword@ServerName:Port/DBName?driver=ODBC+Driver+17+for+SQL+Server'
# # db = SQLDatabase.from_uri(uri)


# driver = '{ODBC Driver 17 for SQL Server}'
# odbc_str = 'mssql+pyodbc:///?odbc_connect='+ \
#     'Driver='+driver+ \
#                 ';Server=tcp:' + os.getenv("SQL_SERVER")+'.database.windows.net;PORT=1433' + \
#                 ';DATABASE=' + os.getenv("SQL_DATABASE") + \
#                 ';Uid=' + os.getenv("SQL_USERNAME")+ \
#                 ';Pwd=' + os.getenv("SQL_PWD") + \
#                 ';Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'

# db_engine = create_engine(odbc_str)

In [None]:
# db = SQLDatabase.from_uri(SQL_DB_CONNECTION_STRING)

For our LLM we need to make sure that we use a model that supports [tool-calling](https://python.langchain.com/v0.2/docs/how_to/tool_calling).

In [8]:
llm = AzureChatOpenAI(
    deployment_name=AZURE_OPENAI_DEPLOYMENT_NAME, openai_api_version=os.getenv('AZURE_OPENAI_API_VERSION')
)

And the [dynamic sessions tool](https://python.langchain.com/v0.2/docs/integrations/tools/azure_container_apps_dynamic_sessions/) is what we'll use for code execution.

In [9]:
repl = SessionsPythonREPLTool(
    pool_management_endpoint=SESSIONS_POOL_MANAGEMENT_ENDPOINT
)

## Define graph

Now we're ready to define our application logic. The core elements are the [agent State, Nodes, and Edges](https://langchain-ai.github.io/langgraph/concepts/#core-design).

### Define State
We'll use a simple agent State which is just a list of messages that every Node can append to:

In [10]:
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

Since our code interpreter can return results like base64-encoded images which we don't want to pass back to the model, we'll create a custom Tool message that allows us to track raw Tool outputs without sending them back to the model.

In [11]:
class RawToolMessage(ToolMessage):
    """
    Customized Tool message that lets us pass around the raw tool outputs (along with string contents for passing back to the model).
    """

    raw: dict
    """Arbitrary (non-string) tool outputs. Won't be sent to model."""
    tool_name: str
    """Name of tool that generated output."""

### Define Nodes

First we'll define a node for calling our model. We need to make sure to bind our tools to the model so that it knows to call them. We'll also specify in our prompt the schema of the SQL tables the model has access to, so that it can write relevant SQL queries.

We'll use our models tool-calling abilities to reliably generate our SQL queries and Python code. To do this we need to define schemas for our tools that the model can use for structuring its tool calls.

Note that the class names, docstrings, and attribute typing and descriptions are crucial here, as they're actually passed in to the model (you can effectively think of them as part of the prompt).

In [12]:
# Tool schema for querying SQL db
class create_df_from_sql(BaseModel):
    """Execute a SQL SELECT statement and use the results to create a DataFrame with the given column names."""

    select_query: str = Field(..., description="A SQL SELECT statement.")
    # We're going to convert the results to a Pandas DataFrame that we pass
    # to the code intepreter, so we also have the model generate useful column and
    # variable names for this DataFrame that the model will refer to when writing
    # python code.
    df_columns: List[str] = Field(
        ..., description="Ordered names to give the DataFrame columns."
    )
    df_name: str = Field(
        ..., description="The name to give the DataFrame variable in downstream code."
    )


# Tool schema for writing Python code
class python_shell(BaseModel):
    """Execute Python code that analyzes the DataFrames that have been generated. Make sure to print any important results."""

    code: str = Field(
        ...,
        description="The code to execute. Make sure to print any important results.",
    )

In [13]:
url = "http://roapi-app-arash.dhbjd9bvccbjedea.eastus.azurecontainer.io:8000/api/schema"
response = requests.get(url)
if response.status_code == 200:
    schema = response.json()
    print(schema)
else:
    print("Error:", response.status_code)


{'asl': {'fields': [{'name': 'Id', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'AccountId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'AccountType', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'DeviceId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'DeviceType', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'ApiName', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'LoggedTime', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'ProfileId', 'data_type': 'Utf8', 'nullable': True, 'dict_id': 0, 'dict_is_ordered': False, 'metadata': {}}, {'name': 'TenantId', 'data_type': 'Utf8', 

In [14]:
import json
from sqlalchemy import Table, MetaData, Column, Integer, String, Boolean

# Sample JSON output
json_output = {
    "asl": {
        "fields": [
            {"name": "Id", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "AccountId", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "AccountType", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "DeviceId", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "DeviceType", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "ApiName", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "LoggedTime", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "ProfileId", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "TenantId", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "MediaFirstRequiredSku", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "DvrCapability", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "date", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}}
        ],
        "metadata": {}
    },
    "bitmovin": {
        "fields": [
            {"name": "ad", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "analytics_version", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "asn", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "audio_bitrate", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "audio_language", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "browser", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "browser_is_bot", "data_type": "Boolean", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "browser_version_major", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "browser_version_minor", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "buffered", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "cdn_provider", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "city", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "client_time", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "country", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "custom_user_id", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "date", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "day", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "device_class", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "device_type", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "domain", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "drm_type", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "dropped_frames", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "duration", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "hour", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "impression_id", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "ip_address", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "is_casting", "data_type": "Boolean", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "is_live", "data_type": "Boolean", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "is_muted", "data_type": "Boolean", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "isp", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "language", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "m3u8_url", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "minute", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "month", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "operatingsystem", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "operatingsystem_version_major", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "operatingsystem_version_minor", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "page_load_time", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "page_load_type", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "paused", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "platform", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "played", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "player", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "player_startuptime", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "player_tech", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "player_version", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "region", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "screen_height", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "screen_width", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "seeked", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "sequence_number", "data_type": "Int32", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "startuptime", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "state", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "stream_format", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "subtitle_enabled", "data_type": "Boolean", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "subtitle_language", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "tenant", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "time", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "user_id", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_bitrate", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_codec", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_duration", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_id", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_playback_height", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_playback_width", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_startuptime", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_title", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_window_height", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "video_window_width", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "videostart_failed", "data_type": "Boolean", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "videostart_failed_reason", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "videotime_end", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "videotime_start", "data_type": "Int64", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "audio_codec", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "custom_data_1", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "custom_data_2", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "custom_data_3", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "custom_data_4", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "custom_data_5", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "experiment_name", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "path", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}},
            {"name": "player_key", "data_type": "Utf8", "nullable": True, "dict_id": 0, "dict_is_ordered": False, "metadata": {}}
        ],
        "metadata": {}
    }
}

# Function to map data types
def map_data_type(data_type):
    if data_type == "Utf8":
        return String
    elif data_type == "Int64":
        return Integer
    elif data_type == "Int32":
        return Integer
    elif data_type == "Boolean":
        return Boolean
    else:
        return String  # Default to String if unknown

# Function to convert JSON to SQLAlchemy table info
def convert_json_to_table_info(json_output):
    metadata = MetaData()
    tables = []

    for table_name, table_info in json_output.items():
        columns = []
        for field in table_info["fields"]:
            col_type = map_data_type(field["data_type"])
            col_args = {"nullable": field.get("nullable", True)}
            columns.append(Column(field["name"], col_type, **col_args))
        
        table = Table(table_name, metadata, *columns)
        tables.append(table)
    
    return tables

tables = convert_json_to_table_info(json_output)
for table in tables:
    print(table)
   

asl
bitmovin


In [15]:
system_prompt = f"""\
You are an expert at ROAPI and Python. You have access to ROAPI \
with the following tables

{tables}

Given a user question related to the data in the database, \
first generate the SQL query that conforms to ApacheArrow and Datafusion style. Use this style as example: sql = '''SELECT COUNT(DISTINCT user_id) AS active_user_count
FROM bitmovin
WHERE date >= to_date(cast(now() AS VARCHAR)) - INTERVAL '200 days'
AND user_id IS NOT NULL;
Then tell me the SQL query that you will use. Then get the relevant data from the table as a DataFrame using the create_df_from_sql tool. Then use the \
python_shell to do any analysis required to answer the user question."""

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("placeholder", "{messages}"),
    ]
)


def call_model(state: AgentState) -> dict:
    """Call model with tools passed in."""
    messages = []

    chain = prompt | llm.bind_tools([create_df_from_sql, python_shell])
    messages.append(chain.invoke({"messages": state["messages"]}))

    return {"messages": messages}

Now we can define the node for executing any SQL queries that were generated by the model. Notice that after we run the query we convert the results into Pandas DataFrames — these will be uploaded the the code interpreter tool in the next step so that it can use the retrieved data.

In [16]:
def execute_sql_query(state: AgentState) -> dict:
    """Execute the latest SQL queries."""
    messages = []

    for tool_call in state["messages"][-1].tool_calls:
        if tool_call["name"] != "create_df_from_sql":
            continue

        # Execute SQL query
        sql_query = tool_call["args"]["select_query"]
        url = "http://roapi-app-arash.dhbjd9bvccbjedea.eastus.azurecontainer.io:8000/api/sql"
        response = requests.post(url, data=sql_query)

        # Check if the request was successful
        if response.status_code == 200:
            try:
                data_sql = response.json()
                print(data_sql)
            except requests.JSONDecodeError:
                print("Failed to decode JSON response.")
        else:
            print(f"Failed to execute SQL query. Status code: {response.status_code}")

        # res = db.run(tool_call["args"]["select_query"]).fetchall()

        # Convert result to Pandas DataFrame
        df_columns = tool_call["args"]["df_columns"]
        df = pd.DataFrame(data_sql, columns=df_columns)
        df_name = tool_call["args"]["df_name"]

        # Add tool output message
        messages.append(
            RawToolMessage(
                f"Generated dataframe {df_name} with columns {df_columns}",  # What's sent to model.
                raw={df_name: df},
                tool_call_id=tool_call["id"],
                tool_name=tool_call["name"],
            )
        )

    return {"messages": messages}

Now we need a node for executing any model-generated Python code. The key steps here are:
- Uploading queried data to the code intepreter
- Executing model generated code
- Parsing results so that images are displayed and not passed in to future model calls

To upload the queried data to the model we can take our DataFrames we generated by executing the SQL queries and upload them as CSVs to our code intepreter.

In [20]:
def _upload_dfs_to_repl(state: AgentState) -> str:
    """
    Upload generated dfs to code intepreter and return code for loading them.

    Note that code intepreter sessions are short-lived so this needs to be done
    every agent cycle, even if the dfs were previously uploaded.
    """
    df_dicts = [
        msg.raw
        for msg in state["messages"]
        if isinstance(msg, RawToolMessage) and msg.tool_name == "create_df_from_sql"
    ]
    name_df_map = {name: df for df_dict in df_dicts for name, df in df_dict.items()}

    # Data should be uploaded as a BinaryIO.
    # Files will be uploaded to the "/mnt/data/" directory on the container.
    for name, df in name_df_map.items():
        buffer = io.StringIO()
        df.to_csv(buffer)
        buffer.seek(0)
        repl.upload_file(data=buffer, remote_file_path=name + ".csv")

    # Code for loading the uploaded files.
    df_code = "import pandas as pd\n" + "\n".join(
        f"{name} = pd.read_csv('/mnt/data/{name}.csv')" for name in name_df_map
    )
    return df_code


def _repl_result_to_msg_content(repl_result: dict) -> str:
    """
    Display images with including them in tool message content.
    """
    content = {}
    for k, v in repl_result.items():
        # Any image results are returned as a dict of the form:
        # {"type": "image", "base64_data": "..."}
        if isinstance(repl_result[k], dict) and repl_result[k]["type"] == "image":
            # Decode and display image
            base64_str = repl_result[k]["base64_data"]
            img = Image.open(io.BytesIO(base64.decodebytes(bytes(base64_str, "utf-8"))))
            display(img)
        else:
            content[k] = repl_result[k]
    return json.dumps(content, indent=2)


def execute_python(state: AgentState) -> dict:
    """
    Execute the latest generated Python code.
    """
    messages = []

    df_code = _upload_dfs_to_repl(state)
    last_ai_msg = [msg for msg in state["messages"] if isinstance(msg, AIMessage)][-1]
    for tool_call in last_ai_msg.tool_calls:
        if tool_call["name"] != "python_shell":
            continue

        generated_code = tool_call["args"]["code"]
        repl_result = repl.execute(df_code + "\n" + generated_code)

        messages.append(
            RawToolMessage(
                _repl_result_to_msg_content(repl_result),
                raw=repl_result,
                tool_call_id=tool_call["id"],
                tool_name=tool_call["name"],
            )
        )
    return {"messages": messages}

### Define Edges

Now we're ready to put all the pieces together into a graph.

In [19]:
def should_continue(state: AgentState) -> str:
    """
    If any Tool messages were generated in the last cycle that means we need to call the model again to interpret the latest results.
    """
    return "execute_sql_query" if state["messages"][-1].tool_calls else END

In [21]:
workflow = StateGraph(AgentState)

workflow.add_node("call_model", call_model)
workflow.add_node("execute_sql_query", execute_sql_query)
workflow.add_node("execute_python", execute_python)

workflow.set_entry_point("call_model")
workflow.add_edge("execute_sql_query", "execute_python")
workflow.add_edge("execute_python", "call_model")
workflow.add_conditional_edges("call_model", should_continue)

app = workflow.compile()

In [22]:
print(app.get_graph().draw_ascii())

                                       +-----------+                                    
                                       | __start__ |                                    
                                       +-----------+                                    
                                              *                                         
                                              *                                         
                                              *                                         
                                       +------------+                                   
                                    ...| call_model |***                                
                             .......   +------------+   *******                         
                     ........          ..           ...        *******                  
              .......                ..                ...            ******            
          ....       

## Test it out

Replace these examples with questions related to the database you've connected your agent to.

In [31]:
output = app.invoke({"messages": [("human", "Count of active users in last 200 days")]})
print(output["messages"][-1].content)

[{'active_user_count': 100}]
The count of active users in the last 200 days is: 100.


**LangSmith Trace**: https://smith.langchain.com/public/9c8afcce-0ed1-4fb1-b719-767e6432bd8e/r

In [24]:
output = app.invoke(
    {
        "messages": [
            ("human", "Count of active users in the month of October")
        ]
    }
)
print(output["messages"][-1].content)

Sure! First, I will generate the SQL query to get the count of active users for the month of October from the `bitmovin` table.

```sql
sql = '''
SELECT COUNT(DISTINCT user_id) AS active_user_count
FROM bitmovin
WHERE date >= '2023-10-01' AND date <= '2023-10-31'
AND user_id IS NOT NULL;
'''
```

Now, I will use this query to get the relevant data from the table as a DataFrame.

```python
functions.create_df_from_sql({
  select_query: '''
    SELECT COUNT(DISTINCT user_id) AS active_user_count
    FROM bitmovin
    WHERE date >= '2023-10-01' AND date <= '2023-10-31'
    AND user_id IS NOT NULL;
  ''',
  df_columns: ['active_user_count'],
  df_name: 'active_users_october'
})
```

Then, I will print the active user count for October using the `python_shell`.

```python
functions.python_shell({
  code: '''
    print("Active User Count for October:", active_users_october['active_user_count'][0])
  '''
})
```


In [25]:
# Continue the conversation
output = app.invoke(
    {"messages": output["messages"] + [("human", "Which channel had the highest duration of viewing in last 200 days")]}
)

In [26]:
print(output["messages"][-1].content)

First, I will generate the SQL query to get the channel with the highest duration of viewing in the last 200 days from the `bitmovin` table. Assuming that the `video_title` column represents the channel name:

```sql
sql = '''
SELECT video_title, SUM(duration) AS total_duration
FROM bitmovin
WHERE date >= to_date(cast(now() AS VARCHAR)) - INTERVAL '200 days'
GROUP BY video_title
ORDER BY total_duration DESC
LIMIT 1;
'''
```

Now, I will use this query to get the relevant data from the table as a DataFrame.

```python
functions.create_df_from_sql({
  select_query: '''
    SELECT video_title, SUM(duration) AS total_duration
    FROM bitmovin
    WHERE date >= to_date(cast(now() AS VARCHAR)) - INTERVAL '200 days'
    GROUP BY video_title
    ORDER BY total_duration DESC
    LIMIT 1;
  ''',
  df_columns: ['video_title', 'total_duration'],
  df_name: 'highest_duration_channel'
})
```

Then, I will print the channel with the highest duration of viewing using the `python_shell`.

```python
fu

In [27]:
output = app.invoke(
    {
        "messages": output["messages"]
        + [("human", "What are the top five VOD assets watched by duration")]
    }
)

In [28]:
print(output["messages"][-1].content)

First, I will generate the SQL query to get the top five VOD assets watched by duration from the `bitmovin` table. Assuming that the `video_title` column represents the VOD asset name:

```sql
sql = '''
SELECT video_title, SUM(duration) AS total_duration
FROM bitmovin
WHERE is_live = FALSE
GROUP BY video_title
ORDER BY total_duration DESC
LIMIT 5;
'''
```

Now, I will use this query to get the relevant data from the table as a DataFrame.

```python
functions.create_df_from_sql({
  select_query: '''
    SELECT video_title, SUM(duration) AS total_duration
    FROM bitmovin
    WHERE is_live = FALSE
    GROUP BY video_title
    ORDER BY total_duration DESC
    LIMIT 5;
  ''',
  df_columns: ['video_title', 'total_duration'],
  df_name: 'top_vod_assets'
})
```

Then, I will print the top five VOD assets watched by duration using the `python_shell`.

```python
functions.python_shell({
  code: '''
    print("Top Five VOD Assets Watched by Duration:")
    for index, row in top_vod_assets.iterr

In [30]:
print(output["messages"][-1].content)

First, I will generate the SQL query to get the top five VOD assets watched by duration from the `bitmovin` table. Assuming that the `video_title` column represents the VOD asset name:

```sql
sql = '''
SELECT video_title, SUM(duration) AS total_duration
FROM bitmovin
WHERE is_live = FALSE
GROUP BY video_title
ORDER BY total_duration DESC
LIMIT 5;
'''
```

Now, I will use this query to get the relevant data from the table as a DataFrame.

```python
functions.create_df_from_sql({
  select_query: '''
    SELECT video_title, SUM(duration) AS total_duration
    FROM bitmovin
    WHERE is_live = FALSE
    GROUP BY video_title
    ORDER BY total_duration DESC
    LIMIT 5;
  ''',
  df_columns: ['video_title', 'total_duration'],
  df_name: 'top_vod_assets'
})
```

Then, I will print the top five VOD assets watched by duration using the `python_shell`.

```python
functions.python_shell({
  code: '''
    print("Top Five VOD Assets Watched by Duration:")
    for index, row in top_vod_assets.iterr