## DatabaseConnectionTool

In [25]:
import re
import os

def is_valid_sqlite_connection_string(conn_str: str) -> bool:
    """
    Validates if the connection string follows the pattern:
    sqlite:///databases/<name>.db and the file exists.
    """
    pattern = r"^sqlite:///databases/[a-zA-Z0-9_\-]+\.db$"
    if not re.match(pattern, conn_str):
        return False

    db_path = conn_str.replace("sqlite:///", "")
    return os.path.isfile(db_path)

In [26]:
import sqlite3
import pandas as pd
import logging
from typing import Dict, List, Any, Optional, Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import pandas as pd
import numpy as np

logger = logging.getLogger(__name__)

class DatabaseConnectionInput(BaseModel):
    """Input schema for database connection"""
    connection_string: str = Field(description="Database connection string")
    query: Optional[str] = Field(default=None, description="SQL query to execute")

In [27]:
class DatabaseConnectionTool(BaseTool):
    """Tool for database connection and query execution"""
    name: str = "Database Connection Tool"
    description: str = "Connect to database and execute queries for data analysis"
    args_schema: Type[BaseModel] = DatabaseConnectionInput
    
    def _run(self, connection_string: str, query: Optional[str] = None, **kwargs) -> str:
        """Execute database operations"""
        if not is_valid_sqlite_connection_string(connection_string):
            return f"❌ Invalid connection string: {connection_string}. Must be sqlite:///databases/<name>.db"
        try:
            # Handle case where parameters might be passed as a single argument
            if isinstance(connection_string, dict):
                query = connection_string.get('query')
                connection_string = connection_string.get('connection_string')
            
            if not connection_string:
                return "Error: connection_string is required"
            
            conn = sqlite3.connect(connection_string.replace("sqlite:///", ""))
            
            if query:
                df = pd.read_sql_query(query, conn)
                result = f"Query executed successfully. Results:\n{df.to_string()}"
                logger.info(f"Database query executed: {query}")
            else:
                # Get table information
                cursor = conn.cursor()
                cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
                tables = cursor.fetchall()
                result = f"Available tables: {[table[0] for table in tables]}"
                logger.info("Retrieved database schema information")
            
            conn.close()
            return result
            
        except Exception as e:
            error_msg = f"Database operation failed: {str(e)}"
            logger.error(error_msg)
            return error_msg

In [29]:
# Test the tool - Method 1: Using keyword arguments
tool = DatabaseConnectionTool()
result = tool.run(
    connection_string="sqlite:///databases/customer_db.db",
    query="SELECT * FROM customers"
)
print("Method 1 (keyword args):", result)

Using Tool: Database Connection Tool
Method 1 (keyword args): Query executed successfully. Results:
       customer_id   first_name   last_name                             email                   phone                             address                    city           state                                              country    registration_date account_status customer_tier  total_spent           last_login  marketing_consent date_of_birth             gender                                                   occupation                  created_at                  updated_at  zip_code
0    CUST_716575A4        Angel        Hill          donaldgarcia@example.net         +1-219-560-0133      79402 Peterson Drives Apt. 511               Davisstad    Rhode Island                                            Sri Lanka  2025-02-20 02:29:55         active        bronze      3707.75  2025-03-13 17:20:20                  1    1947-05-31                  F                                        

In [21]:
# # Test the tool - Method 2: Using dictionary (now supported with override)
# result = tool.run({
#     "connection_string": "sqlite:///customer_db.db",
#     "query": "SELECT * FROM customers"
# })
# print("Method 2 (dict):", result)

## MetadataExtractionTool

In [22]:
class MetadataExtractionInput(BaseModel):
    """Input schema for metadata extraction"""
    table_name: str = Field(description="Name of the table to extract metadata from")
    connection_string: str = Field(description="Database connection string")

class MetadataExtractionTool(BaseTool):
    """Tool for extracting database metadata"""
    name: str = "Metadata Extraction Tool"
    description: str = "Extract comprehensive metadata from database tables and schemas"
    args_schema: Type[BaseModel] = MetadataExtractionInput
    
    def _run(self, table_name: str, connection_string: str) -> str:
        """Extract metadata from specified table"""
        try:
            conn = sqlite3.connect(connection_string.replace("sqlite:///", ""))
            cursor = conn.cursor()
            
            # Get table schema
            cursor.execute(f"PRAGMA table_info({table_name})")
            columns = cursor.fetchall()
            
            # Get row count
            cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
            row_count = cursor.fetchone()[0]
            
            # Build metadata result
            metadata = {
                "table_name": table_name,
                "columns": [{"name": col[1], "type": col[2], "nullable": not col[3]} for col in columns],
                "row_count": row_count,
                "extraction_timestamp": pd.Timestamp.now().isoformat()
            }
            
            conn.close()
            result = f"Metadata extracted for {table_name}:\n{metadata}"
            logger.info(f"Metadata extracted for table: {table_name}")
            return result
            
        except Exception as e:
            error_msg = f"Metadata extraction failed: {str(e)}"
            logger.error(error_msg)
            return error_msg

In [24]:
# Test the tool - Method 1: Using keyword arguments
tool = MetadataExtractionTool()
result = tool.run(
    table_name="customers",
    connection_string="sqlite:///customer_db.db",
)
print("Method 1 (keyword args):", result)

Using Tool: Metadata Extraction Tool
Method 1 (keyword args): Metadata extracted for customers:
{'table_name': 'customers', 'columns': [{'name': 'customer_id', 'type': 'TEXT', 'nullable': True}, {'name': 'first_name', 'type': 'TEXT', 'nullable': False}, {'name': 'last_name', 'type': 'TEXT', 'nullable': False}, {'name': 'email', 'type': 'TEXT', 'nullable': False}, {'name': 'phone', 'type': 'TEXT', 'nullable': True}, {'name': 'address', 'type': 'TEXT', 'nullable': True}, {'name': 'city', 'type': 'TEXT', 'nullable': True}, {'name': 'state', 'type': 'TEXT', 'nullable': True}, {'name': 'country', 'type': 'TEXT', 'nullable': True}, {'name': 'registration_date', 'type': 'DATETIME', 'nullable': True}, {'name': 'account_status', 'type': 'TEXT', 'nullable': True}, {'name': 'customer_tier', 'type': 'TEXT', 'nullable': True}, {'name': 'total_spent', 'type': 'REAL', 'nullable': True}, {'name': 'last_login', 'type': 'DATETIME', 'nullable': True}, {'name': 'marketing_consent', 'type': 'BOOLEAN', 'nul

## SafeDirectoryReadTool

In [43]:
# tools/safe_directory_read_tool.py
import os
from typing import List, Optional, Type
from pydantic import BaseModel, Field
from crewai.tools import BaseTool

class SafeDirectoryReadInput(BaseModel):
    path: str = Field(..., description="Path to the directory")
    file_types: Optional[List[str]] = Field(default=None, description="List of file extensions to filter (e.g. ['.csv'])")
    recursive: Optional[bool] = Field(default=False, description="Whether to search recursively")

class SafeDirectoryReadTool(BaseTool):
    name: str = "List files in directory"
    description: str = "A tool that lists files in a directory with optional filtering and recursion"
    args_schema: Type[BaseModel] = SafeDirectoryReadInput

    def _run(self, path: str, file_types: Optional[List[str]] = None, recursive: Optional[bool] = False) -> str:
        try:
            if not os.path.exists(path):
                return f"❌ Path does not exist: {path}"
            if not os.path.isdir(path):
                return f"❌ Path is not a directory: {path}"

            files_found = []
            for root, _, files in os.walk(path):
                for file in files:
                    if not file_types or any(file.endswith(ext) for ext in file_types):
                        full_path = os.path.join(root, file)
                        rel_path = os.path.relpath(full_path, path)
                        files_found.append(rel_path)

                if not recursive:
                    break

            if not files_found:
                return f"ℹ️ No matching files found in: {path}"

            return "\n".join(files_found)

        except Exception as e:
            return f"❌ Failed to list files in {path}: {str(e)}"

In [44]:
if __name__ == "__main__":
    tool = SafeDirectoryReadTool()
    result = tool._run(
        path="databases",  # make sure this directory exists
        file_types=[".db"],
        recursive=True
    )
    print("✅ Files listed:\n", result)

✅ Files listed:
 customer_db.db
data_platform.db
support_tickets.db
transaction_logs.db


## SafeFileReadTool

In [47]:
# tools/safe_file_read_tool.py

from crewai_tools import FileReadTool
from typing import Optional, Type
from pydantic import BaseModel, Field


class SafeFileReadInput(BaseModel):
    file_path: str = Field(..., description="Path to the file")
    line_count: Optional[int] = Field(default=None, description="Number of lines to read from the file")


class SafeFileReadTool(FileReadTool):
    name: str = "Read a file's content"
    description: str = "A tool that reads the content of a file. Provide 'file_path'. Optionally use 'line_count'."
    args_schema: Type[BaseModel] = SafeFileReadInput

    def _run(self, file_path: str, line_count: Optional[int] = None) -> str:
        # Normalize line_count if passed as a string (for robustness)
        if isinstance(line_count, str):
            if line_count.strip().lower() in ["none", ""]:
                line_count = None
            elif line_count.isdigit():
                line_count = int(line_count)
            else:
                raise ValueError(f"Invalid value for line_count: {line_count}")

        return super()._run(file_path=file_path, line_count=line_count)

In [48]:
if __name__ == "__main__":
    tool = SafeFileReadTool()
    result = tool._run(file_path="requirements.txt", line_count=10)
    print("✅ File Content:\n", result)

✅ File Content:
 # Agentic Data Management Platform - Test Data Setup Requirements
# Core dependencies for generating comprehensive test databases

# Database and Data Generation
faker==20.1.0                  # Realistic fake data generation
sqlite3                        # SQLite database (built-in with Python)

# Data Processing and Analysis
pandas==2.1.4                  # Data manipulation and analysis
numpy==1.24.4                  # Numerical computing



## Text2SQL Testing

In [49]:
tool_args = {
    "natural_language_query": "Show all orders placed in 2023",
    "schema_info": {
        "orders": {
            "columns": [
                {"name": "order_id", "type": "INTEGER"},
                {"name": "order_date", "type": "DATE"}
            ]
        }
    },
    "db_files": [{"path": "databases/orders.db"}]
}

In [50]:
from tools.analytics_tools import Text2SQLInput

In [51]:
try:
    validated_args = Text2SQLInput(**tool_args)
    print("✅ tool_args are valid!")
    print(validated_args)
except Exception as e:
    print("❌ tool_args validation failed:")
    print(e)

✅ tool_args are valid!
natural_language_query='Show all orders placed in 2023' schema_info={'orders': {'columns': [{'name': 'order_id', 'type': 'INTEGER'}, {'name': 'order_date', 'type': 'DATE'}]}} db_files=[{'path': 'databases/orders.db'}]


In [52]:
from tools.analytics_tools import CrewText2SQLTool

tool = CrewText2SQLTool()
ToolInputSchema = tool.args_schema  # This is a Pydantic model class

try:
    validated = ToolInputSchema(**tool_args)
    print("✅ Valid input:")
    print(validated)
except Exception as e:
    print("❌ Invalid input:")
    print(e)

✅ Valid input:
natural_language_query='Show all orders placed in 2023' schema_info={'orders': {'columns': [{'name': 'order_id', 'type': 'INTEGER'}, {'name': 'order_date', 'type': 'DATE'}]}} db_files=[{'path': 'databases/orders.db'}]


In [53]:
# utils/tool_validator.py

from typing import Type
from crewai.tools import BaseTool
from pydantic import ValidationError

def validate_tool_args(tool_class: Type[BaseTool], args: dict):
    """
    Validates arguments against a CrewAI tool's defined args_schema.

    Parameters:
    - tool_class: The tool class (not an instance) that inherits from BaseTool.
    - args: The input arguments dictionary to validate.

    Returns:
    - (True, validated_instance): If validation succeeds.
    - (False, error_message): If validation fails.
    """
    try:
        schema_model = tool_class.args_schema
        validated = schema_model(**args)
        return True, validated
    except ValidationError as ve:
        return False, ve.errors()
    except Exception as e:
        return False, str(e)

In [54]:
from tools.analytics_tools import CrewText2SQLTool
tool_args = {
    "natural_language_query": "List all orders from 2023",
    "schema_info": {
        "orders": {
            "columns": [{"name": "order_id", "type": "INTEGER"}, {"name": "order_date", "type": "DATE"}]
        }
    },
    "db_files": [{"path": "databases/orders.db"}]
}

is_valid, result = validate_tool_args(CrewText2SQLTool, tool_args)

if is_valid:
    print("✅ Valid tool arguments:")
    print(result.model_dump())
else:
    print("❌ Validation failed:")
    print(result)

❌ Validation failed:
args_schema


In [55]:
result

'args_schema'

In [3]:
from pathlib import Path
from utils.scan_tool_args_schemas import scan_tool_args_schemas

# Adjust this path to point to your tools folder
tools_dir = Path("../agentic_ai_platform_for_data_management-nl2sql")
scan_tool_args_schemas(tools_dir)

🔍 Scanning for tools in the '..\agentic_ai_platform_for_data_management-nl2sql' directory...

📦 Found 18 Python file(s) in tools directory.
✅ [DEBUG] Starting main.py


C:\Users\ManojSharma\.conda\envs\llms6\Lib\site-packages\pydantic\_internal\_config.py:291: PydanticDeprecatedSince20: Support for class-based `config` is deprecated, use ConfigDict instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.9/migration/
Traceback (most recent call last):
  File "C:\Users\ManojSharma\LLM Projects\AI projects\Advanced Agentic AI Projects\Agentic AI Platform for Data Management\agentic_ai_platform_for_data_management-nl2sql\utils\scan_tool_args_schemas.py", line 30, in scan_tool_args_schemas
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "C:\Users\ManojSharma\LLM Projects\AI projects\Advanced Agentic AI Projects\Agentic AI Platform for Data Management\agentic_ai_platform_for_data_management-nl2sql\..\agentic_ai_platform_for_data_management-nl2s

✅ [DEBUG] Starting main.py

🔍 Scanned 0 tool(s).
⚠️ Issues found:
 - app: ❌ Import error: Decorators defined with incorrect fields: tools.analytics_tools.CrewText2SQLTool:2254434510752._default_args_schema (use check_fields=False if you're inheriting from the model and intended this)

For further information visit https://errors.pydantic.dev/2.9/u/decorator-missing-field
 - crew: ❌ Import error: Decorators defined with incorrect fields: tools.analytics_tools.CrewText2SQLTool:2254434514656._default_args_schema (use check_fields=False if you're inheriting from the model and intended this)

For further information visit https://errors.pydantic.dev/2.9/u/decorator-missing-field
 - main: ❌ Import error: Decorators defined with incorrect fields: tools.analytics_tools.CrewText2SQLTool:2254434499040._default_args_schema (use check_fields=False if you're inheriting from the model and intended this)

For further information visit https://errors.pydantic.dev/2.9/u/decorator-missing-field
 - eComm

Traceback (most recent call last):
  File "C:\Users\ManojSharma\LLM Projects\AI projects\Advanced Agentic AI Projects\Agentic AI Platform for Data Management\agentic_ai_platform_for_data_management-nl2sql\utils\scan_tool_args_schemas.py", line 30, in scan_tool_args_schemas
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 936, in exec_module
  File "<frozen importlib._bootstrap_external>", line 1073, in get_code
  File "<frozen importlib._bootstrap_external>", line 1130, in get_data
FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\ManojSharma\\LLM Projects\\AI projects\\Advanced Agentic AI Projects\\Agentic AI Platform for Data Management\\agentic_ai_platform_for_data_management-nl2sql\\..\\agentic_ai_platform_for_data_management-nl2sql\\ECommerceSampleDatabases\\eCommerce Data Generator.py'
C:\Users\ManojSharma\.conda\envs\llms6\Lib\site-packages\pydantic\_internal\_config.py:291: PydanticDeprecatedSince20: Support for class-b

In [4]:
import importlib.metadata
print(importlib.metadata.version("crewai"))

0.130.0


In [39]:
from crewai.tools import BaseTool
from typing import Dict, List, Any, Type
from pydantic import BaseModel, Field
import json
import logging
import re
import sqlite3
from pathlib import Path
from openai import OpenAI

logger = logging.getLogger(__name__)

# Input schema with proper Pydantic v2 configuration
class Text2SQLInput(BaseModel):
    model_config = {"arbitrary_types_allowed": True}
    
    natural_language_query: str = Field(description="Natural language query to convert to SQL")
    schema_info: Dict[str, Any] = Field(description="Database schema information")
    db_files: List[Dict[str, str]] = Field(description="List of database files")

# Helper function moved to module level for proper scope
def is_valid_sql(sql: str, schema_info: Dict[str, Any]) -> bool:
    """Validate if SQL references only tables present in schema"""
    tables_in_sql = set(re.findall(r"\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)", sql, re.IGNORECASE))
    tables_in_sql.update(re.findall(r"\bjoin\s+([a-zA-Z_][a-zA-Z0-9_]*)", sql, re.IGNORECASE))

    # Handle both dict and list schema formats
    if isinstance(schema_info, dict):
        valid_tables = {t.lower() for t in schema_info.keys()}
    elif isinstance(schema_info, list):
        valid_tables = set()
        for table in schema_info:
            table_name = table.get("table_name") or table.get("name")
            if table_name:
                valid_tables.add(table_name.lower())
    else:
        return False
    
    return all(t.lower() in valid_tables for t in tables_in_sql)

# Corrected Tool Definition
class CrewText2SQLTool(BaseTool):
    name: str = "Text to SQL Tool"
    description: str = "Convert NL query into SQL and get the answer"
    args_schema: Type[BaseModel] = Text2SQLInput

    def serialize_schema(self, schema_info: Any) -> str:
        """
        Convert schema_info into a strict textual representation
        to be used in the prompt. Supports both dict and list format.
        """
        lines = []

        # Case 1: If schema_info is a list of tables
        if isinstance(schema_info, list):
            for table in schema_info:
                table_name = table.get("table_name") or table.get("name")
                columns = table.get("columns", [])
                if not table_name:
                    continue
                column_list = ", ".join(
                    [f"{col['name']} ({col.get('type', 'TEXT')})" for col in columns]
                )
                lines.append(f"Table: {table_name}\nColumns: {column_list}")

        # Case 2: If schema_info is a dict of table_name → metadata
        elif isinstance(schema_info, dict):
            for table_name, metadata in schema_info.items():
                columns = metadata.get("columns", [])
                column_list = ", ".join(
                    [f"{col['name']} ({col.get('type', 'TEXT')})" for col in columns]
                )
                lines.append(f"Table: {table_name}\nColumns: {column_list}")

        else:
            raise ValueError("schema_info must be a list or dict")

        return "\n\n".join(lines)

    def execute_sql_across_dbs(self, sql: str, db_files: list) -> list:
        if not db_files:
            return [{"error": "No database files provided for execution."}]
    
        conn = None
        try:
            main_db_path = Path(db_files[0]["path"]).resolve()
            conn = sqlite3.connect(str(main_db_path))
            cursor = conn.cursor()
    
            attached_aliases = set(["main"])
            for idx, db in enumerate(db_files[1:], start=1):
                db_path = Path(db["path"]).resolve()
                alias = db_path.stem
                if alias in attached_aliases:
                    alias = f"{alias}_{idx}"
                attached_aliases.add(alias)
                cursor.execute(f"ATTACH DATABASE '{db_path}' AS {alias}")

            logger.info(f"🔍 Executing SQL:\n{sql}")
            cursor.execute(sql)
    
            # ✅ NEW: Check if this query returns results
            if cursor.description is None:
                return [{"message": "Query executed successfully, but no results returned."}]
    
            rows = cursor.fetchall()
            headers = [desc[0] for desc in cursor.description]
            return [dict(zip(headers, row)) for row in rows]
    
        except Exception as e:
            logger.error(f"SQL Execution Error: {e}")
            return [{"error": str(e), "query": sql}]
        finally:
            if conn:
                conn.close()

    def _run(self, natural_language_query: str, schema_info: Dict[str, Any], db_files: List[Dict[str, str]], **kwargs) -> str:
        print("✅ [DEBUG] Received schema_info in tool:", schema_info)
        """
        Execute the text-to-SQL conversion process
        
        Args:
            natural_language_query: The natural language query to convert
            schema_info: Database schema information (dict or list format)
            db_files: List of database files to execute queries against
            
        Returns:
            JSON string containing the generated SQL, explanation, and results
        """
        try:
            question = natural_language_query

            # Ensure schema_info is parsed if passed as a JSON string
            if isinstance(schema_info, str):
                try:
                    schema_info = json.loads(schema_info)
                except json.JSONDecodeError as e:
                    raise ValueError(f"Invalid JSON in schema_info: {e}")

            if not question or not schema_info:
                raise ValueError("Missing natural_language_query or schema_info")

            schema_str = self.serialize_schema(schema_info)

            prompt = f"""
You are an expert SQL generator.

You must ONLY use tables and columns listed below. 
Do NOT invent or guess table names like 'orders', 'sales', 'products'.

## Natural Language Query:
{question}

## Schema:
{schema_str}

## Response Format:
```json
{{
  "generated_sql": "...",
  "explanation": "..."
}}
```"""

            client = OpenAI()
            
            response = client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.2
            )

            llm_output = response.choices[0].message.content.strip()
            logger.debug(f"[GPT-4o Output]\n{llm_output}")

            # Parse JSON response with better error handling
            try:
                match = re.search(r"```json\s*(.*?)```", llm_output, re.DOTALL)
                if match:
                    parsed = json.loads(match.group(1))
                else:
                    parsed = json.loads(llm_output)
            except json.JSONDecodeError as e:
                raise ValueError(f"Failed to parse LLM response as JSON: {e}")

            sql = parsed.get("generated_sql")
            parsed["original_query"] = question

            # Validate and execute SQL if database files are provided
            if sql and db_files:
                if not is_valid_sql(sql, schema_info):
                    raise ValueError("Generated SQL references unknown tables.")
                parsed["query_result"] = self.execute_sql_across_dbs(sql, db_files)

            return json.dumps(parsed, indent=2)

        except Exception as e:
            logger.exception("GPT-4o NL2SQL failed")
            return json.dumps({
                "error": str(e),
                "query": natural_language_query,
                "timestamp": json.dumps({"error": "Could not generate timestamp"})
            }, indent=2)

In [77]:
tool = CrewText2SQLTool()

schema_info = {
  "customers": {
    "columns": [
      { "name": "customer_id", "type": "TEXT", "primary_key": "true" },
      { "name": "first_name", "type": "TEXT" },
      { "name": "last_name", "type": "TEXT" },
      { "name": "email", "type": "TEXT" },
      { "name": "phone", "type": "TEXT" },
      { "name": "address", "type": "TEXT" },
      { "name": "city", "type": "TEXT" },
      { "name": "state", "type": "TEXT" },
      { "name": "country", "type": "TEXT" },
      { "name": "registration_date", "type": "DATETIME" },
      { "name": "account_status", "type": "TEXT" },
      { "name": "customer_tier", "type": "TEXT" },
      { "name": "total_spent", "type": "REAL" },
      { "name": "last_login", "type": "DATETIME" },
      { "name": "marketing_consent", "type": "BOOLEAN" },
      { "name": "date_of_birth", "type": "DATE" },
      { "name": "gender", "type": "TEXT" },
      { "name": "occupation", "type": "TEXT" },
      { "name": "created_at", "type": "DATETIME" },
      { "name": "updated_at", "type": "DATETIME" },
      { "name": "zip_code", "type": "NUMERIC" }
    ]
  },
  "customer_preferences": {
    "columns": [
      { "name": "pref_id", "type": "TEXT", "primary_key": "true"},
      { "name": "customer_id", "type": "TEXT" },
      { "name": "category", "type": "TEXT" },
      { "name": "preference_value", "type": "TEXT" },
      { "name": "created_at", "type": "DATETIME" }
    ]
  }
}

schema_str = tool.serialize_schema(schema_info)

print(schema_str)

Table: customers
Columns: customer_id (TEXT), first_name (TEXT), last_name (TEXT), email (TEXT), phone (TEXT), address (TEXT), city (TEXT), state (TEXT), country (TEXT), registration_date (DATETIME), account_status (TEXT), customer_tier (TEXT), total_spent (REAL), last_login (DATETIME), marketing_consent (BOOLEAN), date_of_birth (DATE), gender (TEXT), occupation (TEXT), created_at (DATETIME), updated_at (DATETIME), zip_code (NUMERIC)

Table: customer_preferences
Columns: pref_id (TEXT), customer_id (TEXT), category (TEXT), preference_value (TEXT), created_at (DATETIME)


In [43]:
# Use the tool
result = tool._run(
    natural_language_query="Show me last login time for top 10 customers",
    schema_info=schema_info,
    db_files=[{"path": "databases/customer_db.db"}]
)

print(result)

✅ [DEBUG] Received schema_info in tool: {'customers': {'columns': [{'name': 'customer_id', 'type': 'TEXT', 'primary_key': 'true'}, {'name': 'first_name', 'type': 'TEXT'}, {'name': 'last_name', 'type': 'TEXT'}, {'name': 'email', 'type': 'TEXT'}, {'name': 'phone', 'type': 'TEXT'}, {'name': 'address', 'type': 'TEXT'}, {'name': 'city', 'type': 'TEXT'}, {'name': 'state', 'type': 'TEXT'}, {'name': 'country', 'type': 'TEXT'}, {'name': 'registration_date', 'type': 'DATETIME'}, {'name': 'account_status', 'type': 'TEXT'}, {'name': 'customer_tier', 'type': 'TEXT'}, {'name': 'total_spent', 'type': 'REAL'}, {'name': 'last_login', 'type': 'DATETIME'}, {'name': 'marketing_consent', 'type': 'BOOLEAN'}, {'name': 'date_of_birth', 'type': 'DATE'}, {'name': 'gender', 'type': 'TEXT'}, {'name': 'occupation', 'type': 'TEXT'}, {'name': 'created_at', 'type': 'DATETIME'}, {'name': 'updated_at', 'type': 'DATETIME'}, {'name': 'zip_code', 'type': 'NUMERIC'}]}, 'customer_preferences': {'columns': [{'name': 'pref_id

##  Crew.py

In [97]:
"""
crew.py
Crew orchestration for the Agentic Data Management Platform
"""
from tools.analytics_tools import CrewText2SQLTool
from crewai.tools import BaseTool
import os
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
from crewai import Agent, Task, Crew, Process
from tools.safe_file_read_tool import SafeFileReadTool
from tools.safe_directory_read_tool import SafeDirectoryReadTool
from langchain.tools import Tool
import sqlite3
import json
import re
from dotenv import load_dotenv
load_dotenv(override=True)

# Import custom tools with error handling
try:
    from tools.database_tools import DatabaseConnectionTool, MetadataExtractionTool
    from tools.data_tools import DataProfilingTool, DataValidationTool
    from tools.analytics_tools import CrewText2SQLTool, ReportGenerationTool
except ImportError as e:
    logging.warning(f"Could not import custom tools: {e}")

from models.data_models import PlatformConfig
from utils.helpers import load_yaml_config

logger = logging.getLogger(__name__)

def extract_schema_info(db_path: str) -> dict:
    """
    Extracts table and column schema from a SQLite database.
    Returns a dict in schema_info format expected by Text2SQLTool.
    """
    schema_info = {}

    try:
        conn = sqlite3.connect(str(Path(db_path).resolve()))
        cursor = conn.cursor()

        # Get all table names
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = cursor.fetchall()

        for (table_name,) in tables:
            cursor.execute(f"PRAGMA table_info('{table_name}');")
            columns = cursor.fetchall()

            schema_info[table_name] = {
                "columns": [
                    {
                        "name": col[1],
                        "type": col[2],
                        "primary_key": bool(col[5])
                    }
                    for col in columns
                ]
            }

        conn.close()
        logger.info(f"✅ Extracted schema_info from {db_path}")
        return schema_info

    except Exception as e:
        logger.error(f"❌ Failed to extract schema from {db_path}: {e}")
        raise

class DataManagementCrew:
    """Main crew orchestrator for data management operations"""
    print("✅ [DEBUG] DataManagementCrew initialized")
    
    def __init__(self, config: Optional[PlatformConfig] = None):
        self.config = config or self._get_default_config()
        self.agents_config = self._load_agents_config()
        self.tasks_config = self._load_tasks_config()
        self.tools = self._initialize_tools()
    
    def _get_default_config(self) -> PlatformConfig:
        """Get default configuration if none provided"""
        try:
            return PlatformConfig()
        except Exception as e:
            logger.warning(f"Could not load default config: {e}")
            return None
    
    def _load_agents_config(self) -> Dict[str, Any]:
        """Load agents configuration with fallback"""
        try:
            return load_yaml_config("config/agents.yaml")
        except Exception as e:
            logger.warning(f"Could not load agents config: {e}")
            return self._get_default_agents_config()
    
    def _load_tasks_config(self) -> Dict[str, Any]:
        """Load tasks configuration with fallback"""
        try:
            return load_yaml_config("config/tasks.yaml")
        except Exception as e:
            logger.warning(f"Could not load tasks config: {e}")
            return self._get_default_tasks_config()
        
    def _initialize_tools(self) -> Dict[str, Any]:
        """Initialize all tools with proper error handling"""
        tools = {}

        # Always available tools
        try:
            tools.update({
                "file_read": SafeFileReadTool(),
                "directory_search": SafeDirectoryReadTool()
            })
            logger.debug("✅ Basic tools initialized")
        except Exception as e:
            logger.error(f"Error initializing basic tools: {e}")

        try:
            text2sql_tool = CrewText2SQLTool()
            tools.update({
                "database_connection": DatabaseConnectionTool(),
                "metadata_extraction": MetadataExtractionTool(),
                "data_profiling": DataProfilingTool(),
                "data_validation": DataValidationTool(),
                "text2sql": text2sql_tool,
                "report_generation": ReportGenerationTool()
            })
            logger.debug(f"✅ Custom tools initialized: {list(tools.keys())}")
        except Exception as e:
            logger.warning(f"Custom tools not available: {e}")
        
        print(f"🧰 Final tools loaded: {list(tools.keys())}")

        return tools
    
    def _create_agent_from_config(self, agent_name: str) -> Agent:
        """Create agent from configuration"""
        try:
            agent_config = self.agents_config[agent_name]
            
            # Get tools for this agent
            agent_tools = []
            for tool_name in agent_config.get("tools", []):
                if tool_name in self.tools:
                    agent_tools.append(self.tools[tool_name])
                else:
                    logger.warning(f"⚠️ Tool '{tool_name}' not found in self.tools and will be skipped for agent '{agent_name}'")
            
            # If no tools available, use basic tools
            if not agent_tools and self.tools:
                agent_tools = [self.tools.get("file_read"), self.tools.get("directory_search")]
                agent_tools = [tool for tool in agent_tools if tool is not None]
            
            return Agent(
                role=agent_config["role"],
                goal=agent_config["goal"],
                backstory=agent_config["backstory"],
                tools=agent_tools,
                verbose=agent_config.get("verbose", True),
                allow_delegation=agent_config.get("allow_delegation", False)
            )
        except Exception as e:
            logger.error(f"Error creating agent {agent_name}: {e}")
            # Return a basic agent as fallback
            return Agent(
                role=f"Generic {agent_name.replace('_', ' ').title()}",
                goal="Perform data management tasks",
                backstory="I am a data management specialist.",
                tools=[],  # No tools to avoid validation errors
                verbose=True,
                allow_delegation=False
            )
    
    def _create_task_from_config(self, task_name: str, agent: Agent, inputs: Dict[str, Any]) -> Task:
        """Create task from configuration"""
        try:
            task_config = self.tasks_config[task_name]
            
            # Format description with inputs - handle missing keys gracefully
            description = task_config["description"]
            try:
                description = description.format(**inputs)
            except (KeyError, ValueError) as e:
                logger.warning(f"Could not format task description for {task_name}: {e}")
                # Use description as-is if formatting fails
            
            return Task(
                description=description,
                agent=agent,
                expected_output=task_config["expected_output"],
                output_file=task_config["output_file"]
            )
        except Exception as e:
            logger.error(f"Error creating task {task_name}: {e}")
            # Return a basic task as fallback
            return Task(
                description=f"Perform {task_name.replace('_', ' ')} task with given inputs: {inputs}",
                agent=agent,
                expected_output="Completed task results and analysis"
            )
    
    def run_data_discovery(self, inputs: Dict[str, Any] = None) -> Any:
        logger.info("Initializing Multi-DB Data Discovery...")
        if inputs is None:
            inputs = {}
            
        results = []
        for db_url in self.config.database_urls:
            logger.info(f"Running discovery for DB: {db_url}")
            db_inputs = {**inputs}
            db_inputs["config"] = {**inputs.get("config", {}), "database_url": db_url}
        
            try:
                # Create agents
                research_agent = self._create_agent_from_config("data_research_agent")
                foundation_agent = self._create_agent_from_config("data_foundation_setup_agent")
                process_agent = self._create_agent_from_config("process_understanding_agent")
                
                # Create tasks
                research_task = self._create_task_from_config("data_research_task", research_agent, inputs)
                foundation_task = self._create_task_from_config("foundation_setup_task", foundation_agent, inputs)
                process_task = self._create_task_from_config("process_understanding_task", process_agent, inputs)
                
                # Create and run crew
                crew = Crew(
                    agents=[research_agent, foundation_agent, process_agent],
                    tasks=[research_task, foundation_task, process_task],
                    process=Process.sequential,
                    verbose=True
                )
                
                db_result = crew.kickoff()
                results.append({"db": db_url, "result": db_result})
            except Exception as e:
                logger.error(f"Error in data discovery for {db_url}: {e}")
                results.append({"db": db_url, "error": str(e)})
        return results
    
    def run_data_cataloging(self, inputs: Dict[str, Any] = None) -> Any:
        logger.info("Initializing Multi-DB Data Cataloging...")
        if inputs is None:
            inputs = {}
            
        results = []
        for db_url in self.config.database_urls:
            logger.info(f"Running cataloging for DB: {db_url}")
            db_inputs = {**inputs}
            db_inputs["config"] = {**inputs.get("config", {}), "database_url": db_url}
        
            try:
                # Create agents
                lineage_agent = self._create_agent_from_config("data_lineage_agent")
                metadata_agent = self._create_agent_from_config("metadata_validation_agent")
                integration_agent = self._create_agent_from_config("data_integration_agent")
                
                # Create tasks
                lineage_task = self._create_task_from_config("data_lineage_task", lineage_agent, inputs)
                metadata_task = self._create_task_from_config("metadata_validation_task", metadata_agent, inputs)
                integration_task = self._create_task_from_config("data_integration_task", integration_agent, inputs)
                
                # Create and run crew
                crew = Crew(
                    agents=[lineage_agent, metadata_agent, integration_agent],
                    tasks=[lineage_task, metadata_task, integration_task],
                    process=Process.sequential,
                    verbose=True
                )
                
                db_result = crew.kickoff()
                results.append({"db": db_url, "result": db_result})
            except Exception as e:
                logger.error(f"Error in data cataloging for {db_url}: {e}")
                results.append({"db": db_url, "error": str(e)})
        return results
            
    def run_data_processing(self, inputs: Dict[str, Any] = None) -> Any:
        logger.info("Initializing Multi-DB Data Processing...")
        if inputs is None:
            inputs = {}
            
        results = []
        for db_url in self.config.database_urls:
            logger.info(f"Running processing for DB: {db_url}")
            db_inputs = {**inputs}
            db_inputs["config"] = {**inputs.get("config", {}), "database_url": db_url}
        
            try:
                # Create agents
                quality_agent = self._create_agent_from_config("data_quality_agent")
                observability_agent = self._create_agent_from_config("data_observability_agent")
                performance_agent = self._create_agent_from_config("performance_tuning_agent")
                
                # Create tasks
                quality_task = self._create_task_from_config("data_quality_task", quality_agent, inputs)
                observability_task = self._create_task_from_config("data_observability_task", observability_agent, inputs)
                performance_task = self._create_task_from_config("performance_tuning_task", performance_agent, inputs)
                
                # Create and run crew
                crew = Crew(
                    agents=[quality_agent, observability_agent, performance_agent],
                    tasks=[quality_task, observability_task, performance_task],
                    process=Process.sequential,
                    verbose=True
                )
                
                db_result = crew.kickoff()
                results.append({"db": db_url, "result": db_result})
            except Exception as e:
                logger.error(f"Error in data processing for {db_url}: {e}")
                results.append({"db": db_url, "error": str(e)})
        return results
    
    def run_insights_generation(self, inputs: Dict[str, Any] = None) -> Any:
        logger.info("Initializing Multi-DB Insight Generation...")
        if inputs is None:
            inputs = {}
            
        results = []
        for db_url in self.config.database_urls:
            logger.info(f"Running processing for DB: {db_url}")
            db_inputs = {**inputs}
            db_inputs["config"] = {**inputs.get("config", {}), "database_url": db_url}
        
            try:
                # Create agents
                text2sql_agent = self._create_agent_from_config("text2sql_agent")
                caching_agent = self._create_agent_from_config("caching_agent")
                reports_agent = self._create_agent_from_config("reports_generation_agent")
                
                # Create tasks
                text2sql_task = self._create_task_from_config("text2sql_task", text2sql_agent, inputs)
                caching_task = self._create_task_from_config("caching_task", caching_agent, inputs)
                reports_task = self._create_task_from_config("reports_generation_task", reports_agent, inputs)
                
                # Create and run crew
                crew = Crew(
                    agents=[text2sql_agent, caching_agent, reports_agent],
                    tasks=[text2sql_task, caching_task, reports_task],
                    process=Process.sequential,
                    verbose=True
                )
                
                db_result = crew.kickoff()
                results.append({"db": db_url, "result": db_result})
            except Exception as e:
                logger.error(f"Error in data processing for {db_url}: {e}")
                results.append({"db": db_url, "error": str(e)})
        return results
        
    def run(self, phase: str = "discovery", inputs: Dict[str, Any] = None) -> Any:
        """
        Main run method that can execute different phases
        This is the method that main.py will call
        """
        logger.info(f"Starting Data Management Crew - Phase: {phase}")
        
        if inputs is None:
            inputs = {}
        
        phase_methods = {
            "discovery": self.run_data_discovery,
            "cataloging": self.run_data_cataloging,
            "processing": self.run_data_processing,
            "insights": self.run_insights_generation
        }
        
        method = phase_methods.get(phase.lower(), self.run_data_discovery)
        return method(inputs)
    
    def process_natural_language_query(self, query: str, db_infos: List[Dict[str, str]], schema_info: dict) -> Any:
        """Use text2sql agent to convert a natural query into SQL, execute and return results"""
        try:
            if not schema_info:
                try:
                    logger.info("📦 Auto-extracting schema_info from DB files...")
                    schema_info = extract_schema_info(db_infos[0]["path"])  # only first DB used here
                except Exception as e:
                    logger.error(f"❌ Failed to auto-extract schema_info: {e}")
                    return {"error": f"Failed to extract schema_info: {e}", "query": query}

            # Step 1: Create text2sql agent
            text2sql_agent = self._create_agent_from_config("text2sql_agent")

            # Step 2: Create task with enforced tool usage and proper input format
            query_task = Task(
                description=f"""
You are a SQL expert agent. Your job is to convert a user's natural language question into a valid SQL query using the provided tool.

## Question:
{query}

## Instructions:
- Use only the schema and db files already provided via inputs.
- You MUST invoke the `Text to SQL Tool` to answer.
- Do NOT guess table names — use only tables present in the provided schema_info.
- If no valid SQL can be generated, say so clearly.

Return only in this JSON format:

```json
{{
  "generated_sql": "...",
  "explanation": "..."
}}```
                """,
                agent=text2sql_agent,
                expected_output="SQL query and formatted results with explanation",
                inputs={
                "natural_language_query": query,
                "schema_info": schema_info,
                "db_files": db_infos
                },
                tools=[self.tools["text2sql"]],
                tool_choice="auto"
            )

            # Step 3: Run the agent task
            crew = Crew(
                agents=[text2sql_agent],
                tasks=[query_task],
                process=Process.sequential,
                verbose=False
            )

            logger.info("🚀 Running Text2SQL crew...")
            agent_output = crew.kickoff()
            print("✅ Text2SQL agent output:", agent_output)

            # Step 4: Parse output (handle both str and object)
            output_text = getattr(agent_output, "result", str(agent_output)).strip()
            logger.debug(f"[Text2SQL Raw Output]\n{output_text}")

            # Step 5: Try parsing output
            try:
                # Attempt direct parse
                parsed_result = json.loads(output_text)
            except Exception:
                match = re.search(r"```json\s*(\{.*?\})\s*```", output_text, re.DOTALL | re.IGNORECASE)
                if match:
                    try:
                        parsed_result = json.loads(match.group(1).strip())
                    except json.JSONDecodeError as e:
                        logger.error("❌ JSON decoding failed inside matched block.")
                        return {
                            "error": f"Matched block but JSON decoding failed: {e}",
                            "llm_output": output_text
                        }
                else:
                    logger.error("❌ No JSON block matched.")
                    return {
                        "error": "Failed to parse JSON from LLM output.",
                        "llm_output": output_text
                    }

            # Step 6: Extract SQL and explanation
            generated_sql = parsed_result.get("generated_sql")
            explanation = parsed_result.get("explanation")  # ✅ now defined
            
            invalid_tables = [
                t for t in re.findall(r'from\s+(\w+)', generated_sql.lower())
                if t not in schema_info
            ]
            if generated_sql and invalid_tables:
                return {
                    "error": f"LLM used unknown tables: {invalid_tables}",
                    "generated_sql": generated_sql,
                    "schema_tables": list(schema_info.keys())
                }

            # Step 7: Execute SQL across attached DBs
            text2sql_tool = CrewText2SQLTool()
            result_data = text2sql_tool.execute_sql_across_dbs(generated_sql, db_infos)

            # Step 8: Return full result
            return {
                "original_query": query,
                "generated_sql": generated_sql,
                "explanation": explanation,
                "query_result": result_data
            }

        except Exception as e:
            logger.exception("Error processing natural language query")
            return {"error": str(e), "query": query}          
    
    def generate_report(self, report_config: Dict[str, Any]) -> Any:
        """Generate a specific report"""
        try:
            reports_agent = self._create_agent_from_config("reports_generation_agent")
            
            report_task = Task(
                description=f"""
                Generate a comprehensive report based on the following configuration:
                Report Type: {report_config.get('type', 'general')}
                Parameters: {report_config}
                
                Steps:
                1. Gather required data based on report type and parameters
                2. Analyze and process the data for insights
                3. Create appropriate visualizations and summaries
                4. Format the report professionally with clear sections
                5. Include actionable recommendations and insights
                
                Ensure the report is comprehensive, accurate, and business-ready.
                """,
                agent=reports_agent,
                expected_output=f"Complete {report_config.get('type', 'general')} report with analysis, visualizations, and recommendations"
            )
            
            crew = Crew(
                agents=[reports_agent],
                tasks=[report_task],
                process=Process.sequential,
                verbose=True
            )
            
            result = crew.kickoff()
            return result
        except Exception as e:
            logger.error(f"Error generating report: {e}")
            return {"error": str(e), "report_config": report_config}

✅ [DEBUG] DataManagementCrew initialized


In [98]:
crew = DataManagementCrew()
crew.process_natural_language_query("List all customers from Germany", db_infos=[{"path": "databases/customer_db.db"}], schema_info=schema_info)

🧰 Final tools loaded: ['file_read', 'directory_search', 'database_connection', 'metadata_extraction', 'data_profiling', 'data_validation', 'text2sql', 'report_generation']
[1m[95m# Agent:[00m [1m[92mNatural Language Query Specialist[00m
[95m## Task:[00m [92m
You are a SQL expert agent. Your job is to convert a user's natural language question into a valid SQL query using the provided tool.

## Question:
List all customers from Germany

## Instructions:
- Use only the schema and db files already provided via inputs.
- You MUST invoke the `Text to SQL Tool` to answer.
- Do NOT guess table names — use only tables present in the provided schema_info.
- If no valid SQL can be generated, say so clearly.

Return only in this JSON format:

```json
{
  "generated_sql": "...",
  "explanation": "..."
}```
                [00m


[1m[95m# Agent:[00m [1m[92mNatural Language Query Specialist[00m
[95m## Final Answer:[00m [92m
```json
{
  "generated_sql": "SELECT * FROM customers WHE

{'original_query': 'List all customers from Germany',
 'generated_sql': "SELECT * FROM customers WHERE country = 'Germany';",
 'explanation': "This SQL query selects all columns from the 'customers' table where the 'country' column matches 'Germany'.",
 'query_result': [{'customer_id': 'CUST_C32BB312',
   'first_name': 'Logan',
   'last_name': 'Lee',
   'email': 'rhonda53@example.com',
   'phone': '775.313.7651',
   'address': '793 Hartman Forks Apt. 836',
   'city': 'New Jorge',
   'state': 'Maine',
   'country': 'Germany',
   'registration_date': '2025-01-04 20:29:30',
   'account_status': 'inactive',
   'customer_tier': 'gold',
   'total_spent': 3464.68,
   'last_login': '2025-03-31 11:46:54',
   'marketing_consent': 1,
   'date_of_birth': '1952-06-26',
   'gender': 'Prefer not to say',
   'occupation': 'Ceramics designer',
   'created_at': '2025-06-16 18:36:32.301951',
   'updated_at': '2025-06-16 18:36:32.301951',
   'zip_code': 45271},
  {'customer_id': 'CUST_EBB16214',
   'first

In [63]:
# import sqlite3

# conn = sqlite3.connect("databases/customer_db.db")
# cursor = conn.cursor()
# cursor.execute("SELECT DISTINCT country FROM customers")
# print(cursor.fetchall())
# conn.close()

In [73]:
def extract_schema_info(db_path: str, db_alias: str = None) -> dict:
    logger = logging.getLogger(__name__)
    abs_path = Path(db_path).resolve()
    if not abs_path.exists():
        raise FileNotFoundError(f"Database file does not exist: {abs_path}")
    
    conn = sqlite3.connect(abs_path)
    cursor = conn.cursor()

    schema = {}
    db_label = db_alias or Path(db_path).stem  # e.g., 'customers_db'

    try:
        tables = cursor.execute("SELECT name FROM sqlite_master WHERE type='table';").fetchall()
        for (table,) in tables:
            try:
                if not table.strip():
                    continue  # skip invalid table names

                cols = cursor.execute(f"PRAGMA table_info({table});").fetchall()
                col_info = [{"name": col[1], "type": col[2], "primary_key": str(bool(col[5]))} for col in cols]

                fks = cursor.execute(f"PRAGMA foreign_key_list({table});").fetchall()
                fk_info = [{"from": fk[3], "to_table": fk[2], "to_column": fk[4]} for fk in fks]

                schema[f"{db_label}.{table}"] = {
                    "columns": col_info,
                    "foreign_keys": fk_info
                }
            
            except Exception as table_error:
                logger.error(f"Error processing table `{table}` in `{db_path}`: {table_error}")

    except Exception as db_error:
        logger.error(f"Metadata extraction failed: {db_error}")

    conn.close()
    return schema

In [74]:
schema_info = extract_schema_info("databases/customer_db.db")

In [75]:
schema_info

{'customer_db.customers': {'columns': [{'name': 'customer_id',
    'type': 'TEXT',
    'primary_key': 'True'},
   {'name': 'first_name', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'last_name', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'email', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'phone', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'address', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'city', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'state', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'country', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'registration_date', 'type': 'DATETIME', 'primary_key': 'False'},
   {'name': 'account_status', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'customer_tier', 'type': 'TEXT', 'primary_key': 'False'},
   {'name': 'total_spent', 'type': 'REAL', 'primary_key': 'False'},
   {'name': 'last_login', 'type': 'DATETIME', 'primary_key': 'False'},
   {'name': 'marketing_

In [88]:
crew = DataManagementCrew()
crew.process_natural_language_query("List all customers from Germany", db_infos=[{"path": "databases/customer_db.db"}], schema_info=schema_info)

🧰 Final tools loaded: ['file_read', 'directory_search', 'database_connection', 'metadata_extraction', 'data_profiling', 'data_validation', 'text2sql', 'report_generation']
[1m[95m# Agent:[00m [1m[92mNatural Language Query Specialist[00m
[95m## Task:[00m [92m
You are a SQL expert agent. Your job is to convert a user's natural language question into a valid SQL query using the provided tool.

## Question:
List all customers from Germany

## Instructions:
- Use only the schema and db files already provided via inputs.
- You MUST invoke the `Text to SQL Tool` to answer.
- Do NOT guess table names — use only tables present in the provided schema_info.
- If no valid SQL can be generated, say so clearly.

Return only in this JSON format:

```json
{
  "generated_sql": "...",
  "explanation": "..."
}```
                [00m


[1m[95m# Agent:[00m [1m[92mNatural Language Query Specialist[00m
[95m## Final Answer:[00m [92m
```json
{
  "generated_sql": "SELECT * FROM customers WHE

{'original_query': 'List all customers from Germany',
 'generated_sql': "SELECT * FROM customers WHERE country = 'Germany';",
 'explanation': "This SQL query selects all records from the 'customers' table where the 'country' field is equal to 'Germany', effectively listing all customers from that country.",
 'query_result': [{'customer_id': 'CUST_C32BB312',
   'first_name': 'Logan',
   'last_name': 'Lee',
   'email': 'rhonda53@example.com',
   'phone': '775.313.7651',
   'address': '793 Hartman Forks Apt. 836',
   'city': 'New Jorge',
   'state': 'Maine',
   'country': 'Germany',
   'registration_date': '2025-01-04 20:29:30',
   'account_status': 'inactive',
   'customer_tier': 'gold',
   'total_spent': 3464.68,
   'last_login': '2025-03-31 11:46:54',
   'marketing_consent': 1,
   'date_of_birth': '1952-06-26',
   'gender': 'Prefer not to say',
   'occupation': 'Ceramics designer',
   'created_at': '2025-06-16 18:36:32.301951',
   'updated_at': '2025-06-16 18:36:32.301951',
   'zip_cod

In [83]:
def extract_schema_info(sqlite_path: str) -> Dict[str, dict]:
    schema = {}
    conn = sqlite3.connect(sqlite_path)
    cursor = conn.cursor()

    try:
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
        tables = cursor.fetchall()

        for (table_name,) in tables:
            cursor.execute(f"PRAGMA table_info('{table_name}')")
            columns = cursor.fetchall()
            schema[table_name] = {
                "columns": [
                    {
                        "name": col[1],
                        "type": col[2],
                        "primary_key": bool(col[5])
                    }
                    for col in columns
                ]
            }
    finally:
        conn.close()

    return schema

In [86]:
schema_info = extract_schema_info('databases/customer_db.db')

In [87]:
schema_info

{'customers': {'columns': [{'name': 'customer_id',
    'type': 'TEXT',
    'primary_key': True},
   {'name': 'first_name', 'type': 'TEXT', 'primary_key': False},
   {'name': 'last_name', 'type': 'TEXT', 'primary_key': False},
   {'name': 'email', 'type': 'TEXT', 'primary_key': False},
   {'name': 'phone', 'type': 'TEXT', 'primary_key': False},
   {'name': 'address', 'type': 'TEXT', 'primary_key': False},
   {'name': 'city', 'type': 'TEXT', 'primary_key': False},
   {'name': 'state', 'type': 'TEXT', 'primary_key': False},
   {'name': 'country', 'type': 'TEXT', 'primary_key': False},
   {'name': 'registration_date', 'type': 'DATETIME', 'primary_key': False},
   {'name': 'account_status', 'type': 'TEXT', 'primary_key': False},
   {'name': 'customer_tier', 'type': 'TEXT', 'primary_key': False},
   {'name': 'total_spent', 'type': 'REAL', 'primary_key': False},
   {'name': 'last_login', 'type': 'DATETIME', 'primary_key': False},
   {'name': 'marketing_consent', 'type': 'BOOLEAN', 'primary_ke

In [89]:
crew = DataManagementCrew()
crew.process_natural_language_query("List all customers from Germany", db_infos=[{"path": "databases/customer_db.db"}], schema_info=schema_info)

🧰 Final tools loaded: ['file_read', 'directory_search', 'database_connection', 'metadata_extraction', 'data_profiling', 'data_validation', 'text2sql', 'report_generation']
[1m[95m# Agent:[00m [1m[92mNatural Language Query Specialist[00m
[95m## Task:[00m [92m
You are a SQL expert agent. Your job is to convert a user's natural language question into a valid SQL query using the provided tool.

## Question:
List all customers from Germany

## Instructions:
- Use only the schema and db files already provided via inputs.
- You MUST invoke the `Text to SQL Tool` to answer.
- Do NOT guess table names — use only tables present in the provided schema_info.
- If no valid SQL can be generated, say so clearly.

Return only in this JSON format:

```json
{
  "generated_sql": "...",
  "explanation": "..."
}```
                [00m


[1m[95m# Agent:[00m [1m[92mNatural Language Query Specialist[00m
[95m## Final Answer:[00m [92m
{
  "generated_sql": "SELECT * FROM customers WHERE count

{'original_query': 'List all customers from Germany',
 'generated_sql': "SELECT * FROM customers WHERE country = 'Germany';",
 'explanation': "This SQL query selects all columns from the 'customers' table where the country is 'Germany', effectively listing all customers located in Germany.",
 'query_result': [{'customer_id': 'CUST_C32BB312',
   'first_name': 'Logan',
   'last_name': 'Lee',
   'email': 'rhonda53@example.com',
   'phone': '775.313.7651',
   'address': '793 Hartman Forks Apt. 836',
   'city': 'New Jorge',
   'state': 'Maine',
   'country': 'Germany',
   'registration_date': '2025-01-04 20:29:30',
   'account_status': 'inactive',
   'customer_tier': 'gold',
   'total_spent': 3464.68,
   'last_login': '2025-03-31 11:46:54',
   'marketing_consent': 1,
   'date_of_birth': '1952-06-26',
   'gender': 'Prefer not to say',
   'occupation': 'Ceramics designer',
   'created_at': '2025-06-16 18:36:32.301951',
   'updated_at': '2025-06-16 18:36:32.301951',
   'zip_code': 45271},
  {'

In [93]:
from utils.helpers import extract_schema_info

ImportError: cannot import name 'extract_schema_info' from 'utils.helpers' (C:\Users\ManojSharma\LLM Projects\AI projects\Advanced Agentic AI Projects\Agentic AI Platform for Data Management\agentic_ai_platform_for_data_management-nl2sql\utils\helpers.py)

In [None]:
def run(self, phase: str = "discovery", inputs: Dict[str, Any] = None) -> Any:
    """
    Main run method that can execute different phases
    This is the method that main.py will call
    """
    logger.info(f"Starting Data Management Crew - Phase: {phase}")
    
    if inputs is None:
        inputs = {}
    
    phase_methods = {
        "discovery": self.run_data_discovery,
        "cataloging": self.run_data_cataloging,
        "processing": self.run_data_processing,
        "insights": self.run_insights_generation
    }
    
    method = phase_methods.get(phase.lower(), self.run_data_discovery)
    return method(inputs)

def process_natural_language_query(self, query: str, db_infos: List[Dict[str, str]], schema_info: dict) -> Any:
    """Use text2sql agent to convert a natural query into SQL, execute and return results"""
    try:
        if not schema_info:
            try:
                logger.info("📦 Auto-extracting schema_info from DB files...")
                schema_info = extract_schema_info(db_infos[0]["path"])  # only first DB used here
            except Exception as e:
                logger.error(f"❌ Failed to auto-extract schema_info: {e}")
                return {"error": f"Failed to extract schema_info: {e}", "query": query}

        # Step 1: Create text2sql agent
        text2sql_agent = self._create_agent_from_config("text2sql_agent")

        # Step 2: Create task with enforced tool usage and proper input format
        query_task = Task(
            description=f"""
You are a SQL expert agent. Your job is to convert a user's natural language question into a valid SQL query using the provided tool.

## Question:
{query}

## Instructions:
- Use only the schema and db files already provided via inputs.
- You MUST invoke the `Text to SQL Tool` to answer.
- Do NOT guess table names — use only tables present in the provided schema_info.
- If no valid SQL can be generated, say so clearly.

Return only in this JSON format:

```json
{{
"generated_sql": "...",
"explanation": "..."
}}```
            """,
            agent=text2sql_agent,
            expected_output="SQL query and formatted results with explanation",
            inputs={
            "natural_language_query": query,
            "schema_info": schema_info,
            "db_files": db_infos
            },
            tools=[self.tools["text2sql"]],
            tool_choice="auto"
        )

        # Step 3: Run the agent task
        crew = Crew(
            agents=[text2sql_agent],
            tasks=[query_task],
            process=Process.sequential,
            verbose=False
        )

        logger.info("🚀 Running Text2SQL crew...")
        agent_output = crew.kickoff()
        print("✅ Text2SQL agent output:", agent_output)

        # Step 4: Parse output (handle both str and object)
        output_text = getattr(agent_output, "result", str(agent_output)).strip()
        logger.debug(f"[Text2SQL Raw Output]\n{output_text}")

        # Step 5: Try parsing output
        try:
            # Attempt direct parse
            parsed_result = json.loads(output_text)
        except Exception:
            match = re.search(r"```json\s*(\{.*?\})\s*```", output_text, re.DOTALL | re.IGNORECASE)
            if match:
                try:
                    parsed_result = json.loads(match.group(1).strip())
                except json.JSONDecodeError as e:
                    logger.error("❌ JSON decoding failed inside matched block.")
                    return {
                        "error": f"Matched block but JSON decoding failed: {e}",
                        "llm_output": output_text
                    }
            else:
                logger.error("❌ No JSON block matched.")
                return {
                    "error": "Failed to parse JSON from LLM output.",
                    "llm_output": output_text
                }

        # Step 6: Extract SQL and explanation
        generated_sql = parsed_result.get("generated_sql")
        explanation = parsed_result.get("explanation")  # ✅ now defined
        
        invalid_tables = [
            t for t in re.findall(r'from\s+(\w+)', generated_sql.lower())
            if t not in schema_info
        ]
        if generated_sql and invalid_tables:
            return {
                "error": f"LLM used unknown tables: {invalid_tables}",
                "generated_sql": generated_sql,
                "schema_tables": list(schema_info.keys())
            }

        # Step 7: Execute SQL across attached DBs
        text2sql_tool = CrewText2SQLTool()
        result_data = text2sql_tool.execute_sql_across_dbs(generated_sql, db_infos)

        # Step 8: Return full result
        return {
            "original_query": query,
            "generated_sql": generated_sql,
            "explanation": explanation,
            "query_result": result_data
        }

    except Exception as e:
        logger.exception("Error processing natural language query")
        return {"error": str(e), "query": query}

In [None]:
from utils.helpers import extract_schema_info
schema_info = extract_schema_info("databases/cusomer_db.db")
schema_info

In [None]:
nl2sql_engine = process_natural_language_query(query="List all customers from Germany", 
                                               db_infos=[{"path":"databases/cusomer_db.db"}],
                                              schema_info=schema_info)

nl2sql_engine