In [1]:
import json
import os
import random
import string
from datetime import date, timedelta
from decimal import Decimal, ROUND_HALF_UP

import pandas as pd

# ------------------ Parámetros ------------------
N = 1500
archivo = "_4"                       # filas a generar en esta corrida
OUTPUT_PATH = fr"C:\Users\santi\OneDrive - Universidad San Francisco de Quito\Ingenieria_datos\Proyecto\Data\retenciones\retenciones_aug2025{archivo}.csv"
STATE_PATH = fr"C:\Users\santi\OneDrive - Universidad San Francisco de Quito\Ingenieria_datos\Proyecto\Arch_Control\retenciones_state.json"  # guarda el último secuencial usado
START_SECUENCIAL = 650000      # valor inicial si no existe estado previo
ERROR_PROBABILITY = 0.15       # prob. de aplicar tasa incorrecta (0.5%, 3%, 5%)
SEED = 42                      # fija aleatoriedad reproducible; cámbialo o quítalo
# ------------------------------------------------

random.seed(SEED)

def load_last_secuencial(state_path: str, fallback: int) -> int:
    """
    Lee el último secuencial usado desde un archivo JSON.
    Si no existe, retorna fallback - 1 para que el primero sea 'fallback'.
    """
    if not os.path.exists(state_path):
        return fallback - 1
    try:
        with open(state_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        return int(data.get("last_no_secuencial", fallback - 1))
    except Exception:
        # Si el archivo está corrupto, reinicia
        return fallback - 1

def save_last_secuencial(state_path: str, last_value: int) -> None:
    """Guarda el último secuencial usado en JSON."""
    with open(state_path, "w", encoding="utf-8") as f:
        json.dump({"last_no_secuencial": last_value}, f, ensure_ascii=False, indent=2)

def next_secuenciales(n: int, state_path: str, fallback_start: int) -> list[str]:
    """
    Retorna n secuenciales *nuevos* de 6 dígitos, consecutivos,
    continuando desde el último valor persistido.
    """
    last = load_last_secuencial(state_path, fallback_start)
    seqs = [f"{v:06d}" for v in range(last + 1, last + 1 + n)]
    save_last_secuencial(state_path, last + n)
    return seqs

def random_id_cliente() -> str:
    """
    Genera un ID alfanumérico no repetido (en la corrida actual),
    independiente del secuencial, con patrón:
      5 dígitos + 1 letra mayúscula + "COMP" + 11 dígitos
    Ej: 45838ACOMP00064131600
    """
    digits5 = f"{random.randint(0, 99999):05d}"
    letter = random.choice(string.ascii_uppercase)
    tail11 = f"{random.randint(0, 99999999999):011d}"
    return f"{digits5}{letter}COMP{tail11}"

def random_august_2025_date() -> str:
    """Fecha aleatoria entre 1/8/2025 y 31/8/2025, formato D/M/YYYY (sin ceros a la izquierda)."""
    start = date(2025, 8, 1)
    d = start + timedelta(days=random.randint(0, 30))
    return f"{d.day}/{d.month}/{d.year}"

def random_base_imponible() -> Decimal:
    """
    Valor con hasta 2 decimales donde (entera + decimales) tenga entre 3 y 6 dígitos.
    Estrategia:
      - dígitos enteros = 1..4 (con 2 decimales => total 3..6)
      - 2 decimales
    """
    int_digits = random.choice([1, 2, 3, 4])
    if int_digits == 1:
        int_part = random.randint(1, 9)
    elif int_digits == 2:
        int_part = random.randint(10, 99)
    elif int_digits == 3:
        int_part = random.randint(100, 999)
    else:
        int_part = random.randint(1000, 9999)

    dec_part = random.randint(0, 99)
    value = Decimal(int_part) + Decimal(dec_part) / Decimal(100)
    return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)

def monto_retencion(base: Decimal, tipo: str, error_probability: float) -> Decimal:
    """
    Regla correcta:
      - 323A -> 2%
      - 324B -> 1%
    Con probabilidad 'error_probability', usamos una tasa incorrecta
    (0.5%, 3% o 5%) para introducir errores en el dataset.
    Devuelve el monto (2 decimales).
    """
    expected_rate = Decimal("0.02") if tipo == "323A" else Decimal("0.01")
    wrong_rates = [Decimal("0.005"), Decimal("0.03"), Decimal("0.05")]

    if random.random() < error_probability:
        rate = random.choice(wrong_rates)
    else:
        rate = expected_rate

    return (base * rate).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)

def main():
    # 1) Secuenciales que continúan desde la última corrida
    secuenciales = next_secuenciales(N, STATE_PATH, START_SECUENCIAL)

    # 2) id_cliente aleatorios y únicos (en esta corrida)
    id_clientes = set()
    while len(id_clientes) < N:
        id_clientes.add(random_id_cliente())
    id_clientes = list(id_clientes)

    # 3) Resto de campos y armado del DataFrame
    tipos = ["323A", "324B"]
    rows = []
    for i in range(N):
        tipo = random.choice(tipos)
        base = random_base_imponible()
        monto = monto_retencion(base, tipo, ERROR_PROBABILITY)
        rows.append({
            "id_cliente": id_clientes[i],
            "No_Secuencial_Comprobante_Retencion": secuenciales[i],
            "Fecha_Emision_Comprobante_Retencion": random_august_2025_date(),
            "Tipo_Rendimiento_Financiero": tipo,
            "Base_Imponible_Renta": float(base),
            "Monto_Retencion_Renta": float(monto),
        })

    df = pd.DataFrame(rows)

    # Orden garantizado de las 6 columnas pedidas
    df = df[
        [
            "id_cliente",
            "No_Secuencial_Comprobante_Retencion",
            "Fecha_Emision_Comprobante_Retencion",
            "Tipo_Rendimiento_Financiero",
            "Base_Imponible_Renta",
            "Monto_Retencion_Renta",
        ]
    ]

    df.to_csv(OUTPUT_PATH, index=False, encoding="utf-8")
    print(f"✅ Dataset generado: {OUTPUT_PATH}")
    print(df.head(10).to_string(index=False))

if __name__ == "__main__":
    main()


✅ Dataset generado: C:\Users\santi\OneDrive - Universidad San Francisco de Quito\Ingenieria_datos\Proyecto\Data\retenciones\retenciones_aug2025_4.csv
           id_cliente No_Secuencial_Comprobante_Retencion Fecha_Emision_Comprobante_Retencion Tipo_Rendimiento_Financiero  Base_Imponible_Renta  Monto_Retencion_Renta
50196HCOMP06059232914                              662500                           27/8/2025                        323A               1305.92                  26.12
88835YCOMP93025476357                              662501                           16/8/2025                        324B                  4.57                   0.05
12234UCOMP35907796557                              662502                           26/8/2025                        324B                 38.52                   0.39
32530DCOMP83391832431                              662503                           30/8/2025                        323A                271.92                   5.44
49613LCOMP88423

In [2]:

import pandas as pd
import numpy as np
import os

# Ruta del archivo original generado previamente
INPUT_PATH = OUTPUT_PATH
OUTPUT_PATH_2 = fr"C:\Users\santi\OneDrive - Universidad San Francisco de Quito\Ingenieria_datos\Proyecto\Data\info_clientes\id_clientes_codigos{archivo}.csv"

def main():
    # Verificar que exista el archivo base
    if not os.path.exists(INPUT_PATH):
        raise FileNotFoundError(f"No se encontró {INPUT_PATH}, genera primero el dataset base.")

    # Leer archivo original
    df = pd.read_csv(INPUT_PATH)

    # Extraer columna id_cliente
    result = pd.DataFrame({"id_cliente": df["id_cliente"]})

    # Añadir columna cod_tipo_id con valores aleatorios {12, 13, 14}
    result["cod_tipo_id"] = np.random.choice([12, 13, 14], size=len(result))

    # Lista válida de provincias: 1..25 y 30 (excluye 26–29)
    valid_provincias = list(range(1, 26)) + [30]
    result["cod_provincia"] = np.random.choice(valid_provincias, size=len(result))

    # Guardar resultado en nuevo CSV
    result.to_csv(OUTPUT_PATH_2, index=False, encoding="utf-8")

    print(f"✅ Archivo generado: {OUTPUT_PATH}")
    print(result.head(10).to_string(index=False))

if __name__ == "__main__":
    main()


✅ Archivo generado: C:\Users\santi\OneDrive - Universidad San Francisco de Quito\Ingenieria_datos\Proyecto\Data\retenciones\retenciones_aug2025_4.csv
           id_cliente  cod_tipo_id  cod_provincia
50196HCOMP06059232914           14             16
88835YCOMP93025476357           13             12
12234UCOMP35907796557           14             10
32530DCOMP83391832431           13             13
49613LCOMP88423407256           13             21
00287KCOMP42090304639           12              5
91056LCOMP45045808051           14             19
97173YCOMP87768677875           13             17
86646ACOMP00566090561           12             15
20517ACOMP61884111757           12              1


In [3]:
import os
import glob
import pandas as pd
import mysql.connector as mc
from datetime import datetime

# ====== CONFIG ======
CSV_DIR = r"C:\Users\santi\OneDrive - Universidad San Francisco de Quito\Ingenieria_datos\Proyecto\Data\retenciones"
BATCH_SIZE = 2000  # inserta en lotes para mejorar rendimiento
# ====================

# === 2. Conexión a MySQL ===
conn = mc.connect(
    host="localhost",
    port=3306,
    user="root",
    password="miclave",
    database="retenciones_mensuales"
)
cur = conn.cursor()

# Sentencia SOLO INSERTAR (ignora duplicados)
SQL_INSERT = """
INSERT IGNORE INTO retenciones (
  id_cliente,
  no_secuencial_retencion,
  fecha_emision,
  cod_retencion,
  base_imponible,
  monto_retencion
) VALUES (%s,%s,%s,%s,%s,%s)
"""

def parse_fecha(fecha_str: str) -> str:
    """
    Convierte fechas tipo D/M/YYYY o DD/MM/YYYY a 'YYYY-MM-DD'.
    Si ya viene en formato ISO, lo respeta.
    """
    if pd.isna(fecha_str):
        return None
    s = str(fecha_str).strip()

    # Ya en formato ISO YYYY-MM-DD
    try:
        if len(s) == 10 and s[4] == "-" and s[7] == "-":
            datetime.strptime(s, "%Y-%m-%d")
            return s
    except Exception:
        pass

    # Intento manual: D/M/YYYY
    try:
        parts = s.split("/")
        if len(parts) == 3:
            d, m, y = parts
            d = int(d)
            m = int(m)
            y = int(y)
            return f"{y:04d}-{m:02d}-{d:02d}"
    except Exception:
        pass

    # Fallback: pandas
    try:
        dt = pd.to_datetime(s, dayfirst=True, errors="raise")
        return dt.strftime("%Y-%m-%d")
    except Exception:
        return None

def cargar_csv_a_mysql(ruta_csv: str) -> int:
    """
    Lee un CSV y lo inserta en MySQL.
    Devuelve el número de filas insertadas (los duplicados se ignoran).
    """
    df = pd.read_csv(ruta_csv)

    # Mapeo de columnas CSV -> DB
    col_map = {
        "id_cliente": "id_cliente",
        "No_Secuencial_Comprobante_Retencion": "no_secuencial_retencion",
        "Fecha_Emision_Comprobante_Retencion": "fecha_emision",
        "Tipo_Rendimiento_Financiero": "cod_retencion",
        "Base_Imponible_Renta": "base_imponible",
        "Monto_Retencion_Renta": "monto_retencion",
    }

    # Verifica columnas necesarias
    faltantes = [c for c in col_map.keys() if c not in df.columns]
    if faltantes:
        raise ValueError(f"Columnas faltantes en {os.path.basename(ruta_csv)}: {faltantes}")

    # Construir DataFrame con nombres destino
    d = pd.DataFrame()
    for src, dst in col_map.items():
        d[dst] = df[src]

    # Transformaciones
    d["fecha_emision"] = d["fecha_emision"].apply(parse_fecha)
    d["no_secuencial_retencion"] = d["no_secuencial_retencion"].astype(str).str.strip().str.zfill(6)
    d["cod_retencion"] = d["cod_retencion"].astype(str).str.strip().str[:5]

    def to_float(x):
        if pd.isna(x):
            return None
        s = str(x).strip().replace(",", "")
        try:
            return float(s)
        except Exception:
            try:
                return float(s.replace(",", "."))
            except Exception:
                return None

    d["base_imponible"] = d["base_imponible"].apply(to_float)
    d["monto_retencion"] = d["monto_retencion"].apply(to_float)

    # Quitar filas inválidas
    d = d.dropna(subset=["id_cliente", "fecha_emision"])

    # Inserción por lotes
    total = 0
    records = list(
        d[["id_cliente", "no_secuencial_retencion", "fecha_emision", "cod_retencion", "base_imponible", "monto_retencion"]]
        .itertuples(index=False, name=None)
    )

    for i in range(0, len(records), BATCH_SIZE):
        chunk = records[i:i+BATCH_SIZE]
        cur.executemany(SQL_INSERT, chunk)
        conn.commit()
        total += len(chunk)

    return total

def main():
    csv_paths = glob.glob(os.path.join(CSV_DIR, "*.csv"))
    if not csv_paths:
        print(f"No se encontraron .csv en: {CSV_DIR}")
        return

    total_files = 0
    total_rows = 0
    for path in sorted(csv_paths):
        try:
            inserted = cargar_csv_a_mysql(path)
            print(f"[OK] {os.path.basename(path)} -> {inserted} filas insertadas (duplicados ignorados)")
            total_files += 1
            total_rows += inserted
        except Exception as e:
            print(f"[ERROR] {os.path.basename(path)}: {e}")

    print(f"\nResumen: Archivos procesados={total_files}, Filas insertadas={total_rows}")

if __name__ == "__main__":
    try:
        main()
    finally:
        try:
            cur.close()
        except Exception:
            pass
        try:
            conn.close()
        except Exception:
            pass


[OK] retenciones_aug2025_1.csv -> 5000 filas insertadas (duplicados ignorados)
[OK] retenciones_aug2025_2.csv -> 7000 filas insertadas (duplicados ignorados)
[OK] retenciones_aug2025_3.csv -> 500 filas insertadas (duplicados ignorados)
[OK] retenciones_aug2025_4.csv -> 1500 filas insertadas (duplicados ignorados)

Resumen: Archivos procesados=4, Filas insertadas=14000


In [4]:
import os
import glob
import pandas as pd
import mysql.connector as mc

# ========= CONFIG =========
CSV_DIR = r"C:\Users\santi\OneDrive - Universidad San Francisco de Quito\Ingenieria_datos\Proyecto\Data\info_clientes"
BATCH_SIZE = 5000  # tamaño de lote para executemany
DB = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "miclave",
    "database": "retenciones_mensuales",   # <--- según lo que indicaste
}
TABLE = "info_clientes"
# ==========================

# SQL de UPSERT: mantiene el id_cliente y reemplaza los datos con lo último que entre
SQL_UPSERT = f"""
INSERT INTO {TABLE} (
  id_cliente,
  cod_tipo_id,
  cod_provincia
) VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
  cod_tipo_id = VALUES(cod_tipo_id),
  cod_provincia = VALUES(cod_provincia);
"""

def cargar_csv(path, cur, conn) -> dict:
    """
    Carga un CSV a la tabla info_clientes con UPSERT.
    Devuelve métricas de carga.
    """
    # Leer como texto para no perder ceros/formatos
    df = pd.read_csv(path, dtype=str, encoding="utf-8-sig")

    # Normalizar nombres de columnas a minúsculas para mapear robustamente
    df.columns = [c.strip().lower() for c in df.columns]

    # Mapear posibles variantes de encabezados a los nombres destino
    # (ajusta si tus CSV usan exactamente estos nombres ya)
    rename_map = {}
    # id_cliente
    for cand in ["id_cliente", "cliente_id", "id"]:
        if cand in df.columns:
            rename_map[cand] = "id_cliente"
            break
    # cod_tipo_id
    for cand in ["cod_tipo_id", "tipo_id", "codigo_tipo_id"]:
        if cand in df.columns:
            rename_map[cand] = "cod_tipo_id"
            break
    # cod_provincia
    for cand in ["cod_provincia", "provincia", "codigo_provincia"]:
        if cand in df.columns:
            rename_map[cand] = "cod_provincia"
            break

    df = df.rename(columns=rename_map)

    # Validar columnas requeridas
    requeridas = ["id_cliente", "cod_tipo_id", "cod_provincia"]
    faltantes = [c for c in requeridas if c not in df.columns]
    if faltantes:
        raise ValueError(f"Faltan columnas requeridas {faltantes} en {os.path.basename(path)}")

    # Limpiezas/normalizaciones mínimas
    d = df[requeridas].copy()

    # Quitar espacios y asegurar strings
    for c in requeridas:
        d[c] = d[c].astype(str).str.strip()

    # Respetar tus longitudes: id_cliente (<=21), códigos (<=2)
    d["id_cliente"] = d["id_cliente"].str[:21]
    d["cod_tipo_id"] = d["cod_tipo_id"].str[:2]
    d["cod_provincia"] = d["cod_provincia"].str[:2]

    # Quitar filas claramente inválidas (id vacío)
    d = d[d["id_cliente"].notna() & (d["id_cliente"] != "")]
    d = d[d["cod_tipo_id"].notna() & (d["cod_tipo_id"] != "")]
    d = d[d["cod_provincia"].notna() & (d["cod_provincia"] != "")]

    total_leidas = len(d)

    # Si el mismo CSV tiene duplicados por id_cliente, conservar la ÚLTIMA aparición (última fila)
    d = d.drop_duplicates(subset=["id_cliente"], keep="last")

    # Preparar tuplas para inserción
    rows = list(d[["id_cliente", "cod_tipo_id", "cod_provincia"]].itertuples(index=False, name=None))

    # Inserción por lotes con UPSERT
    insertadas = 0
    for i in range(0, len(rows), BATCH_SIZE):
        chunk = rows[i:i+BATCH_SIZE]
        cur.executemany(SQL_UPSERT, chunk)
        conn.commit()
        insertadas += len(chunk)

    return {
        "archivo": os.path.basename(path),
        "leidas": total_leidas,
        "distinct_por_id": len(d),
        "upserts_ejecutados": insertadas
    }

def main():
    conn = mc.connect(**DB)
    cur = conn.cursor()

    csvs = sorted(glob.glob(os.path.join(CSV_DIR, "*.csv")))
    if not csvs:
        print(f"No se encontraron .csv en: {CSV_DIR}")
        cur.close(); conn.close()
        return

    resumen = []
    for p in csvs:
        try:
            stats = cargar_csv(p, cur, conn)
            resumen.append(stats)
            print(f"[OK] {stats['archivo']}: leídas={stats['leidas']}, "
                  f"distinct_id={stats['distinct_por_id']}, upserts={stats['upserts_ejecutados']}")
        except Exception as e:
            print(f"[ERROR] {os.path.basename(p)}: {e}")

    # Resumen global
    if resumen:
        tot_arch = len(resumen)
        tot_leidas = sum(r["leidas"] for r in resumen)
        tot_distinct = sum(r["distinct_por_id"] for r in resumen)  # por archivo
        tot_upserts = sum(r["upserts_ejecutados"] for r in resumen)
        print("\n=== RESUMEN ===")
        print(f"Archivos procesados:   {tot_arch}")
        print(f"Filas leídas totales:  {tot_leidas}")
        print(f"Distinct por archivo:  {tot_distinct}  (nota: suma por-archivo; no equivale a distintos globales)")
        print(f"UPSERTs ejecutados:    {tot_upserts}")

    cur.close()
    conn.close()

if __name__ == "__main__":
    main()


[OK] id_clientes_codigos_1.csv: leídas=5000, distinct_id=5000, upserts=5000
[OK] id_clientes_codigos_2.csv: leídas=7000, distinct_id=7000, upserts=7000
[OK] id_clientes_codigos_3.csv: leídas=500, distinct_id=500, upserts=500
[OK] id_clientes_codigos_4.csv: leídas=1500, distinct_id=1500, upserts=1500

=== RESUMEN ===
Archivos procesados:   4
Filas leídas totales:  14000
Distinct por archivo:  14000  (nota: suma por-archivo; no equivale a distintos globales)
UPSERTs ejecutados:    14000


Correr archivo de mysql_logstash

cd "C:\ELK\logstash-7.17.10-windows-x86_64\logstash-7.17.10\bin"

C:\ELK\logstash-7.17.10-windows-x86_64\logstash-7.17.10\bin\logstash.bat -f C:\ELK\logstash-7.17.10-windows-x86_64\logstash-7.17.10\bin\mysql_logstash.conf