# OXCART Philatelic RAG System

Sistema completo de indexación y búsqueda semántica para documentos filatélicos.

**Funcionalidades:**
- 📄 Indexación automática de todos los JSONs philatelic
- 🔍 Búsqueda semántica avanzada con filtros filatélicos
- 🤖 RAG básico con LLM para responder preguntas
- 📊 Dashboard de estadísticas y validación
- 🌐 Interfaz Gradio para consultas interactivas

**Requisitos:**
- Weaviate corriendo en Docker: `docker-compose up -d`
- OpenAI API key configurada en `.env`
- JSONs philatelic en `results/final_jsons/`

## 1. Setup y Configuración

Configuración inicial del entorno y carga de librerías.

In [1]:
import os
import json
import glob
import time
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime

# Cargar variables de entorno
from dotenv import load_dotenv
load_dotenv()

# Imports de terceros
import pandas as pd

print("✅ Imports básicos completados")

✅ Imports básicos completados


In [2]:
# Verificar variables de entorno
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
WEAVIATE_URL = os.getenv('WEAVIATE_URL', 'http://localhost:8083')
PHILATELIC_JSONS_DIR = os.getenv('PHILATELIC_JSONS_DIR', './results/final_jsons')
COLLECTION_NAME = os.getenv('WEAVIATE_COLLECTION_NAME', 'Oxcart')

print(f"🔧 Configuración:")
print(f"   • Weaviate URL: {WEAVIATE_URL}")
print(f"   • JSONs Directory: {PHILATELIC_JSONS_DIR}")
print(f"   • Collection Name: {COLLECTION_NAME}")
print(f"   • OpenAI API Key: {'✅ Configurada' if OPENAI_API_KEY else '❌ Falta configurar'}")

if not OPENAI_API_KEY:
    print("\\n⚠️  IMPORTANTE: Configura tu OPENAI_API_KEY en el archivo .env")
    print("   Copia .env.example a .env y agrega tu API key")

# Verificar que el directorio de JSONs existe
if not os.path.exists(PHILATELIC_JSONS_DIR):
    print(f"\\n⚠️  Directorio {PHILATELIC_JSONS_DIR} no encontrado")
    print("   Asegúrate de haber procesado documentos con el Dolphin parser")
else:
    json_files = glob.glob(os.path.join(PHILATELIC_JSONS_DIR, '*_final.json'))
    print(f"\\n📁 Encontrados {len(json_files)} archivos JSON filatélicos")
    if json_files:
        print("   Ejemplos:")
        for file in json_files[:3]:
            print(f"   • {os.path.basename(file)}")
        if len(json_files) > 3:
            print(f"   • ... y {len(json_files) - 3} más")

🔧 Configuración:
   • Weaviate URL: http://localhost:8083
   • JSONs Directory: ./results/final_jsons
   • Collection Name: Oxcart
   • OpenAI API Key: ✅ Configurada
\n📁 Encontrados 390 archivos JSON filatélicos
   Ejemplos:
   • CRF 01_final.json
   • CRF 02_final.json
   • CRF 03_final.json
   • ... y 387 más


In [3]:
# Importar módulos del sistema OXCART
from philatelic_weaviate import (
    create_weaviate_client,
    create_oxcart_collection,
    index_philatelic_document,
    search_chunks_semantic,
    get_collection_stats,
    transform_chunk_to_weaviate
)

from philatelic_chunk_schema import (
    PhilatelicDocument,
    PhilatelicChunk,
    validate_chunk_structure,
    get_chunk_summary
)

print("✅ Módulos OXCART cargados exitosamente")

Philatelic Weaviate Integration v2.1 cargado exitosamente
Funciones disponibles:
   - create_weaviate_client()
   - create_oxcart_collection()
   - index_philatelic_document()
   - search_chunks_semantic()
   - get_collection_stats()
✅ Módulos OXCART cargados exitosamente


## 2. Descubrimiento de Documentos

Escanear automáticamente todos los archivos JSON philatelic disponibles.

In [4]:
def discover_philatelic_jsons(directory: str) -> List[Dict[str, Any]]:
    """
    Descubrir todos los archivos JSON philatelic en el directorio.
    
    Returns:
        Lista de diccionarios con información de cada archivo
    """
    json_files = []
    
    # Buscar archivos *_final.json
    pattern = os.path.join(directory, "*_final.json")
    philatelic_files = glob.glob(pattern)
    
    print(f"🔍 Buscando archivos en: {directory}")
    print(f"📋 Patrón de búsqueda: *_final.json")
    
    for file_path in philatelic_files:
        try:
            # Obtener información del archivo
            file_size = os.path.getsize(file_path) / (1024 * 1024)  # MB
            file_name = os.path.basename(file_path)
            doc_id = file_name.replace("_final.json", "")
            
            # Cargar archivo para obtener estadísticas básicas
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            chunks = data.get("chunks", [])
            page_count = data.get("page_count", len(chunks))  # Estimado si no está disponible
            
            # Calcular estadísticas básicas
            total_text_length = 0
            chunk_types = {}
            
            for chunk in chunks:
                chunk_text = chunk.get("text", "") or chunk.get("content", "")
                total_text_length += len(chunk_text)
                
                chunk_type = chunk.get("chunk_type", "text")
                chunk_types[chunk_type] = chunk_types.get(chunk_type, 0) + 1
            
            avg_chunk_length = total_text_length / len(chunks) if chunks else 0
            
            json_files.append({
                "file_path": file_path,
                "file_name": file_name,
                "doc_id": doc_id,
                "file_size_mb": round(file_size, 2),
                "chunks_count": len(chunks),
                "page_count": page_count,
                "total_text_length": total_text_length,
                "avg_chunk_length": round(avg_chunk_length, 1),
                "chunk_types": chunk_types,
                "data": data  # Guardar datos para indexación
            })
            
            print(f"   ✅ {file_name}: {len(chunks)} chunks, {page_count} páginas")
            
        except Exception as e:
            print(f"   ❌ Error procesando {file_path}: {e}")
    
    return json_files

print("✅ Función de descubrimiento definida")

✅ Función de descubrimiento definida


In [5]:
# Descubrir archivos
discovered_files = discover_philatelic_jsons(PHILATELIC_JSONS_DIR)

print(f"\n📊 RESUMEN DE DESCUBRIMIENTO:")
print(f"   📄 Archivos encontrados: {len(discovered_files)}")

if discovered_files:
    # === ESTADÍSTICAS BÁSICAS ===
    total_chunks = sum(f["chunks_count"] for f in discovered_files)
    total_pages = sum(f["page_count"] for f in discovered_files)
    total_size = sum(f["file_size_mb"] for f in discovered_files)
    total_text_length = sum(f["total_text_length"] for f in discovered_files)
    
    print(f"   📦 Total chunks: {total_chunks:,}")
    print(f"   📄 Total páginas: {total_pages:,}")
    print(f"   💾 Tamaño total: {total_size:.1f} MB")
    
    # === ESTADÍSTICAS AVANZADAS DE CHUNKS ===
    if total_chunks > 0:
        # Promedio global de tamaño de chunks
        avg_chunk_size_global = total_text_length / total_chunks
        avg_chunks_per_doc = total_chunks / len(discovered_files)
        
        # Distribución de tipos de chunks
        all_chunk_types = {}
        chunk_sizes = []
        
        for f in discovered_files:
            chunk_sizes.extend([f["avg_chunk_length"]] * f["chunks_count"])
            for chunk_type, count in f["chunk_types"].items():
                all_chunk_types[chunk_type] = all_chunk_types.get(chunk_type, 0) + count
        
        # Estadísticas de tamaño
        min_chunk_size = min(chunk_sizes) if chunk_sizes else 0
        max_chunk_size = max(chunk_sizes) if chunk_sizes else 0
        
        print(f"\n📊 ESTADÍSTICAS DE CHUNKS:")
        print(f"   📏 Tamaño promedio global: {avg_chunk_size_global:.0f} caracteres")
        print(f"   📈 Promedio chunks/documento: {avg_chunks_per_doc:.1f}")
        print(f"   📉 Rango de tamaños: {min_chunk_size:.0f} - {max_chunk_size:.0f} chars")
        
        # Top tipos de chunks
        print(f"   🏷️ Tipos más comunes:")
        sorted_types = sorted(all_chunk_types.items(), key=lambda x: x[1], reverse=True)
        for chunk_type, count in sorted_types[:5]:
            percentage = (count / total_chunks) * 100
            print(f"      • {chunk_type}: {count:,} ({percentage:.1f}%)")
    
    # === ESTIMACIÓN DE COSTOS OPENAI ===
    print(f"\n💰 ESTIMACIÓN DE COSTOS OPENAI:")
    
    # Configuración del modelo de embeddings
    # text-embedding-3-large: $0.00013 per 1K tokens (más reciente y eficiente)
    EMBEDDING_MODEL = "text-embedding-3-large"
    COST_PER_1K_TOKENS = 0.00013  # USD
    CHARS_PER_TOKEN = 4  # Aproximación para texto en español
    
    # Calcular tokens estimados
    estimated_tokens = total_text_length / CHARS_PER_TOKEN
    estimated_cost = (estimated_tokens / 1000) * COST_PER_1K_TOKENS
    
    print(f"   🤖 Modelo: {EMBEDDING_MODEL}")
    print(f"   📝 Caracteres totales: {total_text_length:,}")
    print(f"   🎯 Tokens estimados: {estimated_tokens:,.0f}")
    print(f"   💵 Costo estimado: ${estimated_cost:.4f} USD")
    
    # Estimaciones adicionales útiles
    if estimated_cost > 0:
        cost_per_chunk = estimated_cost / total_chunks
        cost_per_document = estimated_cost / len(discovered_files)
        
        print(f"   📊 Costo por chunk: ${cost_per_chunk:.6f} USD")
        print(f"   📄 Costo por documento: ${cost_per_document:.4f} USD")
        
        # Rangos de referencia
        if estimated_cost < 0.01:
            cost_range = "💚 Muy bajo"
        elif estimated_cost < 0.10:
            cost_range = "💙 Bajo"
        elif estimated_cost < 1.00:
            cost_range = "💛 Moderado"
        elif estimated_cost < 5.00:
            cost_range = "🧡 Alto"
        else:
            cost_range = "❤️ Muy alto"
        
        print(f"   📈 Rango de costo: {cost_range}")
    
    # === ADVERTENCIAS Y NOTAS ===
    print(f"\n⚠️ NOTAS IMPORTANTES:")
    print(f"   • Los costos son estimaciones basadas en {CHARS_PER_TOKEN} chars/token")
    print(f"   • Los precios de OpenAI pueden cambiar")
    print(f"   • Se recomienda verificar precios actuales en openai.com/pricing")
    print(f"   • El costo real puede variar ±20% dependiendo del contenido")
    
else:
    print(f"   ⚠️ No se encontraron archivos *_final.json en {PHILATELIC_JSONS_DIR}")
    print(f"   💡 Asegúrate de haber procesado documentos con el Dolphin parser")

🔍 Buscando archivos en: ./results/final_jsons
📋 Patrón de búsqueda: *_final.json
   ✅ CRF 01_final.json: 148 chunks, 13 páginas
   ✅ CRF 02_final.json: 112 chunks, 13 páginas
   ✅ CRF 03_final.json: 175 chunks, 16 páginas
   ✅ CRF 04_final.json: 185 chunks, 15 páginas
   ✅ CRF 05_final.json: 256 chunks, 21 páginas
   ✅ CRF 06_final.json: 264 chunks, 19 páginas
   ✅ CRF 07_final.json: 202 chunks, 15 páginas
   ✅ CRF 08EXTRA_final.json: 141 chunks, 28 páginas
   ✅ CRF 08_final.json: 299 chunks, 20 páginas
   ✅ CRF 09_final.json: 240 chunks, 20 páginas
   ✅ CRF 100_final.json: 258 chunks, 22 páginas
   ✅ CRF 109_final.json: 185 chunks, 19 páginas
   ✅ CRF 10_final.json: 458 chunks, 32 páginas
   ✅ CRF 11_final.json: 387 chunks, 27 páginas
   ✅ CRF 122_final.json: 320 chunks, 24 páginas
   ✅ CRF 123_final.json: 185 chunks, 16 páginas
   ✅ CRF 124_final.json: 148 chunks, 16 páginas
   ✅ CRF 125_final.json: 220 chunks, 20 páginas
   ✅ CRF 126_final.json: 191 chunks, 20 páginas
   ✅ CRF 127_f

In [6]:
# Mostrar tabla resumen de archivos
if discovered_files:
    files_df = pd.DataFrame([
        {
            "Documento": f["doc_id"],
            "Chunks": f["chunks_count"],
            "Páginas": f["page_count"],
            "Tamaño (MB)": f["file_size_mb"],
            "Promedio chunk": f["avg_chunk_length"],
            "Tipos principales": ", ".join([f"{k}: {v}" for k, v in list(f["chunk_types"].items())[:3]])
        }
        for f in discovered_files
    ])
    
    print("\n📋 DOCUMENTOS ENCONTRADOS:")
    print(files_df.to_string(index=False))
else:
    print("\n❌ No hay documentos para mostrar")


📋 DOCUMENTOS ENCONTRADOS:
                    Documento  Chunks  Páginas  Tamaño (MB)  Promedio chunk                            Tipos principales
                       CRF 01     148       13         0.56          1429.5        header: 25, text: 106, marginalia: 15
                       CRF 02     112       13         0.57          2332.8         header: 18, text: 82, marginalia: 11
                       CRF 03     175       16         0.83          1647.3        header: 20, text: 133, marginalia: 14
                       CRF 04     185       15         0.81          1384.4         header: 23, text: 143, marginalia: 9
                       CRF 05     256       21         1.15          1445.6        text: 195, header: 42, marginalia: 10
                       CRF 06     264       19         1.62          2457.8        text: 197, header: 36, marginalia: 21
                       CRF 07     202       15         1.14          2105.6        text: 159, marginalia: 22, header: 17
     

## 3. Configuración de Weaviate

Conectar a Weaviate y crear la colección con esquema optimizado.

In [None]:
import weaviate

weaviate.__version__

In [None]:
# Conectar a Weaviate
print("🔌 Conectando a Weaviate...")

try:
    client = create_weaviate_client(WEAVIATE_URL, OPENAI_API_KEY)
    print("✅ Conexión exitosa")
    
    # Verificar que Weaviate esté funcionando
    meta = client.get_meta()
    print(f"📊 Weaviate versión: {meta.get('version', 'unknown')}")
    
    # Verificar si la colección existe
    try:
        collections = client.collections.list_all()
        collection_names = [col.name for col in collections]
        
        if COLLECTION_NAME in collection_names:
            collection = client.collections.get(COLLECTION_NAME)
            total_objects = collection.aggregate.over_all(total_count=True).total_count
            print(f"📊 Colección '{COLLECTION_NAME}' existe con {total_objects} documentos")
        else:
            print(f"📝 Colección '{COLLECTION_NAME}' no existe (se creará durante la indexación)")
    except Exception as e:
        print(f"⚠️ No se pudo verificar colecciones: {e}")
        
except Exception as e:
    print(f"❌ Error conectando a Weaviate: {e}")
    print("💡 Asegúrate de que Weaviate esté corriendo:")
    print("   docker-compose up -d")
    client = None

In [None]:
#client.collections.delete(COLLECTION_NAME)

In [None]:
# Crear colección Oxcart
if client:
    print("\n🏗️ Configurando colección Oxcart...")
    
    collection_created = create_oxcart_collection(client, COLLECTION_NAME)
    
    if collection_created:
        print("✅ Colección lista para indexación")
        
        # Mostrar estadísticas de la colección
        stats = get_collection_stats(client, COLLECTION_NAME)
        if stats:
            print(f"📊 Chunks actuales en Weaviate: {stats.get('total_chunks', 0)}")
            if stats.get('documents'):
                print(f"📄 Documentos indexados: {list(stats['documents'].keys())}")
    else:
        print("❌ Error configurando colección")
        client = None
else:
    print("⚠️ Saltando configuración de colección (sin conexión)")

## 4. Indexación Automática

Indexar automáticamente todos los documentos philatelic en Weaviate.

In [None]:
def index_all_documents(client, discovered_files: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Indexar todos los documentos descubiertos en Weaviate.
    
    Returns:
        Dict con resultados de indexación
    """
    if not client:
        return {"error": "No hay conexión a Weaviate"}
    
    if not discovered_files:
        return {"error": "No hay documentos para indexar"}
    
    print(f"🚀 INICIANDO INDEXACIÓN MASIVA")
    print(f"📄 Documentos a procesar: {len(discovered_files)}")
    print("=" * 60)
    
    indexing_results = []
    total_chunks_indexed = 0
    total_chunks_failed = 0
    
    start_time = time.time()
    
    for i, file_info in enumerate(discovered_files, 1):
        doc_id = file_info["doc_id"]
        document = file_info["data"]
        chunks_count = file_info["chunks_count"]
        
        print(f"\n📄 [{i}/{len(discovered_files)}] Procesando: {doc_id}")
        print(f"   📊 Chunks: {chunks_count}")
        
        try:
            # Indexar documento usando la función correcta
            result = index_philatelic_document(client, document, COLLECTION_NAME)
            
            # Guardar resultado
            chunks_indexed = result.get("successful", 0)
            chunks_failed = len(result.get("errors", []))
            
            indexing_results.append({
                "doc_id": doc_id,
                "success": chunks_indexed > 0,
                "chunks_indexed": chunks_indexed,
                "chunks_failed": chunks_failed,
                "errors": result.get("errors", [])
            })
            
            total_chunks_indexed += chunks_indexed
            total_chunks_failed += chunks_failed
            
            # Progreso
            elapsed = time.time() - start_time
            avg_time_per_doc = elapsed / i
            remaining_docs = len(discovered_files) - i
            eta_seconds = remaining_docs * avg_time_per_doc
            
            print(f"   ⏱️ Tiempo transcurrido: {elapsed:.1f}s")
            print(f"   🔮 ETA restante: {eta_seconds:.1f}s")
            
        except Exception as e:
            print(f"   ❌ Error indexando {doc_id}: {e}")
            indexing_results.append({
                "doc_id": doc_id,
                "success": False,
                "error": str(e)
            })
    
    total_time = time.time() - start_time
    
    # Resumen final
    successful_docs = sum(1 for r in indexing_results if r.get("success", False))
    
    summary = {
        "total_documents": len(discovered_files),
        "successful_documents": successful_docs,
        "failed_documents": len(discovered_files) - successful_docs,
        "total_chunks_indexed": total_chunks_indexed,
        "total_chunks_failed": total_chunks_failed,
        "total_time_seconds": total_time,
        "avg_time_per_document": total_time / len(discovered_files) if discovered_files else 0,
        "chunks_per_second": total_chunks_indexed / total_time if total_time > 0 else 0,
        "results": indexing_results
    }
    
    print("\n" + "=" * 60)
    print("📊 RESUMEN FINAL DE INDEXACIÓN:")
    print(f"   ✅ Documentos indexados: {successful_docs}/{len(discovered_files)}")
    print(f"   📦 Chunks indexados: {total_chunks_indexed:,}")
    print(f"   ❌ Chunks fallidos: {total_chunks_failed:,}")
    print(f"   ⏱️ Tiempo total: {total_time:.1f} segundos")
    print(f"   🚀 Velocidad: {summary['chunks_per_second']:.1f} chunks/segundo")
    
    success_rate = (total_chunks_indexed / (total_chunks_indexed + total_chunks_failed)) * 100 if (total_chunks_indexed + total_chunks_failed) > 0 else 0
    print(f"   📈 Tasa de éxito: {success_rate:.1f}%")
    
    return summary

print("✅ Función de indexación definida")

In [None]:
# Ejecutar indexación
if client and discovered_files:
    print("🎯 ¿Proceder con la indexación?")
    print(f"   📄 Se indexarán {len(discovered_files)} documentos")
    total_chunks = sum(f["chunks_count"] for f in discovered_files)
    print(f"   📦 Total de chunks: {total_chunks:,}")
    
    # Estimar tiempo (aproximadamente 100 chunks por minuto con OpenAI embeddings)
    estimated_minutes = total_chunks / 100
    print(f"   ⏱️ Tiempo estimado: {estimated_minutes:.1f} minutos")
    
    
    indexing_summary = index_all_documents(client, discovered_files)
    
    # Guardar resultados
    results_file = "indexing_results.json"
    with open(results_file, 'w', encoding='utf-8') as f:
        json.dump(indexing_summary, f, ensure_ascii=False, indent=2)
    print(f"\n💾 Resultados guardados en: {results_file}")
    
else:
    print("⚠️ No se puede proceder con la indexación:")
    if not client:
        print("   - Sin conexión a Weaviate")
    if not discovered_files:
        print("   - No hay documentos para indexar")
    indexing_summary = None

## 5. Validación y Estadísticas

Verificar que la indexación fue exitosa y mostrar estadísticas detalladas.

In [None]:
# Validar indexación
if client:
    print("🔍 VALIDANDO INDEXACIÓN...")
    
    # Obtener estadísticas actuales
    current_stats = get_collection_stats(client, COLLECTION_NAME)
    
    if current_stats:
        print(f"\n📊 ESTADÍSTICAS DE WEAVIATE:")
        print(f"   📦 Total chunks indexados: {current_stats.get('total_chunks', 0):,}")
        print(f"   📄 Documentos únicos: {current_stats.get('total_documents', 0)}")
        
        # Mostrar documentos indexados
        if current_stats.get('documents'):
            print(f"\n📋 DOCUMENTOS EN WEAVIATE:")
            for doc_id, chunk_count in current_stats['documents'].items():
                print(f"   • {doc_id}: {chunk_count:,} chunks")
        
        # Mostrar tipos de chunks
        if current_stats.get('chunk_types'):
            print(f"\n🏷️ TIPOS DE CHUNKS:")
            for chunk_type, count in current_stats['chunk_types'].items():
                print(f"   • {chunk_type}: {count:,}")
        
        # Comparar con archivos originales
        if 'discovered_files' in locals() and discovered_files:
            expected_chunks = sum(f["chunks_count"] for f in discovered_files)
            indexed_chunks = current_stats.get('total_chunks', 0)
            
            print(f"\n🔄 COMPARACIÓN:")
            print(f"   📥 Chunks esperados: {expected_chunks:,}")
            print(f"   📤 Chunks indexados: {indexed_chunks:,}")
            
            if indexed_chunks == expected_chunks:
                print(f"   ✅ ¡Indexación completa al 100%!")
            elif indexed_chunks > 0:
                coverage = (indexed_chunks / expected_chunks) * 100
                print(f"   📊 Cobertura: {coverage:.1f}%")
                if coverage < 100:
                    missing = expected_chunks - indexed_chunks
                    print(f"   ⚠️ Faltan {missing:,} chunks")
            else:
                print(f"   ❌ No hay chunks indexados")
    else:
        print("❌ No se pudieron obtener estadísticas de Weaviate")
else:
    print("⚠️ Sin conexión a Weaviate para validación")

## 6. Pruebas de Búsqueda Semántica

Probar el sistema de búsqueda semántica con consultas filatélicas específicas.

In [None]:
def test_philatelic_searches(client) -> None:
    """
    Ejecutar búsquedas de prueba para validar el sistema.
    """
    if not client:
        print("❌ Sin conexión a Weaviate")
        return
    
    # Consultas de prueba filatélicas
    test_queries = [
        {
            "name": "Búsqueda general de sellos",
            "query": "stamps Scott catalog Costa Rica",
            "filters": None
        },
        {
            "name": "Catálogo Scott específico",
            "query": "Scott catalog numbers",
            "filters": {"catalog_system": "Scott"}
        },
        {
            "name": "Sobrecargas y variedades",
            "query": "overprint surcharge variety error",
            "filters": {"has_varieties": True}
        },
        {
            "name": "Periodo Guanacaste",
            "query": "Guanacaste overprint historical Costa Rica",
            "filters": {"is_guanacaste": True}
        },
        {
            "name": "Especificaciones técnicas",
            "query": "perforation paper printing watermark",
            "filters": {"has_technical_specs": True}
        },
        {
            "name": "Tablas con datos",
            "query": "catalog table prices values",
            "filters": {"chunk_type": "table"}
        }
    ]
    
    print("🔍 EJECUTANDO BÚSQUEDAS DE PRUEBA")
    print("=" * 60)
    
    for i, test in enumerate(test_queries, 1):
        print(f"\n🔎 [{i}/{len(test_queries)}] {test['name']}")
        print(f"   Query: \"{test['query']}\"")
        if test['filters']:
            print(f"   Filtros: {test['filters']}")
        
        try:
            results = search_chunks_semantic(
                client, 
                test["query"], 
                "Oxcart", 
                limit=3,
                filters=test["filters"]
            )
            
            print(f"   📊 Resultados: {len(results)}")
            
            for j, result in enumerate(results, 1):
                print(f"\n      🏷️ #{j} (Score: {result['score']:.3f})")
                print(f"         📄 Documento: {result['doc_id']}")
                print(f"         📋 Tipo: {result['chunk_type']}")
                print(f"         📄 Página: {result['page_number']}")
                
                # Mostrar metadatos relevantes
                if result.get('catalog_systems'):
                    print(f"         📖 Catálogos: {result['catalog_systems']}")
                if result.get('scott_numbers'):
                    print(f"         🔢 Scott: {result['scott_numbers']}")
                if result.get('years'):
                    print(f"         📅 Años: {result['years']}")
                if result.get('colors'):
                    print(f"         🎨 Colores: {result['colors']}")
                if result.get('variety_classes'):
                    print(f"         🔀 Variedades: {result['variety_classes']}")
                
                # Texto truncado
                text = result.get('text', '')
                if len(text) > 200:
                    text = text[:200] + "..."
                print(f"         📝 Texto: {text}")
            
            if not results:
                print(f"   ⚠️ No se encontraron resultados")
            
        except Exception as e:
            print(f"   ❌ Error en búsqueda: {e}")
        
        print("   " + "-" * 50)

# # Ejecutar pruebas de búsqueda
# if client:
#     test_philatelic_searches(client)
# else:
#     print("⚠️ No se pueden ejecutar búsquedas sin conexión a Weaviate")

In [None]:
results = search_chunks_semantic(
                client, 
                "Costa Rica first issue (1862-1863) stamp. First stamp of Costa Rica.", 
                "Oxcart", 
                limit=20,
                filters=[],
                mode = "hybrid",
                alpha= 0.35
                
            )
            
print(f"   📊 Resultados: {len(results)}")

for j, result in enumerate(results, 1):
    print(f"\n      🏷️ #{j} (Score: {result['score']:.3f})")
    print(f"         📄 Documento: {result['doc_id']}")
    print(f"         📋 Tipo: {result['chunk_type']}")
    print(f"         📄 Página: {result['page_number']}")
    
    # Mostrar metadatos relevantes
    if result.get('catalog_systems'):
        print(f"         📖 Catálogos: {result['catalog_systems']}")
    if result.get('scott_numbers'):
        print(f"         🔢 Scott: {result['scott_numbers']}")
    if result.get('years'):
        print(f"         📅 Años: {result['years']}")
    if result.get('colors'):
        print(f"         🎨 Colores: {result['colors']}")
    if result.get('variety_classes'):
        print(f"         🔀 Variedades: {result['variety_classes']}")
    
    # Texto truncado
    text = result.get('text', '')
    if len(text) > 200:
        text = text[:200] + "..."
    print(f"         📝 Texto: {text}")
    print("**********************************************************************************************************")

## 7. Interfaz Gradio para RAG

Interfaz web interactiva para búsquedas semánticas y consultas RAG.

In [None]:
try:
    import gradio as gr
    import openai
    gradio_available = True
    print("✅ Gradio disponible")
except ImportError:
    print("⚠️ Gradio no está instalado")
    print("💡 Para instalar: pip install gradio")
    gradio_available = False

# Configurar OpenAI para RAG
if OPENAI_API_KEY:
    openai.api_key = OPENAI_API_KEY
    openai_available = True
else:
    openai_available = False
    print("⚠️ OpenAI API key no configurada para RAG")

In [None]:
import os
from typing import Any, Dict, List, Tuple

def search_and_answer(
    query: str,
    rag_system: Dict[str, Any],
    use_filters: bool = False,
    catalog_system: str = "",
    chunk_type: str = "",
    has_varieties: bool = False,
    max_results: int = 10,
) -> Tuple[str, List[Dict[str, Any]], Dict[str, Any]]:
    """
    Búsqueda semántica + RAG (OpenAI >= 1.0, modelo gpt-4o-mini).
    Devuelve: (respuesta_rag, resultados(lista de dicts), metadatos(dict))
    """
    # Validación de conexión
    if not rag_system or not rag_system.get("client"):
        meta = {"query": query, "total_results": 0, "max_results": max_results, "filters_used": {}, "context_length": 0}
        return "❌ Error: Sin conexión a Weaviate", [], meta

    client_wv = rag_system["client"]
    collection_name = rag_system.get("collection_name", "Oxcart")

    # Construir filtros
    filt = None
    if use_filters:
        filt = {}
        if catalog_system:
            filt["catalog_system"] = catalog_system
        if chunk_type:
            filt["chunk_type"] = chunk_type
        if has_varieties:
            filt["has_varieties"] = True

    # Búsqueda semántica (usa tu función ya definida)
    results = search_chunks_semantic(
        client=client_wv,
        query=query,
        collection_name=collection_name,
        limit=int(max_results),
        filters=filt,
        mode = "hybrid",
        alpha= 0.35
    )

    # Preparar contexto para RAG (top 3)
    top = results[:3]
    context = "\n\n".join(
        f"Documento {r.get('doc_id', 'N/A')} (Página {r.get('page_number', '¿?')}): {r.get('text','')}"
        for r in top
    )
    context_len = len(context)

    # Generar respuesta RAG (OpenAI >= 1.0.0)
    rag_answer = "⚠️ No se encontraron resultados para generar respuesta"
    openai_key = os.getenv("OPENAI_API_KEY")
    if not results:
        rag_answer = "⚠️ No se encontraron resultados para generar respuesta"
    elif not openai_key:
        rag_answer = "⚠️ RAG no disponible: OpenAI API key no configurada"
    else:
        try:
            from openai import OpenAI
            oa_client = OpenAI(api_key=openai_key)

            system_prompt = (
                "You are an expert in costa rica philately (stamps, covers, etc). "
                "Only answer based with the information provided. If there is not enough info for answer please, "
                "answer with: 'I dont have information'. You must include any references about philatelic like scott catalogue references, dates, etc."
            )

            model = os.getenv("RAG_MODEL", "gpt-4o-mini")
            resp = oa_client.chat.completions.create(
                model=model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": f"Here is the information for your answers:\n{context}\n\nAnswer this only with the information provided: {query}"}
                ],
                temperature=0.3,
                max_tokens=1000,
            )

            rag_text = resp.choices[0].message.content if resp.choices else ""
            if not rag_text:
                rag_text = "No se obtuvo texto de respuesta del modelo."

            rag_answer = (
                "🤖 **Respuesta RAG:**\n\n"
                + rag_text
                + f"\n\n📊 *Basado en {len(results)} resultados de búsqueda*"
            )
        except Exception as e:
            rag_answer = f"❌ Error generando respuesta RAG: {e}"

    metadata = {
        "query": query,
        "total_results": len(results),
        "max_results": int(max_results),
        "filters_used": filt or {},
        "context_length": context_len,
    }
    return rag_answer, results, metadata


In [None]:
def get_collection_info() -> str:
    """
    Obtener información de la colección para mostrar en la interfaz.
    """
    if not client:
        return "❌ Sin conexión a Weaviate"
    
    try:
        stats = get_collection_stats(client, "Oxcart")
        if stats:
            info = f"📊 **Estadísticas de la Colección Oxcart:**\n\n"
            info += f"📦 **Total chunks:** {stats['total_chunks']:,}\n"
            info += f"📄 **Documentos:** {stats['total_documents']}\n\n"
            
            if stats.get('documents'):
                info += "**Documentos indexados:**\n"
                for doc_id, count in stats['documents'].items():
                    info += f"• {doc_id}: {count:,} chunks\n"
            
            return info
        else:
            return "❌ No se pudieron obtener estadísticas"
    except Exception as e:
        return f"❌ Error: {e}"

print("✅ Funciones RAG definidas")

In [None]:
stats = get_collection_stats(client, "Oxcart")
stats['total_documents']
stats['total_chunks']

In [None]:
# Estructura que usan tus funciones de búsqueda/respuesta
rag_system = {
    "success": True,
    "client": client,                    # para que search_and_answer pueda consultar
    "collection_name": COLLECTION_NAME,  # nombre de la colección
    "weaviate_url": WEAVIATE_URL,        # info para la UI
    "total_documents": stats['total_documents'],       # para mostrar estado
    "total_chunks": stats['total_chunks'],        # opcional en la UI
    # puedes añadir más campos que tu search_and_answer necesite
}

In [None]:
import os
import gradio as gr
from typing import Dict, Any

def create_gradio_interface(rag_system: Dict[str, Any]) -> gr.Blocks:
    """
    Crea la interfaz Gradio para consultas RAG.
    """

    def gradio_search_and_answer(query, use_filters, catalog_system, chunk_type, has_varieties, max_results):
        """
        Wrapper para Gradio: llama a search_and_answer y formatea salidas.
        """
        if not rag_system:
            return "❌ Sistema RAG no está configurado", "No hay resultados", "No hay metadatos"

        # Llamada a tu función (se asume definida en tu entorno)
        answer, results, metadata = search_and_answer(
            query=query,
            rag_system=rag_system,
            use_filters=use_filters,
            catalog_system=catalog_system,
            chunk_type=chunk_type,
            has_varieties=has_varieties,
            max_results=int(max_results),
        )

        # --- Formatear resultados de búsqueda ---
        lines = []
        if results:
            for i, r in enumerate(results):
                doc_id = r.get("doc_id") or r.get("document_id", "N/A")
                chunk_type_val = r.get("chunk_type", "N/A")
                page_number = r.get("page_number", "N/A")
                catalogs = r.get("catalog_systems") or []
                scotts = r.get("scott_numbers") or []
                years = r.get("years") or []

                # Vista previa: usa content_preview si existe; si no, toma 'text'
                preview = r.get("content_preview")
                if not preview:
                    text = r.get("text", "")
                    preview = (text[:300] + "...") if len(text) > 300 else text

                block = []
                block.append(f"**Resultado {i+1}**")
                block.append(f"• Documento: {doc_id}")
                block.append(f"• Tipo: {chunk_type_val} | Página: {page_number}")
                if catalogs:
                    block.append(f"• Catálogos: {', '.join(catalogs)}")
                if scotts:
                    block.append(f"• Scott: {', '.join(scotts)}")
                if years:
                    block.append(f"• Años: {', '.join(str(y) for y in years)}")
                block.append(f"• Vista previa: {preview}")
                block.append("-" * 50)
                lines.append("\n".join(block))
            search_output = "\n".join(lines)
        else:
            search_output = "No se encontraron resultados"

        # --- Formatear metadatos ---
        metadata = metadata or {}
        metadata_output = (
            "**Metadatos de la consulta:**\n"
            f"• Consulta: {metadata.get('query', 'N/A')}\n"
            f"• Resultados encontrados: {metadata.get('total_results', 0)}\n"
            f"• Máximo solicitado: {metadata.get('max_results', 'N/A')}\n"
            f"• Filtros usados: {metadata.get('filters_used', {})}\n"
            f"• Longitud del contexto: {metadata.get('context_length', 'N/A')} caracteres\n"
        )

        return answer, search_output, metadata_output

    # Valores informativos del sistema
    collection_name = rag_system.get("collection_name", "Oxcart")
    total_docs = rag_system.get("total_documents", 0)
    weaviate_url = rag_system.get("weaviate_url") or os.getenv("WEAVIATE_URL", "http://localhost:8080")

    # --- UI ---
    with gr.Blocks(title="OXCART RAG - Consultas Filatélicas") as interface:
        gr.Markdown(
            "# 🔍 OXCART RAG - Sistema de Consultas Filatélicas\n\n"
            "Realiza consultas inteligentes sobre tu colección de documentos filatélicos "
            "usando búsqueda semántica y respuestas generadas por IA."
        )

        with gr.Row():
            with gr.Column(scale=2):
                # Input principal
                query_input = gr.Textbox(
                    label="💭 Tu consulta filatélica",
                    placeholder="Ej: ¿Qué sellos de España de 1950 están catalogados como Scott?",
                    lines=2,
                )

                # Botón de búsqueda
                search_btn = gr.Button("🔍 Buscar y Responder", variant="primary")

                # Consultas de ejemplo
                gr.Markdown("**💡 Consultas de ejemplo:**")
                example_queries = [
                    "¿Qué sellos conmemorativos de España están en la colección?",
                    "Muéstrame información sobre sellos con errores de perforación",
                    "¿Cuáles son los sellos más valiosos según el catálogo Michel?",
                    "Información sobre sellos de México de la década de 1960",
                    "¿Qué variedades filatélicas están documentadas?",
                ]
                # Botones que rellenan el textbox
                for example in example_queries:
                    gr.Button(example, variant="secondary").click(
                        fn=(lambda ex=example: ex),
                        inputs=None,
                        outputs=query_input,
                    )

            with gr.Column(scale=1):
                # Filtros avanzados
                gr.Markdown("**🎯 Filtros Avanzados**")

                use_filters = gr.Checkbox(label="Usar filtros específicos", value=False)

                catalog_system = gr.Dropdown(
                    choices=["", "Scott", "Michel", "Yvert", "Stanley Gibbons", "Edifil"],
                    label="Sistema de catálogo",
                    value="",
                )

                chunk_type = gr.Dropdown(
                    choices=["", "text", "table", "figure", "title", "header"],
                    label="Tipo de contenido",
                    value="",
                )

                has_varieties = gr.Checkbox(label="Solo documentos con variedades", value=False)

                max_results = gr.Slider(
                    minimum=1,
                    maximum=100,
                    value=5,
                    step=1,
                    label="Máximo resultados",
                )

        # Outputs
        with gr.Row():
            with gr.Column():
                gr.Markdown("## 🤖 Respuesta IA")
                answer_output = gr.Textbox(label="Respuesta generada", lines=8, interactive=False)

        with gr.Row():
            with gr.Column():
                gr.Markdown("## 📄 Documentos Encontrados")
                search_output = gr.Textbox(label="Resultados de búsqueda", lines=12, interactive=False)

            with gr.Column():
                gr.Markdown("## 📊 Metadatos")
                metadata_output = gr.Textbox(label="Información de la consulta", lines=10, interactive=False)

        # Eventos
        search_btn.click(
            fn=gradio_search_and_answer,
            inputs=[query_input, use_filters, catalog_system, chunk_type, has_varieties, max_results],
            outputs=[answer_output, search_output, metadata_output],
        )

        query_input.submit(
            fn=gradio_search_and_answer,
            inputs=[query_input, use_filters, catalog_system, chunk_type, has_varieties, max_results],
            outputs=[answer_output, search_output, metadata_output],
        )

        # Información del sistema
        gr.Markdown(
            "---\n"
            f"**📊 Estado del Sistema:**\n"
            f"• Colección: {collection_name}\n"
            f"• Documentos indexados: {total_docs:,}\n"
            f"• Weaviate URL: {weaviate_url}\n"
            "• Estado: ✅ Operativo\n"
        )

    return interface


# ---- Lanzador (opcional) ----
if rag_system and rag_system.get("success", False):
    print("\n" + "=" * 50)
    print("CREANDO INTERFAZ GRADIO")
    print("=" * 50)

    gradio_app = create_gradio_interface(rag_system)

    print("✅ Interfaz Gradio creada exitosamente")
    print("\n🚀 Lanzando interfaz...")

    GRADIO_PORT = int(os.getenv("GRADIO_PORT", 7860))
    GRADIO_SHARE = os.getenv("GRADIO_SHARE", "false").lower() == "true"

    gradio_app.launch(
        server_port=GRADIO_PORT,
        share=GRADIO_SHARE,
        inbrowser=True,
        show_error=True,  # si tu versión de Gradio no lo soporta, quítalo sin problema
    )
else:
    print("\n⚠️  No se puede crear la interfaz Gradio:")
    if not rag_system:
        print("   • Sistema RAG no está configurado")
    else:
        print(f"   • Error en RAG: {rag_system.get('error', 'Error desconocido')}")
    print("\n🔧 Para solucionar:")
    print("   1. Verifica que Weaviate esté corriendo (puerto según tu .env)")
    print("   2. Configura OPENAI_API_KEY en .env")
    print("   3. Ejecuta la indexación de documentos")
    print("   4. Reinicia este notebook")


In [None]:
# import gradio as gr
# gr.close_all()
