This notebook demonstrates a basic SQL agent that translates natural language questions into SQL queries.

In [38]:
import os
from dotenv import load_dotenv
from typing import Annotated
from openai import AzureOpenAI
import sqlite3
from typing import Any, List
import pandas as pd
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.identity import DefaultAzureCredential

load_dotenv()
AZURE_OPENAI_ENDPOINT=os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_API_KEY= os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION")
AZURE_OPENAI_EMBEDDINGS_ADA_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_EMBEDDINGS_ADA_DEPLOYMENT_NAME")
AZURE_OPENAI_GPT4_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_GPT4_DEPLOYMENT_NAME")

KUSTO_URI = os.getenv("NL_TO_KQL_KUSTO_URI")
KUSTO_DATABASE = os.getenv("NL_TO_KQL_KUSTO_DATABASE")
KUSTO_TABLE = os.getenv("NL_TO_KQL_KUSTO_TABLE")

try:
    credential = DefaultAzureCredential()
    token = credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    print(ex)

In [53]:
llm = AzureOpenAI(
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        api_key=AZURE_OPENAI_API_KEY,
        api_version=AZURE_OPENAI_API_VERSION
)

def call_openAI(user_prompt, use_json_object=True):
    system_message = """You are an assistant designed to answer questions."""
    
    if use_json_object:
        response = llm.chat.completions.create(
            model=AZURE_OPENAI_GPT4_DEPLOYMENT_NAME,
            messages = [
                {"role":"system","content":system_message},
                {"role":"user","content":user_prompt}
                ],
            response_format={ "type": "json_object" }
        )
        return response.choices[0].message.content
    else:
        response = llm.chat.completions.create(
            model=AZURE_OPENAI_GPT4_DEPLOYMENT_NAME,
            messages = [
                {"role":"system","content":system_message},
                {"role":"user","content":user_prompt}
                ]
        )
    return response.choices[0].message.content


In [54]:
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_URI)
print(kcsb)
client = KustoClient(kcsb)
kusto_db = KUSTO_DATABASE

Data Source=https://trd-m78emrpuh6devrz4ty.z4.kusto.fabric.microsoft.com;Initial Catalog=NetDefaultDB;AAD Federated Security=True;Authority Id=organizations;AZ CLI=True


In [55]:
# #testing the connection to kusto works - sample query to get the top 2 results from the table
# table_name = KUSTO_TABLE
# query = table_name + " | take 2"

# response = client.execute(kusto_db, query)
# for row in response.primary_results[0]:
#     txt = (row["Ticker"])[0:10]
#     print("Ticker :{}".format(txt))

In [56]:
def execute_query(kusto_query: str):
    response = client.execute(kusto_db, kusto_query)
    df = dataframe_from_result_table(response.primary_results[0])
    df
    return df


def get_table_creation_kql() -> str:
    table_schema = ".create-merge table Stocks (Date:datetime, Open:real, High:real, Low:real, Close:real, AdjClose:real, Volume:string, Ticker:string) "
    return table_schema

In [57]:
from typing import List, Optional, Tuple
import pandas as pd
import json

#Given an SQL error after a bad query, this writes a prompt to fix it
def get_correction_prompt(schema: str,question: str,query: str,sql_error_message: str) -> str:
    answer = {
        'Question': question,
        'Query': query,
        'KQL Error': str(sql_error_message)
    }

    prompt = get_schema_prompt(schema)
    prompt += 'Given the following question, query, and kql error, fix the query.'
    prompt += json.dumps(answer)
    prompt += """Return your answer ONLY in JSON format as follows:
    {
        'explanation': '<explain what the query does>',
        'query': '<kql query>'
    }
    """
    return prompt

def extract_json(response: str):
    json_response = None
    if '{' in response and '}' in response:
        response = response.replace('\n', '')
        json_response = json.loads(response)
    return json_response["explanation"], json_response["query"]
    
def get_schema_prompt() -> str:
    prompt = 'Below is the information for a KQL table.'
    prompt += f'Schema:\n{get_table_creation_kql()}\n\n'
    prompt += '\n------------------------------------------------------\n'
    return prompt

def get_prompt(question: str) -> str:
    prompt = get_schema_prompt()
    prompt += 'Below is a question input from a user. '
    prompt += 'Generate a KQL query that pulls the necessary data to answer the question.\n\n'
    prompt += f'Question: {question}\n\n'
    prompt += """Return your answer ONLY in JSON format as follows:
    {
        'explanation': '<explain what the query does>',
        'query': '<kql query>'
    }
    """
    return prompt

def format_result(question: str, query: str, explanation: str, result: str) -> str:
    return json.dumps(
        {
            'question': question,
            'query': query,
            'explanation': explanation,
            'result': result
        }, indent=4
    )

def get_final_answer_prompt(question: str,query: str,explanation: str,result: pd.DataFrame) -> str:
    prompt = get_schema_prompt()
    prompt += 'Below is a question, KQL query, explanation, and the result from executing the query. '
    prompt += 'Use these pieces of information to answer the question.\n\n'
    prompt += format_result(question, query, explanation, result.to_string())
    return prompt


def respond(question: str, chat_history: List[Tuple[str, str]]) -> Tuple:
    user_prompt = get_prompt(question)
    ua_response = call_openAI(user_prompt, True)
    explanation, query = extract_json(ua_response)

    if query is None:
        return '', chat_history, None, '', explanation

    success = False
    for _ in range(5):
        try:
            query_result = execute_query(query)
            success = True
            break
        except Exception as kql_error_message:
            kql_error_prompt = get_correction_prompt(question, query, str(kql_error_message))
            response = call_openAI(kql_error_prompt, True)
            explanation, query = extract_json(response)
            if query is None:
                return '', chat_history, None, '', explanation

    if success:
        final_answer_prompt = get_final_answer_prompt(question, query, explanation, query_result)
        chat_response = call_openAI(final_answer_prompt, False)
        chat_history.append((question, chat_response))
    else:
        query_result = ''

    return '', chat_history, query_result, query, explanation

In [58]:
respond("What is the average price for each stock symbol in the February 2013?", [])

('',
 [('What is the average price for each stock symbol in the February 2013?',
   'The average price for each stock symbol in February 2013 is provided in the "result" section. Each row lists a stock symbol (Ticker) and its average closing price (AvgClose) for that month. For example, the stock symbol "OKE" had an average closing price of 41.215475, while "J" had an average of 48.548421.')],
     Ticker   AvgClose
 0      OKE  41.215475
 1        J  48.548421
 2       HD  66.738947
 3      HAS   39.79421
 4      PFE  25.849895
 ..     ...        ...
 462    FLT  65.527368
 463    MPC  40.260789
 464    XYL  27.725263
 465   META  28.087895
 466    ZTS  32.554737
 
 [467 rows x 2 columns],
 'Stocks\n| where Date between (datetime(2013-02-01) .. datetime(2013-02-28))\n| summarize AvgClose=avg(Close) by Ticker\n| project Ticker, AvgClose',
 'The query calculates the average closing price of each stock symbol for the month of February 2013. It filters the data to only include entries fro