## SQL to NL to SQL architecture. 

In many cases, a company has already many anlytic assets such like SQLs, procedure, OLAP cubes and others in their firm. 

SQL is a valuable tool, but it can be challenging to use in a reusable way because it is not always clear what the query is doing.

For example, 

If someone want to know 'which department has poor performace than the average ?'

The goal is very simple, but its implementation SQL is very complex. 

There is a chasm between the business goal and the specific SQL implentation. 

In this case, pre-defined complex SQL can help to solve this problem. And we should suggest how to retrieve appropriate SQL from the lots of pre-defined SQLs.

In [1]:
#! pip install pandas google-cloud-aiplatform google-cloud-bigquery
#! pip install python-dotenv langchain
#! pip install langchain-google-vertexai

In [2]:
# TODO: Implment this function
import pandas as pd

def crawl_prefined_sqls():
  sqls = []
  # goal = "SQL to check the delay time(second) before shipping for each order and product"
  sqls.append("""
CREATE TEMP FUNCTION convertIntervalToSecond(start_ts TIMESTAMP, end_ts TIMESTAMP)
RETURNS INT64
AS (
  (EXTRACT(DAY FROM (end_ts - start_ts)) * 86400 +
  EXTRACT(HOUR FROM (end_ts - start_ts)) * 3600 +
  EXTRACT(MINUTE FROM (end_ts - start_ts)) * 60 +
  EXTRACT(SECOND FROM (end_ts - start_ts)))
);
select convertIntervalToSecond(a.created_at, a.shipped_at) as order_pending_second, 
  convertIntervalToSecond(b.created_at, b.shipped_at) as product_pending_second, 
  a.status, a.order_id, a.user_id, a.gender, a.created_at, a.shipped_at, 
  b.product_id, b.created_at, b.shipped_at, b.delivered_at, b.returned_at
 from `bigquery-public-data.thelook_ecommerce.orders` a
 join `bigquery-public-data.thelook_ecommerce.order_items` b on (a.order_id = b.order_id)
 where a.status not in ('Cancelled') 
   and a.created_at between '2022-01-01' and '2022-06-30'
order by a.order_id, b.product_id
  """)
  return sqls


In [3]:
import vertexai
from langchain.chat_models import ChatVertexAI
from langchain_google_vertexai import VertexAI
import os
from dotenv import load_dotenv

load_dotenv()

PROJECT_ID=os.getenv("PROJECT_ID")  # @param {type:"string"}
LOCATION=os.getenv("LOCATION")
DATASET=os.getenv("DATASET")
TABLE_NAME=os.getenv("TABLE_NAME")



In [4]:
vertexai.init(project=PROJECT_ID, location="us-central1")

llm_vertex = VertexAI(
    #model_name="text-bison@latest",
    model_name="text-bison-32k@002",
    max_output_tokens=8000,
    temperature=0,
    top_p=0.8,
    top_k=40,
)

llm = llm_vertex

In [5]:
import json

def parse_json_response(llm_json_response) -> any:
  #print('llm response:'+ response)
  start_char = '['
  end_char = ']'
  if llm_json_response.find('[') == -1 or max(0,llm_json_response.find('{')) < llm_json_response.find('[') :
    start_char = '{'
    end_char = '}'
  start_index = llm_json_response.find(start_char)
  end_index = llm_json_response.rfind(end_char)
  json_data = llm_json_response[start_index:end_index+1]
  parsed_json = json.loads(json_data)
  return parsed_json


In [6]:
import ast

def parse_python_object(llm_python_object) -> any:
  print('llm response:'+ llm_python_object)
  if llm_python_object.find('{') == -1:
    start_char = '['
    end_char = ']'
  elif llm_python_object.find('[') == -1 or llm_python_object.find('{') < llm_python_object.find('[') :
    start_char = '{'
    end_char = '}'
  start_index = llm_python_object.find(start_char)
  end_index = llm_python_object.rfind(end_char)
  object_data = llm_python_object[start_index:end_index+1]
  print(object_data)
  parsed_object = ast.literal_eval(object_data)
  return parsed_object


In [7]:

def explain_sql(sql):
  example_json = """
  {
    "business_goal": "explanation of the SQL",
    "prepared_statement": "select convertIntervalToSecond(a.created_at, a.shipped_at) as order_pending_second from sample_table where created_at between ? and ?",
    "filter_columns": [
      {
        "table_name": "sample_table",
        "column_name": "created_at",
        "business goal": "filter by order created date",
        "operator": "between",
        "column_type" : "TIMESTAMP",
        "filter names": ["order_created_at_start","order_created_at_end"]
        "fitler_order": 1
      }
    ],
  }
  """
  prompt_template = """You are a server-side developer.
Please convert the provided SQL to a prepared_statement and extract the filters from the SQL in JSON format. The filter_columns should only include predicate columns. Do not suggest Python code. Describe the business goal based on the prepared_statement.
Step 1: Convert the provided SQL to a prepared_statement.
Step 2: Extract the filters from the SQL in JSON format. The filter_columns should only include predicate columns. Do not suggest Python code.
Step 3: Describe the business goal based on the prepared_statement.

----------------------------
example sql :
select convertIntervalToSecond(a.created_at, a.shipped_at) as order_pending_second from sample_table where created_at between '2023-01-01' and '2023-12-01'

example output json:
{example_json}

----------------------------
provided sql:
{sql}

output json:
"""
  prompt = prompt_template.format(sql=sql, example_json=example_json)
  response = llm.invoke(prompt)
  print(response)
  return parse_json_response(response)

In [8]:

assetized_queries = []

## In some cases, the LLM model can't extract the filter columns properly.

for sql in crawl_prefined_sqls():
  assetized_queries.append(explain_sql(sql))
  

 ```json
{
  "business_goal": "This query aims to calculate the pending time (in seconds) for orders and products, and retrieve order-related information for orders that are not cancelled and were created between January 1, 2022, and June 30, 2022. The results are ordered by order ID and product ID.",
  "prepared_statement": "CREATE TEMP FUNCTION convertIntervalToSecond(start_ts TIMESTAMP, end_ts TIMESTAMP)\nRETURNS INT64\nAS (\n  (EXTRACT(DAY FROM (end_ts - start_ts)) * 86400 +\n  EXTRACT(HOUR FROM (end_ts - start_ts)) * 3600 +\n  EXTRACT(MINUTE FROM (end_ts - start_ts)) * 60 +\n  EXTRACT(SECOND FROM (end_ts - start_ts)))\n);\nselect convertIntervalToSecond(a.created_at, a.shipped_at) as order_pending_second, \n  convertIntervalToSecond(b.created_at, b.shipped_at) as product_pending_second, \n  a.status, a.order_id, a.user_id, a.gender, a.created_at, a.shipped_at, \n  b.product_id, b.created_at, b.shipped_at, b.delivered_at, b.returned_at\n from `bigquery-public-data.thelook_ecommerce

In [9]:
# from vector_util import VectorDatabase

# vdb = VectorDatabase()

# vdb.truncate_table()

from bigquery_vector_util import BigQueryVectorDatabase, SqlSearchSchema

vdb = BigQueryVectorDatabase(project_id=PROJECT_ID, location=LOCATION, dataset=DATASET, table_name=TABLE_NAME)
vdb.create_table()

Empty DataFrame
Columns: []
Index: []
QueryJob<project=turnkey-charter-358922, location=asia-northeast3, id=18a7048c-d543-4c89-96e4-7aceb2aec7c0>


In [10]:
from langchain.embeddings import VertexAIEmbeddings

embeddings = VertexAIEmbeddings("textembedding-gecko-multilingual@latest")

  warn_deprecated(


In [11]:
def write_goal_to_vdb(assetized_queries):
  records = []
  for assetized_query in assetized_queries:
    description = assetized_query['business_goal']
    sql = assetized_query['prepared_statement']
    parameters = assetized_query['filter_columns']
    desc_vector = embeddings.embed_query(description)
    records.append(SqlSearchSchema(sql=sql, description=description, desc_vector=desc_vector, parameters=parameters, explore_view=None, model_name=None, table_name=None, column_schema=None))
  vdb.insert_record(records)

write_goal_to_vdb(assetized_queries)


AttributeError: 'list' object has no attribute 'tolist'

In [None]:
def get_related_query(question):
  test_embedding =  embeddings.embed_query(question)
  result = None
  with vdb.get_connection() as conn:
    try:
      with conn.cursor() as cur:
        select_record = (str(test_embedding).replace(' ',''),)
        cur.execute(f"SELECT sql, parameters, description FROM rag_test where (1 - (desc_vector <=> %s)) > 0.6 limit 1", select_record)
        if cur.rowcount == 0:
          return None
        rs = cur.fetchone()
        result = {
          'prepared_statement': rs[0],
          'filter_columns': rs[1],
          'description': rs[2]
        }
        print(result)
    except Exception as e:
      print(e)
  return result


In [None]:
question = "How to check the delay time(second) before shipping for each order and product?"

assetized_query = get_related_query(question)
print(assetized_query)


In [None]:

def extract_filter_values(assetized_query, question):
  example_json = """
  {
    "filter_columns": [
      {
        "table_name": "sample_table",
        "column_name": "col_1",
        "operator" : "in", 
        "column_type" : "STRING",
        "filter_names": ["col_1_filter"],
        "filter_values": [null],
        "filter_order": 1
      },
      {
        "table_name": "sample_table",
        "column_name": "col_2",
        "operator" : "=", 
        "column_type" : "STRING",
        "filter_names": ["col_2_filter"],
        "filter_values": ["2022-01"],
        "filter_order": 2
      }
    ]
  }
  """

  prompt_template = """You are a serverside developer. Extract filter values from the question and fill the values into the 'filter_values' item for the given filter columns. Do not suggest codes. Output should be the following JSON format.

  given question:
  {question}
  
  given sql:
  {sql}

  given filters:
  {filters}

  ----------------------------

  output format(example) : json
  {example_json}

  """
  sql = assetized_query['prepared_statement']
  filters = assetized_query['filter_columns']
  prompt = prompt_template.format(sql=sql, filters=filters, example_json=example_json, question=question)
  response = llm.predict(prompt)
  return response


In [None]:
question_enriched = "How to check the delay time(second) before shipping for each order and product in this year(2023)?"

response = extract_filter_values(assetized_query, question_enriched)

In [None]:
response

In [None]:
filter_values = parse_json_response(response)

In [None]:
filter_values

In [None]:

def check_unfilled_filters(filter_values):
  unfilled_filters = []
  for parameter in filter_values['filter_columns']:
    if None in parameter['filter_values'] or len(parameter['filter_values']) == 0:
      unfilled_filters.append(parameter)
  return unfilled_filters



In [None]:
unfilled_filters = check_unfilled_filters(filter_values)

In [None]:
unfilled_filters

In [None]:

def make_response_to_fill_filter_values(unfilled_filters, question):
  example_json = """
  {
    "agent_response": "..."
  }
  """
  prompt_template = """You are an automatic agent to serve natural language to SQL conversion. Please guide the user to fill the missing filter values in the given question and unfilled filters. Output should be the following JSON format.

  question:
  {question}

  unfilled filters:
  {unfilled_filters}

  output format json:
  {example_json}
  """
  prompt = prompt_template.format(unfilled_filters=unfilled_filters, question=question, example_json=example_json)
  response = llm.predict(prompt)
  return response

In [None]:
response = make_response_to_fill_filter_values(unfilled_filters, question=question_enriched)


In [None]:
response

In [None]:
additional_response = "I want to know not cancelled orders only"

In [None]:

def extract_filter_values_with_additional_response(unfilled_filters, additional_response):
  example_json = """
  {
    "filter_columns": [
      {
        "column_name": "col_1",
        "operator" : "in", 
        "column_type" : "STRING",
        "filter_names": ["col_1_filter"],
        "filter_values": [null]
      }
    ]
  }
  """

  prompt_template = """You are a serverside developer. Extract filter values from the user response and unfilled filter information. Do not suggest codes. Output should be the following JSON format.

  output format json:
  {example_json}

  ----------------------------
  user response :
  {user_response}

  unfilled filters:
  {unfilled_filters}
  """
  prompt = prompt_template.format(unfilled_filters=unfilled_filters, user_response=additional_response, example_json=example_json)
  response = llm.predict(prompt)
  return response

In [None]:
additional_filter_values = parse_json_response(extract_filter_values_with_additional_response(unfilled_filters, additional_response))

In [None]:
additional_filter_values

In [None]:
def merge_filter_values(filter_values, additional_filter_values):
  original_parameters = filter_values['filter_columns']
  additional_parameters = additional_filter_values['filter_columns']
  for org_parameter in original_parameters:
    for add_parameter in additional_parameters:
      if org_parameter['column_name'] == add_parameter['column_name']:
        org_parameter['filter_values'] = add_parameter['filter_values']
  return filter_values


In [None]:
merged_filter_values = merge_filter_values(filter_values, additional_filter_values)

In [None]:
merged_filter_values

As you can see, LLM can handle extraction / tranformation and validation as well. 

Next step is very similar with 'Direction Conversion'.

In [None]:
from google.cloud import bigquery
client = bigquery.Client()

In [None]:
def get_field_unique_values(matched_table, matched_field):
  if matched_table[0] != '`' :
    matched_table = '`' + matched_table + '`'
  sql_query = f"with distinct_values as ( select distinct {matched_field} as {matched_field} from {matched_table} ) select {matched_field}, (select count(1) from distinct_values) as total_count from distinct_values limit 500"
  df = client.query(sql_query).to_dataframe()
  return df[matched_field].tolist(), df['total_count'][0]
  

In [None]:
import ast


def choose_right_filter_value(filter_values, wanted_value):
  prompt_template = """As a looker developer, choose right filter value for the wanted value below without changing filter value itself.

  filter_values : {filter_values}

  wanted_values: {wanted_value}

  answer format: python list
[filter_value1, filter_value2, ...]
  """
  response = llm.predict(prompt_template.format(filter_values=filter_values, wanted_value=wanted_value))
  return response 

def adjust_filter_value(filter_columns):
  for filter in filter_columns:
    matched_table = filter['table_name']
    matched_field = filter['column_name']
    filter['unique_values'], filter['unique_count'] = get_field_unique_values(matched_table, matched_field)
    # TODO: if unique_count < 500, then choose right filter value in the unique value list.
    if filter['unique_count'] < 500:
      response = choose_right_filter_value(filter['unique_values'], filter['filter_values'])
      if response.strip().find("```json") == 0 :
        filter['adjust_filter_values'] = parse_json_response(response)
      else:
        filter['adjust_filter_values'] = parse_python_object(response)
    else:
      filter['adjust_filter_values'] = filter['filter_values']
    filter['unique_values'] = None
  
  

In [None]:
adjust_filter_value(merged_filter_values['filter_columns'])

In [None]:
merged_filter_values

In [None]:
def prepared_statement_with_filter_values_in_bigquery(sql_and_filters):
  prepared_statement = sql_and_filters['prepared_statement']
  query_parameters = []
  for filter_column in sql_and_filters['filter_columns']:
    if len(filter_column['adjust_filter_values']) > 1:
      if len(filter_column['filter_names']) > 1:
        for filter_value in filter_column['adjust_filter_values']:
          if(filter_column['column_type'] == 'FLOAT64'):
            query_parameters.append(bigquery.ScalarQueryParameter(None, "FLOAT64", filter_value))
          elif(filter_column['column_type'] == 'INT64'):
            query_parameters.append(bigquery.ScalarQueryParameter(None, "INT64", filter_value))
          else:
            query_parameters.append(bigquery.ScalarQueryParameter(None, "STRING", filter_value))  
      else:
        if(filter_column['column_type'] == 'FLOAT64'):
          query_parameters.append(bigquery.ArrayQueryParameter(None, "FLOAT64", filter_column['adjust_filter_values']))
        elif(filter_column['column_type'] == 'INT64'):
          query_parameters.append(bigquery.ArrayQueryParameter(None, "INT64", filter_column['adjust_filter_values']))
        else:
          query_parameters.append(bigquery.ArrayQueryParameter(None, "STRING", filter_column['adjust_filter_values']))
    else:
      if(filter_column['column_type'] == 'FLOAT64'):
        query_parameters.append(bigquery.ScalarQueryParameter(None, "FLOAT64", filter_column['adjust_filter_values'][0]))
      elif(filter_column['column_type'] == 'INT64'):
        query_parameters.append(bigquery.ScalarQueryParameter(None, "INT64", filter_column['adjust_filter_values'][0]))
      else:
        query_parameters.append(bigquery.ScalarQueryParameter(None, "STRING", filter_column['adjust_filter_values'][0]))  
  job_config = bigquery.QueryJobConfig(
    query_parameters=query_parameters
  )
  print(query_parameters)
  query_job = client.query(prepared_statement, job_config=job_config)
  return query_job.to_dataframe()

In [None]:
sql_and_filters = {
  'prepared_statement':assetized_query['prepared_statement'],
  'filter_columns': merged_filter_values['filter_columns']
}

In [None]:
sql_and_filters

In [None]:
df_result = prepared_statement_with_filter_values_in_bigquery(sql_and_filters)

In [None]:
df_result.head(10)