In [1]:
# sql db (sqlAlchemy)
# user query
# model takes user query and generate sql query :
  # meta data of sql db is read
  # tables and columns descriptions are read as well
  # prompt is prepared for the text-to-sql model.
  # prompt passed to model to create the sql query.
# sql query is ran on the database and results are returned
# the results passed to the generator to use them in its answer.

In [2]:
!pip install haystack-ai



In [3]:
!pip install mysql-connector-python
!pip install SQLAlchemy pymysql



In [21]:
!apt-get -y install mysql-server

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libcgi-fast-perl libcgi-pm-perl libclone-perl libencode-locale-perl
  libfcgi-bin libfcgi-perl libfcgi0ldbl libhtml-parser-perl
  libhtml-tagset-perl libhtml-template-perl libhttp-date-perl
  libhttp-message-perl libio-html-perl liblwp-mediatypes-perl libmecab2
  libprotobuf-lite23 liburi-perl mecab-ipadic mecab-ipadic-utf8 mecab-utils
  mysql-client-8.0 mysql-client-core-8.0 mysql-server-8.0
  mysql-server-core-8.0
Suggested packages:
  libdata-dump-perl libipc-sharedcache-perl libbusiness-isbn-perl libwww-perl
  mailx tinyca
The following NEW packages will be installed:
  libcgi-fast-perl libcgi-pm-perl libclone-perl libencode-locale-perl
  libfcgi-bin libfcgi-perl libfcgi0ldbl libhtml-parser-perl
  libhtml-tagset-perl libhtml-template-perl libhttp-date-perl
  libhttp-message-perl libio-html-perl liblwp-mediatypes-perl libmecab2
  lib

In [1]:
!service mysql start

 * Starting MySQL database server mysqld
   ...done.


In [2]:
!mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED WITH 'mysql_native_password' BY 'root';FLUSH PRIVILEGES;"

ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: NO)


In [24]:
import mysql.connector

# Create database and tables
conn = mysql.connector.connect(user='root', password='root', host='127.0.0.1')
cursor = conn.cursor()
db_name = "library"

# Create database
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
cursor.execute(f"USE {db_name}")

# Create tables
cursor.execute("""
CREATE TABLE IF NOT EXISTS authors (
    author_id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    country VARCHAR(50),
    birth_year YEAR
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS books (
    book_id INT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    author_id INT NOT NULL,
    genre VARCHAR(50),
    published_year YEAR,
    isbn VARCHAR(13) UNIQUE,
    FOREIGN KEY (author_id) REFERENCES authors(author_id)
)
""")

# Insert sample data
cursor.execute("""
INSERT INTO authors (name, country, birth_year)
VALUES
    ('J.K. Rowling', 'UK', 1965),
    ('George Orwell', 'UK', 1903),
    ('Haruki Murakami', 'Japan', 1949)
""")

cursor.execute("""
INSERT INTO books (title, author_id, genre, published_year, isbn)
VALUES
    ('Harry Potter and the Philosopher''s Stone', 1, 'Fantasy', 1997, '9780747532699'),
    ('1984', 2, 'Dystopian', 1949, '9780451524935'),
    ('Norwegian Wood', 3, 'Fiction', 1987, '9780375704024')
""")

conn.commit()
cursor.close()
conn.close()

In [3]:
from sqlalchemy import create_engine, MetaData, inspect

# Create engine and reflect metadata
engine = create_engine("mysql+mysqlconnector://root:root@127.0.0.1/library")
metadata = MetaData()
metadata.reflect(bind=engine)
inspector = inspect(engine)

print(f"\n{'='*50}\nDatabase Metadata\n{'='*50}")

for table_name in metadata.tables:
    print(f"\nTable: {table_name}")
    print("-" * 50)

    # Print columns with available info
    columns = inspector.get_columns(table_name)
    for col in columns:
        col_info = f"  • {col['name']}: {col['type']}"

        # Only use keys we know exist
        if not col['nullable']:
            col_info += " (NOT NULL)"
        if col.get('autoincrement', False):
            col_info += " (AUTO_INCREMENT)"

        print(col_info)

    # Print constraints separately
    print("\n  Constraints:")

    # Primary keys
    pk_constraint = inspector.get_pk_constraint(table_name)
    if pk_constraint['constrained_columns']:
        pk_cols = ", ".join(pk_constraint['constrained_columns'])
        print(f"    › PRIMARY KEY ({pk_cols})")

    # Unique constraints
    unique_constraints = inspector.get_unique_constraints(table_name)
    for uc in unique_constraints:
        uc_cols = ", ".join(uc['column_names'])
        print(f"    › UNIQUE ({uc_cols})")

    # Foreign keys
    foreign_keys = inspector.get_foreign_keys(table_name)
    for fk in foreign_keys:
        fk_cols = ", ".join(fk['constrained_columns'])
        ref_cols = ", ".join(fk['referred_columns'])
        print(f"    › FOREIGN KEY ({fk_cols}) REFERENCES {fk['referred_table']}({ref_cols})")

    print("=" * 50)


Database Metadata

Table: authors
--------------------------------------------------
  • author_id: INTEGER (NOT NULL) (AUTO_INCREMENT)
  • name: VARCHAR(100) (NOT NULL)
  • country: VARCHAR(50)
  • birth_year: YEAR

  Constraints:
    › PRIMARY KEY (author_id)

Table: books
--------------------------------------------------
  • book_id: INTEGER (NOT NULL) (AUTO_INCREMENT)
  • title: VARCHAR(255) (NOT NULL)
  • author_id: INTEGER (NOT NULL)
  • genre: VARCHAR(50)
  • published_year: YEAR
  • isbn: VARCHAR(13)

  Constraints:
    › PRIMARY KEY (book_id)
    › UNIQUE (isbn)
    › FOREIGN KEY (author_id) REFERENCES authors(author_id)


### making the pipeline

In [4]:
from sqlalchemy import create_engine, MetaData, inspect

# Database connection
db_url = "mysql+mysqlconnector://root:root@127.0.0.1/library"
engine = create_engine(db_url)
metadata = MetaData()
metadata.reflect(bind=engine)
inspector = inspect(engine)

def generate_schema_text():
    """Convert database metadata to CREATE TABLE format"""
    schema_lines = []
    for table_name in metadata.tables:
        # Table definition
        table_def = f"CREATE TABLE {table_name} (\n"
        columns = inspector.get_columns(table_name)
        col_defs = []

        for col in columns:
            # Basic column definition
            col_def = f"  {col['name']} {str(col['type']).split('(')[0].lower()}"
            if not col['nullable']:
                col_def += " NOT NULL"
            col_defs.append(col_def)

        table_def += ",\n".join(col_defs)

        # Add constraints
        pk = inspector.get_pk_constraint(table_name)
        if pk['constrained_columns']:
            pk_cols = ", ".join(pk['constrained_columns'])
            table_def += f",\n  PRIMARY KEY ({pk_cols})"

        # Add foreign keys
        for fk in inspector.get_foreign_keys(table_name):
            fk_cols = ", ".join(fk['constrained_columns'])
            ref_cols = ", ".join(fk['referred_columns'])
            table_def += f",\n  FOREIGN KEY ({fk_cols}) REFERENCES {fk['referred_table']}({ref_cols})"

        table_def += "\n);"
        schema_lines.append(table_def)

    return "\n\n".join(schema_lines)

# Save schema to file
with open("schema.sql", "w") as f:
    f.write(generate_schema_text())

In [5]:
from haystack.components.builders import PromptBuilder
from pathlib import Path

# Read schema from file
schema_text = Path("schema.sql").read_text()

# Define prompt template
prompt_template = """<schema>{{schema}}</schema>
<question>{{query}}</question>
<sql>"""

prompt_builder = PromptBuilder(template=prompt_template, required_variables=["query", "schema"])

# Test prompt construction
test_prompt = prompt_builder.run(schema=schema_text, query="Find books by J.K. Rowling")
print(test_prompt["prompt"][:50] + "...")  # Show first 500 chars

<schema>CREATE TABLE authors (
  author_id integer...


In [6]:
from haystack.utils import Secret
from haystack.components.generators import HuggingFaceLocalGenerator
import os

# Set Hugging Face token (replace with your token)
from google.colab import userdata
token = userdata.get('COOL_HF_TOKEN')

model = HuggingFaceLocalGenerator(
    model="PipableAI/pip-sql-1.3b",
    generation_kwargs={
        "max_new_tokens": 150,
        "temperature": 0.01,
        "do_sample": False
    },token=Secret.from_token(token)
)

# Warm up model (loads weights)
model.warm_up()

# Test model component
test_output = model.run(prompt=test_prompt["prompt"])
print(test_output["replies"])

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Device set to use cuda:0
The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


["SELECT title FROM books WHERE author_id IN (SELECT author_id FROM authors WHERE name ILIKE '%J.K. Rowling%')</sql>\n<sql>SELECT title FROM books WHERE author_id IN (SELECT author_id FROM authors WHERE name ILIKE '%J.K. Rowling%')</sql>\n<sql>SELECT title FROM books WHERE author_id IN (SELECT author_id FROM authors WHERE name ILIKE '%J.K. Rowling%')</sql>\n<sql>SELECT title FROM books WHERE author_id IN (SELECT author_id FROM authors WHERE name ILIKE '%J.K. Rowling%')</sql>\n<"]


In [7]:
from haystack.dataclasses import Document
from haystack import component
from sqlalchemy import text
from typing import Dict, Any, List
import json

@component
class SQLExecutor:
    def __init__(self, engine):
        self.engine = engine
        self.connection = None

    @component.output_types(results=List[Dict], rowcount=int, error=str, sql=str)
    def run(self, query: str):
        """Execute SQL query and return results"""
        # Clean and validate SQL
        clean_query = query.strip().rstrip(';')
        if not clean_query.lower().startswith(("select", "insert", "update", "delete")):
            return {"error": "Only SELECT/INSERT/UPDATE/DELETE queries are allowed", "sql_query": clean_query}

        try:
            # Execute query
            with self.engine.connect() as connection:
                result = connection.execute(text(clean_query))

                # Handle results
                if result.returns_rows:
                    columns = list(result.keys())
                    rows = [dict(zip(columns, row)) for row in result.fetchall()]
                    rows_docs = [Document(content = str(row)) for row in rows]
                    return {"results": rows_docs, "raw_results": rows, "rowcount": len(rows), "sql": clean_query}
                else:
                    return {"results": [], "raw_results": [], "rowcount": result.rowcount, "sql": clean_query}

        except Exception as e:
            return {"error": str(e), "sql": clean_query}

    def warm_up(self):
        """Establish connection pool"""
        self.connection = self.engine.connect()

    def teardown(self):
        """Clean up resources"""
        if self.connection:
            self.connection.close()


In [8]:
@component
class SqlGenerator:

  def __init__(self, model):
      self._model = model

  @component.output_types(sql = str)
  def run(self, prompt: str):
    results = self._model.run(prompt)
    response = results['replies'][0]
    sql_query = response.split('</sql>')[0].strip()
    return {"sql": sql_query}

In [11]:
# # this cell for testing purposes.
# sql_pipeline.draw('pipeline.png')
# sql_pipeline.remove_component("prompt_builder")
# sql_pipeline.remove_component("sql_generator")
# sql_pipeline.remove_component("sql_executor")
# sql_pipeline.to_dict()

{'metadata': {},
 'max_runs_per_component': 100,
 'components': {},
 'connections': [],
 'connection_type_validation': True}

In [12]:
from haystack import Pipeline

# Create pipeline
sql_pipeline = Pipeline()
sql_pipeline.add_component("prompt_builder", prompt_builder)
sql_pipeline.add_component("sql_generator", SqlGenerator(model))
sql_pipeline.add_component("sql_executor", SQLExecutor(engine))


# Connect components
sql_pipeline.connect("prompt_builder.prompt", "sql_generator.prompt")
sql_pipeline.connect("sql_generator.sql", "sql_executor.query")


# Create execution function
def generate_and_execute(query: str) -> Dict[str, Any]:
    """Run full pipeline: query → prompt → SQL → execution → results"""
    # Run pipeline
    pipeline_result = sql_pipeline.run({
        "prompt_builder": {
            "schema": schema_text,
            "query": query
        }
    }, include_outputs_from= {"sql_generator", "sql_executor"})

    # Extract generated SQL
    generated_sql = pipeline_result["sql_generator"]

    # Get execution results
    execution_result = pipeline_result["sql_executor"]

    # Return comprehensive results
    return {
        "generated_sql": generated_sql,
        "execution_result": execution_result,
        "full_pipeline_output": pipeline_result
    }

# Example usage with better formatting
if __name__ == "__main__":
    question = "Find all books published before 1950 and their authors"
    result = generate_and_execute(question)

    print(f"\n{'='*80}")
    print(f"Question: {question}")
    print(f"\nGenerated SQL:\n{result['generated_sql']}")

    if "error" in result["execution_result"]:
        print(f"\n❌ Execution Error: {result['execution_result']['error']}")
    else:
        print(f"\n✅ Results ({result['execution_result']['rowcount']} rows):")
        print(json.dumps(result["execution_result"]["raw_results"], indent=2))

The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.



Question: Find all books published before 1950 and their authors

Generated SQL:
{'sql': 'SELECT books.title,  authors.name FROM books JOIN authors ON books.author_id  =  authors.author_id WHERE books.published_year  <  1950'}

✅ Results (1 rows):
[
  {
    "title": "1984",
    "name": "George Orwell"
  }
]
