<a href="https://colab.research.google.com/github/solomontessema/Agentic-AI-with-Python/blob/main/notebooks/Natural_Language_to_SQL_Generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<table>
  <tr>
    <td><img src="https://ionnova.com/img/ionnova_logo_name_2.png" width="120px"></td>
    <td><h1>Natural Language to SQL Generator</h1></td>
  </tr>
</table>

## Step 1: Create database and table

In [None]:
!pip install langchain_community

In [None]:
import sqlite3
import random
from datetime import datetime

# Connect to (or create) a database file
db = sqlite3.connect("data.db")
cursor = db.cursor()

# Create a sample table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY,
    name TEXT,
    email TEXT,
    signup_date TEXT
);
""")

db.commit()

# Insert sample data
def insert_sample_users():
    names = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Hank", "Ivy", "Jack",
             "Karen", "Leo", "Mona", "Nate", "Olivia", "Paul", "Quinn", "Rachel", "Steve", "Tina",
             "Uma", "Victor", "Wendy", "Xander", "Yara", "Zane", "Liam", "Noah", "Emma", "Sophia"]

    for i in range(30):
        name = names[i]
        email = f"{name.lower()}@example.com"
        # Random signup date in 2022 or 2023
        year = random.choice([2022, 2023])
        month = random.randint(1, 12)
        day = random.randint(1, 28)
        signup_date = datetime(year, month, day).strftime("%Y-%m-%d")

        cursor.execute("INSERT INTO users (name, email, signup_date) VALUES (?, ?, ?)",
                       (name, email, signup_date))

    db.commit()
    print("Inserted 30 sample users.")

# Run this once to populate the table
insert_sample_users()

In [None]:
import sqlite3

class SQLiteConnector:
    def __init__(self, db_path="data.db"):
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()

    def list_tables(self):
        self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        return [row[0] for row in self.cursor.fetchall()]

    def describe_table(self, table_name):
        self.cursor.execute(f"PRAGMA table_info({table_name});")
        return self.cursor.fetchall()

    def get_schema_summary(self):
        summary = []
        for table in self.list_tables():
            cols = self.describe_table(table)
            formatted = f"Table: {table} Columns: {[col[1] for col in cols]}"
            summary.append(formatted)
        return "\n".join(summary)

if __name__ == "__main__":
    db = SQLiteConnector()
    print(db.get_schema_summary())

In [None]:
# from db_connector import SQLiteConnector
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOpenAI
from langchain.chains import LLMChain
import os
from dotenv import load_dotenv

load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")


connector = SQLiteConnector()
schema_summary = connector.get_schema_summary()

llm = ChatOpenAI(model="gpt-4", temperature=0)

prompt = PromptTemplate(
    input_variables=["schema", "question"],
    template="""
    Given the database schema:
    {schema}

    Write only the SQLite SQL query to answer the question below.
    Do not include any explanation or commentary.

    Question:
    {question}
    """
)


sql_chain = LLMChain(llm=llm, prompt=prompt)

def generate_sql(question: str):
    return sql_chain.run({"schema": schema_summary, "question": question})

def run_query(query):
    try:
        cursor = connector.conn.cursor()
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        return columns, rows
    except Exception as e:
        return None, f"SQL Error: {str(e)}"


def format_results(columns, rows):
    if not rows:
        return "No results found."

    output = ""
    for row in rows:
        row_text = ", ".join(f"{col}: {val}" for col, val in zip(columns, row))
        output += f"- {row_text}\n"
    return output


def execute_and_respond(question):
    sql = generate_sql(question)
    columns, rows_or_error = run_query(sql)

    if isinstance(rows_or_error, str):  # error message
        return f"Error executing SQL: {rows_or_error}"
    return format_results(columns, rows_or_error)

if __name__ == "__main__":
    print(generate_sql("How many users signed up after 2023?"))

    response = execute_and_respond("How many users signed up after 2023?")
    print(response)
