# Limpieza HTS
## Fases de Tratado
### Fase 0: Configuración General

El objetivo de este script es procesar la base de datos de tarifas de Estados Unidos (HTS - Harmonized Tariff Schedule). Integra una extracción dinámica de la estructura (Secciones y Capítulos) directamente desde el PDF oficial y cruza esta información con los datos tarifarios crudos.

Instalación de dependencias:
- `pip install pdfplumber pandas xlsxwriter openpyxl`

Librerías:
- **pdfplumber:** Motor de extracción para leer la jerarquía del PDF `Table of Contents`.
- **pandas (pd):** Manipulación de estructuras de datos (DataFrames).
- **re:** Expresiones regulares para detectar patrones de texto (Secciones, Capítulos, Notas).
- **xlsxwriter:** Motor de escritura para Excel con formato avanzado.
- **os/sys:** Manejo de rutas y control de flujo.

Variables Globales:
- **Rutas:** Apuntan a los archivos Raw (CSV y PDF) y definen la salida procesada.

In [8]:
import pandas as pd
import pdfplumber
import re
import os
import sys

PATH_HTS_CSV = r"../data/raw/htsdata.csv"
PATH_HTS_PDF = r"../data/raw/Table of Contents_2025HTSRev31.pdf"
PATH_SALIDA = "../data/intermediate/HTS_Limpia.xlsx"

print("--- CONFIGURACIÓN CARGADA ---")
print(f"Input CSV: {PATH_HTS_CSV}")
print(f"Input PDF: {PATH_HTS_PDF}")
print(f"Output:    {PATH_SALIDA}")

--- CONFIGURACIÓN CARGADA ---
Input CSV: ../data/raw/htsdata.csv
Input PDF: ../data/raw/Table of Contents_2025HTSRev31.pdf
Output:    ../data/intermediate/HTS_Limpia.xlsx


### Fase 0.5: Definición de Funciones

Se definen todas las funciones del sistema de manera plana (sin anidamiento) para facilitar su depuración y mantenimiento. A continuación se explica cada una:

**1. Funciones Auxiliares (`_nombre`)**
Operaciones atómicas de limpieza y formateo.

- **`_normalizar_texto`**: Recibe un texto crudo (ej. "LIVE ANIMALS"), lo convierte a formato Oración ("Live animals") y asegura que termine en punto final.
- **`_restaurar_puntos`**: Convierte un string numérico limpio (`01012100`) al formato visual HTS con puntos (`0101.21.00`).
- **`_concatenar_descripciones`**: Toma las columnas jerárquicas (Sección, Capítulo, Partida, etc.) de una fila y las une en un solo párrafo coherente.
- **`_registrar_capitulo`**: Función de apoyo para el procesador PDF. Se encarga de guardar en la lista principal el Capítulo y Sección que se acaban de terminar de leer, uniendo las partes multilínea del título.

**2. Funciones Críticas (`__nombre__`)**
Motores de lectura y escritura de archivos.

- **`__procesar_pdf_estructura__`**: Implementa una Máquina de Estados para leer el PDF línea por línea. Detecta cambios de Sección y Capítulo, acumula líneas de títulos largos y utiliza `_registrar_capitulo` para guardar los avances.
- **`__escribir_hoja__`**: Controlador de `xlsxwriter`. Genera las pestañas de Excel, configura los encabezados y aplica lógica condicional para el ajuste de texto (Wrap Text) en descripciones y tasas especiales.

**3. Funciones Principales (`nombre`)**
Orquestadores de la lógica de negocio.

- **`construir_diccionarios_maestros`**: Transforma el DataFrame extraído del PDF en diccionarios optimizados (Hash Maps) para cruzar información rápidamente por código.
- **`procesar_csv_hts`**: Carga la base de datos tarifaria, elimina el Capítulo 99 (administrativo) y ejecuta la lógica de Herencia de Indentación (donde los códigos hijos heredan los dígitos de los padres).
- **`generar_paneles`**: Cruza la información del CSV procesado con los diccionarios del PDF. Normaliza los textos y genera las columnas derivadas necesarias para los reportes.

In [9]:
# --- FUNCIONES AUXILIARES ---

def _normalizar_texto(texto):
    """Capitaliza (Oración) y agrega punto final si falta."""
    t = str(texto).strip()
    if not t or t.lower() == 'nan': return ""
    t = t[0].upper() + t[1:].lower()
    if not t.endswith('.'):
        t += "."
    return t


def _restaurar_puntos(code):
    """Formato visual HTS (xxxx.xx.xx)."""
    s = str(code)
    if len(s) <= 4: return s
    elif len(s) <= 6: return f"{s[:4]}.{s[4:]}"
    elif len(s) <= 8: return f"{s[:4]}.{s[4:6]}.{s[6:]}"
    return f"{s[:4]}.{s[4:6]}.{s[6:8]}.{s[8:]}"


def _concatenar_descripciones(row):
    """Une jerarquía textual en una sola cadena limpia."""
    partes = [
        row.get('Seccion_Nombre', ''),
        row.get('Capitulo_Nom', ''),
        row.get('Partida_Nom', ''),
        row.get('Desdoblamiento_Nom', ''),
        row.get('Subpartida_Nom', ''),
        row.get('Description', '')
    ]
    validas = [str(x).strip() for x in partes if str(x).strip() not in ['', 'nan', 'ND', 'None']]
    return " ".join(validas)


def _registrar_capitulo(data_list, sec_id, sec_parts, chap_id, chap_parts):
    """Guarda el capítulo actual en la lista de datos."""
    if chap_id is not None:
        s_name = " ".join(sec_parts).strip()
        c_name = " ".join(chap_parts).strip()
        data_list.append({
            "Seccion_ID": sec_id,
            "Seccion_Nombre": s_name,
            "Capitulo_ID": chap_id,
            "Capitulo_Nombre": c_name
        })


# --- FUNCIONES CRÍTICAS ---

def __procesar_pdf_estructura__(pdf_path):
    """
    Motor de extracción PDF basado en Máquina de Estados.
    """
    if not os.path.exists(pdf_path):
        sys.exit(f"ERROR CRÍTICO: No se encontró el PDF en: {pdf_path}")

    print(f">> [CRÍTICO] Leyendo estructura desde PDF...")
    data = []
    
    # Variables de estado
    last_sec_id = ""
    last_sec_name_parts = []
    last_chap_id = None
    last_chap_name_parts = []
    next_expected_chap = 1
    
    # Modos
    MODE_SEARCH, MODE_READING_SEC, MODE_READING_CHAP = 0, 1, 2
    current_mode = MODE_SEARCH

    # Regex
    rx_section = re.compile(r"^SECTION\s+([IVXLCDM]+)", re.IGNORECASE)
    rx_explicit_chap = re.compile(r"^Chapter\s+(\d+)", re.IGNORECASE)
    rx_stop = re.compile(r"^(Section Note|General Note|Page|Change Record|Annex|Rate of Duty|Statistical Annexes|Chemical Appendix|Pharmaceutical Appendix|Intermediate Chemicals)", re.IGNORECASE)

    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            text = page.extract_text()
            if not text: continue
            for line in text.split('\n'):
                clean = line.strip()
                if not clean: continue
                
                # 1. Stop Tokens
                if rx_stop.match(clean) or clean.isdigit():
                    if current_mode == MODE_READING_SEC: current_mode = MODE_SEARCH
                    elif current_mode == MODE_READING_CHAP:
                        if "Note" in clean or "Appendix" in clean or "Annex" in clean: current_mode = MODE_SEARCH
                    continue

                # 2. Nueva Sección
                sec_match = rx_section.match(clean)
                if sec_match:
                    # Guardar anterior
                    _registrar_capitulo(data, last_sec_id, last_sec_name_parts, last_chap_id, last_chap_name_parts)
                    
                    last_sec_id = f"SECTION {sec_match.group(1)}"
                    last_sec_name_parts = []; last_chap_id = None; last_chap_name_parts = []
                    current_mode = MODE_READING_SEC
                    continue

                # 3. Nuevo Capítulo
                detected_num = None; text_start_idx = 0
                explicit = rx_explicit_chap.match(clean)
                if explicit:
                    detected_num = int(explicit.group(1)); text_start_idx = explicit.end()
                elif clean.split()[0].isdigit():
                    first_word = clean.split()[0]
                    if first_word.isdigit():
                        detected_num = int(first_word); text_start_idx = len(first_word)

                if detected_num is not None and detected_num == next_expected_chap:
                    # Guardar anterior
                    _registrar_capitulo(data, last_sec_id, last_sec_name_parts, last_chap_id, last_chap_name_parts)
                    
                    if current_mode == MODE_READING_SEC: current_mode = MODE_SEARCH
                    last_chap_id = detected_num
                    last_chap_name_parts = []
                    rest_of_line = clean[text_start_idx:].strip()
                    if rest_of_line: last_chap_name_parts.append(rest_of_line)
                    next_expected_chap += 1
                    current_mode = MODE_READING_CHAP
                    continue

                # 4. Acumulación
                if current_mode == MODE_READING_SEC: last_sec_name_parts.append(clean)
                elif current_mode == MODE_READING_CHAP: last_chap_name_parts.append(clean)

    # Guardar último
    _registrar_capitulo(data, last_sec_id, last_sec_name_parts, last_chap_id, last_chap_name_parts)
    return pd.DataFrame(data)


def __escribir_hoja__(dataframe, nombre_hoja, writer, workbook):
    """
    Motor de escritura Excel con Wrap Text condicional.
    """
    worksheet = workbook.add_worksheet(nombre_hoja)
    writer.sheets[nombre_hoja] = worksheet
    
    # Formatos
    fmt_header = workbook.add_format({'bold': True, 'border': 1, 'bg_color': '#D9E1F2', 'valign': 'top', 'align': 'center', 'font_name': 'Arial', 'font_size': 10})
    fmt_normal = workbook.add_format({'border': 1, 'valign': 'top', 'font_name': 'Arial', 'font_size': 9})
    fmt_wrap = workbook.add_format({'border': 1, 'valign': 'top', 'text_wrap': True, 'font_name': 'Arial', 'font_size': 9})

    headers = dataframe.columns.tolist()
    
    # Encabezados
    for col_idx, header in enumerate(headers):
        worksheet.write(0, col_idx, header, fmt_header)
    
    # Cuerpo
    for row_idx, row in enumerate(dataframe.itertuples(index=False), start=1):
        for col_idx, value in enumerate(row):
            header_name = str(headers[col_idx])
            
            # Lógica Wrap Text: Descripciones, Nombres, Concatenado y Special Rate
            es_wrap = any(x in header_name for x in ["Name", "Description", "Special", "Concatenated"])
            formato = fmt_wrap if es_wrap else fmt_normal
            
            if pd.isna(value): value = ""
            worksheet.write(row_idx, col_idx, value, formato)

    # Ajuste de Anchos
    for idx, col_name in enumerate(headers):
        header_str = str(col_name)
        if any(x in header_str for x in ["Name", "Description", "Concatenated", "Special"]):
            worksheet.set_column(idx, idx, 50) # Ancho fijo amplio para Wrap
        else:
            worksheet.set_column(idx, idx, 15) # Ancho estándar


# --- FUNCIONES PRINCIPALES ---

def construir_diccionarios_maestros(df_pdf):
    print(">> Construyendo diccionarios maestros...")
    df_pdf['Seccion_Romana'] = df_pdf['Seccion_ID'].str.replace('SECTION ', '', regex=False)
    df_pdf['Cap_Str'] = df_pdf['Capitulo_ID'].astype(str).str.zfill(2)
    
    mapa_cap_sec = pd.Series(df_pdf.Seccion_Romana.values, index=df_pdf.Cap_Str).to_dict()
    df_secs = df_pdf[['Seccion_Romana', 'Seccion_Nombre']].drop_duplicates()
    sec_names = pd.Series(df_secs.Seccion_Nombre.values, index=df_secs.Seccion_Romana).to_dict()
    cap_names = pd.Series(df_pdf.Capitulo_Nombre.values, index=df_pdf.Cap_Str).to_dict()
    
    return sec_names, cap_names, mapa_cap_sec


def procesar_csv_hts(csv_path):
    print(">> Procesando CSV Raw...")
    if not os.path.exists(csv_path):
        sys.exit("ERROR: No existe el CSV.")

    df = pd.read_csv(csv_path, dtype=str).iloc[:, :-3]
    df = df.rename(columns={'HTS Number': 'HTS_Number', 'Indent': 'Indent', 'Description': 'Description',
                            'Unit of Quantity': 'Unit', 'General Rate of Duty': 'General_Rate', 
                            'Special Rate of Duty': 'Special_Rate'})
    
    df = df.fillna('')
    df['Indent'] = pd.to_numeric(df['Indent'], errors='coerce').fillna(0).astype(int)
    df['HTS_Number'] = df['HTS_Number'].str.replace('.', '', regex=False).str.strip()

    # Cortar en Capítulo 99
    indices_99 = df.index[df['HTS_Number'].str.startswith('99', na=False)].tolist()
    if indices_99: df = df.iloc[:indices_99[0]]

    # Lógica de Indentación (Padre hereda a Hijo)
    codes, indents = df['HTS_Number'].tolist(), df['Indent'].tolist()
    n = len(df)
    for i in range(n):
        if indents[i] == 1:
            curr = str(codes[i]) if pd.notna(codes[i]) else ""
            if (curr == "" or curr.lower() == 'nan') and i + 1 < n:
                if indents[i+1] == 2 and str(codes[i+1]):
                    codes[i] = str(codes[i+1])[:5]
                    curr = codes[i]
            if len(curr) > 5: codes[i] = curr[:5]
        elif indents[i] == 0:
            if len(str(codes[i])) > 4: codes[i] = str(codes[i])[:4]
    
    df['HTS_Number'] = codes
    return df


def generar_paneles(df_raw, sec_names, cap_names, mapa_cap_sec):
    print(">> Generando paneles analíticos...")
    
    # Diccionarios internos (partidas, subpartidas)
    d_part, d_des, d_sub = {}, {}, {}
    for row in df_raw.itertuples():
        c, d = str(row.HTS_Number), str(row.Description)
        if not c: continue
        if len(c) == 4: d_part[c] = d
        elif len(c) == 5: d_des[c] = d
        elif len(c) == 6: d_sub[c] = d

    # Filtrar solo registros con Tasa General
    df_fin = df_raw[df_raw['General_Rate'].notna() & (df_raw['General_Rate'] != '')].copy()
    df_fin['HTS_Number'] = df_fin['HTS_Number'].apply(lambda x: str(x)[:8].ljust(8, '0'))
    
    p = df_fin.copy()
    p['Codigo_Limpio'] = p['HTS_Number']
    p['Codigo_Formato'] = p['Codigo_Limpio'].apply(_restaurar_puntos)
    
    # Mapeos Maestros
    p['Capitulo_Num'] = p['Codigo_Limpio'].str[:2]
    p['Seccion_Romana'] = p['Capitulo_Num'].apply(lambda x: mapa_cap_sec.get(x, "ND"))
    p['Seccion_Nombre'] = p['Seccion_Romana'].map(sec_names)
    p['Capitulo_Nom'] = p['Capitulo_Num'].map(cap_names)
    
    # Mapeos Internos
    p['Partida_Nom'] = p['Codigo_Limpio'].str[:4].map(d_part)
    p['Desdoblamiento_Nom'] = p['Codigo_Limpio'].str[:5].map(d_des)
    p['Subpartida_Nom'] = p['Codigo_Limpio'].str[:6].map(d_sub)
    
    # Desglose Numérico
    p['Partida_Num'] = p['Codigo_Limpio'].str[2:4]
    p['Desdoblamiento_Num'] = p['Codigo_Limpio'].str[4:5]
    p['Subpartida_Num'] = p['Codigo_Limpio'].str[5:6]
    p['Fraccion_Num'] = p['Codigo_Limpio'].str[6:8]
    
    # Normalización de Texto (Capitalize + Puntos)
    cols_txt = ['Seccion_Nombre', 'Capitulo_Nom', 'Partida_Nom', 'Desdoblamiento_Nom', 'Subpartida_Nom', 'Description']
    for col in cols_txt:
        p[col] = p[col].apply(_normalizar_texto)
        
    # Concatenación
    p['Concatenated_Desc'] = p.apply(_concatenar_descripciones, axis=1)
    
    return p, df_raw

### Fase 1: Extracción Estructural (PDF)
En esta fase se invoca al motor de extracción para leer el PDF `Table of Contents`. El resultado es procesado por la función constructora para generar los diccionarios maestros que relacionan cada capítulo con su Sección y su descripción oficial.

In [10]:
df_estructura = __procesar_pdf_estructura__(PATH_HTS_PDF)
SEC_NAMES, CAP_NAMES, MAPA_CAP_SEC = construir_diccionarios_maestros(df_estructura)

print(f"   Referencias cargadas: {len(CAP_NAMES)} capítulos detectados.")

>> [CRÍTICO] Leyendo estructura desde PDF...
>> Construyendo diccionarios maestros...
   Referencias cargadas: 99 capítulos detectados.


### Fase 2: Procesamiento de Datos (CSV)
Se carga el archivo `htsdata.csv`. Se eliminan columnas innecesarias, se cortan los datos administrativos (Capítulo 99) y se ejecuta la lógica de indentación para asegurar que todos los registros tengan códigos válidos.

In [11]:
DF_HTS_CLEAN = procesar_csv_hts(PATH_HTS_CSV)
print(">> Proceso finalizado.")

>> Procesando CSV Raw...
>> Proceso finalizado.


### Fase 3: Construcción de Paneles
Se cruza la información del CSV procesado con los diccionarios maestros extraídos del PDF. Se generan los diferentes DataFrames (Numérico, Textual, Extendido, Concatenado) aplicando las reglas de normalización de texto y concatenación.

In [12]:
PANEL_DF, DF_ORIGINAL = generar_paneles(DF_HTS_CLEAN, SEC_NAMES, CAP_NAMES, MAPA_CAP_SEC)

# 1. Original Cleaned (Con puntos y Special Rate)
df_orig = DF_ORIGINAL.copy()
df_orig['HTS_Number'] = df_orig['HTS_Number'].apply(_restaurar_puntos)

# 2. Numérico (Sin Special Rate)
cols_num = ['Codigo_Formato', 'Seccion_Romana', 'Capitulo_Num', 'Partida_Num', 'Desdoblamiento_Num', 'Subpartida_Num', 'Fraccion_Num', 'Unit', 'General_Rate']
df_num = PANEL_DF[cols_num].fillna('').copy()
df_num.columns = ['Code', 'Section', 'Chapter', 'Heading', 'Breakdown', 'Subheading', 'Item', 'Unit', 'General Duty']

# 3. Textual (Sin Special Rate)
cols_txt = ['Codigo_Formato', 'Seccion_Nombre', 'Capitulo_Nom', 'Partida_Nom', 'Desdoblamiento_Nom', 'Subpartida_Nom', 'Description', 'Unit', 'General_Rate']
df_txt = PANEL_DF[cols_txt].fillna('').copy()
df_txt.columns = ['Code', 'Section Name', 'Chapter Name', 'Heading Name', 'Breakdown Name', 'Subheading Name', 'Description', 'Unit', 'General Duty']

# 4. Extendido (Sin Special Rate)
cols_ext = ['Codigo_Formato', 'Seccion_Romana', 'Seccion_Nombre', 'Capitulo_Num', 'Capitulo_Nom', 'Partida_Num', 'Partida_Nom', 'Desdoblamiento_Num', 'Desdoblamiento_Nom', 'Subpartida_Num', 'Subpartida_Nom', 'Fraccion_Num', 'Description', 'Unit', 'General_Rate']
df_ext = PANEL_DF[cols_ext].fillna('').copy()
df_ext.columns = ['Code', 'Section', 'Section Name', 'Chapter', 'Chapter Name', 'Heading', 'Heading Name', 'Breakdown', 'Breakdown Name', 'Subheading', 'Subheading Name', 'Item', 'Description', 'Unit', 'General Duty']

# 5. Concatenado
df_concat = PANEL_DF[['Codigo_Formato', 'Concatenated_Desc', 'Unit', 'General_Rate']].fillna('').copy()
df_concat.columns = ['Code', 'Concatenated Description', 'Unit', 'General Duty']

print(">> Paneles Generados.")

>> Generando paneles analíticos...
>> Paneles Generados.


### Fase 4: Exportación Final
Se utiliza el motor `__escribir_hoja__` para generar el archivo Excel final, asegurando que las columnas de descripción y tasas especiales tengan el formato `Wrap Text` para una correcta visualización.

In [13]:
print(f"Generando Excel: {PATH_SALIDA}...")

try:
    writer = pd.ExcelWriter(PATH_SALIDA, engine='xlsxwriter')
    workbook = writer.book

    # Escritura de hojas
    __escribir_hoja__(df_orig, "Original Cleaned", writer, workbook)
    __escribir_hoja__(df_num, "Numeric Panel", writer, workbook)
    __escribir_hoja__(df_txt, "Textual Panel", writer, workbook)
    __escribir_hoja__(df_ext, "Extended Panel", writer, workbook)
    __escribir_hoja__(df_concat, "Concatenado", writer, workbook)

    writer.close()
    print("¡ÉXITO! Archivo generado correctamente con estructura automatizada.")

except Exception as e:
    print(f"ERROR CRÍTICO EN EXPORTACIÓN: {e}")

Generando Excel: ../data/intermediate/HTS_Limpia.xlsx...
¡ÉXITO! Archivo generado correctamente con estructura automatizada.
