<a href="https://colab.research.google.com/github/neyronstalin/ETL_EXCEL-APUS/blob/main/ETL_UNICOS_XLSX_FIXED.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL UNICOS desde XLSX Multi-Hoja


## üìò Documento de Reglas y Contrato del ETL UNICOS

Consulta `ETL_UNICOS_CONTRATO.md` para el detalle del contrato.


## üß™ Diagn√≥stico (issues detectados en el notebook original)

- Widgets usados pero no definidos en el callback (`execute_button`, `manual_path_text`, `max_sheet_input`, `include_aux_checkbox`, `include_otro_checkbox`, `sheet_pattern_text`, `output_status`, `csv_path_text`, `xlsx_path_text`).
- Duplicaci√≥n de `export_results()` que genera ambig√ºedad y errores silenciosos.
- `filter_rows()` dejaba pasar descripciones con espacios (ej: `"   "`).
- `read_sheet_fast()` atrapaba errores y retornaba `None` sin trazabilidad.

Este notebook corrige todo lo anterior y deja un flujo **Run All safe** con UI completa.


In [58]:

# %% Setup de entorno
# Esta celda prepara el entorno del notebook:
# 1) Verifica/instala dependencias
# 2) Detecta librerias opcionales
# 3) Importa todo lo necesario para el ETL

import sys
import os
import re
import time
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Set
import warnings
import subprocess
import importlib.util
warnings.filterwarnings('ignore')

print("")
print("="*60)
print("SETUP: Instalando dependencias...")
print("="*60)
print(f"Python executable: {sys.executable}")
print(f"Python version: {sys.version.split()[0]}")

# Paquetes requeridos para el flujo principal
required_packages = {
    'pandas': 'pandas',
    'numpy': 'numpy',
    'openpyxl': 'openpyxl',
    'ipywidgets': 'ipywidgets',
}

def install_if_missing(package_name, import_name=None):
    """Instala el paquete solo si no existe en el kernel actual."""
    import_name = import_name or package_name
    if importlib.util.find_spec(import_name) is not None:
        print(f"‚úì {package_name} ya est√° instalado")
        return

    print(f"‚è≥ Instalando {package_name}...")
    try:
        result = subprocess.run(
            [sys.executable, "-m", "pip", "install", "-q", package_name],
            capture_output=True,
            text=True,
            check=True,
        )
        if result.stdout.strip():
            print(result.stdout.strip())
        if result.stderr.strip():
            print(result.stderr.strip())
        print(f"‚úì {package_name} instalado")
    except subprocess.CalledProcessError as e:
        print(f"‚úó Error instalando {package_name}")
        if e.stdout:
            print(e.stdout)
        if e.stderr:
            print(e.stderr)
        raise

    if importlib.util.find_spec(import_name) is None:
        raise ImportError(f"No se pudo importar {import_name} despu√©s de la instalaci√≥n")

for pkg, imp in required_packages.items():
    install_if_missing(pkg, imp)

# Opcional: python-calamine para lectura ultra r√°pida
print("")
print("‚è≥ Verificando python-calamine (lectura ultra-rapida)...")
try:
    import calamine
    CALAMINE_AVAILABLE = True
    print("‚úì python-calamine disponible")
except ImportError:
    CALAMINE_AVAILABLE = False
    print("‚Ñπ python-calamine no disponible (usando openpyxl)")
    print("  Para instalar: pip install python-calamine")

# Opcional: itables para visualizaci√≥n interactiva
print("")
print("‚è≥ Verificando itables (visor interactivo)...")
try:
    import itables
    ITABLES_AVAILABLE = True
    print("‚úì itables disponible")
except ImportError:
    ITABLES_AVAILABLE = False
    print("‚Ñπ itables no disponible")
    print("  Para instalar: pip install itables")

# Importar dependencias luego de instalarlas
import pandas as pd
import numpy as np

import ipywidgets as widgets
from ipywidgets import FileUpload, VBox, HBox, Button, Label, HTML, Output, IntText, Text, Checkbox, Dropdown
from IPython.display import display, clear_output

print("")
print("="*60)
print("‚úì Entorno configurado correctamente")
print("="*60)
print("")



SETUP: Instalando dependencias...
Python executable: /usr/bin/python3
Python version: 3.12.12
‚úì pandas ya est√° instalado
‚úì numpy ya est√° instalado
‚úì openpyxl ya est√° instalado
‚úì ipywidgets ya est√° instalado

‚è≥ Verificando python-calamine (lectura ultra-rapida)...
‚Ñπ python-calamine no disponible (usando openpyxl)
  Para instalar: pip install python-calamine

‚è≥ Verificando itables (visor interactivo)...
‚Ñπ itables no disponible
  Para instalar: pip install itables

‚úì Entorno configurado correctamente



In [59]:

# %% Configuraci√≥n de carpeta de entrada y carga de archivos

import shutil

# Crear carpeta de entrada si no existe
INPUT_FOLDER = Path('./input')
INPUT_FOLDER.mkdir(exist_ok=True)

OUTPUT_FOLDER = Path('./output')
OUTPUT_FOLDER.mkdir(exist_ok=True)

# Diccionario global para almacenar la ruta actual del archivo
current_file_path = {'path': None}

print("="*70)
print("üìÅ CONFIGURACI√ìN DE CARPETAS")
print("="*70)
print(f"Carpeta de entrada: {INPUT_FOLDER.absolute()}")
print(f"Carpeta de salida: {OUTPUT_FOLDER.absolute()}")

# UI para cargar archivo desde carpeta de entrada
input_status = Output()

input_file_upload = FileUpload(
    accept='.xlsx',
    multiple=False,
    description='üì§ Selecciona tu Excel:',
    layout=widgets.Layout(width='500px')
)

def list_input_files():
    """Lista archivos Excel en la carpeta de entrada."""
    excel_files = list(INPUT_FOLDER.glob('*.xlsx')) + list(INPUT_FOLDER.glob('*.xls'))
    return excel_files

def handle_input_upload(change):
    """Guarda el archivo en la carpeta de entrada."""
    input_status.clear_output()
    if input_file_upload.value:
        uploaded_files = input_file_upload.value
        if isinstance(uploaded_files, tuple):
            for file_info in uploaded_files:
                filename = file_info.get('name', 'archivo.xlsx')
                content = file_info.get('content', b'')
                input_path = INPUT_FOLDER / filename
                with open(input_path, 'wb') as f:
                    f.write(content)
                current_file_path['path'] = str(input_path)
                with input_status:
                    print(f"‚úÖ Archivo cargado: {filename}")
                    print(f"üìç Ubicaci√≥n: {input_path.absolute()}")
        else:
            for filename, filedata in uploaded_files.items():
                input_path = INPUT_FOLDER / filename
                with open(input_path, 'wb') as f:
                    f.write(filedata['content'])
                current_file_path['path'] = str(input_path)
                with input_status:
                    print(f"‚úÖ Archivo cargado: {filename}")
                    print(f"üìç Ubicaci√≥n: {input_path.absolute()}")

input_file_upload.observe(handle_input_upload, names='value')

# Bot√≥n para limpiar carpeta
clean_button = Button(
    description='üóëÔ∏è Limpiar entrada',
    button_style='warning',
    layout=widgets.Layout(width='150px')
)

def on_clean_click(b):
    """Limpia la carpeta de entrada."""
    with input_status:
        input_status.clear_output()
        try:
            if list(INPUT_FOLDER.glob('*')):
                shutil.rmtree(INPUT_FOLDER)
                INPUT_FOLDER.mkdir(exist_ok=True)
                current_file_path['path'] = None
                print("‚úÖ Carpeta de entrada limpiada")
            else:
                print("‚ÑπÔ∏è La carpeta ya est√° vac√≠a")
        except Exception as e:
            print(f"‚ùå Error al limpiar: {e}")

clean_button.on_click(on_clean_click)

# Bot√≥n para listar archivos
list_button = Button(
    description='üìã Listar archivos',
    button_style='info',
    layout=widgets.Layout(width='150px')
)

def on_list_click(b):
    """Lista los archivos en la carpeta de entrada."""
    with input_status:
        input_status.clear_output()
        files = list_input_files()
        if files:
            print("üìÇ Archivos en la carpeta de entrada:")
            for i, f in enumerate(files, 1):
                size_mb = f.stat().st_size / (1024*1024)
                selected = " ‚Üê SELECCIONADO" if current_file_path['path'] == str(f) else ""
                print(f"  {i}. {f.name} ({size_mb:.2f} MB){selected}")
        else:
            print("üì≠ No hay archivos Excel en la carpeta de entrada")

list_button.on_click(on_list_click)

# Construir panel de carga (se mostrar√° en el Panel ETL)
input_ui = VBox([
    HTML('<h3>üì• Carga de Archivo</h3>'),
    input_file_upload,
    HBox([clean_button, list_button]),
    input_status
])

print("")
print("="*70)
print("‚úÖ Carpeta de entrada configurada correctamente")
print("="*70)
print("")


üìÅ CONFIGURACI√ìN DE CARPETAS
Carpeta de entrada: /content/input
Carpeta de salida: /content/output

‚úÖ Carpeta de entrada configurada correctamente



In [60]:
# %% Configuraci√≥n global
# Mapeo de columnas del Excel hacia nombres internos
# Nota: solo se leen columnas espec√≠ficas para optimizar rendimiento.
COLUMN_MAPPING = {
    'A': 'Column1',
    'B': 'Column2',
    'C': 'Column3',
    'E': 'Column5',
    'G': 'Column7',
    'J': 'Column10',
    'K': 'Column11',
    'L': 'Column12',
}

# Clasificaci√≥n por primera letra (se mantiene para compatibilidad con extract_code/debug si es necesario)
RESOURCE_TYPE_MAP = {
    'M': 'EQUIPO',
    'N': 'MANO DE OBRA',
    'O': 'MATERIAL',
    'P': 'TRANSPORTE',
}

# Nuevo mapeo de texto de encabezado a tipo de recurso
HEADER_TO_RESOURCE_TYPE_MAP = {
    'EQUIPOS': 'EQUIPO',
    'MANO DE OBRA': 'MANO DE OBRA',
    'MATERIALES': 'MATERIAL',
    'MATERIAL': 'MATERIAL', # Para ambos, plural y singular
    'TRANSPORTE': 'TRANSPORTE',
}

# Regex para detectar c√≥digos embebidos (ej: M.01, N02, O-03)
CODE_PATTERN = r'([MNOP])\s*[\.-]?\s*\d+'

# Patrones de encabezados que deben excluirse (o usarse para clasificaci√≥n)
# Ahora usamos las claves de HEADER_TO_RESOURCE_TYPE_MAP para la detecci√≥n
SECTION_HEADERS_PATTERNS = list(HEADER_TO_RESOURCE_TYPE_MAP.keys())

def is_section_header(text: str) -> bool:
    """Detecta si un texto es un encabezado de secci√≥n que define un tipo de recurso."""
    if pd.isna(text) or text == '':
        return False
    text_upper = str(text).strip().upper()
    for header_key in HEADER_TO_RESOURCE_TYPE_MAP.keys():
        if re.match(r'^' + re.escape(header_key) + r'$', text_upper, re.IGNORECASE):
            return True
    return False

def get_resource_type_from_header(header_text: str) -> str:
    """Mapea el texto del encabezado a un tipo de recurso conocido."""
    if pd.isna(header_text) or header_text == '':
        return 'OTRO'
    normalized_text = str(header_text).strip().upper()
    for header_key, resource_type in HEADER_TO_RESOURCE_TYPE_MAP.items():
        if re.match(r'^' + re.escape(header_key) + r'$', normalized_text, re.IGNORECASE):
            return resource_type
    return 'OTRO' # Default if no match

print("‚úì Configuraci√≥n global cargada")

‚úì Configuraci√≥n global cargada


In [61]:

# %% Funciones ETL - Selecci√≥n de hojas y lectura
def pick_sheets(
    xlsx_path: str,
    sheet_pattern: str = r'^\d{3}$',
    max_sheet: int = None,
    include_aux: bool = True
) -> List[str]:
    """
    Selecciona hojas seg√∫n patr√≥n regex y l√≠mite.
    Respeta el orden original del archivo.
    """
    try:
        excel_file = pd.ExcelFile(xlsx_path)
        all_sheets = excel_file.sheet_names
    except Exception as e:
        print(f"‚ùå Error abriendo {xlsx_path}: {e}")
        return []

    # Compilar regex del patr√≥n de hojas
    try:
        pattern = re.compile(sheet_pattern)
    except Exception:
        print(f"‚ùå Patr√≥n regex inv√°lido: {sheet_pattern}")
        return []

    # Hojas num√©ricas (seg√∫n patr√≥n)
    numeric_sheets = [s for s in all_sheets if pattern.match(s)]

    # Hojas auxiliares
    aux_sheets = [s for s in all_sheets if s.startswith('Aux.')] if include_aux else []

    # Combinar respetando orden original del archivo
    selected = [s for s in all_sheets if s in numeric_sheets or s in aux_sheets]

    # Limitar cantidad de hojas si se pide
    if max_sheet and max_sheet > 0:
        selected = selected[:max_sheet]

    return selected

def read_sheet_fast(
    xlsx_path: str,
    sheet_name: str,
    column_mapping: Dict[str, str] = COLUMN_MAPPING,
    return_error: bool = False
):
    """
    Lee solo columnas necesarias de una hoja.
    Intenta calamine primero (r√°pido), fallback a openpyxl.
    Si return_error=True, retorna (df, error).
    """
    calamine_error = None

    # Intentar con python-calamine si est√° disponible
    if CALAMINE_AVAILABLE:
        try:
            from calamine import load_workbook
            wb = load_workbook(xlsx_path)
            ws = wb.sheet_by_name(sheet_name)

            # Indices 0-based para columnas A, B, C, E, G, J, K, L
            col_indices = {'A': 0, 'B': 1, 'C': 2, 'E': 4, 'G': 6, 'J': 9, 'K': 10, 'L': 11}
            data = {}

            for col_letter, col_name in column_mapping.items():
                col_idx = col_indices[col_letter]
                col_data = []
                for row in ws.rows():
                    if col_idx < len(row):
                        cell = row[col_idx]
                        col_data.append(cell.value if hasattr(cell, 'value') else cell)
                    else:
                        col_data.append(None)
                data[col_name] = col_data

            df = pd.DataFrame(data)
            return (df, None) if return_error else df
        except Exception as e:
            calamine_error = f"calamine: {type(e).__name__}: {e}"

    # Fallback: openpyxl
    try:
        from openpyxl import load_workbook
        wb = load_workbook(xlsx_path, data_only=True, read_only=True)
        ws = wb[sheet_name]

        # Construir columnas vac√≠as
        col_letters = list(column_mapping.keys())
        data = {column_mapping[col]: [] for col in col_letters}

        # Recorrer filas y extraer columnas clave
        for row in ws.iter_rows(min_col=1, max_col=12, values_only=False):
            for col_letter, col_name in column_mapping.items():
                col_idx = ord(col_letter) - ord('A')
                if col_idx < len(row):
                    cell = row[col_idx]
                    value = cell.value if hasattr(cell, 'value') else cell
                    data[col_name].append(value)
                else:
                    data[col_name].append(None)

        wb.close()
        df = pd.DataFrame(data)
        return (df, None) if return_error else df

    except Exception as e:
        err = f"openpyxl: {type(e).__name__}: {e}"
        if calamine_error:
            err = f"{calamine_error} | {err}"
        return (None, err) if return_error else None

print("‚úì pick_sheets() y read_sheet_fast() cargados")


‚úì pick_sheets() y read_sheet_fast() cargados


In [62]:
# %% Funciones ETL - Filtrado y clasificaci√≥n
def filter_rows(df: pd.DataFrame) -> pd.DataFrame:
    """
    Excluye filas con descripci√≥n vac√≠a (Column3).
    Los encabezados de secci√≥n se manejan en `run_etl`.
    """
    df = df.copy()

    # Filtrar nulos/vac√≠os/espacios en descripci√≥n
    desc = df['Column3']
    desc_str = desc.astype(str).str.strip()
    df = df[desc.notna() & desc_str.ne('')]

    # La exclusi√≥n de encabezados de secci√≥n se mueve a run_etl o se hace en process_row

    return df.reset_index(drop=True)

def extract_code(row_dict: Dict) -> str:
    """
    Intenta extraer el c√≥digo desde Column1/Column2/Column3.
    Si no encuentra patr√≥n, devuelve texto normalizado del campo m√°s confiable.
    """
    candidates = [row_dict.get('Column1', ''), row_dict.get('Column2', ''), row_dict.get('Column3', '')]
    for val in candidates:
        if pd.isna(val) or val == '':
            continue
        m = re.search(CODE_PATTERN, str(val).upper())
        if m:
            return m.group(0).replace(' ', '')

    # Fallback: usar Column1/Column2 sin patr√≥n
    fallback = normalize_text_field(row_dict.get('Column1', '')) or normalize_text_field(row_dict.get('Column2', ''))
    return fallback

def classify_resource(code_or_text: str) -> str:
    """Clasifica recurso por primera letra encontrada (usado para debug con CODE_PATTERN)."""
    text = str(code_or_text).upper() if code_or_text else ''
    if text:
        letter = text[0]
        return RESOURCE_TYPE_MAP.get(letter, 'OTRO')
    return 'OTRO'

def normalize_numeric(value, is_percentage=False) -> float:
    """
    Normaliza valores num√©ricos con m√∫ltiples formatos:
    - "1,234.56", "1.234,56", "1234,56", "1234.56", "10%", "0.1"

    Nota: si is_percentage=True, convierte 40 -> 0.40
    """
    if pd.isna(value) or value == '':
        return 0.0

    value_str = str(value).strip()

    # Quitar s√≠mbolo % si existe
    if '%' in value_str:
        value_str = value_str.replace('%', '').strip()

    comma_count = value_str.count(',')
    dot_count = value_str.count('.')

    try:
        if comma_count > 1 or dot_count > 1:
            if ',' in value_str and '.' in value_str:
                if value_str.rindex(',') > value_str.rindex('.'):
                    value_str = value_str.replace('.', '').replace(',', '.')
                else:
                    value_str = value_str.replace(',', '')
            elif comma_count > 1:
                value_str = value_str.replace(',', '.', comma_count - 1).replace(',', '.')
        elif comma_count == 1 and dot_count == 0:
            parts = value_str.split(',')
            if len(parts[1]) > 2:
                value_str = value_str.replace(',', '')
            else:
                value_str = value_str.replace(',', '.')

        result = float(value_str)

        if is_percentage:
            result = result / 100.0

        return result
    except Exception:
        return 0.0

def normalize_text_field(value) -> str:
    """Normaliza campos de texto."""
    if pd.isna(value) or value == '':
        return ''
    return str(value).strip()

print("‚úì Funciones de filtrado cargadas")

‚úì Funciones de filtrado cargadas


In [63]:
# %% Funciones ETL - Procesamiento de filas y UNICOS
def process_row(row_dict: Dict, debug: bool = False, inferred_recurso: str = 'OTRO') -> Optional[Dict]:
    """Procesa una fila y devuelve dict normalizado o None si no es v√°lida."""
    try:
        col1 = row_dict.get('Column1', '')
        col2 = row_dict.get('Column2', '')
        col3 = row_dict.get('Column3', '')
        col5 = row_dict.get('Column5', '')
        col7 = row_dict.get('Column7', '')
        col10 = row_dict.get('Column10', '')
        col11 = row_dict.get('Column11', '')
        col12 = row_dict.get('Column12', '')

        # Validaci√≥n: debe haber descripci√≥n
        descripcion = normalize_text_field(col3)
        if descripcion == '':
            return None

        # Excluir encabezados de secci√≥n. Si una fila de datos cumple el patr√≥n de encabezado,
        # no la procesamos como dato, aunque no haya sido detectada como secci√≥n principal en run_etl.
        if is_section_header(col1) or is_section_header(col3):
            return None

        # RECURSO AHORA ES INFERIDO POR EL CONTEXTO DE LA HOJA (√∫ltimo encabezado de secci√≥n)
        recurso = inferred_recurso

        # Detectar c√≥digo para debug, si aplica (utiliza la clasificaci√≥n original por letra si debug)
        codigo = extract_code(row_dict) if debug else ''

        # PRECIO UNITARIO y UNIDAD seg√∫n RECURSO (basado en el RECURSO inferido)
        if recurso in ('EQUIPO', 'MANO DE OBRA'):
            precio_unitario = normalize_numeric(col5)
            unidad = ''
        else:
            precio_unitario = normalize_numeric(col7)
            unidad = normalize_text_field(col5)

        # Otros campos
        cpc = normalize_text_field(col10)
        np_nd = normalize_text_field(col11)
        vae = normalize_numeric(col12, is_percentage=False)

        result = {
            'DESCRIPCION': descripcion,
            'UNIDAD': unidad,
            'PRECIO UNITARIO': precio_unitario,
            'CPC ELEMENTO': cpc,
            'NP/ND/EP': np_nd if np_nd else '',
            'VAE (%)': vae,
            'RECURSO': recurso
        }

        if debug:
            result['CODIGO'] = codigo
            # Si debug, tambi√©n puedes querer el recurso original si codigo est√° presente
            # Pero para mantenerlo claro, 'RECURSO' se basa en el header.

        return result
    except Exception:
        return None

def build_unicos(
    all_rows: List[Dict],
    include_otro: bool = False
) -> pd.DataFrame:
    """
    Construye DataFrame UNICOS final:
    - Concatena todas las filas
    - Deduplicaci√≥n
    - Normalizaci√≥n
    - Ordenamiento
    """
    if not all_rows:
        print("‚ùå No hay filas para procesar")
        return pd.DataFrame()

    df = pd.DataFrame(all_rows)

    # Excluir OTRO si aplica
    if not include_otro:
        df = df[df['RECURSO'] != 'OTRO']

    # Columnas finales (sin CODIGO ni HOJA)
    final_cols = ['DESCRIPCION', 'UNIDAD', 'PRECIO UNITARIO', 'CPC ELEMENTO', 'NP/ND/EP', 'VAE (%)', 'RECURSO']
    if 'CODIGO' in df.columns: # Mantener CODIGO si debug estuvo activado y se gener√≥
        final_cols.append('CODIGO')
    if 'HOJA' in df.columns: # Mantener HOJA si debug estuvo activado y se gener√≥
        final_cols.append('HOJA')
    if 'ROW_INDEX' in df.columns: # Mantener ROW_INDEX si debug estuvo activado y se gener√≥
        final_cols.append('ROW_INDEX')

    # Asegurarse de que todas las columnas finales existan antes de seleccionarlas
    df_final_cols_present = [col for col in final_cols if col in df.columns]
    df = df[df_final_cols_present]

    # Deduplicaci√≥n exacta de filas
    df = df.drop_duplicates(subset=[col for col in df_final_cols_present if col not in ['CODIGO', 'HOJA', 'ROW_INDEX']])

    # Normalizaci√≥n post-dedup
    df['NP/ND/EP'] = df['NP/ND/EP'].fillna('').astype(str)
    df['VAE (%)'] = df['VAE (%)'].fillna(0).astype(float)
    df['PRECIO UNITARIO'] = df['PRECIO UNITARIO'].fillna(0).astype(float)
    df['UNIDAD'] = df['UNIDAD'].fillna('').astype(str)

    # Ordenar por DESCRIPCION
    df = df.sort_values('DESCRIPCION', ignore_index=True)

    return df

print("‚úì process_row() y build_unicos() cargados")

‚úì process_row() y build_unicos() cargados


In [64]:
# %% Ejecuci√≥n del ETL
def run_etl(source: str, options: Dict) -> Dict:
    """
    Ejecuta el pipeline ETL completo.

    Args:
        source: ruta del archivo Excel
        options: dict con sheet_pattern, max_sheet, include_aux, include_otro, debug

    Returns:
        results dict con:
        - df_unicos
        - df_sheet_summary
        - df_debug (si debug)
        - df_debug_agg (si debug)
        - stats
    """
    options = options or {}
    sheet_pattern = options.get('sheet_pattern', r'^\d{3}$')
    max_sheet = options.get('max_sheet') or None
    include_aux = options.get('include_aux', True)
    include_otro = options.get('include_otro', False)
    debug = options.get('debug', False)
    progress_every = options.get('progress_every', 20)

    stats = {
        'total_sheets': 0,
        'sheets_ok': 0,
        'sheets_failed': 0,
        'total_rows_read': 0,
        'total_rows_filtered': 0, # Rows that passed basic description filter
        'total_rows_extracted': 0, # Rows that were successfully processed by process_row
        'duplicates_removed': 0,
        'final_rows': 0,
        'time_seconds': 0,
        'failed_sheet_details': []
    }

    start_time = time.time()

    # 1. Seleccionar hojas
    sheets_to_process = pick_sheets(
        source,
        sheet_pattern=sheet_pattern,
        max_sheet=max_sheet,
        include_aux=include_aux
    )

    stats['total_sheets'] = len(sheets_to_process)
    total_sheets = stats['total_sheets']

    all_rows = []
    debug_rows = []
    sheet_summary_rows = []

    # 2. Procesar cada hoja
    for i, sheet_name in enumerate(sheets_to_process, 1):
        if progress_every and (i == 1 or i % progress_every == 0 or i == total_sheets):
            print(f"‚è≥ Progreso: {i}/{total_sheets} hojas... (actual: {sheet_name})")

        df_raw, err = read_sheet_fast(source, sheet_name, return_error=True)

        if err or df_raw is None or df_raw.empty:
            stats['sheets_failed'] += 1
            error_msg = err or 'Hoja vac√≠a o no legible'
            stats['failed_sheet_details'].append({
                'HOJA': sheet_name,
                'ERROR': error_msg
            })
            sheet_summary_rows.append({
                'HOJA': sheet_name,
                'ROWS_READ': 0,
                'ROWS_FILTERED': 0,
                'ROWS_EXTRACTED': 0,
                'ROW_ERRORS': 0,
                'STATUS': 'FAILED',
                'ERROR': error_msg
            })
            continue

        # Agregar √≠ndice de fila para trazabilidad antes de cualquier filtrado/iteraci√≥n
        df_raw = df_raw.copy()
        df_raw['ROW_INDEX'] = df_raw.index + 1

        current_sheet_recurso = 'OTRO'
        sheet_rows_read = len(df_raw)
        sheet_rows_filtered = 0
        sheet_rows_extracted = 0
        sheet_row_errors = 0

        # Iterar el DataFrame raw para identificar secciones y procesar filas de datos
        for _, row_series in df_raw.iterrows():
            row_dict = row_series.to_dict()
            col1_val = normalize_text_field(row_dict.get('Column1', ''))
            col3_val = normalize_text_field(row_dict.get('Column3', ''))

            # Si la fila es un encabezado de secci√≥n, actualizamos el recurso actual
            if is_section_header(col1_val) or is_section_header(col3_val):
                # Intentar obtener el tipo de recurso del encabezado de la columna 1 o 3
                detected_resource = 'OTRO'
                if is_section_header(col1_val):
                    detected_resource = get_resource_type_from_header(col1_val)
                elif is_section_header(col3_val):
                    detected_resource = get_resource_type_from_header(col3_val)

                if detected_resource != 'OTRO':
                    current_sheet_recurso = detected_resource
                continue # No procesar la fila del encabezado como dato

            # Si no es un encabezado de secci√≥n, intentar procesar como fila de datos
            processed = process_row(row_dict, debug=debug, inferred_recurso=current_sheet_recurso)
            if processed:
                sheet_rows_extracted += 1
                sheet_rows_filtered += 1 # Cuenta las filas que pasaron la validaci√≥n de descripci√≥n en process_row
                if debug:
                    # HOJA y ROW_INDEX ya est√°n en row_dict si df_raw['ROW_INDEX'] se hizo.
                    # process_row ya deber√≠a haber tomado ROW_INDEX si debug es True
                    processed_with_debug = processed.copy()
                    if 'HOJA' not in processed_with_debug: processed_with_debug['HOJA'] = sheet_name
                    if 'ROW_INDEX' not in processed_with_debug: processed_with_debug['ROW_INDEX'] = int(row_dict.get('ROW_INDEX', 0))
                    debug_rows.append(processed_with_debug)
                all_rows.append(processed)
            else:
                # Si process_row retorna None, significa que no es una fila de datos v√°lida (ej: descripci√≥n vac√≠a, o es un encabezado)
                # Esto ya se cuenta en sheet_row_errors.
                sheet_row_errors += 1
                # Aqu√≠ no se incrementa sheet_rows_filtered porque no es una fila 'filtrada' para procesamiento, es 'descartada'.

            # Actualizar stats para el resumen de la hoja

        stats['total_rows_read'] += sheet_rows_read
        stats['total_rows_filtered'] += sheet_rows_filtered # Total de filas v√°lidas procesadas como datos
        stats['total_rows_extracted'] += sheet_rows_extracted

        if sheet_rows_extracted > 0:
            stats['sheets_ok'] += 1
            sheet_summary_rows.append({
                'HOJA': sheet_name,
                'ROWS_READ': sheet_rows_read,
                'ROWS_FILTERED': sheet_rows_filtered,
                'ROWS_EXTRACTED': sheet_rows_extracted,
                'ROW_ERRORS': sheet_row_errors, # Errores de process_row
                'STATUS': 'OK' if sheet_row_errors == 0 else 'OK_WITH_ERRORS',
                'ERROR': ''
            })
        else:
            stats['sheets_failed'] += 1
            error_msg = 'Sin filas de datos v√°lidas (solo encabezados o descripciones vac√≠as)'
            stats['failed_sheet_details'].append({
                'HOJA': sheet_name,
                'ERROR': error_msg
            })
            sheet_summary_rows.append({
                'HOJA': sheet_name,
                'ROWS_READ': sheet_rows_read,
                'ROWS_FILTERED': sheet_rows_filtered,
                'ROWS_EXTRACTED': sheet_rows_extracted,
                'ROW_ERRORS': sheet_row_errors,
                'STATUS': 'FAILED',
                'ERROR': error_msg
            })


    # 3. Construir UNICOS
    df_unicos = build_unicos(all_rows, include_otro=include_otro)

    stats['final_rows'] = len(df_unicos)
    stats['duplicates_removed'] = stats['total_rows_extracted'] - stats['final_rows']
    stats['time_seconds'] = time.time() - start_time

    # 4. Sheet summary
    if sheet_summary_rows:
        df_sheet_summary = pd.DataFrame(sheet_summary_rows)
        df_sheet_summary = df_sheet_summary.sort_values('HOJA', ignore_index=True)
    else:
        df_sheet_summary = pd.DataFrame(columns=[
            'HOJA', 'ROWS_READ', 'ROWS_FILTERED', 'ROWS_EXTRACTED', 'ROW_ERRORS', 'STATUS', 'ERROR'
        ])

    # 5. Debug outputs
    df_debug = None
    df_debug_agg = None
    if debug and debug_rows:
        df_debug = pd.DataFrame(debug_rows)

        final_cols = ['DESCRIPCION', 'UNIDAD', 'PRECIO UNITARIO', 'CPC ELEMENTO', 'NP/ND/EP', 'VAE (%)', 'RECURSO']

        def unique_list(series):
            seen = set()
            out = []
            for v in series:
                if pd.isna(v):
                    continue
                v_str = str(v).strip()
                if v_str == '':
                    continue
                if v_str not in seen:
                    seen.add(v_str)
                    out.append(v_str)
            return out

        df_debug_agg = (
            df_debug
            .groupby(final_cols, dropna=False)
            .agg(
                HOJAS=('HOJA', unique_list),
                CODIGOS=('CODIGO', unique_list),
                FILAS=('HOJA', 'count')
            )
            .reset_index()
        )

    results = {
        'df_unicos': df_unicos,
        'df_sheet_summary': df_sheet_summary,
        'df_debug': df_debug,
        'df_debug_agg': df_debug_agg,
        'stats': stats
    }

    return results

print("‚úì run_etl() cargado")

‚úì run_etl() cargado


In [65]:

# %% Similitud heur√≠stica (funciones)
# Configuraci√≥n
SIM_THRESHOLD = 90
SIM_SCOPE = 'RECURSO'
SIM_EXPORT_PATH = 'output/unicos_similitud.xlsx'
SIM_PROGRESS_EVERY = 100

# Motor de similitud
try:
    from rapidfuzz import fuzz
    RAPIDFUZZ_AVAILABLE = True
except Exception:
    RAPIDFUZZ_AVAILABLE = False

import difflib

def normalize_desc(text: str) -> str:
    if pd.isna(text):
        return ''
    s = str(text).lower().strip()
    s = re.sub(r'\s+', ' ', s)
    return s


def get_similarity(a: str, b: str) -> float:
    if not a or not b:
        return 0.0
    if RAPIDFUZZ_AVAILABLE:
        return float(fuzz.ratio(a, b))
    return difflib.SequenceMatcher(None, a, b).ratio() * 100.0


def find_similar_candidates(
    df_unicos: pd.DataFrame,
    threshold: int = SIM_THRESHOLD,
    scope: str = SIM_SCOPE,
    progress_every: int = SIM_PROGRESS_EVERY
) -> pd.DataFrame:
    """
    Encuentra pares de descripciones con similitud >= threshold.
    Scope por defecto: mismo RECURSO.
    """
    if df_unicos is None or df_unicos.empty:
        return pd.DataFrame(columns=['DESCRIPCION_A', 'DESCRIPCION_B', 'SIMILARIDAD', 'RECURSO'])

    df = df_unicos.copy()
    df['DESC_NORM'] = df['DESCRIPCION'].apply(normalize_desc)
    df = df[df['DESC_NORM'] != '']

    records = []
    groups = df.groupby('RECURSO') if scope == 'RECURSO' else [('ALL', df)]

    for recurso, g in groups:
        g = g[['DESCRIPCION', 'DESC_NORM']].drop_duplicates().copy()
        if g.empty:
            continue

        g['BLOCK'] = g['DESC_NORM'].str[:4]
        blocks = g.groupby('BLOCK')

        comparisons = 0
        for block_key, gb in blocks:
            items = gb.to_dict('records')
            n = len(items)
            if n < 2:
                continue
            for i in range(n):
                a = items[i]
                for j in range(i + 1, n):
                    b = items[j]
                    sim = get_similarity(a['DESC_NORM'], b['DESC_NORM'])
                    comparisons += 1
                    if progress_every and comparisons % progress_every == 0:
                        print(f"‚è≥ Similitud: {comparisons} comparaciones (RECURSO={recurso})")
                    if sim >= threshold:
                        records.append({
                            'DESCRIPCION_A': a['DESCRIPCION'],
                            'DESCRIPCION_B': b['DESCRIPCION'],
                            'SIMILARIDAD': round(sim, 2),
                            'RECURSO': recurso
                        })

    if not records:
        return pd.DataFrame(columns=['DESCRIPCION_A', 'DESCRIPCION_B', 'SIMILARIDAD', 'RECURSO'])

    df_candidates = pd.DataFrame(records)
    df_candidates = df_candidates.sort_values('SIMILARIDAD', ascending=False)

    # Mantener mejor match por DESCRIPCION_A
    df_candidates = df_candidates.drop_duplicates(subset=['DESCRIPCION_A'], keep='first')

    return df_candidates.reset_index(drop=True)


def build_canonical_map(df_unicos: pd.DataFrame, df_candidates: pd.DataFrame) -> pd.DataFrame:
    """
    Propone un nombre can√≥nico por cada candidato.
    Regla: m√°s frecuente, desempate por menor longitud.
    """
    if df_candidates is None or df_candidates.empty:
        return pd.DataFrame(columns=['ORIGINAL', 'SIMILAR', 'PROPUESTA', 'SIMILARIDAD', 'RECURSO'])

    freq = df_unicos['DESCRIPCION'].value_counts().to_dict()

    suggestions = []
    for _, row in df_candidates.iterrows():
        a = row['DESCRIPCION_A']
        b = row['DESCRIPCION_B']
        fa = freq.get(a, 0)
        fb = freq.get(b, 0)

        if fa > fb:
            propuesta = a
        elif fb > fa:
            propuesta = b
        else:
            propuesta = a if len(a) <= len(b) else b

        suggestions.append({
            'ORIGINAL': a,
            'SIMILAR': b,
            'PROPUESTA': propuesta,
            'SIMILARIDAD': row['SIMILARIDAD'],
            'RECURSO': row['RECURSO']
        })

    return pd.DataFrame(suggestions)


def export_similarity_results(df_unicos_revisado: pd.DataFrame, df_mapeo: pd.DataFrame, path: str = SIM_EXPORT_PATH) -> Tuple[bool, str]:
    if df_unicos_revisado is None or df_unicos_revisado.empty:
        return False, "‚ö†Ô∏è df_unicos_revisado est√° vac√≠o"
    if df_mapeo is None or df_mapeo.empty:
        return False, "‚ö†Ô∏è df_mapeo est√° vac√≠o"

    out_path = Path(path)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        with pd.ExcelWriter(out_path, engine='openpyxl') as writer:
            df_unicos_revisado.to_excel(writer, sheet_name='UNICOS', index=False)
            df_mapeo.to_excel(writer, sheet_name='MAPEO', index=False)
        return True, f"‚úÖ XLSX similitud: {out_path.absolute()}"
    except Exception as e:
        return False, f"‚ùå Error exportando similitud: {e}"

print("‚úì Funciones de similitud cargadas")


‚úì Funciones de similitud cargadas


In [66]:

# %% Exportaci√≥n de resultados (√∫nica definici√≥n)
def export_results(df: pd.DataFrame, csv_path: str, xlsx_path: str, sheet_name: str = 'UNICOS') -> Tuple[bool, str]:
    """
    Exporta resultados a CSV y XLSX.

    Returns:
        (ok, message)
    """
    if df is None or df.empty:
        return False, "‚ö†Ô∏è No hay resultados para exportar"

    if not csv_path or not xlsx_path:
        return False, "‚ö†Ô∏è Rutas de exportaci√≥n inv√°lidas"

    csv_path = Path(csv_path)
    xlsx_path = Path(xlsx_path)

    # Asegurar que existan las carpetas
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    xlsx_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        # Exportar CSV
        df.to_csv(csv_path, index=False, encoding='utf-8-sig')

        # Exportar XLSX
        with pd.ExcelWriter(xlsx_path, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name=sheet_name, index=False)

        msg = f"""‚úÖ CSV: {csv_path.absolute()}
‚úÖ XLSX: {xlsx_path.absolute()}

üì¶ Archivos guardados en carpeta: {OUTPUT_FOLDER.absolute()}"""
        return True, msg
    except Exception as e:
        return False, f"‚ùå Error en exportaci√≥n: {e}"

print("‚úì export_results() disponible")


‚úì export_results() disponible


In [67]:
# %% Panel ETL (UI y callbacks)
state = {
    'df_unicos': None,
    'df_sheet_summary': None,
    'df_debug': None,
    'df_debug_agg': None,
    'stats': None,
    'last_options': None,
    'df_candidates': None,
    'df_suggestions': None,
    'df_mapeo': None,
    'df_unicos_revisado': None,
    'review_idx': 0,
    'review_log': []
}

# Widgets de control
manual_path_text = Text(
    value='',
    description='Ruta manual:',
    placeholder='input/archivo.xlsx',
    layout=widgets.Layout(width='500px')
)

sheet_pattern_text = Text(
    value=r'^\d{3}$',
    description='Patr√≥n hojas:',
    layout=widgets.Layout(width='300px')
)

max_sheet_input = IntText(
    value=0,
    description='Max hojas (0=todas):',
    layout=widgets.Layout(width='220px')
)

include_aux_checkbox = Checkbox(value=True, description='Incluir Aux.*')
include_otro_checkbox = Checkbox(value=False, description='Incluir OTRO')
debug_checkbox = Checkbox(value=False, description='Debug: HOJA/CODIGO')

csv_path_text = Text(
    value=str(OUTPUT_FOLDER / 'unicos.csv'),
    description='CSV path:',
    layout=widgets.Layout(width='500px')
)

xlsx_path_text = Text(
    value=str(OUTPUT_FOLDER / 'unicos.xlsx'),
    description='XLSX path:',
    layout=widgets.Layout(width='500px')
)

execute_button = Button(
    description='‚ñ∂ Ejecutar ETL',
    button_style='success',
    layout=widgets.Layout(width='150px')
)

export_button = Button(
    description='üíæ Exportar',
    button_style='info',
    layout=widgets.Layout(width='150px')
)

output_status = Output()
output_tables = Output()
output_explore = Output()

# Exploraci√≥n
recurso_filter_dropdown = Dropdown(
    options=['(Todos)'],
    value='(Todos)',
    description='RECURSO:',
    layout=widgets.Layout(width='220px')
)

descripcion_search_text = Text(
    value='',
    description='Buscar:',
    placeholder='Texto en DESCRIPCION',
    layout=widgets.Layout(width='300px')
)

apply_filters_button = Button(
    description='Aplicar filtros',
    button_style='primary',
    layout=widgets.Layout(width='160px')
)

# Similitud
sim_review_button = Button(
    description='üîç Iniciar revisi√≥n similitud',
    button_style='warning',
    layout=widgets.Layout(width='220px')
)

sim_accept_button = Button(
    description='‚úÖ Aceptar sugerencia',
    button_style='success',
    layout=widgets.Layout(width='180px')
)

sim_keep_button = Button(
    description='‚ûñ Mantener original',
    button_style='info',
    layout=widgets.Layout(width='170px')
)

sim_skip_button = Button(
    description='‚è≠ Saltar',
    button_style='',
    layout=widgets.Layout(width='120px')
)

sim_custom_text = Text(
    value='',
    description='Reemplazo manual:',
    placeholder='Escribe y presiona Enter',
    layout=widgets.Layout(width='420px')
)

sim_apply_custom_button = Button(
    description='‚úèÔ∏è Aplicar texto',
    button_style='primary',
    layout=widgets.Layout(width='150px')
)

sim_export_button = Button(
    description='üíæ Exportar similitud',
    button_style='info',
    layout=widgets.Layout(width='200px')
)

sim_status_html = HTML()
sim_output = Output()


def _resolve_source_path() -> Optional[str]:
    manual = manual_path_text.value.strip()
    if manual:
        return manual
    return current_file_path.get('path')


def _update_resource_filter(df: pd.DataFrame):
    if df is None or df.empty:
        recurso_filter_dropdown.options = ['(Todos)']
        recurso_filter_dropdown.value = '(Todos)'
        return
    valores = sorted([v for v in df['RECURSO'].dropna().unique().tolist()])
    recurso_filter_dropdown.options = ['(Todos)'] + valores
    recurso_filter_dropdown.value = '(Todos)'


def _reset_similarity_state():
    state['df_candidates'] = None
    state['df_suggestions'] = None
    state['df_mapeo'] = None
    state['df_unicos_revisado'] = None
    state['review_idx'] = 0
    state['review_log'] = []
    sim_status_html.value = ''
    sim_output.clear_output()


def on_execute_click(b):
    if execute_button.disabled:
        return
    output_status.clear_output()
    output_tables.clear_output()
    output_explore.clear_output()

    source = _resolve_source_path()
    if not source:
        with output_status:
            print("‚ùå Por favor sube un archivo o ingresa una ruta manual")
        return

    if not os.path.exists(source):
        with output_status:
            print(f"‚ùå Archivo no encontrado: {source}")
        return

    options = {
        'sheet_pattern': sheet_pattern_text.value,
        'max_sheet': max_sheet_input.value if max_sheet_input.value > 0 else None,
        'include_aux': include_aux_checkbox.value,
        'include_otro': include_otro_checkbox.value,
        'debug': debug_checkbox.value,
        'progress_every': 20
    }

    # Bloquear bot√≥n mientras corre
    execute_button.disabled = True
    export_button.disabled = True
    execute_button.description = '‚è≥ Ejecutando...'
    execute_button.button_style = 'warning'

    try:
        with output_status:
            print("üöÄ Ejecutando ETL...")
            print(f"Archivo: {source}")
            print(f"Opciones: {options}")
            results = run_etl(source, options)
    except Exception as e:
        with output_status:
            print(f"‚ùå Error en ETL: {e}")
        return
    finally:
        execute_button.disabled = False
        export_button.disabled = False
        execute_button.description = '‚ñ∂ Ejecutar ETL'
        execute_button.button_style = 'success'

    state['df_unicos'] = results['df_unicos']
    state['df_sheet_summary'] = results['df_sheet_summary']
    state['df_debug'] = results['df_debug']
    state['df_debug_agg'] = results['df_debug_agg']
    state['stats'] = results['stats']
    state['last_options'] = options

    _update_resource_filter(state['df_unicos'])
    _reset_similarity_state()

    # Resumen en output_status
    stats = results['stats']
    with output_status:
        print("")
        print("‚úÖ ETL completado")
        print(f"Hojas procesadas OK: {stats['sheets_ok']}/{stats['total_sheets']}")
        print(f"Hojas fallidas: {stats['sheets_failed']}")
        print(f"Filas le√≠das: {stats['total_rows_read']}")
        print(f"Filas filtradas: {stats['total_rows_filtered']}")
        print(f"Filas extra√≠das: {stats['total_rows_extracted']}")
        print(f"Duplicados removidos: {stats['duplicates_removed']}")
        print(f"Filas finales (UNICOS): {stats['final_rows']}")
        print(f"Tiempo: {stats['time_seconds']:.2f}s")

        failed = stats.get('failed_sheet_details', [])
        if failed:
            print("")
            print("‚ùó Top 5 errores:")
            for item in failed[:5]:
                print(f"- {item.get('HOJA')}: {item.get('ERROR')}")

    # Tablas en output_tables
    with output_tables:
        print("")
        print("üìÑ Resumen por hoja:")
        display(state['df_sheet_summary'])

        if state['df_unicos'] is None or state['df_unicos'].empty:
            print("")
            print("‚ö†Ô∏è df_unicos est√° vac√≠o")
        else:
            print("")
            print("üìã Preview df_unicos (primeras 20 filas):")
            display(state['df_unicos'].head(20))

            print("")
            print("üìä Conteo por RECURSO:")
            display(state['df_unicos']['RECURSO'].value_counts().reset_index().rename(columns={'index': 'RECURSO', 'RECURSO': 'COUNT'}))

        if state['df_debug_agg'] is not None:
            print("")
            print("üß™ Debug agregado (primeras 20 filas):")
            display(state['df_debug_agg'].head(20))


def on_export_click(b):
    output_status.clear_output()
    df = state.get('df_unicos')
    ok, msg = export_results(df, csv_path_text.value, xlsx_path_text.value)
    with output_status:
        print(msg)


def on_apply_filters(b):
    output_explore.clear_output()
    df = state.get('df_unicos')
    if df is None or df.empty:
        with output_explore:
            print("‚ö†Ô∏è No hay datos para filtrar. Ejecuta el ETL primero.")
        return

    filtered = df.copy()
    recurso = recurso_filter_dropdown.value
    if recurso and recurso != '(Todos)':
        filtered = filtered[filtered['RECURSO'] == recurso]

    q = descripcion_search_text.value.strip()
    if q:
        filtered = filtered[filtered['DESCRIPCION'].astype(str).str.contains(q, case=False, na=False, regex=False)]

    with output_explore:
        print(f"Filas filtradas: {len(filtered)}")
        display(filtered.head(50))
        print("")
        print("Conteo por RECURSO:")
        display(filtered['RECURSO'].value_counts().reset_index().rename(columns={'index': 'RECURSO', 'RECURSO': 'COUNT'}))


def _current_suggestion():
    df = state.get('df_suggestions')
    if df is None or df.empty:
        return None
    idx = state.get('review_idx', 0)
    if idx >= len(df):
        return None
    return df.iloc[idx]


def _render_current():
    row = _current_suggestion()
    if row is None:
        # finalizar
        if state['review_log']:
            df_mapeo = pd.DataFrame(state['review_log'])
            state['df_mapeo'] = df_mapeo
            mapping = {r['ORIGINAL']: r['ELEGIDO'] for r in state['review_log'] if r.get('ACCION') != 'SALTAR' and r.get('ELEGIDO')}
            df_rev = state['df_unicos'].copy()
            df_rev['DESCRIPCION'] = df_rev['DESCRIPCION'].apply(lambda x: mapping.get(x, x))
            state['df_unicos_revisado'] = df_rev

            sim_status_html.value = "<b>‚úÖ Revisi√≥n finalizada.</b> Puedes exportar el XLSX de similitud."
            with sim_output:
                print("""
Resumen de cambios:""")
                display(df_mapeo)
        else:
            sim_status_html.value = "<b>‚úÖ Revisi√≥n finalizada.</b> Sin cambios registrados."
        return

    total = len(state['df_suggestions'])
    idx = state.get('review_idx', 0) + 1
    sim_status_html.value = (
        f"<b>Revisi√≥n {idx}/{total}</b><br>"
        f"RECURSO: <b>{row['RECURSO']}</b><br>"
        f"SIMILARIDAD: <b>{row['SIMILARIDAD']}%</b><br>"
        f"ORIGINAL: <code>{row['ORIGINAL']}</code><br>"
        f"SIMILAR: <code>{row['SIMILAR']}</code><br>"
        f"PROPUESTA: <b>{row['PROPUESTA']}</b>"
    )
    sim_custom_text.value = ''


def on_start_similarity(b):
    sim_output.clear_output()
    df = state.get('df_unicos')
    if df is None or df.empty:
        sim_status_html.value = "<b>‚ö†Ô∏è Ejecuta el ETL antes de revisar similitud.</b>"
        return

    sim_status_html.value = "<b>‚è≥ Generando candidatos de similitud...</b>"
    df_candidates = find_similar_candidates(df, threshold=SIM_THRESHOLD, scope=SIM_SCOPE, progress_every=SIM_PROGRESS_EVERY)
    state['df_candidates'] = df_candidates

    if df_candidates.empty:
        sim_status_html.value = "<b>‚úÖ No se encontraron candidatos con el umbral actual.</b>"
        return

    df_suggestions = build_canonical_map(df, df_candidates)
    state['df_suggestions'] = df_suggestions
    state['review_idx'] = 0
    state['review_log'] = []

    with sim_output:
        print(f"Candidatos: {len(df_candidates)}")

    _render_current()


def _record_action(action: str, elegido: str = ''):
    row = _current_suggestion()
    if row is None:
        return

    state['review_log'].append({
        'ORIGINAL': row['ORIGINAL'],
        'SIMILAR': row['SIMILAR'],
        'SUGERIDO': row['PROPUESTA'],
        'ELEGIDO': elegido,
        'SIMILARIDAD': row['SIMILARIDAD'],
        'RECURSO': row['RECURSO'],
        'ACCION': action
    })

    with sim_output:
        if action != 'SALTAR':
            print(f"{action}: '{row['ORIGINAL']}' -> '{elegido}' (sim {row['SIMILARIDAD']}%) ")
        else:
            print(f"{action}: '{row['ORIGINAL']}' (sim {row['SIMILARIDAD']}%) ")

    state['review_idx'] += 1
    _render_current()


def on_accept_click(b):
    row = _current_suggestion()
    if row is None:
        return
    _record_action('ACEPTAR', row['PROPUESTA'])


def on_keep_click(b):
    row = _current_suggestion()
    if row is None:
        return
    _record_action('MANTENER', row['ORIGINAL'])


def on_skip_click(b):
    _record_action('SALTAR', '')


def on_apply_custom(b):
    custom = sim_custom_text.value.strip()
    if not custom:
        return
    _record_action('CUSTOM', custom)


def on_export_similarity(b):
    ok, msg = export_similarity_results(state.get('df_unicos_revisado'), state.get('df_mapeo'), SIM_EXPORT_PATH)
    with sim_output:
        print(msg)


def _bind_enter(text_widget, handler):
    try:
        text_widget.on_submit(lambda x: handler(None))
    except Exception:
        pass


execute_button.on_click(on_execute_click)
export_button.on_click(on_export_click)
apply_filters_button.on_click(on_apply_filters)

sim_review_button.on_click(on_start_similarity)
sim_accept_button.on_click(on_accept_click)
sim_keep_button.on_click(on_keep_click)
sim_skip_button.on_click(on_skip_click)
sim_apply_custom_button.on_click(on_apply_custom)
sim_export_button.on_click(on_export_similarity)
_bind_enter(sim_custom_text, on_apply_custom)

panel = VBox([
    HTML('<h2>‚öôÔ∏è Panel de Control ETL</h2>'),
    input_ui,
    HTML('<h3>üîß Par√°metros</h3>'),
    manual_path_text,
    HBox([sheet_pattern_text, max_sheet_input]),
    HBox([include_aux_checkbox, include_otro_checkbox, debug_checkbox]),
    HTML('<h3>üíæ Exportaci√≥n</h3>'),
    csv_path_text,
    xlsx_path_text,
    HBox([execute_button, export_button]),
    output_status,
    HTML('<hr/>'),
    HTML('<h3>üìä Resultados</h3>'),
    output_tables,
    HTML('<h3>üîé Exploraci√≥n</h3>'),
    HBox([recurso_filter_dropdown, descripcion_search_text, apply_filters_button]),
    output_explore,
    HTML('<hr/>'),
    HTML('<h3>üß† Similitud heur√≠stica</h3>'),
    sim_review_button,
    sim_status_html,
    HBox([sim_accept_button, sim_keep_button, sim_skip_button]),
    HBox([sim_custom_text, sim_apply_custom_button]),
    sim_export_button,
    sim_output
])

display(panel)

VBox(children=(HTML(value='<h2>‚öôÔ∏è Panel de Control ETL</h2>'), VBox(children=(HTML(value='<h3>üì• Carga de Archi‚Ä¶

## ‚úÖ How to use (r√°pido)

1. Ejecuta **Run All** en el notebook.
2. Sube tu archivo Excel con el widget de carga o ingresa una ruta manual.
3. Ajusta par√°metros (patr√≥n de hojas, m√°ximo, incluir Aux, incluir OTRO, Debug).
4. Presiona **‚ñ∂ Ejecutar ETL**.
5. Explora resultados con los filtros (RECURSO y b√∫squeda por DESCRIPCION).
6. Exporta con **üíæ Exportar** (CSV/XLSX).

**Paso adicional (Similitud heur√≠stica):**
1. Presiona **üîç Iniciar revisi√≥n similitud**.
2. Revisa uno‚Äëpor‚Äëuno y elige: Aceptar, Mantener, Saltar o escribir un reemplazo.
3. Al finalizar, exporta con **üíæ Exportar similitud** (XLSX con hojas UNICOS + MAPEO).

**Outputs generados:**
- `df_unicos`: consolidado final (contrato sin columnas extra si `debug=False`).
- `df_sheet_summary`: resumen por hoja.
- `df_debug` y `df_debug_agg`: solo si `debug=True`.
- `df_mapeo` y `df_unicos_revisado`: solo si completas revisi√≥n de similitud.

**Debug ON**: activa trazabilidad (HOJA, CODIGO, ROW_INDEX) y un agregado con HOJAS/CODIGOS por recurso.
