<a href="https://colab.research.google.com/github/ml4devs/ml4devs-notebooks/blob/master/gpt/translate_natural_language_query_to_sql_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1><center>Translate Natural Language Queries to SQL with GPT</center></h1>

<p><center>
<address>&copy; Satish Chandra Gupta<br/>
LinkedIn: <a href="https://www.linkedin.com/in/scgupta/">scgupta</a>,
Twitter: <a href="https://twitter.com/scgupta">scgupta</a>
</address>
</center></p>

---

## Setup Environment

### Install Pip Packages

You need Python 3.7 or higher to install [OpenAI Python API library](https://github.com/openai/openai-python).

In [1]:
# You should have Python 3.7 or higher

!python --version


Python 3.11.1


In [2]:
!pip install openai==1.3.6 python-dotenv==1.0.0 SQLAlchemy==2.0.23




### Upload `.env` File with API Keys

You can either use GPT directly from OpenAI, or you can use Azure OpenAI from Microsoft. You need to create a `.env` file and add the environment variables needed for OpenAI api.

If you are using OpenAI, check your [OpenAI account](https://platform.openai.com/api-keys) for creating API key. Your `.env` file will look like following:

```sh
$ cat .env
OPENAI_API_KEY='sk-YourOpenAiApiKeyHere'
```

If you are using Microsoft Azure OpenAI:
- Go to [Azure Portal](https://portal.azure.com/) > **All Resources**
- Filter the list with Type == Azure OpenAI
- Select the one you plan to use
- If there are none, you can [create and deploy an Azure OpenAI Service resource](https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource)
- Click on **Keys and Endpoint** on the left menu
- Get `AZURE_OPENAI_API_KEY` and `AZURE_OPENAI_ENDPOINT`
- Next click **Model deployments** on the left menu, and then click **Manage Deployment** button
- Alternatively, you can go to [Azure OpenAI Studio](https://oai.azure.com/), and click **Deployments** on the left menu
- Find (the latest) API version for [Azure OpenAI Service](https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#rest-api-versioning)

Your `.env` file will look like following:
```sh
$ cat .env
AZURE_OPENAI_API_KEY=yourAzureOpenAiApiKey
AZURE_OPENAI_ENDPOINT=https://your-azure-deployment.openai.azure.com/
AZURE_OPENAI_DEPLOYMENT_ID=your-deployment-name
AZURE_OPENAI_API_VERSION=2023-10-01-preview
```

Upload `.env` using Upload File button in Google Colab (or Jupyter Notebook). In worst case scenario, uncomment and modify the relevant lines in the following cell to create `.env` file. Please note that it is dangerous to share such notebooks or check them into git.

In [3]:
# Upload or create a .env file with (Azure) OpenAI API creds

#!echo "OPENAI_API_KEY=sk-YourOpenApiKeyHere" >> .env

#!echo "AZURE_OPENAI_API_KEY=yourAzureOpenAiApiKey" >> .env
#!echo "AZURE_OPENAI_ENDPOINT=https://your-azure-deployment.openai.azure.com/" >> .env
#!echo "AZURE_OPENAI_DEPLOYMENT_ID=your-deployment-name" >> .env
#!echo "AZURE_OPENAI_API_VERSION=2023-10-01-preview" >> .env


### Load `.env` File and Specify (Azure) OpenAI GPT Model

Load environment variables from `.env` file:

In [4]:
from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())


Set `IS_AZURE_OPENAI` flag to `True`, if you are using Azure OpenAI:

In [5]:
IS_AZURE_OPENAI: bool = False


Specify model name:

In [6]:
from datetime import datetime

GPT35_TURBO: str = "gpt-3.5-turbo-1106" if datetime.now() < datetime(2023, 12, 11) else "gpt-3.5-turbo"
GPT4: str = "gpt-4"


---

## Setup OpenAI Client with GPT Model

In [7]:
import os
import openai


In [8]:
def create_open_ai_client():
    if IS_AZURE_OPENAI:
        return openai.AzureOpenAI(
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
            api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_ID")
        )
    else:
        return openai.OpenAI(
            api_key=os.getenv('OPENAI_API_KEY')
        )


In [9]:
openai_client = create_open_ai_client()
openai_model = os.getenv("AZURE_OPENAI_DEPLOYMENT_ID") if IS_AZURE_OPENAI else GPT4

def get_gpt_response(messages, model=openai_model, temperature=0) -> str:
    response = openai_client.chat.completions.create(
        model=model,
        #response_format={"type": "json_object"},  # Uncomment it if your chosen model supports it
        messages=messages,
        temperature=temperature,
    )
    return response.choices[0].message.content


In [10]:
print(get_gpt_response([
    {"role": "user", "content": "Say this is test. Format response in JSON"}
]))


{
  "message": "This is test"
}


You are all set to use GPT for common NLP tasks such as Sentiment Analysis, Language Translation, Intent/Entity Recognition.

---

## Setup Database

You need a dataset that you will query using natural language. You also need a SQL database that will host that dataset.

### Example Dataset: DVD Rental

Sakila example dataset is commonly used for teaching and testing RDBMS concept. It has data of fictitious DVD Rental Store. We will use [SQLite](https://www.sqlite.org/index.html) as the database. Python has [sqlite3](https://docs.python.org/3/library/sqlite3.html) package, so it does not require anything to installed and deployed locally or on cloud.

1. Download the dataset using `curl` or `wget` command from [SQLite Tutorial](https://www.sqlitetutorial.net/sqlite-sample-database/). Alternatively, you can download from [Kaggle](https://www.kaggle.com/datasets/atanaskanev/sqlite-sakila-sample-database/data) too.

In [11]:
!curl -L0 https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip --output ./chinook.zip

#!wget https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  298k  100  298k    0     0   314k      0 --:--:-- --:--:-- --:--:--  316k


2. Unzip the db file

In [12]:
!unzip chinook.zip


Archive:  chinook.zip
  inflating: chinook.db              


3. The db file will be stored at `./chinook.db`. This is the path you will need when using `sqlite3` package.

In [13]:
!ls -l ./chinook.db


-rw-r--r--  1 scgupta  staff  884736 Nov 29  2015 ./chinook.db


4. Extract DB Metadata

In [14]:
import json
import sqlite3


In [15]:
DB_FILE_PATH = "./chinook.db"


In [16]:
def extract_sqlite3_db_metadata(sqlite_db_file_path: str):
    db_metadata = {}

    # Connect to the SQLite database
    conn = sqlite3.connect(sqlite_db_file_path)
    cursor = conn.cursor()

    # Get a list of all tables in the database
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()

    # Loop through each table and get its columns
    for table in tables:
        table_name = table[0]
        primary_keys = []
        foreign_keys = {}
        columns_info = {}

        # Get table details
        cursor.execute(f"PRAGMA table_info({table_name});")
        columns = cursor.fetchall()

        # Extract info about the columns of the current table
        for column in columns:
            column_name = column[1]
            column_type = column[2]
            is_primary = (column[5] == 1)

            columns_info[column_name] = {
                "type": column_type,
                "primary": is_primary,
                "foreign": {}
            }

        # Primary Keys
        primary_keys = [
            c_name
            for c_name, c_attrs in columns_info.items()
            if c_attrs["primary"] == True
        ]

        # Get foreign key details
        cursor.execute(f"PRAGMA foreign_key_list({table_name});")
        fk_constraints = cursor.fetchall()

        for fk in fk_constraints:
            fk_constraint_id = fk[0]
            fk_to_table = fk[2]
            fk_from_column = fk[3]
            fk_to_column = fk[4]

            fk_info = {
                "constraint_id": fk_constraint_id,
                "to_table": fk_to_table,
                "to_column": fk_to_column
            }
            foreign_keys[fk_from_column] = fk_info
            columns_info[fk_from_column]["foreign"] = fk_info

        db_metadata[table_name] = {
            "columns": columns_info,
            "primary_keys": primary_keys,
            "foreign_keys": foreign_keys
        }

    # Close the connection
    conn.close()

    # Remove tables with names staring with "sqlite" as those are not part of applications
    tables_to_remove = [t for t in db_metadata if t.startswith("sqlite")]
    for t in tables_to_remove:
        del db_metadata[t]

    # Done!
    return db_metadata


5. Check out if the database metadata has been extracted correctly.

In [17]:
def table_info_str(t_name, t_info) -> str:
    column_info_str = "\n        ".join([
        f"{c_name}: {c_info['type']}"
        for c_name, c_info in t_info["columns"].items()
    ])

    primary_key_info_str = ""
    if len(t_info["primary_keys"]) > 0:
        primary_key_info_str = f"Primary Keys: {','.join(t_info['primary_keys'])}"

    foreign_key_info_str = ""
    if len(t_info["foreign_keys"]) > 0:
        foreign_key_info_str = "\n    Foreign Keys:\n        " + "\n        ".join([
            f"{fk_from_col} => {fk_info['to_table']}.{fk_info['to_column']}"
            for fk_from_col, fk_info in t_info["foreign_keys"].items()
        ])

    return f"""Table Name: {t_name}
    Columns:
        {column_info_str}
    """ + primary_key_info_str + foreign_key_info_str


In [18]:
chinook_db_metadata = extract_sqlite3_db_metadata(DB_FILE_PATH)


In [19]:
for t_name, t_info in chinook_db_metadata.items():
    print(table_info_str(t_name, t_info))
    print()


Table Name: albums
    Columns:
        AlbumId: INTEGER
        Title: NVARCHAR(160)
        ArtistId: INTEGER
    Primary Keys: AlbumId
    Foreign Keys:
        ArtistId => artists.ArtistId

Table Name: artists
    Columns:
        ArtistId: INTEGER
        Name: NVARCHAR(120)
    Primary Keys: ArtistId

Table Name: customers
    Columns:
        CustomerId: INTEGER
        FirstName: NVARCHAR(40)
        LastName: NVARCHAR(20)
        Company: NVARCHAR(80)
        Address: NVARCHAR(70)
        City: NVARCHAR(40)
        State: NVARCHAR(40)
        Country: NVARCHAR(40)
        PostalCode: NVARCHAR(10)
        Phone: NVARCHAR(24)
        Fax: NVARCHAR(24)
        Email: NVARCHAR(60)
        SupportRepId: INTEGER
    Primary Keys: CustomerId
    Foreign Keys:
        SupportRepId => employees.EmployeeId

Table Name: employees
    Columns:
        EmployeeId: INTEGER
        LastName: NVARCHAR(20)
        FirstName: NVARCHAR(20)
        Title: NVARCHAR(30)
        ReportsTo: INTEGER
 

---

## Database Table Schema Documents

GPT can create a SQL query only if it understands various tables and their columns. While creating the GPT prompt, you must include this info of relevant tables.

The `CREATE TABLE` statement of [SQL DDL](https://en.wikipedia.org/wiki/Data_definition_language) captures all necessary info. Ideally, table description and column descriptions should also be captured as comments to assist document search and GPT.

Let's create a mapping of table name and their `CREATE TABLE` statements.

In [20]:
def create_table_ddl_stmt_str(t_name, t_info) -> str:
    column_defs = ",\n    ".join([
        f"{c_name} \t{c_info['type']}"
        for c_name, c_info in t_info["columns"].items()
    ])

    primary_key_def = ""
    if len(t_info["primary_keys"]) > 0:
        primary_key_def = f",\n\n    PRIMARY KEY ({', '.join(t_info['primary_keys'])})"

    foreign_key_def =""
    if len(t_info["foreign_keys"]) > 0:
        fk_stmts = ",\n".join([
            f"    FOREIGN KEY({fk_from_col}) REFERENCES {fk_info['to_table']}({fk_info['to_column']})"
            for fk_from_col, fk_info in t_info["foreign_keys"].items()
        ])
        foreign_key_def = f",\n\n{fk_stmts}"

    return f"""CREATE TABLE {t_name} (
    {column_defs}{primary_key_def}{foreign_key_def}
);"""


Let's sequence the tables so that the definition of every table referred to in a `FOREIGN KEY` constraint comes before the constraint. While one can write code to analyze foreign key constraint graph and perform a topological sort to get a partial order, I decided to just hand code it as it does not have relevance for this tutorial.

You can check the [Entity Relation Model](https://en.wikipedia.org/wiki/Entity%E2%80%93relationship_model) for all tables drawn using Crow's Foot notation:

![](https://www.sqlitetutorial.net/wp-content/uploads/2015/11/sqlite-sample-database-color.jpg)

In [21]:
# Table list in Topological Order for foreign key constraints

chinook_db_table_names = [
    "artists", "albums",
    "media_types", "genres", "tracks",
    "playlists", "playlist_track",
    "employees",
    "customers", "invoices", "invoice_items"
]


In [22]:
all_chinook_db_table_documents: dict[str, str] = {
    t_name: create_table_ddl_stmt_str(t_name, chinook_db_metadata[t_name])
    for t_name in chinook_db_table_names
}


In [23]:
#for t_name in chinook_db_table_names:
#    print(all_chinook_db_table_documents[t_name])
#    print()


---

## Natural Language Query to SQL

General flow of building applications using Large Language Models (LLMs) and Retrieval Augmented Generation (RAG) has three parts:

- **Embeddings**: Data Preprocessing
  - Break private data or documents into chunks
  - Convert chunks to vectors using an embedding model
  - Store vectors in a Vector DB
- **Retrieval**: Prompt Construction
  - Convert user query into a vector using the same embedding model
  - Search the Vector DB for chunk with similar embeddings and rank them
  - Craft a prompt using the user query and the document chunks found in the search
- **Inference**: Prompt Execution
  - Submit the prompt to a LLM
  - Post-process (check, augment) the LLM response
  - Send the response to the user

For converting a natural language query to SQL, RAG pattern will translate to:

- Embeddings:
  - Consider a `CREATE TABLE` statement for a table as one document chunk
  - Convert each `CREATE TABLE` statement to a vector embedding
  - Save (embedding, table name) mapping in a Vector DB
- Retrieval:
  - Convert incoming user query to a vector embedding
  - Search Vector DB and find tables with top similarity score
  - Craft a prompt using the user query and `CREATE TABLE` statements of all top-matching tables
- Inference:
  - Submit prompt to GPT to get the equivalent SQL
  - Execute the returned SQL on the database
  - Present the results to the user


### RAG: Vector DB Document Search

For sake of simplicity, we will skip the embedding and Vector DB search. Since there are only 11 tables, with not too many columns, we can send DDL for all tables in the prompt.

In [24]:
def find_tables(nl_query: str) -> dict[str, str]:
    # Bypassing
    # - Convert nl_query => embeddings
    # - Search Vector DB for documents (table's CREATE TABLE statement) with similar embeddings
    # - Return {table_name: document} mapping for all matching tables
    #
    # Instead return all documents

    return all_chinook_db_table_documents


### Prompt Construction

Craft a prompt using the user query and the documents returned from Vector DB search

In [25]:
def nl2sql_system_prompt(documents: dict[str, str], sql_flavor: str = "Python sqlite3") -> str:
    metadata = "\n".join([
        f"# SQL DDL Schema for `{table_name}` table:```sql\n{table_schema}```\n"
        for table_name, table_schema in documents.items()
    ])

    system_prompt = f"""
    You are a data analyst and data engineer. You are an expert in writing SQL queries
    for {sql_flavor} database.

    You have following tables in the database. The table name is in single backquote, and
    the DDL code to create that table with schema and metadata details are in triple backquote.

    ### Database Table Schemas:
    \n{metadata}
    ###

    User ask you queries in natural language, and you job is to write equivalent
    SQL queries in following steps:
    1. Identify the tables that have data relevant for the query
    2. Identify relevant columns in those tables
    3. Craft a SQL query that selects, filters, groups, joins in an optimal order
       that is equivalent to the user's natural language query.

    Format your response as a JSON dictionary with following key, value:
    - tables: a dictionary with the name of relevant tables as keys, and the
        list of relevant columns in that as value.
    - sql: the sql query that you crafted.
    """

    return system_prompt


In [26]:
def nl2sql_user_prompt(nl_query: str):
    return f"Write a SQL that computes natural language query in triple backquotes: ```{nl_query}```"


### Prompt Execution

In [27]:
def write_sql_query(nl_query: str) -> dict:
    # Vectorize nl_query and find matching documents (tables and their DDL)
    documents = find_tables(nl_query)
    # Craft prompt using the natural language queries and matching documents
    system_prompt = nl2sql_system_prompt(documents)
    user_prompt = nl2sql_user_prompt(nl_query)

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]
    response = get_gpt_response(messages)

    response_dict = json.loads(response)
    return response_dict


### Post-processing: Execute SQL

In [28]:
def execute_sql_query_on_sqlite3(sql_query: str):
    conn = sqlite3.connect(DB_FILE_PATH)
    cursor = conn.cursor()
    result = cursor.execute(sql_query)
    rows = result.fetchall()
    conn.close()

    return rows


In [29]:
import sqlalchemy


In [30]:
sql_engine = sqlalchemy.create_engine(
    f"sqlite:///{os.path.abspath(os.path.join(os.getcwd(), DB_FILE_PATH))}",
    echo=True
)


In [31]:
def execute_sql_query(connection, query):
    result_obj = connection.execute(sqlalchemy.text(query))
    return result_obj.fetchall()


In [32]:
def execute_nl_query(nl_query: str):
    response = write_sql_query(nl_query)

    #response["rows"] = execute_sql_query_on_sqlite3(response["sql"])
    with sql_engine.connect() as conn:
        response["rows"] = execute_sql_query(conn, response["sql"])

    return response


### Try

In [33]:
test_nl_queries = [
    "Who is the artist with the most albums?",
    "List the top 3 tracks with maximum sale.",
    "Name the employee who supports maximum number of customers."
] 


In [34]:
results = []
for nl_q in test_nl_queries:
    response =  execute_nl_query(nl_q)
    response["query"] = nl_q
    results.append(response)


2023-12-01 20:46:01,984 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-12-01 20:46:01,984 INFO sqlalchemy.engine.Engine SELECT artists.Name, COUNT(albums.AlbumId) as AlbumCount FROM artists JOIN albums ON artists.ArtistId = albums.ArtistId GROUP BY artists.ArtistId ORDER BY AlbumCount DESC LIMIT 1;
2023-12-01 20:46:01,985 INFO sqlalchemy.engine.Engine [generated in 0.00112s] ()
2023-12-01 20:46:01,986 INFO sqlalchemy.engine.Engine ROLLBACK
2023-12-01 20:46:08,690 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-12-01 20:46:08,690 INFO sqlalchemy.engine.Engine SELECT t.Name, SUM(ii.Quantity) as Total_Sales FROM invoice_items ii JOIN tracks t ON ii.TrackId = t.TrackId GROUP BY ii.TrackId ORDER BY Total_Sales DESC LIMIT 3
2023-12-01 20:46:08,691 INFO sqlalchemy.engine.Engine [generated in 0.00118s] ()
2023-12-01 20:46:08,694 INFO sqlalchemy.engine.Engine ROLLBACK
2023-12-01 20:46:12,770 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-12-01 20:46:12,770 INFO sqlalchemy.engine.E

In [35]:
for t in results:
    print(f"User Query: {t['query']}\nTables:{json.dumps(t['tables'])}\nSQL:{t['sql']}\nRows:\n{str(t['rows'])}\n")


User Query: Who is the artist with the most albums?
Tables:{"artists": ["ArtistId", "Name"], "albums": ["ArtistId"]}
SQL:SELECT artists.Name, COUNT(albums.AlbumId) as AlbumCount FROM artists JOIN albums ON artists.ArtistId = albums.ArtistId GROUP BY artists.ArtistId ORDER BY AlbumCount DESC LIMIT 1;
Rows:
[('Iron Maiden', 21)]

User Query: List the top 3 tracks with maximum sale.
Tables:{"invoice_items": ["TrackId", "Quantity"], "tracks": ["TrackId", "Name"]}
SQL:SELECT t.Name, SUM(ii.Quantity) as Total_Sales FROM invoice_items ii JOIN tracks t ON ii.TrackId = t.TrackId GROUP BY ii.TrackId ORDER BY Total_Sales DESC LIMIT 3
Rows:
[('Balls to the Wall', 2), ('Inject The Venom', 2), ('Snowballed', 2)]

User Query: Name the employee who supports maximum number of customers.
Tables:{"employees": ["EmployeeId", "FirstName", "LastName"], "customers": ["SupportRepId"]}
SQL:SELECT e.FirstName, e.LastName FROM employees e WHERE e.EmployeeId = (SELECT c.SupportRepId FROM customers c GROUP BY c.Su

## Cleanup

In [36]:
sql_engine.dispose()


---
<p>Copyright &copy 2023 <a href="https://www.linkedin.com/in/scgupta">Satish Chandra Gupta</a>.</p>
<img src="https://licensebuttons.net/l/by-nc-sa/3.0/88x31.png" align="left"/> <p>&nbsp;<a href="https://creativecommons.org/licenses/by-nc-sa/4.0/">CC BY-NC-SA 4.0 International</a> License.</p>