## Agent with SQL Tool

In [80]:
import json
import datetime
from typing import Any, Callable, Set, Dict, List, Optional
from dotenv import load_dotenv  
import json
import datetime
from typing import Any, Callable, Set, Dict, List, Optional
import os
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from typing import Annotated  
import pandas as pd
import os  
import struct  
import urllib.parse  
from azure import identity  
from sqlalchemy import create_engine 

# Load the .env file  
load_dotenv(override=True) 


system_message = """
        You are a Fabric SQL expert.  You have tools to explore the database by getting tables, getting the schema and executing SQL select statements.
        Explore the schema of tables to create sql to answer the questions asked.
        You can ask clarifying questions if needed.

        You have information regarding World Wide Importers Customer Profiles from a Fabric SQL Endpoint
        To Write Valid SQL - you must use the Fully Qualifed Name [warehouse].[schema].[table] in ALL SQL queries.

        You do have a tool to determine what the warehouse is, what tables are available, and what their schema is.
        The correct schema will be required for any joins you do on the data to answer complex questions.
        """


# These are the user-defined functions that can be called by the agent.
def get_table_names() -> Annotated[str, "The output is an HTML table"]:  
        try:  
            load_dotenv(override=True)  
            SQL_COPT_SS_ACCESS_TOKEN = 1256

            fabric_connection_string = os.environ.get("FABRIC_CONNECTION_STRING")  
            driver = "ODBC Driver 18 for SQL Server"  
            warehouse = os.environ.get("FABRIC_WAREHOUSE", "NONE")  
  
            # Build the SQL query  
            query = f"""  
                SELECT *   
                FROM [{warehouse}].INFORMATION_SCHEMA.TABLES  
                WHERE TABLE_TYPE = 'BASE TABLE';  
            """  
            print(f"Executing query: {query}")  
  
            # Get Azure token and SQL engine  
            credential = identity.DefaultAzureCredential(exclude_interactive_browser_credential=False)  
            token = credential.get_token("https://database.windows.net/.default").token  
            token_bytes = token.encode("UTF-16-LE")  
            token_struct = struct.pack(f"<I{len(token_bytes)}s", len(token_bytes), token_bytes)  

            odbc_str = (  
            f"Driver={{{driver}}};"  
            f"Server=tcp:{fabric_connection_string},1433;"  
            "Encrypt=yes;"  
            "TrustServerCertificate=no;"  
            "Connection Timeout=30;" )  

            quoted_odbc_str = urllib.parse.quote_plus(odbc_str)  
            engine = create_engine(  
            f"mssql+pyodbc:///?odbc_connect={quoted_odbc_str}",  
            connect_args={"attrs_before": {SQL_COPT_SS_ACCESS_TOKEN: token_struct}},  )  

  
            # Query and render as HTML  
            df = pd.read_sql(query, con=engine)  
            html_table = df.to_html(index=False, border=0, justify='center', classes='table table-striped table-bordered')  
            print(html_table)  
            return html_table  
  
        except Exception as e:  
            return f"Error retrieving table names: {e}"  
        

# These are the user-defined functions that can be called by the agent.
def get_table_schema(table_name: str) -> Annotated[str, "The output is an HTML table"]:  
        try:  
            load_dotenv(override=True)  
            SQL_COPT_SS_ACCESS_TOKEN = 1256

            fabric_connection_string = os.environ.get("FABRIC_CONNECTION_STRING")  
            driver = "ODBC Driver 18 for SQL Server"  
            warehouse = os.environ.get("FABRIC_WAREHOUSE", "NONE")  
  
            # Build the SQL query  
            query = """
            SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, IS_NULLABLE
                FROM [{warehouse}].INFORMATION_SCHEMA.COLUMNS
                WHERE TABLE_NAME = '{table_name}'
            """.format(warehouse=warehouse, table_name=table_name)

            print(f"Executing query: {query}")  
  
            # Get Azure token and SQL engine  
            credential = identity.DefaultAzureCredential(exclude_interactive_browser_credential=False)  
            token = credential.get_token("https://database.windows.net/.default").token  
            token_bytes = token.encode("UTF-16-LE")  
            token_struct = struct.pack(f"<I{len(token_bytes)}s", len(token_bytes), token_bytes)  

            odbc_str = (  
            f"Driver={{{driver}}};"  
            f"Server=tcp:{fabric_connection_string},1433;"  
            "Encrypt=yes;"  
            "TrustServerCertificate=no;"  
            "Connection Timeout=30;" )  

            quoted_odbc_str = urllib.parse.quote_plus(odbc_str)  
            engine = create_engine(  
            f"mssql+pyodbc:///?odbc_connect={quoted_odbc_str}",  
            connect_args={"attrs_before": {SQL_COPT_SS_ACCESS_TOKEN: token_struct}},  )  

  
            # Query and render as HTML  
            df = pd.read_sql(query, con=engine)  
            html_table = df.to_html(index=False, border=0, justify='center', classes='table table-striped table-bordered')  
            print(html_table)  
            return html_table  
  
        except Exception as e:  
            return f"Error retrieving table names: {e}"  
        
# These are the user-defined functions that can be called by the agent.
def execute_sql_select(select_query: str) -> Annotated[str, "The output is an HTML table"]:  
        try:  
            load_dotenv(override=True)  
            SQL_COPT_SS_ACCESS_TOKEN = 1256

            fabric_connection_string = os.environ.get("FABRIC_CONNECTION_STRING")  
            driver = "ODBC Driver 18 for SQL Server"  
            warehouse = os.environ.get("FABRIC_WAREHOUSE", "NONE")  
  
            # Get Azure token and SQL engine  
            credential = identity.DefaultAzureCredential(exclude_interactive_browser_credential=False)  
            token = credential.get_token("https://database.windows.net/.default").token  
            token_bytes = token.encode("UTF-16-LE")  
            token_struct = struct.pack(f"<I{len(token_bytes)}s", len(token_bytes), token_bytes)  

            odbc_str = (  
            f"Driver={{{driver}}};"  
            f"Server=tcp:{fabric_connection_string},1433;"  
            "Encrypt=yes;"  
            "TrustServerCertificate=no;"  
            "Connection Timeout=30;" )  

            quoted_odbc_str = urllib.parse.quote_plus(odbc_str)  
            engine = create_engine(  
            f"mssql+pyodbc:///?odbc_connect={quoted_odbc_str}",  
            connect_args={"attrs_before": {SQL_COPT_SS_ACCESS_TOKEN: token_struct}},  )  

  
            # Query and render as HTML  
            df = pd.read_sql(select_query, con=engine)  
            html_table = df.to_html(index=False, border=0, justify='center', classes='table table-striped table-bordered')  
            print(html_table)  
            return html_table  
  
        except Exception as e:  
            return f"Error retrieving table names: {e}"  



def get_connection_string() -> str:
    load_dotenv(override=True)
    fabric_connection_string = os.environ.get("FABRIC_CONNECTION_STRING")
    return fabric_connection_string if fabric_connection_string else "No connection string found."
    


def get_warehouse(table: str ) -> str:
    load_dotenv(override=True)
    warehouse = os.environ.get("FABRIC_WAREHOUSE", "NONE")  
    return warehouse
        

# # Define user functions
user_functions = {get_connection_string, get_warehouse, get_table_names, execute_sql_select}

agent_name = os.environ["AZURE_AI_AGENT_NAME"]

In [81]:

async def find_or_create_agent(agent_list, agent_name, logger):
    agent = None
    if agent_list:
        async for agent_object in agent_list:
            if agent_object.name == agent_name:
                agent = agent_object
                logger.info(f"Found agent by name '{agent_name}', ID={agent_object.id}")
                return agent
    
        if not agent:
            logger.info(f"Agent with name '{agent_name}' not found, creating a new agent.")
            return None


In [82]:
import os, time
from azure.identity import DefaultAzureCredential
from azure.ai.projects import AIProjectClient
from azure.ai.agents.models import FunctionTool
from azure.ai.agents.models import FunctionTool, ToolSet, ListSortOrder

# Retrieve the project endpoint from environment variables
project_endpoint = os.environ["PROJECT_ENDPOINT"]

# Initialize the AIProjectClient
project_client = AIProjectClient(
    endpoint=project_endpoint,
    credential=DefaultAzureCredential(),
)

# Initialize the FunctionTool with user-defined functions

functions = FunctionTool(user_functions)
toolset = ToolSet()
toolset.add(functions)
project_client.agents.enable_auto_function_calls(toolset)





In [83]:
project_endpoint

'https://mmx-admin-project-001-resource.services.ai.azure.com/api/projects/mmx-admin-project-001'

In [84]:

agent_name = os.environ["AZURE_AI_AGENT_NAME"]
agent_list = project_client.agents.list_agents()

agent = None
if agent_list:
    for agent_object in agent_list:
        if agent_object.name == agent_name:
            agent = agent_object

if not agent:
    print(f"Agent '{agent_name}' not found in the project.")
    agent = project_client.agents.create_agent(
        model=os.environ["AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"],
        name=agent_name,
        instructions=system_message,
        tools=functions.definitions,
    )
    print(f"Created agent, ID: {agent.id}")
else:
    print(f"Agent '{agent_name}' already exists, ID: {agent.id}")

Agent 'AIFoundryAgentWithSQLTool05' not found in the project.
Created agent, ID: asst_LruyLi5KDbcxyEkMgO2FASSn


In [85]:
# Create a thread for communication
thread = project_client.agents.threads.create()
print(f"Created thread, ID: {thread.id}")

# Send a message to the thread
message = project_client.agents.messages.create(
    thread_id=thread.id,
    role="user",
    content="What are the tables in the warehoues?",
)
print(f"Created message, ID: {message['id']}")

Created thread, ID: thread_HxCkmGi2U1NsFgLnl5DJrvxH
Created message, ID: msg_PJYlOeMB5L7AwLsr2oCAQCzW


In [86]:
run = project_client.agents.runs.create_and_process(thread_id=thread.id, agent_id=agent.id)
print(f"Run finished with status: {run.status}")

print(agent.name)
print(agent.id)

Executing query:   
                SELECT *   
                FROM [wwilakehouse_01].INFORMATION_SCHEMA.TABLES  
                WHERE TABLE_TYPE = 'BASE TABLE';  
            
<table class="dataframe table table-striped table-bordered">
  <thead>
    <tr style="text-align: center;">
      <th>TABLE_CATALOG</th>
      <th>TABLE_SCHEMA</th>
      <th>TABLE_NAME</th>
      <th>TABLE_TYPE</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>wwilakehouse_01</td>
      <td>dbo</td>
      <td>dimension_customer</td>
      <td>BASE TABLE</td>
    </tr>
  </tbody>
</table>
Run finished with status: RunStatus.COMPLETED
AIFoundryAgentWithSQLTool05
asst_LruyLi5KDbcxyEkMgO2FASSn


In [87]:
if run.status == "failed":
        print(f"Run failed: {run.last_error}")
    
# Fetch and log all messages
messages = project_client.agents.messages.list(thread_id=thread.id)
for message in messages:
    print(f"Role: {message.role}, Content: {message.content}")

Role: MessageRole.AGENT, Content: [{'type': 'text', 'text': {'value': 'There is one table available in the warehouse:\n\n- [wwilakehouse_01].[dbo].[dimension_customer]\n\nLet me know if you need to see its schema or want to run any queries on this table.', 'annotations': []}}]
Role: MessageRole.USER, Content: [{'type': 'text', 'text': {'value': 'What are the tables in the warehoues?', 'annotations': []}}]


In [88]:
run_steps = project_client.agents.run_steps.list(thread_id=thread.id, run_id=run.id)
for step in run_steps:
        # print(f"Step {step['id']} status: {step['status']}")
        step_details = step.get("step_details", {})
        tool_calls = step_details.get("tool_calls", [])

        if tool_calls:
            print("  Tool calls:")
            for call in tool_calls:
                print(call)
                print(f"    Tool Call ID: {call.get('id')}")
                print(f"    Type: {call.get('type')}")
                type = call.get("type")
                if type == "function":
                    print(f"    Function Name: {call.get('function', {}).get('name')}")
                    print(f"    Function Arguments: {call.get('function', {}).get('arguments')}")
              

  Tool calls:
{'id': 'call_usGqXIr1d7Ayuni4qALaGeu0', 'type': 'function', 'function': {'name': 'get_table_names', 'arguments': '{}', 'output': '<table class="dataframe table table-striped table-bordered">\n  <thead>\n    <tr style="text-align: center;">\n      <th>TABLE_CATALOG</th>\n      <th>TABLE_SCHEMA</th>\n      <th>TABLE_NAME</th>\n      <th>TABLE_TYPE</th>\n    </tr>\n  </thead>\n  <tbody>\n    <tr>\n      <td>wwilakehouse_01</td>\n      <td>dbo</td>\n      <td>dimension_customer</td>\n      <td>BASE TABLE</td>\n    </tr>\n  </tbody>\n</table>'}}
    Tool Call ID: call_usGqXIr1d7Ayuni4qALaGeu0
    Type: function
    Function Name: get_table_names
    Function Arguments: {}


In [89]:
message = project_client.agents.messages.create(
        thread_id=thread.id,
        role="user",  # Role of the message sender
        content="what are 10 common zip codes",  # Message content
    )

In [90]:
run = project_client.agents.runs.create_and_process(thread_id=thread.id, agent_id=agent.id)
print(f"Run finished with status: {run.status}")

print(agent.name)
print(agent.id)

Executing query:   
                SELECT *   
                FROM [wwilakehouse_01].INFORMATION_SCHEMA.TABLES  
                WHERE TABLE_TYPE = 'BASE TABLE';  
            
<table class="dataframe table table-striped table-bordered">
  <thead>
    <tr style="text-align: center;">
      <th>TABLE_CATALOG</th>
      <th>TABLE_SCHEMA</th>
      <th>TABLE_NAME</th>
      <th>TABLE_TYPE</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>wwilakehouse_01</td>
      <td>dbo</td>
      <td>dimension_customer</td>
      <td>BASE TABLE</td>
    </tr>
  </tbody>
</table>
<table class="dataframe table table-striped table-bordered">
  <thead>
    <tr style="text-align: center;">
      <th>CustomerKey</th>
      <th>WWICustomerID</th>
      <th>Customer</th>
      <th>BillToCustomer</th>
      <th>Category</th>
      <th>BuyingGroup</th>
      <th>PrimaryContact</th>
      <th>PostalCode</th>
      <th>ValidFrom</th>
      <th>ValidTo</th>
      <th>LineageKey</th>
    </tr>
  </thead>
  <t

In [91]:
if run.status == "failed":
        print(f"Run failed: {run.last_error}")
    
# Fetch and log all messages
messages = project_client.agents.messages.list(thread_id=thread.id)
for message in messages:
    print(f"Role: {message.role}, Content: {message.content}")

Role: MessageRole.AGENT, Content: [{'type': 'text', 'text': {'value': 'The 10 most common zip codes (PostalCodes) for customers, along with their counts, are:\n\n1. 90683 (5 customers)\n2. 90298 (4 customers)\n3. 90761 (3 customers)\n4. 90179 (3 customers)\n5. 90588 (3 customers)\n6. 90686 (3 customers)\n7. 90706 (3 customers)\n8. 90050 (2 customers)\n9. 90644 (2 customers)\n10. 90451 (2 customers)\n\nLet me know if you need any more details or another query!', 'annotations': []}}]
Role: MessageRole.USER, Content: [{'type': 'text', 'text': {'value': 'what are 10 common zip codes', 'annotations': []}}]
Role: MessageRole.AGENT, Content: [{'type': 'text', 'text': {'value': 'There is one table available in the warehouse:\n\n- [wwilakehouse_01].[dbo].[dimension_customer]\n\nLet me know if you need to see its schema or want to run any queries on this table.', 'annotations': []}}]
Role: MessageRole.USER, Content: [{'type': 'text', 'text': {'value': 'What are the tables in the warehoues?', 'an