In [1]:

import os
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from typing import Any
from pathlib import Path
from azure.ai.projects.models import (
    Agent,
    AgentThread,
    BingGroundingTool,
    CodeInterpreterTool,
    FileSearchTool,
    AsyncFunctionTool,
    FunctionTool
)
#Genie imports
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dashboards import GenieAPI
import pandas as pd
from typing import Any, Callable, Set, Dict, List, Optional
from azure.ai.projects.models import FunctionTool, ToolSet
import json
from colorama import Fore, Style 

In [2]:
def get_query_result(statement_id: str,workspace_client: WorkspaceClient) :
    # For simplicity, let's say data fits in one chunk, query.manifest.total_chunk_count = 1
    result = workspace_client.statement_execution.get_statement(statement_id)
    return pd.DataFrame(
        result.result.data_array, columns=[i.name for i in result.manifest.schema.columns]
    )


def process_genie_response(response,workspace_client: WorkspaceClient):
    for i in response.attachments:
        if i.text:
            print(f"process_genie_response Text: {i.text.content}")
            return i.text.content
        elif i.query:
            data = get_query_result(i.query.statement_id,workspace_client)
            print(f"process_genie_response Attach: {i.query.description}")
            print(f"Data: {data}")
            print(f"Generated code: {i.query.query}")
            return i.query.description



In [3]:
def printFormat(message:str, color: str):
    print(f"{color}{message}{Style.RESET_ALL}")

def dataframe_to_json(df: pd.DataFrame) -> str:
    try:
        json_result = df.to_json(orient="records")
        return json_result
    except Exception as e:
        print(f"An error occurred while converting DataFrame to JSON: {str(e)}")
        return f"Error: {str(e)}"
            
def process_genie_responseJson(response,workspace_client: WorkspaceClient):
    for i in response.attachments:
        if i.text:
            printFormat(f"process_genie_response Text: {i.text.content}",Fore.BLUE) 
            return i.text.content
        elif i.query:
            data = get_query_result(i.query.statement_id,workspace_client)
            printFormat(f"process_genie_response Attach: {i.query.description}",Fore.BLUE)
            printFormat(f"Data: {data}",Fore.BLUE)
            printFormat(f"Generated code: {i.query.query}",Fore.BLUE)
            jsonResponse = dataframe_to_json(data)
            printFormat(f"json response: {jsonResponse}",Fore.BLUE)
            return jsonResponse

In [4]:
def askDatabaseQuestions(prompt: str) -> str:
    """
    Fetches the database information for the specified data.
    use only for database questions!

    :param prompt  (str): the question to make to Genie assitante about data.
    :return: text response to the question. 
    :rtype: str
    """
    theResponse = ""
    try:
        # call Genie
        DATABRICKS_SPACE_ID = os.getenv("DATABRICKS_SPACE_ID")
        DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
        DATABRICKS_TOKEN = os.getenv("DATABRICKS_TOKEN")
        workspace_client = WorkspaceClient(
            host=DATABRICKS_HOST,
            token=DATABRICKS_TOKEN
        )

        #genie_api = GenieAPI(workspace_client.api_client)

        # Start the conversation
        #prompt="what data do you have access?"
        print(f"")
        print(f"askDatabaseQuestions Prompt: {prompt}")
        print(f"")

        conversation = workspace_client.genie.start_conversation_and_wait(DATABRICKS_SPACE_ID, prompt)
        theResponse = process_genie_responseJson(conversation,workspace_client)
        #theResponse = process_genie_response(conversation,workspace_client)
    except Exception as e:
        # Handle any unexpected errors
        print(f"An error occurred: {str(e)}")
        theResponse = f"Error: {str(e)}"
    return theResponse

In [5]:
# Statically defined user functions for fast reference
user_functions: Set[Callable[..., Any]] = {
    askDatabaseQuestions,

}

# Initialize agent toolset with user functions
functions = FunctionTool(user_functions)
myToolSet = ToolSet()
myToolSet.add(functions)

In [6]:
agentInsructions = """
you are an agent that response user questions.
for question related to date topics listes below you must  use the function askDatabaseQuestions.
when you call the function askDatabaseQuestions, you must use the same prompt as the user question. you don't change the prompt.
When you get the response from the function askDatabaseQuestions, you must return the response to the user as is.
you must not change the response from the function askDatabaseQuestions.
the topics are:
- database schema questions
- Driver questions

"""

In [7]:
project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(), conn_str=os.environ["PROJECT_CONNECTION_STRING"]
)

#enable_auto_function_calls
project_client.agents.enable_auto_function_calls(toolset=myToolSet)

myAgent = project_client.agents.create_agent(
        model=os.environ["MODEL_DEPLOYMENT_NAME"],
        name="my-agent",
        instructions=agentInsructions,
        headers={"x-ms-enable-preview": "true"},
        toolset=myToolSet
)

print(f"Created agent, agent ID: {myAgent.id}")
# Create a thread
thread = project_client.agents.create_thread()
print(f"Created thread, thread ID: {thread.id}")

Created agent, agent ID: asst_Aple9tY5bDao6juXmTj9Tax2
Created thread, thread ID: thread_PYdPNsPav4tWOEjDT35oEDV7


In [8]:
def askAgent(string: str) -> str:
    try:
        # Create a new agent message
        message = project_client.agents.create_message(
            thread_id=thread.id,
            content=string,
            role="user",
        )
        print(f"{Fore.GREEN}Created message, message ID: {message.id}")

        # Run the agent
        
        run = project_client.agents.create_and_process_run(thread_id=thread.id, agent_id=myAgent.id,toolset=myToolSet)
        print(f"{Fore.GREEN}Run finished with status: {run.status}")

        if run.status == "failed":
            # Check if you got "Rate limit is exceeded.", then you want to get more quota
            print(f"Run failed: {run.last_error}")
            return f"Error: {run.last_error}"

        # Get messages from the thread
        messages = project_client.agents.list_messages(thread_id=thread.id)
       # print(f"Messages: {messages}")

        # Get the last message from the sender
        last_msg = messages.get_last_text_message_by_role("assistant")
        if last_msg:
            print(f"{Fore.GREEN} Last Message: {last_msg.text.value}")
            return last_msg.text.value

        return "No response from the assistant."
    except Exception as e:
        # Handle any unexpected errors
        print(f"An error occurred: {str(e)}")
        return f"Error: {str(e)}"

In [9]:


#askAgent("list all table names in the database")
#finalAnswwer=askAgent("list first 3 drivers in json format")
#finalAnswwer=askAgent("list first 3 drivers")
finalAnswwer=askAgent("show the full names of top 5 drivers and total wins that had the most ace wins in the 2024 seasson")

print(f"{Fore.GREEN}AI Foundry Agent Answer: {finalAnswwer}{Style.RESET_ALL}") 

[32mCreated message, message ID: msg_6lhx4fYEKf1uDxhby87dPLWt

askDatabaseQuestions Prompt: show the full names of top 5 drivers and total wins that had the most ace wins in the 2024 seasson

[34mprocess_genie_response Attach: This analysis identifies the top five drivers with the most wins in the 2024 racing season. It combines data from the drivers and their standings, ranking them based on their win totals. The results are then presented in order of their rank.[0m
[34mData:   forename     surname wins
0      Max  Verstappen    9
1      Max  Verstappen    9
2      Max  Verstappen    8
3      Max  Verstappen    8
4      Max  Verstappen    7[0m
[34mGenerated code: WITH RankedDrivers AS (SELECT `drivers`.`forename`, `drivers`.`surname`, `driver_standings`.`wins`, ROW_NUMBER() OVER (ORDER BY `driver_standings`.`wins` DESC) AS rank FROM `adb_myworkspace_473562017951006`.`genie_data`.`drivers` JOIN `adb_myworkspace_473562017951006`.`genie_data`.`driver_standings` ON `drivers`.`driver