# Importing neccesery things

In [510]:
import os
from dotenv import load_dotenv

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Float
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
from sqlalchemy import text, inspect

from langchain_core.runnables.config import RunnableConfig
from typing_extensions import TypedDict
from pydantic import BaseModel, Field

from crewai import Agent, Crew, Process
from crewai.project import CrewBase, agent, task, crew


import nest_asyncio
import asyncio

nest_asyncio.apply()


load_dotenv('/Users/giorgikurtanidze/Documents/GitHub/Text-to-SQL-Agent/.env')

DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///example.db")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

## Step 1: Define Agent State**


In [524]:
class AgentState(BaseModel):
    current_user: str = ""
    question: str = ""
    relevance: str = ""
    sql_query: str = ""
    query_rows: list = []
    query_result: str = ""

In [525]:
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    age = Column(Integer)
    email = Column(String, unique=True, index=True)



class Food(Base):
    __tablename__ = "food"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, index=True)
    price = Column(Float)


class Order(Base):
    __tablename__ = "orders"

    id = Column(Integer, primary_key=True, index=True)
    food_id = Column(Integer, ForeignKey("food.id"))
    user_id = Column(Integer, ForeignKey("users.id"))



# Step 2: Get schema from database


In [526]:
def get_database_schema(engine):
    inspector = inspect(engine)
    schema = ""
    for table_name in inspector.get_table_names():
        schema += f"Table: {table_name}\n"
        column_count = 0
        for column in inspector.get_columns(table_name):
            if column_count >= 21:
                break
            col_name = column["name"]
            col_type = str(column["type"])
            if column.get("primary_key"):
                col_type += ", Primary Key"
            if column.get("foreign_keys"):
                fk = list(column["foreign_keys"])[0]
                col_type += f", Foreign Key to {fk.column.table.name}.{fk.column.name}"
            schema += f"- {col_name}: {col_type}\n"
            column_count += 1
        schema += "\n"
    print(f"[DEBUG] Database Schema:\n{schema}")
    return schema


# Step 3: Define Workflow Nodes**


In [527]:
class GetCurrentUser(BaseModel):
    current_user: str = Field(
        description="The name of the current user based on the provided user ID."
    )

In [528]:
def get_current_user(state: AgentState, config: RunnableConfig):
    user_id = config.get("configurable", {}).get("current_user_id", None)
    session = SessionLocal()
    try:
        user = session.query(User).filter(User.id == int(user_id)).first()
        state.current_user = user.name if user else "User not found"
    finally:
        session.close()
    return state

In [529]:
class CheckRelevance(BaseModel):
    relevance: str = Field(description="Is the question relevant to the database?")

In [572]:
def check_relevance(state: AgentState, config: RunnableConfig):
    question = state.question
    schema = get_database_schema(engine)
    print(f"[DEBUG] Checking relevance of the question: {question}")
    system = f"""
    You are an assistant that determines whether a given question is related to the following database schema.

    Schema:
    {schema}

    Respond with only "relevant" or "not_relevant".
    """
    human = f"Question: {question}"
    check_prompt = ChatPromptTemplate.from_messages([
        ("system", system),
        ("human", human),
    ])
    llm = ChatOpenAI(temperature=0)
    structured_llm = llm.with_structured_output(CheckRelevance)
    try:
        relevance_checker = check_prompt | structured_llm
        relevance = relevance_checker.invoke({})
        state.relevance = relevance.relevance
        print(f"[DEBUG] Relevance determined: {state.relevance}")
    except Exception as e:
        print(f"[ERROR] Error during relevance check: {e}")
        state.relevance = "not_relevant"
    return state


In [573]:
def convert_nl_to_sql(state: AgentState, config: RunnableConfig):
    print(f"[DEBUG] Relevance determined: {state.relevance}")
    if state.relevance != "relevant":
        print("[DEBUG] Question marked as not relevant")
        state.sql_query = "The question is not relevant to the database."
        return state  # Ensure early return only for "not relevant"

    schema = get_database_schema(engine)
    question = state.question
    current_user = state.current_user
    system = f"""
    You are an assistant that converts natural language questions into SQL queries based on the following schema:

    Schema:
    {schema}

    The current user is '{current_user}'. Ensure that all query-related data is scoped to this user.

    Provide only the SQL query without any explanations.
    """
    human = f"Question: {question}"
    convert_prompt = ChatPromptTemplate.from_messages([
        ("system", system),
        ("human", human),
    ])
    llm = ChatOpenAI(temperature=0)
    sql_generator = convert_prompt | StrOutputParser()

    try:
        response = sql_generator.invoke({})
        if isinstance(response, str):
            state.sql_query = response.strip()
        else:
            raise ValueError("Unexpected response format from LLM.")
        print(f"[DEBUG] SQL Query Generated: {state.sql_query}")
    except Exception as e:
        print(f"[ERROR] Error in convert_nl_to_sql: {e}")
        state.sql_query = "Error generating SQL query."
    return state


In [574]:
def execute_sql(state: AgentState):
    sql_query = state.sql_query
    if "The question is not relevant" in sql_query or "Error generating SQL query" in sql_query:
        print("[DEBUG] Skipping execution: Invalid SQL query.")
        state.query_rows = []
        return state
    session = SessionLocal()
    try:
        print(f"[DEBUG] Executing SQL Query: {sql_query}")
        result = session.execute(text(sql_query))
        state.query_rows = result.fetchall()
    except Exception as e:
        print(f"[ERROR] Error executing SQL query: {e}")
        state.query_rows = []
    finally:
        session.close()
    return state


In [575]:
def generate_human_readable_answer(state: AgentState):
    rows = state.query_rows
    current_user = state.current_user
    state.query_result = (
        f"Hello {current_user}, found {len(rows)} rows." if rows else f"Hello {current_user}, no data found."
    )
    return state

# Step 4: Create Workflow with Crew


In [576]:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""

class StateExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from first_method"
        self.state.counter += 1

    @listen(first_method)
    def second_method(self):
        self.state.message += " - updated by second_method"
        self.state.counter += 1
        return self.state.message

flow = StateExampleFlow()
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)


Final Output: Hello from first_method - updated by second_method
Final State:
counter=2 message='Hello from first_method - updated by second_method'


In [577]:
from langchain_core.prompts.chat import ChatPromptTemplate
from langchain_core.output_parsers.string import StrOutputParser
from langchain_openai import ChatOpenAI

class TextToSQLFlow(Flow[AgentState]):
    @start()
    def get_current_user(self):
        return get_current_user(self.state, self.config)

    @listen("get_current_user")
    def check_relevance(self, current_user):
        return check_relevance(self.state, self.config)

    @listen("check_relevance")
    def convert_nl_to_sql(self, relevance):
        print(f"Relevance efrnecueybfihdbcreubfvcruc: {relevance}")
        if relevance != "relevant":
            print("[DEBUG] Question marked as not relevant")
            self.state.sql_query = "The question is not relevant to the database."
            return self.state.sql_query
        return convert_nl_to_sql(self.state, self.config)


    @listen("convert_nl_to_sql")
    def execute_sql(self, sql_query):
        return execute_sql(self.state)

    @listen("execute_sql")
    def generate_human_readable_answer(self, query_rows):
        return generate_human_readable_answer(self.state)


In [578]:
flow = TextToSQLFlow()
flow.state.question = "Show me all delivery information for user 10 and oreders."
flow.config = {"configurable": {"current_user_id": "10"}}

async def run_flow():
    final_output = await flow.kickoff_async()
    print(f"Final Output: {final_output}")
    print("Final State:")
    print(flow.state)

await run_flow()

[DEBUG] Database Schema:
Table: coupons
- id: INTEGER, Primary Key
- code: VARCHAR
- discount_percentage: FLOAT
- expiration_date: DATE

Table: delivery
- id: INTEGER, Primary Key
- order_id: INTEGER
- delivery_address: VARCHAR
- delivery_status: VARCHAR

Table: discounts
- id: INTEGER, Primary Key
- food_id: INTEGER
- discount_percentage: FLOAT

Table: employees
- id: INTEGER, Primary Key
- name: VARCHAR
- position: VARCHAR
- salary: FLOAT

Table: food
- id: INTEGER, Primary Key
- name: VARCHAR
- price: FLOAT

Table: food_category
- id: INTEGER, Primary Key
- name: VARCHAR

Table: food_category_mapping
- id: INTEGER, Primary Key
- food_id: INTEGER
- category_id: INTEGER

Table: ingredients
- id: INTEGER, Primary Key
- name: VARCHAR
- cost_per_unit: FLOAT

Table: inventory
- id: INTEGER, Primary Key
- ingredient_id: INTEGER
- stock_level: FLOAT

Table: loyalty_program
- id: INTEGER, Primary Key
- user_id: INTEGER
- points: INTEGER
- membership_tier: VARCHAR

Table: orders
- id: INTEGER