# Text-to-Report Chatbot Development Notebook

Este notebook implementa el sistema completo de chatbot Text-to-Report usando LangGraph con un flujo de trabajo multiagente que incluye:

- **Agente SQL**: Conversi√≥n de lenguaje natural a SQL
- **Agente de Visualizaci√≥n**: Generaci√≥n inteligente de gr√°ficos
- **Agente QA**: Validaci√≥n de calidad y coherencia
- **Motor PDF**: Ensamblaje de reportes profesionales

## Arquitectura del Sistema

```mermaid
graph TD
    A[Usuario] -->|Consulta Natural| B[LangGraph Workflow]
    B --> C[Agente SQL]
    C --> D[Agente Visualizaci√≥n]
    D --> E[Agente QA]
    E -->|Rechazado| C
    E -->|Aprobado| F[Motor PDF]
    F --> G[Reporte Final]
```

**Objetivo**: Demostrar la funcionalidad completa del sistema desde consulta natural hasta reporte PDF.

# 1. Environment Setup and Dependencies

Instalaci√≥n e importaci√≥n de todas las librer√≠as necesarias para el sistema multiagente.

In [None]:
# Core imports
import os
import sys
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, TypedDict
import warnings
warnings.filterwarnings('ignore')

# Add src to path
sys.path.append('../src')

# Environment and configuration
from dotenv import load_dotenv
load_dotenv('../.env')

# LangGraph and LangChain
try:
    from langgraph.graph import StateGraph, END
    from langgraph.checkpoint.memory import MemorySaver
    from langchain_openai import ChatOpenAI
    from langchain.prompts import ChatPromptTemplate
    print("‚úÖ LangGraph imports successful")
except ImportError as e:
    print(f"‚ùå Error importing LangGraph: {e}")
    print("Install with: pip install langgraph langchain-openai")

# Database and SQL
try:
    import sqlalchemy
    from sqlalchemy import create_engine, text
    import psycopg2
    print("‚úÖ Database imports successful")
except ImportError as e:
    print(f"‚ùå Error importing database libraries: {e}")
    print("Install with: pip install sqlalchemy psycopg2-binary")

# Visualization
try:
    import plotly.express as px
    import plotly.graph_objects as go
    import matplotlib.pyplot as plt
    import seaborn as sns
    import pandas as pd
    import numpy as np
    print("‚úÖ Visualization imports successful")
except ImportError as e:
    print(f"‚ùå Error importing visualization libraries: {e}")
    print("Install with: pip install plotly matplotlib seaborn pandas numpy")

# PDF Generation
try:
    from weasyprint import HTML, CSS
    from jinja2 import Environment, Template
    import base64
    from io import BytesIO
    print("‚úÖ PDF generation imports successful")
except ImportError as e:
    print(f"‚ùå Error importing PDF libraries: {e}")
    print("Install with: pip install weasyprint jinja2")

# Additional utilities
import json
import re
from pathlib import Path
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("\nüéØ All core dependencies loaded successfully!")
print(f"üìÖ Notebook initialized at: {datetime.now()}")
print(f"üêç Python version: {sys.version}")
print(f"üìÅ Working directory: {os.getcwd()}")

In [None]:
# Configuration settings
class Settings:
    """Simple settings class for the notebook"""
    
    def __init__(self):
        # LLM Configuration
        self.openai_api_key = os.getenv('OPENAI_API_KEY', 'your-key-here')
        self.model_name = 'gpt-4'
        
        # Database Configuration (for demo, we'll use SQLite)
        self.database_url = "sqlite:///demo_data.db"  # Simple SQLite for demo
        
        # Paths
        self.temp_dir = Path("../temp")
        self.output_dir = Path("../generated_reports") 
        
        # Create directories
        self.temp_dir.mkdir(exist_ok=True)
        self.output_dir.mkdir(exist_ok=True)
        
        # Limits
        self.max_iterations = 3
        self.default_limit = 100
        
        print(f"‚öôÔ∏è Configuration loaded")
        print(f"üîë OpenAI Key: {'‚úÖ Set' if self.openai_api_key != 'your-key-here' else '‚ùå Not set'}")
        print(f"üóÑÔ∏è Database: {self.database_url}")
        print(f"üìÅ Temp dir: {self.temp_dir}")
        print(f"üìÑ Output dir: {self.output_dir}")

# Initialize settings
settings = Settings()

# 2. Database Connection and Schema Loading

Establecemos conexi√≥n con la base de datos y creamos datos de muestra para demostraci√≥n.

In [None]:
# Create demo database and sample data
def create_demo_database():
    """Creates a demo database with sample business data"""
    
    engine = create_engine(settings.database_url)
    
    # Create tables
    with engine.connect() as conn:
        # Drop existing tables
        conn.execute(text("DROP TABLE IF EXISTS ventas"))
        conn.execute(text("DROP TABLE IF EXISTS productos"))
        conn.execute(text("DROP TABLE IF EXISTS regiones"))
        conn.execute(text("DROP TABLE IF EXISTS clientes"))
        
        # Create schema
        create_tables_sql = """
        CREATE TABLE regiones (
            id INTEGER PRIMARY KEY,
            nombre VARCHAR(50) NOT NULL,
            pais VARCHAR(50) NOT NULL
        );
        
        CREATE TABLE productos (
            id INTEGER PRIMARY KEY,
            nombre VARCHAR(100) NOT NULL,
            categoria VARCHAR(50) NOT NULL,
            precio DECIMAL(10,2) NOT NULL
        );
        
        CREATE TABLE clientes (
            id INTEGER PRIMARY KEY,
            nombre VARCHAR(100) NOT NULL,
            email VARCHAR(100),
            region_id INTEGER,
            fecha_registro DATE,
            FOREIGN KEY (region_id) REFERENCES regiones(id)
        );
        
        CREATE TABLE ventas (
            id INTEGER PRIMARY KEY,
            cliente_id INTEGER NOT NULL,
            producto_id INTEGER NOT NULL,
            cantidad INTEGER NOT NULL,
            monto DECIMAL(10,2) NOT NULL,
            fecha DATE NOT NULL,
            vendedor VARCHAR(50),
            FOREIGN KEY (cliente_id) REFERENCES clientes(id),
            FOREIGN KEY (producto_id) REFERENCES productos(id)
        );
        """
        
        for statement in create_tables_sql.split(';'):
            if statement.strip():
                conn.execute(text(statement))
        
        # Insert sample data
        
        # Regiones
        regiones_data = [
            (1, 'Norte', 'Colombia'),
            (2, 'Centro', 'Colombia'),
            (3, 'Sur', 'Colombia'),
            (4, 'Costa', 'Colombia')
        ]
        
        for region in regiones_data:
            conn.execute(text(
                "INSERT INTO regiones (id, nombre, pais) VALUES (?, ?, ?)"
            ), region)
        
        # Productos
        productos_data = [
            (1, 'Laptop Dell XPS', 'Electr√≥nicos', 2500000),
            (2, 'iPhone 15', 'Electr√≥nicos', 3200000),
            (3, 'Escritorio Ergon√≥mico', 'Muebles', 800000),
            (4, 'Silla Ejecutiva', 'Muebles', 450000),
            (5, 'Monitor 27"', 'Electr√≥nicos', 900000),
            (6, 'Teclado Mec√°nico', 'Electr√≥nicos', 320000),
            (7, 'Mesa de Reuniones', 'Muebles', 1200000),
            (8, 'Proyector 4K', 'Electr√≥nicos', 1800000)
        ]
        
        for producto in productos_data:
            conn.execute(text(
                "INSERT INTO productos (id, nombre, categoria, precio) VALUES (?, ?, ?, ?)"
            ), producto)
        
        # Clientes
        import random
        from datetime import date, timedelta
        
        nombres_clientes = [
            'Empresa ABC', 'Corporaci√≥n XYZ', 'Startup Tech', 'Consultora Pro',
            'Industrias Futuro', 'Comercial √âxito', 'Servicios Prime', 'Grupo Innovar',
            'Tecnolog√≠a Avanzada', 'Soluciones Integrales', 'Desarrollo Agil', 'Sistemas Modernos'
        ]
        
        for i, nombre in enumerate(nombres_clientes, 1):
            region_id = random.choice([1, 2, 3, 4])
            fecha_registro = date.today() - timedelta(days=random.randint(30, 365))
            email = f"{nombre.lower().replace(' ', '')}@email.com"
            
            conn.execute(text(
                "INSERT INTO clientes (id, nombre, email, region_id, fecha_registro) VALUES (?, ?, ?, ?, ?)"
            ), (i, nombre, email, region_id, fecha_registro.isoformat()))
        
        # Ventas (√∫ltimos 6 meses)
        venta_id = 1
        start_date = date.today() - timedelta(days=180)
        vendedores = ['Ana Garc√≠a', 'Carlos L√≥pez', 'Mar√≠a Rodriguez', 'Juan P√©rez', 'Laura Mart√≠n']
        
        for days_offset in range(180):
            current_date = start_date + timedelta(days=days_offset)
            # 1-5 ventas por d√≠a
            num_ventas = random.randint(1, 5)
            
            for _ in range(num_ventas):
                cliente_id = random.randint(1, len(nombres_clientes))
                producto_id = random.randint(1, 8)
                cantidad = random.randint(1, 10)
                
                # Get product price
                precio_result = conn.execute(text(
                    "SELECT precio FROM productos WHERE id = ?"
                ), (producto_id,)).fetchone()
                precio = precio_result[0]
                
                monto = precio * cantidad
                vendedor = random.choice(vendedores)
                
                conn.execute(text("""
                    INSERT INTO ventas (id, cliente_id, producto_id, cantidad, monto, fecha, vendedor) 
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                """), (venta_id, cliente_id, producto_id, cantidad, monto, current_date.isoformat(), vendedor))
                
                venta_id += 1
        
        conn.commit()
    
    print("‚úÖ Demo database created successfully!")
    
    # Show sample data
    with engine.connect() as conn:
        # Count records
        counts = {}
        for table in ['regiones', 'productos', 'clientes', 'ventas']:
            result = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).fetchone()
            counts[table] = result[0]
        
        print(f"\nüìä Sample data created:")
        for table, count in counts.items():
            print(f"  {table}: {count:,} records")
    
    return engine

# Create the demo database
engine = create_demo_database()

In [None]:
# Extract database schema and metadata
def get_database_schema():
    """Extract DDL and metadata for LLM context"""
    
    # Get table info for SQLite
    schema_info = []
    
    with engine.connect() as conn:
        # Get all tables
        tables_result = conn.execute(text("""
            SELECT name FROM sqlite_master 
            WHERE type='table' AND name NOT LIKE 'sqlite_%'
            ORDER BY name
        """))
        tables = [row[0] for row in tables_result]
        
        schema_parts = []
        
        for table_name in tables:
            # Get table schema
            schema_result = conn.execute(text(f"PRAGMA table_info({table_name})"))
            columns = schema_result.fetchall()
            
            # Build CREATE TABLE statement
            create_table = f"CREATE TABLE {table_name} ("
            column_defs = []
            
            for col in columns:
                cid, name, type_, notnull, dflt_value, pk = col
                col_def = f"  {name} {type_}"
                if notnull:
                    col_def += " NOT NULL"
                if pk:
                    col_def += " PRIMARY KEY"
                if dflt_value:
                    col_def += f" DEFAULT {dflt_value}"
                column_defs.append(col_def)
            
            create_table += "\n" + ",\n".join(column_defs) + "\n);"
            schema_parts.append(f"-- Table: {table_name}")
            schema_parts.append(create_table)
            
            # Get sample data
            sample_result = conn.execute(text(f"SELECT * FROM {table_name} LIMIT 3"))
            sample_data = sample_result.fetchall()
            if sample_data:
                column_names = [col[0] for col in sample_result.description]
                schema_parts.append(f"-- Sample data for {table_name}:")
                schema_parts.append(f"-- Columns: {', '.join(column_names)}")
                for i, row in enumerate(sample_data):
                    schema_parts.append(f"-- Row {i+1}: {dict(zip(column_names, row))}")
            
            schema_parts.append("")  # Empty line
    
    full_schema = "\n".join(schema_parts)
    
    print("üìã Database schema extracted:")
    print(f"  Tables: {len(tables)}")
    print(f"  Schema length: {len(full_schema):,} characters")
    
    return full_schema, tables

# Extract schema
database_schema, table_names = get_database_schema()

# Show first part of schema
print("\nüìù Schema preview (first 1000 chars):")
print("=" * 50)
print(database_schema[:1000] + "..." if len(database_schema) > 1000 else database_schema)

# 3. LangGraph State Definition

Definimos la estructura de estado que gestiona todo el flujo de procesamiento multiagente.

In [None]:
# Define the state for our LangGraph workflow
class ReportState(TypedDict):
    """
    State structure for the Text-to-Report workflow
    Manages all data flowing between agents
    """
    # Input
    user_query: str
    user_profile: Optional[Dict[str, Any]]
    
    # SQL Processing
    sql_query: Optional[str]
    sql_explanation: Optional[str]
    data_results: Optional[List[Dict[str, Any]]]
    
    # Visualization
    chart_image: Optional[bytes]
    chart_metadata: Optional[Dict[str, Any]]
    
    # QA Validation
    qa_feedback: Optional[str]
    qa_approved: bool
    qa_score: Optional[float]
    
    # Iteration Control
    iteration_count: int
    max_iterations: int
    
    # Final Output
    final_pdf: Optional[bytes]
    report_id: Optional[str]
    
    # Metadata and Errors
    timestamp: Optional[str]
    errors: Optional[List[str]]
    warnings: Optional[List[str]]

def create_initial_state(user_query: str, user_profile: Optional[Dict] = None) -> ReportState:
    """Create initial state for a new report generation"""
    return ReportState(
        # Input
        user_query=user_query,
        user_profile=user_profile or {"name": "Usuario Demo"},
        
        # SQL Processing
        sql_query=None,
        sql_explanation=None,
        data_results=None,
        
        # Visualization
        chart_image=None,
        chart_metadata=None,
        
        # QA Validation
        qa_feedback=None,
        qa_approved=False,
        qa_score=None,
        
        # Iteration Control
        iteration_count=0,
        max_iterations=settings.max_iterations,
        
        # Final Output
        final_pdf=None,
        report_id=None,
        
        # Metadata
        timestamp=datetime.now().isoformat(),
        errors=[],
        warnings=[]
    )

# Test state creation
test_query = "ventas totales del √∫ltimo mes por regi√≥n"
test_state = create_initial_state(test_query)

print("‚úÖ State definition created")
print(f"üß™ Test state for query: '{test_query}'")
print(f"üìä State keys: {list(test_state.keys())}")
print(f"üî¢ Max iterations: {test_state['max_iterations']}")

# 4. SQL Agent Implementation

Agente especializado en convertir lenguaje natural a SQL con validaci√≥n y ejecuci√≥n segura.

In [None]:
class SQLAgent:
    """Agent for converting natural language to SQL queries"""
    
    def __init__(self):
        self.llm = ChatOpenAI(
            model=settings.model_name,
            temperature=0.1,
            api_key=settings.openai_api_key
        )
        self.engine = engine
        
        # Create the prompt template
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """
Eres un experto analista SQL especializado en convertir consultas en lenguaje natural a SQL preciso.

ESQUEMA DE BASE DE DATOS:
{database_schema}

REGLAS ESTRICTAS:
1. SOLO generar consultas SELECT
2. Usar √öNICAMENTE las tablas del esquema proporcionado  
3. Incluir LIMIT cuando sea apropiado para performance
4. Usar JOINs correctos bas√°ndose en las relaciones FK
5. Manejar fechas correctamente
6. Usar alias descriptivos para columnas

FORMATO DE RESPUESTA JSON:
{{
    "sql_query": "SELECT ... FROM ... WHERE ...",
    "explanation": "Explicaci√≥n clara de la consulta",
    "tables_used": ["tabla1", "tabla2"],
    "confidence_score": 0.95
}}
            """),
            ("human", """
Consulta del usuario: {user_query}

Feedback de iteraci√≥n anterior (si aplica): {qa_feedback}

Genera una consulta SQL precisa y optimizada.
            """)
        ])
    
    def validate_sql(self, sql_query: str) -> Dict[str, Any]:
        """Basic SQL validation"""
        sql_upper = sql_query.upper().strip()
        
        errors = []
        
        # Check if it's a SELECT query
        if not sql_upper.startswith('SELECT'):
            errors.append("Solo se permiten consultas SELECT")
        
        # Check for dangerous keywords
        dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'TRUNCATE']
        for keyword in dangerous_keywords:
            if keyword in sql_upper:
                errors.append(f"Palabra clave prohibida: {keyword}")
        
        # Add LIMIT if not present
        if 'LIMIT' not in sql_upper and len(errors) == 0:
            sql_query = sql_query.rstrip(';') + f' LIMIT {settings.default_limit};'
        
        return {
            'valid': len(errors) == 0,
            'errors': errors,
            'safe_query': sql_query if len(errors) == 0 else None
        }
    
    def execute_sql(self, sql_query: str) -> Dict[str, Any]:
        """Execute SQL query safely"""
        try:
            with self.engine.connect() as conn:
                result = conn.execute(text(sql_query))
                columns = list(result.keys())
                rows = result.fetchall()
                
                # Convert to list of dictionaries
                data = [dict(zip(columns, row)) for row in rows]
                
                return {
                    'success': True,
                    'data': data,
                    'row_count': len(data),
                    'columns': columns
                }
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'data': [],
                'row_count': 0,
                'columns': []
            }
    
    async def process(self, state: ReportState) -> ReportState:
        """Process the SQL generation step"""
        try:
            print(f"üîç SQL Agent processing query: {state['user_query']}")
            
            # Prepare prompt
            messages = self.prompt.format_messages(
                database_schema=database_schema,
                user_query=state['user_query'],
                qa_feedback=state.get('qa_feedback', 'Sin feedback previo')
            )
            
            # Generate SQL with LLM
            response = await self.llm.ainvoke(messages)
            
            # Parse JSON response
            try:
                # Clean response if it has markdown formatting
                content = response.content.strip()
                if content.startswith('```json'):
                    content = content[7:]
                if content.endswith('```'):
                    content = content[:-3]
                
                llm_result = json.loads(content.strip())
                
            except json.JSONDecodeError:
                raise ValueError("No se pudo parsear la respuesta del LLM como JSON")
            
            # Validate SQL
            validation = self.validate_sql(llm_result['sql_query'])
            
            if not validation['valid']:
                state['errors'].append(f"SQL inv√°lido: {', '.join(validation['errors'])}")
                return state
            
            # Execute SQL
            execution = self.execute_sql(validation['safe_query'])
            
            if not execution['success']:
                state['errors'].append(f"Error ejecutando SQL: {execution['error']}")
                return state
            
            # Update state
            state['sql_query'] = validation['safe_query']
            state['sql_explanation'] = llm_result.get('explanation', '')
            state['data_results'] = execution['data']
            state['iteration_count'] += 1
            
            print(f"‚úÖ SQL Agent completed: {execution['row_count']} rows returned")
            
        except Exception as e:
            error_msg = f"Error in SQL Agent: {str(e)}"
            print(f"‚ùå {error_msg}")
            state['errors'].append(error_msg)
        
        return state

# Initialize SQL Agent
sql_agent = SQLAgent()
print("‚úÖ SQL Agent initialized")

# 5. Visualization Agent Implementation

Agente especializado en generar visualizaciones inteligentes bas√°ndose en los datos obtenidos.

In [None]:
class VisualizationAgent:
    """Agent for generating intelligent data visualizations"""
    
    def __init__(self):
        self.llm = ChatOpenAI(
            model=settings.model_name,
            temperature=0.1,
            api_key=settings.openai_api_key
        )
        
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """
Eres un experto en visualizaci√≥n de datos especializado en an√°lisis autom√°tico de datasets y selecci√≥n √≥ptima de gr√°ficos.

TIPOS DE GR√ÅFICOS DISPONIBLES:
1. bar - Barras verticales (comparaciones categ√≥ricas)
2. bar_horizontal - Barras horizontales (nombres largos)
3. line - L√≠neas (tendencias temporales)
4. pie - Circular (proporciones/porcentajes)
5. scatter - Dispersi√≥n (correlaciones)
6. heatmap - Mapa de calor (matrices de correlaci√≥n)
7. histogram - Histograma (distribuciones)
8. box - Caja y bigotes (distribuciones con outliers)

REGLAS DE SELECCI√ìN:
- Datos temporales: line chart
- Categor√≠as comparativas: bar chart
- Proporciones/partes del total: pie chart
- Correlaciones num√©ricas: scatter plot
- Matrices de datos: heatmap
- Distribuciones: histogram o box plot

FORMATO JSON RESPUESTA:
{{
    "chart_type": "tipo_de_grafico",
    "title": "T√≠tulo descriptivo",
    "x_axis": "columna_x",
    "y_axis": "columna_y", 
    "color_by": "columna_color_opcional",
    "insights": ["insight1", "insight2"],
    "confidence": 0.95
}}
            """),
            ("human", """
CONSULTA USUARIO: {user_query}
SQL GENERADO: {sql_query}

DATOS DISPONIBLES:
Columnas: {columns}
Filas: {row_count}
Muestra de datos: {data_sample}

Analiza los datos y recomienda la mejor visualizaci√≥n.
            """)
        ])
    
    def analyze_data_for_viz(self, data: List[Dict], columns: List[str]) -> Dict[str, Any]:
        """Analyze data structure to suggest appropriate visualizations"""
        if not data:
            return {"can_visualize": False, "reason": "No data available"}
        
        analysis = {
            "can_visualize": True,
            "row_count": len(data),
            "column_count": len(columns),
            "column_types": {}
        }
        
        # Analyze column types
        for col in columns:
            sample_values = [row[col] for row in data[:10] if row[col] is not None]
            
            if not sample_values:
                analysis["column_types"][col] = "null"
                continue
            
            # Check if numeric
            try:
                [float(str(v)) for v in sample_values]
                analysis["column_types"][col] = "numeric"
            except (ValueError, TypeError):
                # Check if date
                try:
                    from dateutil.parser import parse
                    [parse(str(v)) for v in sample_values]
                    analysis["column_types"][col] = "date"
                except:
                    analysis["column_types"][col] = "categorical"
        
        return analysis
    
    def create_visualization(self, chart_config: Dict, data: List[Dict]) -> str:
        """Create visualization using Plotly"""
        try:
            import plotly.graph_objects as go
            import plotly.express as px
            from datetime import datetime
            
            df_data = {col: [row.get(col) for row in data] for col in data[0].keys()}
            
            chart_type = chart_config["chart_type"]
            title = chart_config["title"]
            x_col = chart_config.get("x_axis")
            y_col = chart_config.get("y_axis")
            
            fig = None
            
            if chart_type == "bar":
                fig = px.bar(
                    x=[str(val) for val in df_data[x_col]], 
                    y=df_data[y_col],
                    title=title,
                    labels={x_col: x_col, y_col: y_col}
                )
            
            elif chart_type == "line":
                fig = px.line(
                    x=df_data[x_col], 
                    y=df_data[y_col],
                    title=title,
                    labels={x_col: x_col, y_col: y_col}
                )
            
            elif chart_type == "pie":
                fig = px.pie(
                    values=df_data[y_col],
                    names=[str(val) for val in df_data[x_col]],
                    title=title
                )
            
            elif chart_type == "scatter":
                fig = px.scatter(
                    x=df_data[x_col], 
                    y=df_data[y_col],
                    title=title,
                    labels={x_col: x_col, y_col: y_col}
                )
            
            else:
                # Fallback to bar chart
                fig = px.bar(
                    x=[str(val) for val in df_data[x_col]], 
                    y=df_data[y_col],
                    title=title
                )
            
            # Style the chart
            fig.update_layout(
                template="plotly_white",
                title_font_size=16,
                showlegend=True,
                height=500
            )
            
            # Save as image
            chart_path = f"/tmp/chart_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
            fig.write_image(chart_path, format="png", width=800, height=500)
            
            return chart_path
            
        except Exception as e:
            print(f"Error creating visualization: {e}")
            return None
    
    async def process(self, state: ReportState) -> ReportState:
        """Process the visualization generation step"""
        try:
            print(f"üìä Visualization Agent processing data...")
            
            if not state.get('data_results'):
                state['errors'].append("No hay datos para visualizar")
                return state
            
            # Analyze data
            data = state['data_results']
            columns = list(data[0].keys()) if data else []
            
            data_analysis = self.analyze_data_for_viz(data, columns)
            
            if not data_analysis["can_visualize"]:
                state['errors'].append(f"No se puede visualizar: {data_analysis['reason']}")
                return state
            
            # Prepare data sample for LLM
            data_sample = data[:3] if len(data) >= 3 else data
            
            # Generate visualization config with LLM
            messages = self.prompt.format_messages(
                user_query=state['user_query'],
                sql_query=state['sql_query'],
                columns=columns,
                row_count=len(data),
                data_sample=json.dumps(data_sample, indent=2, default=str)
            )
            
            response = await self.llm.ainvoke(messages)
            
            # Parse response
            try:
                content = response.content.strip()
                if content.startswith('```json'):
                    content = content[7:]
                if content.endswith('```'):
                    content = content[:-3]
                
                viz_config = json.loads(content.strip())
                
            except json.JSONDecodeError:
                raise ValueError("No se pudo parsear la configuraci√≥n de visualizaci√≥n")
            
            # Create visualization
            chart_path = self.create_visualization(viz_config, data)
            
            if not chart_path:
                state['errors'].append("Error generando el gr√°fico")
                return state
            
            # Update state
            state['visualization_config'] = viz_config
            state['visualization_path'] = chart_path
            state['insights'] = viz_config.get('insights', [])
            
            print(f"‚úÖ Visualization Agent completed: {viz_config['chart_type']} chart created")
            
        except Exception as e:
            error_msg = f"Error in Visualization Agent: {str(e)}"
            print(f"‚ùå {error_msg}")
            state['errors'].append(error_msg)
        
        return state

# Initialize Visualization Agent  
viz_agent = VisualizationAgent()
print("‚úÖ Visualization Agent initialized")

# 6. QA Agent Implementation

Agente de validaci√≥n de calidad que revisa la coherencia entre consulta, datos y visualizaci√≥n.

In [None]:
class QAAgent:
    """Quality Assurance Agent for validating query results"""
    
    def __init__(self):
        self.llm = ChatOpenAI(
            model=settings.model_name,
            temperature=0.1,
            api_key=settings.openai_api_key
        )
        
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """
Eres un analista de calidad especializado en validar la coherencia entre consultas de usuario, resultados SQL y visualizaciones.

TU MISI√ìN:
1. Verificar que la consulta SQL responde correctamente a la pregunta del usuario
2. Validar que los datos obtenidos son relevantes y completos
3. Confirmar que la visualizaci√≥n elegida es apropiada para los datos
4. Identificar posibles problemas o inconsistencias
5. Sugerir mejoras si es necesario

CRITERIOS DE EVALUACI√ìN:
- Relevancia: ¬øLos datos responden la pregunta?
- Completitud: ¬øFalta informaci√≥n importante?
- Precisi√≥n: ¬øLa SQL es correcta t√©cnicamente?
- Visualizaci√≥n: ¬øEl gr√°fico representa bien los datos?
- Insights: ¬øSe pueden extraer conclusiones √∫tiles?

FORMATO JSON RESPUESTA:
{{
    "quality_score": 0.85,
    "is_satisfactory": true,
    "evaluation": {{
        "relevance": {{"score": 0.9, "comment": "Datos muy relevantes"}},
        "completeness": {{"score": 0.8, "comment": "Podr√≠a incluir m√°s detalle"}},
        "accuracy": {{"score": 1.0, "comment": "SQL correcta"}},
        "visualization": {{"score": 0.8, "comment": "Gr√°fico apropiado"}},
        "insights": {{"score": 0.7, "comment": "Insights b√°sicos"}}
    }},
    "feedback": "Descripci√≥n espec√≠fica de mejoras",
    "requires_iteration": false
}}
            """),
            ("human", """
CONSULTA ORIGINAL: {user_query}

SQL GENERADO: {sql_query}
EXPLICACI√ìN SQL: {sql_explanation}

DATOS OBTENIDOS:
- Filas: {row_count}
- Columnas: {columns}
- Muestra: {data_sample}

VISUALIZACI√ìN:
- Tipo: {chart_type}
- T√≠tulo: {chart_title}
- Configuraci√≥n: {viz_config}

INSIGHTS GENERADOS: {insights}

Eval√∫a la calidad y coherencia de toda la pipeline.
            """)
        ])
    
    def calculate_quality_metrics(self, state: ReportState) -> Dict[str, Any]:
        """Calculate basic quality metrics"""
        metrics = {
            "has_data": len(state.get('data_results', [])) > 0,
            "has_visualization": state.get('visualization_path') is not None,
            "has_sql": state.get('sql_query') is not None,
            "has_insights": len(state.get('insights', [])) > 0,
            "error_count": len(state.get('errors', []))
        }
        
        # Basic quality score
        score = 0.0
        if metrics["has_data"]: score += 0.3
        if metrics["has_sql"]: score += 0.2
        if metrics["has_visualization"]: score += 0.2
        if metrics["has_insights"]: score += 0.2
        if metrics["error_count"] == 0: score += 0.1
        
        metrics["basic_score"] = min(score, 1.0)
        
        return metrics
    
    async def process(self, state: ReportState) -> ReportState:
        """Process the quality assurance step"""
        try:
            print(f"üîç QA Agent validating results...")
            
            # Calculate basic metrics
            metrics = self.calculate_quality_metrics(state)
            
            # If basic quality is too low, mark as unsatisfactory
            if metrics["basic_score"] < 0.5:
                state['qa_feedback'] = "Calidad insuficiente: faltan componentes b√°sicos"
                state['quality_score'] = metrics["basic_score"]
                state['requires_iteration'] = True
                return state
            
            # Prepare data for LLM evaluation
            data_results = state.get('data_results', [])
            viz_config = state.get('visualization_config', {})
            
            data_sample = data_results[:3] if len(data_results) >= 3 else data_results
            columns = list(data_results[0].keys()) if data_results else []
            
            # Generate detailed QA evaluation
            messages = self.prompt.format_messages(
                user_query=state['user_query'],
                sql_query=state.get('sql_query', 'No SQL generado'),
                sql_explanation=state.get('sql_explanation', 'Sin explicaci√≥n'),
                row_count=len(data_results),
                columns=columns,
                data_sample=json.dumps(data_sample, indent=2, default=str),
                chart_type=viz_config.get('chart_type', 'No especificado'),
                chart_title=viz_config.get('title', 'Sin t√≠tulo'),
                viz_config=json.dumps(viz_config, indent=2, default=str),
                insights=state.get('insights', [])
            )
            
            response = await self.llm.ainvoke(messages)
            
            # Parse response
            try:
                content = response.content.strip()
                if content.startswith('```json'):
                    content = content[7:]
                if content.endswith('```'):
                    content = content[:-3]
                
                qa_result = json.loads(content.strip())
                
            except json.JSONDecodeError:
                raise ValueError("No se pudo parsear la evaluaci√≥n QA")
            
            # Update state with QA results
            state['quality_score'] = qa_result.get('quality_score', metrics["basic_score"])
            state['qa_feedback'] = qa_result.get('feedback', 'Sin feedback espec√≠fico')
            state['requires_iteration'] = qa_result.get('requires_iteration', False)
            state['qa_evaluation'] = qa_result.get('evaluation', {})
            
            # Determine if iteration is needed
            is_satisfactory = qa_result.get('is_satisfactory', True)
            quality_threshold = 0.7
            
            if not is_satisfactory or state['quality_score'] < quality_threshold:
                state['requires_iteration'] = True
                print(f"‚ö†Ô∏è  QA Agent: Quality below threshold ({state['quality_score']:.2f}), iteration required")
            else:
                state['requires_iteration'] = False
                print(f"‚úÖ QA Agent: Quality acceptable ({state['quality_score']:.2f})")
            
        except Exception as e:
            error_msg = f"Error in QA Agent: {str(e)}"
            print(f"‚ùå {error_msg}")
            state['errors'].append(error_msg)
            state['requires_iteration'] = True
        
        return state

# Initialize QA Agent
qa_agent = QAAgent()
print("‚úÖ QA Agent initialized")

# 7. LangGraph Workflow Integration

Orquestaci√≥n completa del flujo de trabajo usando LangGraph con manejo de estados y decisiones condicionales.

In [None]:
from langgraph.graph import StateGraph, START, END
from typing import Literal

# Define workflow nodes
async def sql_node(state: ReportState) -> ReportState:
    """SQL generation node"""
    return await sql_agent.process(state)

async def visualization_node(state: ReportState) -> ReportState:
    """Visualization generation node"""
    return await viz_agent.process(state)

async def qa_node(state: ReportState) -> ReportState:
    """Quality assurance node"""
    return await qa_agent.process(state)

# Conditional routing logic
def should_continue(state: ReportState) -> Literal["continue", "end"]:
    """Determine if iteration is needed or workflow should end"""
    
    # Check for critical errors
    if state.get('errors'):
        print(f"üö´ Workflow stopped due to errors: {state['errors']}")
        return "end"
    
    # Check iteration requirements
    if state.get('requires_iteration', False):
        # Limit iterations to prevent infinite loops
        max_iterations = 3
        if state.get('iteration_count', 0) >= max_iterations:
            print(f"üîÑ Max iterations reached ({max_iterations}), ending workflow")
            return "end"
        
        print(f"üîÑ QA requires iteration {state.get('iteration_count', 0) + 1}")
        return "continue"
    
    print("‚úÖ Workflow completed successfully")
    return "end"

def route_after_qa(state: ReportState) -> Literal["sql_agent", "pdf_generation"]:
    """Route after QA: back to SQL if iteration needed, or to PDF if done"""
    if should_continue(state) == "continue":
        return "sql_agent"
    else:
        return "pdf_generation"

async def pdf_generation_node(state: ReportState) -> ReportState:
    """PDF generation node (simplified for demo)"""
    try:
        print("üìÑ Generating PDF report...")
        
        # Create a simple summary
        summary = {
            "query": state.get('user_query', 'No query'),
            "sql": state.get('sql_query', 'No SQL'),
            "data_count": len(state.get('data_results', [])),
            "visualization": state.get('visualization_config', {}).get('chart_type', 'None'),
            "quality_score": state.get('quality_score', 0.0),
            "insights": state.get('insights', [])
        }
        
        # For demo purposes, just store the summary
        state['pdf_summary'] = summary
        state['pdf_generated'] = True
        
        print("‚úÖ PDF generation completed")
        
    except Exception as e:
        error_msg = f"Error in PDF generation: {str(e)}"
        print(f"‚ùå {error_msg}")
        state['errors'].append(error_msg)
    
    return state

# Build the LangGraph workflow
def create_workflow() -> StateGraph:
    """Create and configure the LangGraph workflow"""
    
    workflow = StateGraph(ReportState)
    
    # Add nodes
    workflow.add_node("sql_agent", sql_node)
    workflow.add_node("visualization_agent", visualization_node) 
    workflow.add_node("qa_agent", qa_node)
    workflow.add_node("pdf_generation", pdf_generation_node)
    
    # Define the flow
    workflow.add_edge(START, "sql_agent")
    workflow.add_edge("sql_agent", "visualization_agent")
    workflow.add_edge("visualization_agent", "qa_agent")
    
    # Conditional routing after QA
    workflow.add_conditional_edges(
        "qa_agent",
        route_after_qa,
        {
            "sql_agent": "sql_agent",  # Iterate back to SQL
            "pdf_generation": "pdf_generation"  # Continue to PDF
        }
    )
    
    workflow.add_edge("pdf_generation", END)
    
    return workflow

# Compile the workflow
workflow = create_workflow()
app = workflow.compile()

print("‚úÖ LangGraph workflow compiled and ready")

# Visualize the workflow (optional)
try:
    from IPython.display import Image, display
    
    # Generate workflow diagram
    workflow_image = app.get_graph().draw_mermaid_png()
    
    # Display the diagram
    display(Image(workflow_image))
    print("üìä Workflow diagram displayed above")
    
except ImportError:
    print("‚ÑπÔ∏è  Install graphviz and pillow to see workflow diagram")
except Exception as e:
    print(f"‚ÑπÔ∏è  Could not display diagram: {e}")

# 8. End-to-End Testing

Vamos a probar el sistema completo con consultas de ejemplo.

In [None]:
async def run_test_query(query: str, description: str = ""):
    """Run a test query through the complete pipeline"""
    
    print(f"\n{'='*80}")
    print(f"üß™ TESTING: {description}")
    print(f"üìù Query: {query}")
    print(f"{'='*80}")
    
    # Create initial state
    initial_state = ReportState(
        user_query=query,
        sql_query="",
        sql_explanation="",
        data_results=[],
        visualization_config={},
        visualization_path="",
        insights=[],
        qa_feedback="",
        quality_score=0.0,
        requires_iteration=False,
        iteration_count=0,
        errors=[],
        pdf_summary={},
        pdf_generated=False
    )
    
    try:
        # Run the workflow
        result = await app.ainvoke(initial_state)
        
        # Display results
        print(f"\nüìä RESULTS:")
        print(f"‚úÖ SQL Generated: {bool(result.get('sql_query'))}")
        print(f"‚úÖ Data Retrieved: {len(result.get('data_results', []))} rows")
        print(f"‚úÖ Visualization: {result.get('visualization_config', {}).get('chart_type', 'None')}")
        print(f"‚úÖ Quality Score: {result.get('quality_score', 0.0):.2f}")
        print(f"‚úÖ PDF Generated: {result.get('pdf_generated', False)}")
        print(f"‚ö†Ô∏è  Errors: {len(result.get('errors', []))}")
        
        if result.get('errors'):
            print(f"‚ùå Error Details: {result['errors']}")
        
        if result.get('insights'):
            print(f"üí° Insights: {result['insights']}")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Test failed with error: {str(e)}")
        return None

# Test Cases
test_queries = [
    {
        "query": "¬øCu√°les son las ventas totales por regi√≥n?",
        "description": "An√°lisis b√°sico de ventas por regi√≥n"
    },
    {
        "query": "Muestra la evoluci√≥n de ventas de los √∫ltimos 3 meses",
        "description": "An√°lisis temporal de tendencias"
    },
    {
        "query": "¬øQu√© productos son los m√°s vendidos?",
        "description": "Ranking de productos top"
    },
    {
        "query": "Compara las ventas entre Norte y Sur",
        "description": "Comparativa entre regiones espec√≠ficas"
    }
]

print("üöÄ Starting End-to-End Tests")
print("‚ö†Ô∏è  Make sure you have your OpenAI API key configured!")
print("\nTo run tests, execute the following cells one by one...")

# Display test overview
for i, test in enumerate(test_queries, 1):
    print(f"{i}. {test['description']}: '{test['query']}'")

print("\n‚ú® Ready to test the complete Text-to-Report pipeline!")

In [None]:
# Test 1: Basic sales by region analysis
await run_test_query(
    query="¬øCu√°les son las ventas totales por regi√≥n?",
    description="An√°lisis b√°sico de ventas por regi√≥n"
)

In [None]:
# Test 2: Temporal analysis  
await run_test_query(
    query="Muestra la evoluci√≥n de ventas de los √∫ltimos 3 meses",
    description="An√°lisis temporal de tendencias"
)

In [None]:
# Test 3: Product ranking
await run_test_query(
    query="¬øQu√© productos son los m√°s vendidos?",
    description="Ranking de productos top"
)

In [None]:
# Test 4: Comparative analysis
await run_test_query(
    query="Compara las ventas entre Norte y Sur",
    description="Comparativa entre regiones espec√≠ficas"
)

# 9. Development Summary & Next Steps

## ‚úÖ What We've Built

This notebook demonstrates a complete **Text-to-Report** system using LangGraph that:

1. **SQL Agent**: Converts natural language queries to secure SQL using OpenAI GPT-4
2. **Visualization Agent**: Creates intelligent charts based on data analysis  
3. **QA Agent**: Validates results quality and triggers iterations when needed
4. **LangGraph Orchestration**: Manages the multi-agent workflow with conditional routing
5. **Database Integration**: Works with PostgreSQL demo data (regions, products, clients, sales)

## üèóÔ∏è Architecture Highlights

- **Security**: SQL validation prevents dangerous operations
- **Quality Control**: QA agent ensures coherent results before final output
- **Iterative Improvement**: Failed quality checks trigger workflow iterations
- **Error Handling**: Comprehensive error management throughout the pipeline
- **Extensibility**: Modular design allows easy addition of new agents

## üöÄ Next Development Steps

1. **PDF Generation**: Implement full PDF report generation with WeasyPrint
2. **API Integration**: Connect to the FastAPI service layer
3. **Production Database**: Replace demo data with real business database
4. **Advanced Visualizations**: Add more chart types and statistical analysis
5. **User Authentication**: Add security and user management
6. **Deployment**: Test on Azure VM environment

## üîß Production Checklist

- [ ] Configure OpenAI API key in environment variables
- [ ] Install all required dependencies (`pip install -r requirements.txt`)
- [ ] Set up PostgreSQL database with real data
- [ ] Configure monitoring and logging
- [ ] Deploy using Docker containers
- [ ] Set up CI/CD pipeline

## üìö Usage Instructions

1. **Setup Environment**: Configure all dependencies and API keys
2. **Run Tests**: Execute the test cells to validate functionality
3. **Customize Queries**: Modify test queries for your specific use cases
4. **Monitor Performance**: Check quality scores and iteration patterns
5. **Scale Up**: Deploy to production when satisfied with results

---

**üéØ This notebook serves as the foundation for your complete Text-to-Report solution!**