In [1]:
import requests
import logging

from pprint import pprint
from config import DefaultConfig
from typing import Dict, List, Optional

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dashboards import GenieAPI
from databricks.sdk import AccountClient

CONFIG = DefaultConfig()
# access_token = CONFIG.DATABRICKS_TOKEN
workspace_url = CONFIG.DATABRICKS_HOST
genie_room_id = CONFIG.DATABRICKS_SPACE_ID
w = WorkspaceClient(host = workspace_url)
genie_api = GenieAPI(w.api_client)

# Log
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

In [15]:
from clients.databricks_genie_client import GenieClient

await GenieClient().ask_genie("Hello")


('{"message": "Hello! How can I assist you with your data analysis today?"}',
 '01f049a74db918aea42088d01e04797f')

In [None]:
# 1. getspace: Retrieve available spaces
get_spaces_url = f"{workspace_url}/api/2.0/genie/spaces/{genie_room_id}"
response1 = requests.get(get_spaces_url, headers={"Authorization": f"Bearer {access_token}"})
pprint(response1.text)

('{"error_code":401,"message":"Credential was not sent or was of an '
 'unsupported type for this API. [ReqId: '
 '48ba218a-e43f-479b-a4bc-236acb6b2992]"}')


In [27]:
def process_query_results(answer_json: Dict) -> str:
    response = ""
    
    logger.info(f"Processing answer JSON: {answer_json}")
    
    if "query_description" in answer_json and answer_json["query_description"]:
        response += f"## Query Description\n\n{answer_json['query_description']}\n\n"

    if "query_result_metadata" in answer_json:
        metadata = answer_json["query_result_metadata"]
        if isinstance(metadata, dict):
            if "row_count" in metadata:
                response += f"**Row Count:** {metadata['row_count']}\n\n"
            if "execution_time_ms" in metadata:
                response += f"**Execution Time:** {metadata['execution_time_ms']}ms\n\n"

    if "statement_response" in answer_json:
        statement_response = answer_json["statement_response"]
        logger.info(f"Found statement_response: {statement_response}")
        
        if "result" in statement_response and "data_array" in statement_response["result"]:
            response += "## Query Results\n\n"
            
            schema = statement_response.get("manifest", {}).get("schema", {})
            columns = schema.get("columns", [])
            logger.info(f"Schema columns: {columns}")
            
            header = "| " + " | ".join(col["name"] for col in columns) + " |"
            separator = "|" + "|".join(["---" for _ in columns]) + "|"
            response += header + "\n" + separator + "\n"
            
            data_array = statement_response["result"]["data_array"]
            logger.info(f"Data array: {data_array}")
            
            for row in data_array:
                formatted_row = []
                for value, col in zip(row, columns):
                    if value is None:
                        formatted_value = "NULL"
                    elif col["type_name"] in ["DECIMAL", "DOUBLE", "FLOAT"]:
                        formatted_value = f"{float(value):,.2f}"
                    elif col["type_name"] in ["INT", "BIGINT", "LONG"]:
                        formatted_value = f"{int(value):,}"
                    else:
                        formatted_value = str(value)
                    formatted_row.append(formatted_value)
                response += "| " + " | ".join(formatted_row) + " |\n"
        else:
            logger.error(f"Missing result or data_array in statement_response: {statement_response}")
    elif "message" in answer_json:
        response += f"{answer_json['message']}\n\n"
    else:
        response += "No data available.\n\n"
        logger.error("No statement_response or message found in answer_json")
    
    return response

In [40]:
# 1. getspace: Retrieve available spaces
get_spaces_url = f"{workspace_url}/api/2.0/genie/spaces/{genie_room_id}"
response1 = requests.get(get_spaces_url, headers={"Authorization": f"Bearer {access_token}"})
pprint(response1.text)

'{"space_id":"01f0480a465e133d91f903c42d73ef81","title":"Genie Data Analyst"}'


In [41]:
# 2. startconversation: Start a new conversation in a specific space
start_conv_url = f"{workspace_url}/api/2.0/genie/spaces/{genie_room_id}/start-conversation"
payload_start = {"content": "Hi Genie!"}
response2 = requests.post(start_conv_url, headers={"Authorization": f"Bearer {access_token}"}, json=payload_start)
pprint(response2.text)

('{"message_id":"01f048790db71403aefc33fac3e27534","message":{"id":"01f048790db71403aefc33fac3e27534","space_id":"01f0480a465e133d91f903c42d73ef81","conversation_id":"01f048790da11e4f8a539a33fc5a46a0","user_id":8707948743593954,"created_timestamp":1749834520684,"status":"SUBMITTED","content":"Hi '
 'Genie!","auto_regenerate_count":0,"message_id":"01f048790db71403aefc33fac3e27534"},"conversation_id":"01f048790da11e4f8a539a33fc5a46a0","conversation":{"id":"01f048790da11e4f8a539a33fc5a46a0","space_id":"01f0480a465e133d91f903c42d73ef81","user_id":8707948743593954,"created_timestamp":1749834520541,"last_updated_timestamp":1749834520541,"title":"Hi '
 'Genie!","conversation_id":"01f048790da11e4f8a539a33fc5a46a0"}}')


In [47]:
# 3. createmessage: Create a new message in the conversation
conversation_id = response2.json()['conversation_id']
create_msg_url = f"{workspace_url}/api/2.0/genie/spaces/{genie_room_id}/conversations/{conversation_id}/messages"
payload_message = {"content": "Show me sales by years"}
response3 = requests.post(create_msg_url, headers={"Authorization": f"Bearer {access_token}"}, json=payload_message)
pprint(response3.text)

('{"id":"01f048794b591e88a661494951bdff2a","space_id":"01f0480a465e133d91f903c42d73ef81","conversation_id":"01f048790da11e4f8a539a33fc5a46a0","user_id":8707948743593954,"created_timestamp":1749834624091,"status":"SUBMITTED","content":"Show '
 'me sales by '
 'years","auto_regenerate_count":0,"message_id":"01f048794b591e88a661494951bdff2a"}')


In [48]:
# 4. getmessage: Retrieve the created message from the conversation
message_id = response3.json()['message_id']
get_msg_url = f"{workspace_url}/api/2.0/genie/spaces/{genie_room_id}/conversations/{conversation_id}/messages/{message_id}"
response4 = requests.get(get_msg_url, headers={"Authorization": f"Bearer {access_token}"})
pprint(response4.text)

('{"id":"01f048794b591e88a661494951bdff2a","space_id":"01f0480a465e133d91f903c42d73ef81","conversation_id":"01f048790da11e4f8a539a33fc5a46a0","user_id":8707948743593954,"created_timestamp":1749834624091,"last_updated_timestamp":1749834624386,"status":"FILTERING_CONTEXT","content":"Show '
 'me sales by '
 'years","auto_regenerate_count":0,"message_id":"01f048794b591e88a661494951bdff2a"}')


In [49]:
# 5. getmessage: Retrieve the created message from the conversation
message_id = response4.json()['message_id']
get_msg_url = f"{workspace_url}/api/2.0/genie/spaces/{genie_room_id}/conversations/{conversation_id}/messages/{message_id}"
response5 = requests.get(get_msg_url, headers={"Authorization": f"Bearer {access_token}"})
pprint(response5.text)

('{"id":"01f048794b591e88a661494951bdff2a","space_id":"01f0480a465e133d91f903c42d73ef81","conversation_id":"01f048790da11e4f8a539a33fc5a46a0","user_id":8707948743593954,"created_timestamp":1749834624091,"last_updated_timestamp":1749834633608,"status":"COMPLETED","content":"Show '
 'me sales by years","attachments":[{"query":{"query":"SELECT '
 'YEAR(`sales`.`order_date`) AS `year`, SUM(`sales`.`total_price`) AS '
 '`total_sales` FROM `databricks_simulated_retail_customer_data`.`v01`.`sales` '
 'GROUP BY `year` ORDER BY `year`","description":"This analysis provides the '
 'total sales revenue for each year, calculated by summing the sales amounts '
 'from all transactions. The results are organized chronologically by '
 'year.","statement_id":"01f04879-4f09-1156-9b69-43c6fac2ac05","query_result_metadata":{"row_count":2}},"attachment_id":"01f048794efe19c0b7832b7f2843414e"}],"query_result":{"statement_id":"01f04879-4f09-1156-9b69-43c6fac2ac05","row_count":2},"auto_regenerate_count":0,"mes

In [51]:
# 5. getmessageattachmentqueryresult: Get the SQL query attachment result (if any) for the message
attachment_id = response5.json()['attachments'][0]['attachment_id']
get_attachment_result_url = f"{workspace_url}/api/2.0/genie/spaces/{genie_room_id}/conversations/{conversation_id}/messages/{message_id}/attachments/{attachment_id}/query-result"
response6 = requests.get(get_attachment_result_url, headers={"Authorization": f"Bearer {access_token}"})
pprint(response6.text)

'{"statement_response":{"statement_id":"01f04879-4f09-1156-9b69-43c6fac2ac05","status":{"state":"SUCCEEDED"},"manifest":{"format":"JSON_ARRAY","schema":{"column_count":2,"columns":[{"name":"year","type_text":"INT","type_name":"INT","position":0},{"name":"total_sales","type_text":"BIGINT","type_name":"LONG","position":1}]},"total_chunk_count":1,"chunks":[{"chunk_index":0,"row_offset":0,"row_count":2,"byte_count":464}],"total_row_count":2,"total_byte_count":464,"truncated":false},"result":{"chunk_index":0,"row_offset":0,"row_count":2,"data_array":[["2019","2550552"],["2020","185474"]]}}}'


In [52]:
process_query_results(response6.json())

'## Query Results\n\n| year | total_sales |\n|---|---|\n| 2,019 | 2,550,552 |\n| 2,020 | 185,474 |\n'

In [None]:
# 6. we can also use SQL endpoint statement info to get that result
statement_id = response5.json()['query_result']['statement_id']
get_attachment_result_url = f"{workspace_url}/api/2.0/sql/statements/{statement_id}"
response7 = requests.get(get_attachment_result_url, headers={"Authorization": f"Bearer {access_token}"})
pprint(response7.text)

'{"statement_id":"01f04876-0ccd-1770-a696-845d19a825d8","status":{"state":"SUCCEEDED"},"manifest":{"format":"JSON_ARRAY","schema":{"column_count":2,"columns":[{"name":"year","type_text":"INT","type_name":"INT","position":0},{"name":"total_sales","type_text":"BIGINT","type_name":"LONG","position":1}]},"total_chunk_count":1,"chunks":[{"chunk_index":0,"row_offset":0,"row_count":2}],"total_row_count":2,"truncated":false},"result":{"chunk_index":0,"row_offset":0,"row_count":2,"data_array":[["2019","2550552"],["2020","185474"]]}}'


In [12]:
def init_conversation(space_id: str):
    """
    Initialize a conversation with Genie and ask a question.
    
    :param space_id: The ID of the space where the conversation will take place.
    :param question: The question to ask Genie.
    :return: The response from Genie and the conversation ID.
    """
    initial_message = genie_api.start_conversation_and_wait( space_id, "Hello Genie!")
    conversation_id = initial_message.conversation_id

    return conversation_id

def ask_genie(question: str, space_id: str, conversation_id: Optional[str] = None):
    """
    Ask Genie a question in a specific space.
    
    :param question: The question to ask Genie.
    :param space_id: The ID of the space where the question will be asked.
    :param conversation_id: Optional conversation ID to continue an existing conversation.
    :return: The response from Genie.
    """
    if conversation_id is None:
        initial_message =genie_api.start_conversation_and_wait( space_id, question)
        conversation_id = initial_message.conversation_id
    else:
        initial_message = genie_api.create_message_and_wait(space_id, conversation_id, question)

    message_content = genie_api.get_message(space_id, initial_message.conversation_id, initial_message.id)

    return message_content, conversation_id

def get_attachment_query_result(space_id, conversation_id, message_id, attachment_id):    
    """Get the query result for an attachment using the Genie API.
    
    """

    url = f"{CONFIG.DATABRICKS_HOST}/api/2.0/genie/spaces/{space_id}/conversations/{conversation_id}/messages/{message_id}"
    headers = {
        "Authorization": f"Bearer {CONFIG.DATABRICKS_TOKEN}",
        "Content-Type": "application/json"
    }
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        logger.error(f"Message endpoint returned status {response.status_code}: {response.text}")
        return {}
    
    try:
        message_data = response.json()
        logger.info(f"Message data: {message_data}")
        
        statement_id = None
        if "attachments" in message_data:
            for attachment in message_data["attachments"]:
                if attachment.get("attachment_id") == attachment_id:
                    if "query" in attachment and "statement_id" in attachment["query"]:
                        statement_id = attachment["query"]["statement_id"]
                        break
        
        if not statement_id:
            logger.error("No statement_id found in message data")
            return {}
            
        query_url = f"{CONFIG.DATABRICKS_HOST}/api/2.0/genie/spaces/{space_id}/conversations/{conversation_id}/messages/{message_id}/attachments/{attachment_id}/query-result"
        query_headers = {
            "Authorization": f"Bearer {CONFIG.DATABRICKS_TOKEN}",
            "Content-Type": "application/json",
            "X-Databricks-Statement-Id": statement_id
        }
        
        query_response = requests.get(query_url, headers=query_headers)
        if query_response.status_code != 200:
            logger.error(f"Query result endpoint returned status {query_response.status_code}: {query_response.text}")
            return {}
            
        if not query_response.text.strip():
            logger.error(f"Empty response from Genie API: {query_response.status_code}")
            return {}
            
        result = query_response.json()
        logger.info(f"Raw query result response: {result}")
        
        if isinstance(result, dict):
            if "data_array" in result:
                if not isinstance(result["data_array"], list):
                    result["data_array"] = []
            if "schema" in result:
                if not isinstance(result["schema"], dict):
                    result["schema"] = {}
                    
            if "schema" in result and "columns" in result["schema"]:
                if not isinstance(result["schema"]["columns"], list):
                    result["schema"]["columns"] = []
                    
            if "data_array" in result and result["data_array"] and "schema" not in result:
                first_row = result["data_array"][0]
                if isinstance(first_row, dict):
                    result["schema"] = {
                        "columns": [{"name": key} for key in first_row.keys()]
                    }
                elif isinstance(first_row, list):
                    result["schema"] = {
                        "columns": [{"name": f"Column {i}"} for i in range(len(first_row))]
                    }
                    
        return result
    except Exception as e:
        logger.error(f"Failed to process Genie API response: {e}, text: {response.text}")
        return {}


In [10]:
conversation_id = init_conversation(genie_room_id)

In [69]:
message_content, conversation_id = ask_genie("Show me sales by years and month", genie_room_id, conversation_id)
message_content

GenieMessage(id='01f048a2b8fb1162a0f0a86bcb799238', space_id='01f0480a465e133d91f903c42d73ef81', conversation_id='01f0489c2e371e24acf44e378e9d9657', content='Show me sales by years and month', message_id='01f048a2b8fb1162a0f0a86bcb799238', attachments=[GenieAttachment(attachment_id='01f048a2bce21b69ac961e63b6e24c78', query=GenieQueryAttachment(description='This report summarizes total sales by month and year, providing insights into sales performance over time. The data is aggregated to show the total revenue generated for each month, allowing for easy tracking of sales trends.', id=None, last_updated_timestamp=None, query='SELECT YEAR(`sales`.`order_date`) AS `year`, MONTH(`sales`.`order_date`) AS `month`, SUM(`sales`.`total_price`) AS `total_sales` FROM `databricks_simulated_retail_customer_data`.`v01`.`sales` GROUP BY YEAR(`sales`.`order_date`), MONTH(`sales`.`order_date`) ORDER BY YEAR(`sales`.`order_date`), MONTH(`sales`.`order_date`)', query_result_metadata=GenieResultMetadata(is

In [70]:
initial_message = genie_api.start_conversation_and_wait(genie_room_id, "Hello Genie!")
conversation_id = initial_message.conversation_id
message_content, conversation_id = ask_genie("Show me aggregated sales amount by years and month", genie_room_id, conversation_id)

In [71]:
message_id = message_content.message_id
attachment_id = message_content.attachments[0].attachment_id if message_content.attachments else None

In [83]:
query_result = genie_api.get_message_attachment_query_result(genie_room_id, conversation_id, message_id, attachment_id)

In [85]:
query_result.as_dict()['statement_response']

{'manifest': {'chunks': [{'byte_count': 616,
    'chunk_index': 0,
    'row_count': 4,
    'row_offset': 0}],
  'format': 'JSON_ARRAY',
  'schema': {'column_count': 3,
   'columns': [{'name': 'year',
     'position': 0,
     'type_name': 'INT',
     'type_text': 'INT'},
    {'name': 'month', 'position': 1, 'type_name': 'INT', 'type_text': 'INT'},
    {'name': 'total_sales',
     'position': 2,
     'type_name': 'LONG',
     'type_text': 'BIGINT'}]},
  'total_byte_count': 616,
  'total_chunk_count': 1,
  'total_row_count': 4,
  'truncated': False},
 'result': {'chunk_index': 0,
  'data_array': [['2019', '8', '1128305'],
   ['2019', '9', '609871'],
   ['2019', '10', '812376'],
   ['2020', '2', '185474']],
  'row_count': 4,
  'row_offset': 0},
 'statement_id': '01f048a2-cca0-17b1-aee1-e082313d1971',
 'status': {'state': 'SUCCEEDED'}}

In [64]:
query_result

GenieGetMessageQueryResultResponse(statement_response=StatementResponse(manifest=ResultManifest(chunks=[BaseChunkInfo(byte_count=616, chunk_index=0, row_count=4, row_offset=0)], format=<Format.JSON_ARRAY: 'JSON_ARRAY'>, schema=ResultSchema(column_count=3, columns=[ColumnInfo(name='year', position=0, type_interval_type=None, type_name=<ColumnInfoTypeName.INT: 'INT'>, type_precision=None, type_scale=None, type_text='INT'), ColumnInfo(name='month', position=1, type_interval_type=None, type_name=<ColumnInfoTypeName.INT: 'INT'>, type_precision=None, type_scale=None, type_text='INT'), ColumnInfo(name='total_sales', position=2, type_interval_type=None, type_name=<ColumnInfoTypeName.LONG: 'LONG'>, type_precision=None, type_scale=None, type_text='BIGINT')]), total_byte_count=616, total_chunk_count=1, total_row_count=4, truncated=False), result=ResultData(byte_count=None, chunk_index=0, data_array=[['2019', '8', '1128305'], ['2019', '9', '609871'], ['2019', '10', '812376'], ['2020', '2', '185474

In [62]:
query_description = getattr(query_result.attachments, "description", "")

AttributeError: 'GenieGetMessageQueryResultResponse' object has no attribute 'attachments'

In [61]:
query_description

''

In [28]:
process_query_results(message_attachment_query_result)

'## Query Results\n\n| year | month | total_sales |\n|---|---|---|\n| 2,019 | 8 | 1,128,305 |\n| 2,019 | 9 | 609,871 |\n| 2,019 | 10 | 812,376 |\n| 2,020 | 2 | 185,474 |\n'