## 1. Creating Hashed Passwords for Authentication:

* Everytime when a new user is added to the app, we need to get the user_id (usually email) and their password from them.
* Then we need to create a `Hash` of the password and add it to the `authenticator.yml` file in order for the user to be able to log-in to the app.

In [4]:
from streamlit_authenticator.utilities.hasher import Hasher

hashed_passwords = Hasher(['abc123']).generate()
print(hashed_passwords) 

['$2b$12$tdKt5p7nfhQgcDG30FldqeTZLUio5P.vAqKXSqiUHxURaei7Npkq2']


In [5]:
hashed_passwords = Hasher(['blackpearl']).generate()
print(hashed_passwords) 

['$2b$12$N9w9IGLBfLv1tne8lzalUOm2hH4ytnvbTgQ.vs0jV6EbKt9x2QPr6']


## 2. Testing the connection with Snowflake Data warehouse:

In [32]:
import snowflake.connector
from dotenv import load_dotenv
import os 

load_dotenv()

True

In [34]:
con = snowflake.connector.connect(
    user='hariharan',
    password= os.getenv('SNOWSQL_PWD'),
    account=os.getenv('SNOWSQL_ACCOUNT'),
    warehouse = os.getenv('SNOWSQL_WAREHOUSE'),
)

In [35]:
# Show all the databses present in the database
cur = con.cursor()
cur.execute("SHOW DATABASES")
result = cur.fetchall()
for row in result:
    schema_name = row[1]
    print(schema_name)

FOOD_DELIVERY
SNOWFLAKE
SNOWFLAKE_SAMPLE_DATA


In [36]:
# Show all the schemas present in the database
cur = con.cursor()
cur.execute("SHOW SCHEMAS IN FOOD_DELIVERY")
result = cur.fetchall()
for row in result:
    schema_name = row[1]
    print(schema_name)

CORE
INFORMATION_SCHEMA
PUBLIC


In [37]:
# Show all the tables present in the schema CORE
cur.execute("SHOW TABLES IN SCHEMA FOOD_DELIVERY.CORE")
result = cur.fetchall()
for row in result:
    table_name = row[1]
    print(table_name)

MENU_ITEMS
ORDERS
ORDER_DETAILS
PAYMENTS
RESTAURANTS
REVIEWS
USERS


In [38]:
# creating cursor object
cur = con.cursor()

# Execute a statement that will generate a result set.
sql = "SELECT * FROM FOOD_DELIVERY.CORE.ORDERS"
cur.execute(sql)

# Fetch the result set from the cursor and deliver it as the pandas DataFrame.
df = cur.fetch_pandas_all()

print(df.shape)
df.head()

(73748, 7)


Unnamed: 0,ORDER_ID,USER_ID,ORDER_TIME,DELIVERY_ADDRESS,ORDER_STATUS,RESTAURANT_ID,TOTAL_AMOUNT
0,716d868b-2b5a-4bee-bc2c-ee262c81588f,d36e1a86-6e33-434a-b9c7-3f5c30753402,2024-06-01 11:53:19,"9198 Gabriela Green\nEast Marcton, DC 16873",Delivered,R18329211,152.85
1,0f41a3a7-954a-4e24-ad84-dba22e8dd154,1ae81af6-6b78-4695-8996-442e9e40ad3c,2024-06-01 08:11:05,"366 Byrd Hills\nNew Robert, WI 24455",Delivered,R22670500,191.5
2,fde175ad-d2ea-4901-a5fd-5c28f44e7382,2c1492fd-c993-4d00-9086-dc105c821bae,2024-06-01 21:46:46,"71211 Gregory Track\nGreenestad, OH 72514",Delivered,R66375744,35.88
3,99e370cd-f55e-4793-b0e6-0c41d8fd9287,523d1f1a-9edd-4811-9472-f65a52d51f15,2024-06-01 09:04:39,"070 Steven Heights\nRoachville, PW 12962",Delivered,R42644875,149.57
4,a88b8bf1-9fdf-4c8e-87ab-ac3a390d05e0,a6c31749-eb2a-40f3-aa65-49ead5cfdc3a,2024-06-01 21:05:23,"9382 Alyssa Branch\nShellyfurt, VI 11466",Delivered,R23024716,129.32


## 3. Testing the connection with Databricks Data warehouse:

In [1]:
from dotenv import load_dotenv
import os 
import pandas as pd 
from databricks import sql 

load_dotenv()

True

In [9]:
# Creating a connection to databricks
connection = sql.connect(server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path=os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token=os.getenv("DATABRICKS_ACCESS_TOKEN")
                         )


In [10]:
df = pd.read_sql("SELECT * FROM hive_metastore.online_food_business.menu_items", connection)
print(df.shape)
df.head()

  df = pd.read_sql("SELECT * FROM hive_metastore.online_food_business.menu_items", connection)


(3000, 4)


Unnamed: 0,menu_item_id,restaurant_id,item_name,price
0,R32379007_1,R32379007,Tacos,10.2
1,R32379007_2,R32379007,Burritos,13.93
2,R32379007_3,R32379007,Enchiladas,14.15
3,R32379007_4,R32379007,Quesadillas,10.47
4,R32379007_5,R32379007,Nachos,10.8


## 4. Testing the connection with OpenAI API:

In [2]:
from openai import OpenAI
client = OpenAI()

completion = client.chat.completions.create(
  model="gpt-4o-mini",
  messages=[
    {"role": "system", "content": "You are a poetic assistant, skilled in explaining complex programming concepts with creative flair."},
    {"role": "user", "content": "Compose a poem that explains the concept of recursion in programming."}
  ]
)

print(completion.choices[0].message)

ChatCompletionMessage(content='In the realm of code where logic reigns,  \nThere dwells a concept, where beauty gains,  \nRecursive thoughts, like whispers in trees,  \nUnfold their secrets, a dance on the keys.  \n\nImagine a puzzle, with pieces askew,  \nTo solve it completely, what must you do?  \nBreak it to bits, that’s the crafter’s decree,  \nFor recursion bends time, like branches of a tree.  \n\nA function calls forth, a voice echoing clear,  \n"Look at the smaller, and hold it near!"  \nA task lightly born, yet endlessly spun,  \nFor in each little call, the journey\'s begun.  \n\nBase case awaits, like a lighthouse’s beam,  \nA stopping point found, in this coding dream.  \nWhen n equals one, or perhaps zero,  \nThe function returns, a hero in shadow.  \n\nBut should there be more, the dance starts anew,  \nThe function dives deeper, pursuing its due,  \nWith each layered call, the stack rises high,  \nA mountain of logic, reaching the sky.  \n\nYet heed well, dear coder, th

## 5. Retrieving the Table Schema, Categorical Column details and Sample rows for Prompt Context:

In [3]:
from dotenv import load_dotenv
import os 
import pandas as pd 
from databricks import sql 

load_dotenv()

True

In [4]:
# Selected tables list 
catalog = "hive_metastore"
schema = "online_food_business"
tables_list = ["menu_items","orders","users"]

In [66]:
# table_schema = ""

# # Iterating through each selected tables and get the list of columns for each table.
# for table in tables_list:

#     conn = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
#                     http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
#                     access_token    = os.getenv("DATABRICKS_ACCESS_TOKEN"))        

#     # Getting the Schema for the table
#     query = f"SHOW CREATE TABLE `{catalog}`.{schema}.{table}"
#     df = pd.read_sql(sql=query,con=conn)
#     stmt = df['createtab_stmt'][0]
#     stmt = stmt.split("USING")[0]

#     # Filtering the String columns from the table to identify Categorical columns
#     query = f"DESCRIBE TABLE `{catalog}`.{schema}.{table}"
#     df = pd.read_sql(sql=query,con=conn)
#     string_cols = df[df['data_type']=='string']['col_name'].values.tolist()

#     sql_distinct = ""
#     for col in string_cols:
#         # Getting the distinct values for each column as rows
#         if col == string_cols[-1]:
#             sql_distinct += f"SELECT '{col}' AS column_name, COUNT(DISTINCT {col}) AS cnt, ARRAY_AGG(DISTINCT {col}) AS values FROM `{catalog}`.{schema}.{table}"
#         else:
#             sql_distinct += f"SELECT '{col}' AS column_name, COUNT(DISTINCT {col}) AS cnt, ARRAY_AGG(DISTINCT {col}) AS values FROM `{catalog}`.{schema}.{table} UNION ALL "

#     # print(sql_distinct)
#     df_categories = pd.read_sql(sql=sql_distinct,con=conn)
#     df_categories = df_categories[df_categories['cnt'] <= 20]
#     df_categories = df_categories.drop(columns='cnt')

#     if df_categories.empty:
#         df_categories_string = "No Categorical Fields"
#     else:
#         df_categories_string = df_categories.to_string(index=False)


#     # Getting the sample rows from the table
#     query = f"SELECT * FROM `{catalog}`.{schema}.{table} LIMIT 3"
#     df = pd.read_sql(sql=query,con=conn)
#     sample_rows = df.to_string(index=False)

    
#     # df_categories = df_string.groupby('col_name').filter(lambda x: x['col_name'].count() <= 20)
#     # print(df_string)
#     # print(df_categories)
#     if table_schema == "":
#         table_schema = stmt + "\n" + sample_rows + "\n\nCategorical Fields:\n" + df_categories_string + "\n"
#     else:
#         table_schema = table_schema + "\n" + stmt + "\n" + sample_rows + "\n\nCategorical Fields:\n" + df_categories_string + "\n"

In [5]:
# Function to create enriched database schema details for the Prompt
def get_enriched_database_schema(catalog,schema,tables_list):
    table_schema = ""

    # Iterating through each selected tables and get the list of columns for each table.
    for table in tables_list:

        conn = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                        http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                        access_token    = os.getenv("DATABRICKS_ACCESS_TOKEN"))        

        # Getting the Schema for the table
        query = f"SHOW CREATE TABLE `{catalog}`.{schema}.{table}"
        df = pd.read_sql(sql=query,con=conn)
        stmt = df['createtab_stmt'][0]
        stmt = stmt.split("USING")[0]

        # Filtering the String columns from the table to identify Categorical columns
        query = f"DESCRIBE TABLE `{catalog}`.{schema}.{table}"
        df = pd.read_sql(sql=query,con=conn)
        string_cols = df[df['data_type']=='string']['col_name'].values.tolist()

        sql_distinct = ""
        for col in string_cols:
            # Getting the distinct values for each column as rows
            if col == string_cols[-1]:
                sql_distinct += f"SELECT '{col}' AS column_name, COUNT(DISTINCT {col}) AS cnt, ARRAY_AGG(DISTINCT {col}) AS values FROM `{catalog}`.{schema}.{table}"
            else:
                sql_distinct += f"SELECT '{col}' AS column_name, COUNT(DISTINCT {col}) AS cnt, ARRAY_AGG(DISTINCT {col}) AS values FROM `{catalog}`.{schema}.{table} UNION ALL "

        print(sql_distinct)
        df_categories = pd.read_sql(sql=sql_distinct,con=conn)
        df_categories = df_categories[df_categories['cnt'] <= 20]
        df_categories = df_categories.drop(columns='cnt')
        print(df_categories)

        if df_categories.empty:
            df_categories_string = "No Categorical Fields"
        else:
            df_categories_string = df_categories.to_string(index=False)


        # Getting the sample rows from the table
        query = f"SELECT * FROM `{catalog}`.{schema}.{table} LIMIT 3"
        df = pd.read_sql(sql=query,con=conn)
        sample_rows = df.to_string(index=False)

        if table_schema == "":
            table_schema = stmt + "\n" + sample_rows + "\n\nCategorical Fields:\n" + df_categories_string + "\n"
        else:
            table_schema = table_schema + "\n" + stmt + "\n" + sample_rows + "\n\nCategorical Fields:\n" + df_categories_string + "\n"
    
    return table_schema

In [6]:
table_schema = get_enriched_database_schema(catalog,schema,tables_list)

  df = pd.read_sql(sql=query,con=conn)
  df = pd.read_sql(sql=query,con=conn)


SELECT 'menu_item_id' AS column_name, COUNT(DISTINCT menu_item_id) AS cnt, ARRAY_AGG(DISTINCT menu_item_id) AS values FROM `hive_metastore`.online_food_business.menu_items UNION ALL SELECT 'restaurant_id' AS column_name, COUNT(DISTINCT restaurant_id) AS cnt, ARRAY_AGG(DISTINCT restaurant_id) AS values FROM `hive_metastore`.online_food_business.menu_items UNION ALL SELECT 'item_name' AS column_name, COUNT(DISTINCT item_name) AS cnt, ARRAY_AGG(DISTINCT item_name) AS values FROM `hive_metastore`.online_food_business.menu_items


  df_categories = pd.read_sql(sql=sql_distinct,con=conn)


Empty DataFrame
Columns: [column_name, values]
Index: []


  df = pd.read_sql(sql=query,con=conn)
  df = pd.read_sql(sql=query,con=conn)
  df = pd.read_sql(sql=query,con=conn)


SELECT 'order_id' AS column_name, COUNT(DISTINCT order_id) AS cnt, ARRAY_AGG(DISTINCT order_id) AS values FROM `hive_metastore`.online_food_business.orders UNION ALL SELECT 'user_id' AS column_name, COUNT(DISTINCT user_id) AS cnt, ARRAY_AGG(DISTINCT user_id) AS values FROM `hive_metastore`.online_food_business.orders UNION ALL SELECT 'delivery_address' AS column_name, COUNT(DISTINCT delivery_address) AS cnt, ARRAY_AGG(DISTINCT delivery_address) AS values FROM `hive_metastore`.online_food_business.orders UNION ALL SELECT 'order_status' AS column_name, COUNT(DISTINCT order_status) AS cnt, ARRAY_AGG(DISTINCT order_status) AS values FROM `hive_metastore`.online_food_business.orders UNION ALL SELECT 'restaurant_id' AS column_name, COUNT(DISTINCT restaurant_id) AS cnt, ARRAY_AGG(DISTINCT restaurant_id) AS values FROM `hive_metastore`.online_food_business.orders


  df_categories = pd.read_sql(sql=sql_distinct,con=conn)


    column_name                                        values
3  order_status  [Cancelled, Pending, Undelivered, Delivered]


  df = pd.read_sql(sql=query,con=conn)
  df = pd.read_sql(sql=query,con=conn)
  df = pd.read_sql(sql=query,con=conn)


SELECT 'user_id' AS column_name, COUNT(DISTINCT user_id) AS cnt, ARRAY_AGG(DISTINCT user_id) AS values FROM `hive_metastore`.online_food_business.users UNION ALL SELECT 'name' AS column_name, COUNT(DISTINCT name) AS cnt, ARRAY_AGG(DISTINCT name) AS values FROM `hive_metastore`.online_food_business.users UNION ALL SELECT 'gender' AS column_name, COUNT(DISTINCT gender) AS cnt, ARRAY_AGG(DISTINCT gender) AS values FROM `hive_metastore`.online_food_business.users UNION ALL SELECT 'email' AS column_name, COUNT(DISTINCT email) AS cnt, ARRAY_AGG(DISTINCT email) AS values FROM `hive_metastore`.online_food_business.users UNION ALL SELECT 'phone_number' AS column_name, COUNT(DISTINCT phone_number) AS cnt, ARRAY_AGG(DISTINCT phone_number) AS values FROM `hive_metastore`.online_food_business.users UNION ALL SELECT 'delivery_address' AS column_name, COUNT(DISTINCT delivery_address) AS cnt, ARRAY_AGG(DISTINCT delivery_address) AS values FROM `hive_metastore`.online_food_business.users


  df_categories = pd.read_sql(sql=sql_distinct,con=conn)


  column_name          values
2      gender  [Female, Male]


  df = pd.read_sql(sql=query,con=conn)


In [7]:
print(table_schema)

CREATE TABLE hive_metastore.online_food_business.menu_items (
  menu_item_id STRING,
  restaurant_id STRING,
  item_name STRING,
  price DOUBLE)

menu_item_id restaurant_id  item_name  price
 R32379007_1     R32379007      Tacos  10.20
 R32379007_2     R32379007   Burritos  13.93
 R32379007_3     R32379007 Enchiladas  14.15

Categorical Fields:
No Categorical Fields

CREATE TABLE hive_metastore.online_food_business.orders (
  order_id STRING,
  user_id STRING,
  order_time TIMESTAMP,
  delivery_address STRING,
  order_status STRING,
  restaurant_id STRING,
  total_amount DOUBLE)

                            order_id                              user_id                order_time                            delivery_address order_status restaurant_id  total_amount
716d868b-2b5a-4bee-bc2c-ee262c81588f d36e1a86-6e33-434a-b9c7-3f5c30753402 2024-06-01 11:53:19+00:00 9198 Gabriela Green\nEast Marcton, DC 16873    Delivered     R18329211        152.85
0f41a3a7-954a-4e24-ad84-dba22e8dd154 1ae81a

In [11]:
from langchain_core.prompts import PromptTemplate
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
from langchain.chains.llm import LLMChain
from langchain_openai import ChatOpenAI

def create_sql(question,table_schema):

    ### Defining the prompt template
    template_string = """ 
    You are a expert data engineer working with a Databricks environment.\
    Your task is to generate a working SQL query in Databricks SQL dialect. \
    During join if column name are same please use alias ex llm.customer_id \
    in select statement. It is also important to respect the type of columns: \
    if a column is string, the value should be enclosed in quotes. \
    If you are writing CTEs then include all the required columns. \
    While concatenating a non string column, make sure cast the column to string. \
    For date columns comparing to string , please cast the string input.\
    For string columns, check if it is a categorical column and only use the appropriate values provided in the schema.\

    SCHEMA:
    ## {table_schema} ##

    QUESTION:
    ##
    {question}
    ##


    IMPORTANT: MAKE SURE THE OUTPUT IS JUST THE SQL CODE AND NOTHING ELSE. Ensure the appropriate CATALOG is used in the query and SCHEMA is specified when reading the tables.
    ##

    OUTPUT:
    """
    prompt_template = PromptTemplate.from_template(template_string)

    ### Defining the LLM chain
    llm_chain = LLMChain(
        llm=ChatOpenAI(model="gpt-4o-mini",temperature=0),
        prompt=prompt_template
    )

    response =  llm_chain.invoke({"question":question,"table_schema":table_schema})
    output = response['text']

    return output

In [30]:
result = create_sql("What is the total orders by each restaurant and total quantity?",table_schema)

In [31]:
print(result)

```sql
WITH order_summary AS (
    SELECT 
        o.restaurant_id AS restaurant_id,
        COUNT(o.order_id) AS total_orders,
        SUM(o.total_amount) AS total_amount
    FROM 
        hive_metastore.online_food_business.orders o
    GROUP BY 
        o.restaurant_id
)

SELECT 
    os.restaurant_id AS restaurant_id,
    os.total_orders AS total_orders,
    os.total_amount AS total_amount
FROM 
    order_summary os
```


In [32]:
# Function to render the sql code
def process_llm_response_for_sql(response: str) -> str:
    # Extract the Mermaid code block from the response
    start_idx = response.find("```sql") + len("```sql")
    end_idx = response.find("```", start_idx)
    sql_code = response[start_idx:end_idx].strip()

    return sql_code

In [33]:
final_query_1 = process_llm_response_for_sql(result)
print(final_query_1)

WITH order_summary AS (
    SELECT 
        o.restaurant_id AS restaurant_id,
        COUNT(o.order_id) AS total_orders,
        SUM(o.total_amount) AS total_amount
    FROM 
        hive_metastore.online_food_business.orders o
    GROUP BY 
        o.restaurant_id
)

SELECT 
    os.restaurant_id AS restaurant_id,
    os.total_orders AS total_orders,
    os.total_amount AS total_amount
FROM 
    order_summary os


In [34]:
def create_advanced_sql(question,sql_code,table_schema):

    ### Defining the prompt template
    template_string = """ 
    You are a expert data engineer working with a Databricks environment.\
    Your task is to generate a working SQL query in Databricks SQL dialect. \
    Enclose the complete SQL_CODE in a WITH clause and name it as MASTER. DON'T ALTER THE given SQL_CODE. \
    Then based on the QUESTION and the master WITH clause, generate the final SQL query based on the WITH clause.\
    ONLY IF additional information is needed to answer the QUESTION, then use the SCHEMA to join the details to get the final answer. \


    INPUT:
    SQL_CODE:
    ##
    {sql_code}
    ##

    SCHEMA:
    ## {table_schema} ##

    QUESTION:
    ##
    {question}
    ##

    IMPORTANT: MAKE SURE THE OUTPUT IS JUST THE SQL CODE AND NOTHING ELSE.
    ##


    OUTPUT:
    """
    prompt_template = PromptTemplate.from_template(template_string)

    ### Defining the LLM chain
    llm_chain = LLMChain(
        llm=ChatOpenAI(model="gpt-4o-mini",temperature=0),
        prompt=prompt_template
    )

    response =  llm_chain.invoke({"sql_code":sql_code,"question":question,"table_schema":table_schema})
    output = response['text']

    return output

In [37]:
final_query_2 = create_advanced_sql(question="Can you give me the average total orders overall and also average item quantity?",sql_code=final_query_1,table_schema=table_schema)

In [38]:
final_query_2 = process_llm_response_for_sql(final_query_2)
print(final_query_2)

WITH MASTER AS (
    WITH order_summary AS (
    SELECT 
        o.restaurant_id AS restaurant_id,
        COUNT(o.order_id) AS total_orders,
        SUM(o.total_amount) AS total_amount
    FROM 
        hive_metastore.online_food_business.orders o
    GROUP BY 
        o.restaurant_id
)

SELECT 
    os.restaurant_id AS restaurant_id,
    os.total_orders AS total_orders,
    os.total_amount AS total_amount
FROM 
    order_summary os
)

SELECT 
    AVG(total_orders) AS average_total_orders,
    AVG(total_amount / total_orders) AS average_item_quantity
FROM 
    MASTER;


In [62]:
query = """ 

WITH MASTER AS (
    WITH order_summary AS (
    SELECT 
        o.restaurant_id AS restaurant_id,
        COUNT(o.order_id) AS total_orders,
        SUM(o.total_amount) AS total_amount
    FROM 
        hive_metastore.online_food_business.orders o
)

SELECT 
    os.restaurant_id AS restaurant_id,
    os.total_orders AS total_orders,
    os.total_amount AS total_amount
FROM 
    order_summary os
)

SELECT 
    AVG(total_orders) AS average_total_orders,
    AVG(total_amount / total_orders) AS average_item_quantity
FROM 
    MASTER;



"""

## 6. Query Self-Correction Component:

In [63]:
def load_data_from_query(query):
    # Getting the sample details of the selected table
    conn = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                    http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                    access_token    = os.getenv("DATABRICKS_ACCESS_TOKEN"))

    # query = query.replace(";","")
    # query = query + f" LIMIT 1000;"
    df = pd.read_sql(sql=query,con=conn)
    return df  

In [64]:
def self_correction(query):
    error_msg = ""

    try:
        df = load_data_from_query(query)
        print(df.shape)
        # df.head()
        error_msg += "Successful"
    except Exception as e:
        error_msg += str(e)
    
    if error_msg == "Successful":
        return error_msg
    else:
        # print("There is error")
        # print(error_msg)
        return error_msg

In [65]:
# query = final_query_1
# query = "SELECT * FROM hive_metastore.online_food_business.menu"

In [66]:
error_msg = self_correction(query)
error_msg

  df = pd.read_sql(sql=query,con=conn)


'Execution failed on sql:  \n\nWITH MASTER AS (\n    WITH order_summary AS (\n    SELECT \n        o.restaurant_id AS restaurant_id,\n        COUNT(o.order_id) AS total_orders,\n        SUM(o.total_amount) AS total_amount\n    FROM \n        hive_metastore.online_food_business.orders o\n)\n\nSELECT \n    os.restaurant_id AS restaurant_id,\n    os.total_orders AS total_orders,\n    os.total_amount AS total_amount\nFROM \n    order_summary os\n)\n\nSELECT \n    AVG(total_orders) AS average_total_orders,\n    AVG(total_amount / total_orders) AS average_item_quantity\nFROM \n    MASTER;\n\n\n\n\n[MISSING_GROUP_BY] The query does not include a GROUP BY clause. Add GROUP BY or turn it into the window functions using OVER clauses. SQLSTATE: 42803; line 5 pos 4\nunable to rollback'

In [67]:
def correct_sql(question,sql_code,table_schema,error_msg):

    ### Defining the prompt template
    template_string = """ 
    You are a expert data engineer working with a Databricks environment.\
    Your task is to modify the SQL_CODE using Databricks SQL dialect based on the QUESTION, SCHEMA and the ERROR_MESSAGE. \
    If ERROR_MESSAGE is provided, then make sure to correct the SQL query according to that. \

    SCHEMA:
    ## {table_schema} ##

    ERROR_MESSAGE:
    ## {error_msg} ##

    SQL_CODE:
    ##
    {sql_code}

    QUESTION:
    ## {question} ##

    ##


    IMPORTANT: MAKE SURE THE OUTPUT IS JUST THE SQL CODE AND NOTHING ELSE. Ensure the appropriate CATALOG is used in the query and SCHEMA is specified when reading the tables.
    ##

    OUTPUT:
    """
    prompt_template = PromptTemplate.from_template(template_string)

    ### Defining the LLM chain
    llm_chain = LLMChain(
        llm=ChatOpenAI(model="gpt-4o-mini",temperature=0),
        prompt=prompt_template
    )

    response =  llm_chain.invoke({"question":question,"sql_code":sql_code,"table_schema":table_schema,"error_msg":error_msg})
    output = response['text']

    return output

In [68]:
query

' \n\nWITH MASTER AS (\n    WITH order_summary AS (\n    SELECT \n        o.restaurant_id AS restaurant_id,\n        COUNT(o.order_id) AS total_orders,\n        SUM(o.total_amount) AS total_amount\n    FROM \n        hive_metastore.online_food_business.orders o\n)\n\nSELECT \n    os.restaurant_id AS restaurant_id,\n    os.total_orders AS total_orders,\n    os.total_amount AS total_amount\nFROM \n    order_summary os\n)\n\nSELECT \n    AVG(total_orders) AS average_total_orders,\n    AVG(total_amount / total_orders) AS average_item_quantity\nFROM \n    MASTER;\n\n\n\n'

In [71]:
modified = correct_sql("Can you give me the average total orders overall and also average item quantity?",query,table_schema,error_msg=error_msg)

In [72]:
print(modified)

```sql
WITH MASTER AS (
    WITH order_summary AS (
    SELECT 
        o.restaurant_id AS restaurant_id,
        COUNT(o.order_id) AS total_orders,
        SUM(o.total_amount) AS total_amount
    FROM 
        hive_metastore.online_food_business.orders o
    GROUP BY 
        o.restaurant_id
)

SELECT 
    os.restaurant_id AS restaurant_id,
    os.total_orders AS total_orders,
    os.total_amount AS total_amount
FROM 
    order_summary os
)

SELECT 
    AVG(total_orders) AS average_total_orders,
    AVG(total_amount / total_orders) AS average_item_quantity
FROM 
    MASTER;
```


In [48]:
# Final
error_msg = self_correction(query)

if error_msg == "Successful":
    print("Query is successful")
else:
    modified_query = correct_sql("List the details of the menu item table",query,table_schema,error_msg=error_msg)

  df = pd.read_sql(sql=query,con=conn)


In [49]:
# Final function to validate and self-correct
def validate_and_correct_sql(query,table_schema):
    error_msg = self_correction(query)

    if error_msg == "Successful":
        # print("Query is successful")
        return "Correct",query
    else:
        modified_query = correct_sql("List the details of the menu item table",query,table_schema,error_msg=error_msg)
        return "Incorrect",modified_query

In [50]:
print(modified_query)

```sql
SELECT * FROM hive_metastore.online_food_business.menu_items
```


## 7. Adding Favourites:

In [156]:
def load_data_from_query(query):
    # Getting the sample details of the selected table
    conn = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                    http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                    access_token    = os.getenv("DATABRICKS_ACCESS_TOKEN"))

    # query = query.replace(";","")
    # query = query + f" LIMIT 1000;"
    df = pd.read_sql(sql=query,con=conn)
    return df  

In [175]:
def add_to_user_history(user_name,question,query,favourite_ind):
    conn = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                    http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                    access_token    = os.getenv("DATABRICKS_ACCESS_TOKEN"))
    
    user_history_table = "hive_metastore.dev_tools.sqlgenpro_user_query_history"

    query = f"INSERT INTO {user_history_table} VALUES ('{user_name}',current_timestamp(),'{question}','{query}',{favourite_ind})"
    # query = f"SELECT * FROM {user_history_table}"
    df = pd.read_sql(sql=query,con=conn)

In [173]:
add_to_user_history("hariharan","List the details of the menu item table",query)

  df = pd.read_sql(sql=query,con=conn)


## 8. Deploying Streamlit App:

In [None]:
# Step-1
# Logging into the EC2 Instance

#----> ssh -i "SQLGenPro.pem" ubuntu@ec2-13-51-170-253.eu-north-1.compute.amazonaws.com

# Get Root user access
#----> sudo -i

# Go to the home directory
#----> cd /home/ubuntu

# Update the packages
#----> sudo apt-get update

In [None]:
# Step-2
# Move the app to the EC2 instance
# NOTE: Make sure you are outside the app directory before running the following command and also the pem file is present in the same directory.

#----> pip list --format=freeze > requirements.txt
#----> scp -i SQLGenPro.pem -r SQLGenPro ubuntu@ec2-51-20-34-113.eu-north-1.compute.amazonaws.com:/home/ubuntu/

In [None]:
# Step-3
# Go to the EC2 instance and check if the app folder is present in the home directory
# Then create a python virtual environment and install the required packages

#----> apt install python3.10-venv
#----> python3 -m venv myenv
#----> source myenv/bin/activate
#----> pip install -r requirements.txt (after moving to the app directory)
#----> streamlit hello 

#Permanent running
#----> nohup python3 -m streamlit run SQLGenPro_Live.py