# Instalaciones

In [1]:
# %pip install pandas openpyxl xlsxwriter python-dateutil





[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


# Importaciones

In [3]:
from __future__ import annotations
import os, re, math
from pathlib import Path
from datetime import datetime
from typing import Optional, Tuple, List
import pandas as pd
import numpy as np
from dateutil import tz
from unidecode import unidecode
import unicodedata
from collections import defaultdict
import matplotlib.pyplot as plt
import textwrap

C:\Users\augus\anaconda\lib\site-packages\numpy\.libs\libopenblas.EL2C6PLE4ZYW3ECEVIV3OXXGRN2NRFM2.gfortran-win_amd64.dll
C:\Users\augus\anaconda\lib\site-packages\numpy\.libs\libopenblas64__v0.3.23-246-g3d31191b-gcc_10_3_0.dll


In [30]:
# consolidar_sensores.py
# Autor: Augusto (pipeline para Tesis - Secado de arroz)
# Objetivo: Consolidar JPV (.txt) y RB (.csv) en formato ANCHO (estilo RB) preservando columnas crudas.

# ========== CONFIG ==========
# Editá estas tres rutas a tu gusto (sugerencia: dejarlas dentro del mismo OneDrive del proyecto)
BASE_JPV = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos JPV 2024 2025"
BASE_RB  = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025"
OUTPUT_DIR = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados"

# Modo muestra: si querés probar con pocos archivos, poner un número (ej. 6). None procesa todo.
SAMPLE_MAX_FILES: Optional[int] = 5

# Zona horaria (no forzamos tz-aware; la usamos si quisieras localize más adelante)
TZ = "America/Montevideo"

# Si True, exporta también los datos en “largo” (auditoría).
EXPORT_LONG = True

DROP_WIDE_COLS = ["TimeString", "HUMEDAD", "OFFSET", "TEMPERATURA"]
RB_VOLT_SCALE = 0.01  # dividir voltajes de RB por 100 para equiparar con JPV


# ============================

os.makedirs(OUTPUT_DIR, exist_ok=True)

# --------- Utilidades ---------
def extract_sensor_id_from_name(name: str) -> Optional[int]:
    """
    JPV: SENSOR10,20,...,60 -> 1..6
    RB:  SENSOR1, SENSOR2, etc.
    """
    m = re.search(r"SENSOR\s*(\d+)", name.upper())
    if not m:
        return None
    num = int(m.group(1))
    return num // 10 if num in (10, 20, 30, 40, 50, 60) else num

def is_plain_sensor_folder(name: str) -> bool:
    """
    True solo si la carpeta es exactamente SENSOR<1..6> (sin 'b' ni 'c').
    Aplica solo a JPV.
    """
    m = re.fullmatch(r"SENSOR([1-6])", name.upper())
    return m is not None

def parse_tirada_jpv(path: Path) -> Tuple[Optional[int], Optional[datetime]]:
    """
    Busca en los segmentos del path:
      - usb <número>   (sin importar espacios / mayúsculas)
      - una fecha con separador . o - o / (dd.mm.aa[aa] o dd-mm-aaaa, etc.)
    Tolera texto adicional en el mismo nombre de carpeta.
    """
    DATE_RX = r"([0-3]?\d)[\.\-\/]([01]?\d)[\.\-\/](\d{2,4})"
    usb_num = None
    date_dt = None

    for part in path.parts:
        # número de USB
        if usb_num is None:
            mnum = re.search(r"\busb\s*(\d+)", part, flags=re.IGNORECASE)
            if mnum:
                usb_num = int(mnum.group(1))

        # fecha
        if date_dt is None:
            mdate = re.search(DATE_RX, part)
            if mdate:
                d, m, y = mdate.groups()
                y = int(y)
                if y < 100:  # años 2 dígitos
                    y += 2000
                try:
                    date_dt = datetime(y, int(m), int(d))
                except ValueError:
                    pass  # por si cae una fecha inválida

        if usb_num is not None and date_dt is not None:
            break

    return usb_num, date_dt


def parse_tirada_rb(path: Path) -> Optional[datetime]:
    DATE_RX = r"([0-3]?\d)[\.\-\/]([01]?\d)[\.\-\/](\d{2,4})"
    for part in path.parts[::-1]:  # de más profundo a más alto
        mdate = re.search(DATE_RX, part)
        if mdate:
            d, m, y = mdate.groups()
            y = int(y)
            if y < 100:
                y += 2000
            try:
                return datetime(y, int(m), int(d))
            except ValueError:
                continue
    return None


def read_jpv_txt(path: Path) -> pd.DataFrame:
    # Intentos de lectura (UTF-16 comunes en estos exportes)
    tried = [( "utf-16", "\t"), ("utf-16le", "\t"), ("utf-8", "\t")]
    last_err = None
    for enc, sep in tried:
        try:
            df = pd.read_csv(path, sep=sep, encoding=enc, engine="python", on_bad_lines="skip")
            break
        except Exception as e:
            last_err = e
            df = None
    if df is None:
        raise last_err or RuntimeError(f"No se pudo leer {path}")

    # Filtrar metadatos ($RT_*)
    if "VarName" in df.columns:
        df = df[~df["VarName"].astype(str).str.startswith("$RT_")]

    # Selección mínima, preservando trazabilidad
    keep_cols = [c for c in ["VarName", "TimeString", "VarValue", "Validity", "Time_ms"] if c in df.columns]
    df = df[keep_cols].copy()

    # Timestamp desde TimeString (no forcemos dayfirst aquí; viene bien formado)
    df["timestamp"] = pd.to_datetime(df.get("TimeString", pd.Series(dtype=str)), errors="coerce")

    # Variable normalizada y original
    if "VarName" in df.columns:
        df["VarName_original"] = df["VarName"].astype(str)
        df["variable"] = df["VarName"].astype(str).str.replace(r"^\d+_", "", regex=True)
    else:
        df["VarName_original"] = np.nan
        df["variable"] = "Var"

    # VarValue -> número (coma decimal → punto)
    if "VarValue" in df.columns:
        s = df["VarValue"].astype(str).str.strip()
        # Cambiamos coma por punto solo si parece número con coma
        s = np.where(s.str.contains(r"^\s*-?\d+,\d+\s*$"), s.str.replace(",", ".", regex=False), s)
        df["valor"] = pd.to_numeric(s, errors="coerce")
    else:
        df["valor"] = np.nan

    return df

def _canon(s: str) -> str:
    # normaliza: mayúsculas, sin espacios/guiones/puntos/paréntesis
    return re.sub(r"[\s\-\.\(\)]", "", s.upper())

def read_rb_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, engine="python")

    # detectar columnas de fecha/hora
    date_col = None
    lt_col = None
    for c in df.columns:
        cc = _canon(c)
        if cc in ("DATE","FECHA"):
            date_col = c
        if cc in ("LOCTIME","LOCTIEMPO","LOCALTIME"):
            lt_col = c
    # timestamp
    if date_col and lt_col:
        df["timestamp"] = pd.to_datetime(df[date_col].astype(str) + " " + df[lt_col].astype(str),
                                         dayfirst=False, errors="coerce")
    elif date_col:
        df["timestamp"] = pd.to_datetime(df[date_col], errors="coerce")
    else:
        df["timestamp"] = pd.NaT

    # detectar variables RB (V_HUM / V_TEM con variantes)
    value_cols_map = {}
    for c in df.columns:
        cc = _canon(c)
        if cc in ("V_HUM","VHUM"):
            value_cols_map[c] = "V_HUM"
        elif cc in ("V_TEM","VTEM","V_TEMP","VTEMP"):
            value_cols_map[c] = "V_TEM"

    value_cols = list(value_cols_map.keys())
    # si no encontramos nada, devolvemos vacío (que luego no rompe)
    if not value_cols:
        # construir frame vacío con id_cols
        id_cols = []
        if "Record" in df.columns: id_cols.append("Record")
        if date_col: id_cols.append(date_col)
        if lt_col:   id_cols.append(lt_col)
        id_cols.append("timestamp")
        out = df[id_cols].copy()
        out["variable"] = np.nan
        out["valor"] = np.nan
        if date_col: out.rename(columns={date_col: "Date_raw"}, inplace=True)
        if lt_col:   out.rename(columns={lt_col: "LOC_time_raw"}, inplace=True)
        return out[["Record","Date_raw","LOC_time_raw","timestamp","variable","valor"]]

    # melt
    id_cols = []
    if "Record" in df.columns: id_cols.append("Record")
    if date_col: id_cols.append(date_col)
    if lt_col:   id_cols.append(lt_col)
    id_cols.append("timestamp")

    long_df = df[id_cols + value_cols].melt(id_vars=id_cols, var_name="variable_raw", value_name="valor_raw")
    # variable canónica
    long_df["variable"] = long_df["variable_raw"].map({c: value_cols_map[c] for c in value_cols})
    # Date/LOC raw para trazabilidad
    if date_col: long_df.rename(columns={date_col: "Date_raw"}, inplace=True)
    if lt_col:   long_df.rename(columns={lt_col: "LOC_time_raw"}, inplace=True)

    # valor a num (coma→punto si corresponde)
    s = long_df["valor_raw"].astype(str).str.strip()
    s = np.where(s.str.contains(r"^\s*-?\d+,\d+\s*$"), s.str.replace(",", ".", regex=False), s)
    long_df["valor"] = pd.to_numeric(s, errors="coerce")

    return long_df[["Record","Date_raw","LOC_time_raw","timestamp","variable","valor"]]

def build_inventory() -> pd.DataFrame:
    rows = []

    # --- JPV ---
    for root, dirs, files in os.walk(BASE_JPV):
        root_p = Path(root)
        # excluir carpetas SENSOR#b y SENSOR#c (solo JPV)
        parent_name = root_p.name
        if parent_name.upper().startswith("SENSOR") and not is_plain_sensor_folder(parent_name):
            continue

        for f in files:
            if not f.lower().endswith(".txt"): 
                continue
            # aceptar solo archivos bajo una carpeta SENSOR<1..6>
            if not is_plain_sensor_folder(root_p.name):
                continue
            # meta
            p = root_p / f
            planta = "JPV"
            sensor_id = extract_sensor_id_from_name(f) or extract_sensor_id_from_name(root_p.name)
            tirada_num, tirada_dt = parse_tirada_jpv(root_p)
            año = None
            # buscar el '2024 Datos Sensores JPV' en path para inferir año si aplica
            m = re.search(r"(20\d{2})\s+Datos\s+Sensores\s+JPV", str(p), flags=re.IGNORECASE)
            if m: año = int(m.group(1))
            rows.append({
                "planta": planta, "año": año,
                "tirada_num": tirada_num, "tirada_fecha": tirada_dt,
                "sensor_id": sensor_id,
                "ext": p.suffix.lower(), "source_file": f, "source_path": str(p)
            })

    # --- RB ---
    for root, dirs, files in os.walk(BASE_RB):
        root_p = Path(root)
        for f in files:
            if not f.lower().endswith(".csv"):
                continue
            p = root_p / f
            planta = "RB"
            sensor_id = extract_sensor_id_from_name(f) or extract_sensor_id_from_name(root_p.name)
            tirada_dt = parse_tirada_rb(root_p)
            # año desde '2024 Datos Sensores RB' si aparece
            año = None
            m = re.search(r"(20\d{2})\s+Datos\s+Sensores\s+RB", str(p), flags=re.IGNORECASE)
            if m: año = int(m.group(1))
            rows.append({
                "planta": planta, "año": año,
                "tirada_num": None, "tirada_fecha": tirada_dt,
                "sensor_id": sensor_id,
                "ext": p.suffix.lower(), "source_file": f, "source_path": str(p)
            })

    inv = pd.DataFrame(rows)
    if len(inv):
        inv.sort_values(["planta","año","tirada_fecha","sensor_id","source_file"], inplace=True, ignore_index=True)
    return inv

def process_files(inv: pd.DataFrame, sample_strategy: str = "balanced") -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Devuelve: (long_all, log_omisiones, qa_resumen)

    sample_strategy:
      - "balanced": mitad JPV / mitad RB (fallback con el resto)
      - "by_group": 1 por grupo (planta, año, sensor) y luego completa hasta N
      - "head":     simplemente los primeros N
    """
    long_frames: List[pd.DataFrame] = []
    log_rows: List[dict] = []

    # ---------- Selección de muestra ----------
    files_to_process = inv.copy()
    if SAMPLE_MAX_FILES:
        n = int(SAMPLE_MAX_FILES)

        if sample_strategy == "balanced":
            jpv = inv[inv["planta"] == "JPV"]
            rb  = inv[inv["planta"] == "RB"]

            per_jpv = n // 2
            per_rb  = n - per_jpv

            sel = pd.concat([jpv.head(per_jpv), rb.head(per_rb)], ignore_index=True)

            # fallback si a una planta le faltan archivos
            if len(sel) < n:
                faltan = n - len(sel)
                resto = inv.drop(sel.index).head(faltan)
                sel = pd.concat([sel, resto], ignore_index=True)

            files_to_process = sel

        elif sample_strategy == "by_group":
            base = (inv.sort_values(["planta","año","sensor_id","tirada_fecha","source_file"])
                      .groupby(["planta","año","sensor_id"], group_keys=False)
                      .head(1))

            if len(base) < n:
                resto = inv.drop(base.index)
                base  = pd.concat([base, resto.head(n - len(base))], ignore_index=True)
            else:
                base  = base.head(n).reset_index(drop=True)

            files_to_process = base

        else:  # "head"
            files_to_process = files_to_process.head(n)

    # ---------- Procesamiento archivo x archivo ----------
    for _, r in files_to_process.iterrows():
        p = Path(r["source_path"])
        try:
            if r["planta"] == "JPV":
                df = read_jpv_txt(p)
            else:
                df = read_rb_csv(p)

            # columnas meta comunes
            df["planta"] = r["planta"]
            df["año"] = r["año"]
            df["tirada_num"] = r["tirada_num"]
            df["tirada_fecha"] = pd.to_datetime(r["tirada_fecha"]) if pd.notna(r["tirada_fecha"]) else pd.NaT
            df["sensor_id"] = r["sensor_id"]
            df["source_file"] = r["source_file"]
            df["source_path"] = r["source_path"]

            # armonizar columnas crudas (por si no existen)
            for col in ["Date_raw","LOC_time_raw","VarName","TimeString","VarValue","Validity","Time_ms","VarName_original"]:
                if col not in df.columns:
                    df[col] = pd.Series([np.nan]*len(df))

            # clave de duplicado en largo
            df["dup_key"] = (
                df["planta"].astype(str) + "|" +
                df["sensor_id"].astype(str) + "|" +
                df["timestamp"].astype(str) + "|" +
                df["variable"].astype(str)
            )

            # detectar duplicados exactos
            dups = df[df.duplicated(subset=["dup_key"], keep="first")]
            if len(dups):
                for _, dd in dups.iterrows():
                    log_rows.append({
                        "tipo": "duplicado_largo",
                        "planta": r["planta"], "sensor_id": r["sensor_id"],
                        "timestamp": dd["timestamp"], "variable": dd["variable"],
                        "source_file": r["source_file"], "source_path": r["source_path"]
                    })
                # nos quedamos con la primera
                df = df.drop_duplicates(subset=["dup_key"], keep="first")

            long_frames.append(df.drop(columns=["dup_key"]))

        except Exception as e:
            log_rows.append({
                "tipo": "error_lectura",
                "planta": r["planta"], "sensor_id": r["sensor_id"],
                "timestamp": None, "variable": None,
                "source_file": r["source_file"], "source_path": r["source_path"],
                "detalle": str(e)
            })

    # ---------- Unión y QA ----------
    long_all = pd.concat(long_frames, ignore_index=True) if long_frames else pd.DataFrame()

    if not long_all.empty:
        qa = (long_all
                .groupby(["planta","año","sensor_id"])
                .agg(registros=("valor","size"),
                     fechas_min=("timestamp","min"),
                     fechas_max=("timestamp","max"))
                .reset_index())
    else:
        qa = pd.DataFrame(columns=["planta","año","sensor_id","registros","fechas_min","fechas_max"])

    log_df = pd.DataFrame(log_rows, columns=["tipo","planta","sensor_id","timestamp","variable","source_file","source_path","detalle"])
    return long_all, log_df, qa


def to_wide(long_all: pd.DataFrame) -> pd.DataFrame:
    """
    ANCHO: unifica voltajes en VOLT_HUM / VOLT_TEM, escala RB, excluye HUMEDAD/TEMPERATURA/OFFSET/VARIEDAD
    y evita columnas 'nan' en el pivot.
    """
    if long_all.empty:
        return long_all

    # 1) Claves estables (sin tirada_num para no perder RB por NaN)
    key_cols = ["planta","año","tirada_fecha","sensor_id","timestamp"]
    for c in key_cols:
        if c not in long_all.columns:
            long_all[c] = np.nan

    # 2) Canonicalizar nombre de variable
    svar = (long_all["variable"]
            .astype(str)
            .str.upper()
            .str.replace(r"[\s_\.\-]", "", regex=True))

    # Aliases que SÍ queremos (voltajes)
    hum_aliases  = {"VHUM","VOLTHUM","VOLTHUME"}
    tem_aliases  = {"VTEM","VOLTTEM","VOLTTEMP","VTEMP"}

    # Aliases que NO queremos (descartar de wide)
    drop_aliases = {"HUMEDAD","TEMPERATURA","OFFSET","VARIEDAD"}

    mask_hum  = svar.isin(hum_aliases)
    mask_tem  = svar.isin(tem_aliases)
    mask_drop = svar.isin(drop_aliases)

    # Nos quedamos SOLO con voltajes (descartamos HUMEDAD/TEMPERATURA/etc.)
    keep_mask = (mask_hum | mask_tem) & (~mask_drop)
    long_v = long_all[keep_mask].copy()

    # 3) Nombre normalizado + escala RB
    long_v["var_norm"] = np.where(mask_hum[keep_mask], "VOLT_HUM", "VOLT_TEM")
    scale = np.where(long_v["planta"].eq("RB"), RB_VOLT_SCALE, 1.0)
    long_v["valor_norm"] = pd.to_numeric(long_v["valor"], errors="coerce") * scale

    # 4) Pivot SOLO con claves estables
    wide = (long_v
            .pivot_table(index=key_cols,
                         columns="var_norm",
                         values="valor_norm",
                         aggfunc="first")
            .reset_index())

    # 5) Re-anexar crudos + tirada_num (primer no-NaN por grupo)
    raw_keep = [c for c in ["tirada_num","Date_raw","LOC_time_raw","TimeString"] if c in long_all.columns]
    if raw_keep:
        raw = (long_all[key_cols + raw_keep]
               .groupby(key_cols, as_index=False)
               .first())
        wide = wide.merge(raw, on=key_cols, how="left")

    # 6) Orden de columnas
    volt_cols = [c for c in ["VOLT_HUM","VOLT_TEM"] if c in wide.columns]
    other_vars = [c for c in wide.columns if c not in (key_cols + raw_keep + volt_cols)]
    wide = wide[key_cols + raw_keep + volt_cols + other_vars]

    # 7) Drop seguro de columnas 'nan' (por si quedara alguna etiqueta rara)
    safe_cols = []
    for c in wide.columns:
        if isinstance(c, float) and pd.isna(c):
            continue
        if isinstance(c, str) and c.strip().lower() == "nan":
            continue
        safe_cols.append(c)
    wide = wide[safe_cols]

    # 8) Quitar columnas no deseadas configuradas
    if "DROP_WIDE_COLS" in globals():
        wide = wide.drop(columns=[c for c in DROP_WIDE_COLS if c in wide.columns], errors="ignore")

    return wide

def save_outputs(inv: pd.DataFrame, long_all: pd.DataFrame, wide: pd.DataFrame,
                 log_df: pd.DataFrame, qa: pd.DataFrame):
    inv_path = Path(OUTPUT_DIR) / "inventario.xlsx"
    out_path = Path(OUTPUT_DIR) / "ConsolidadoSensores.xlsx"

    with pd.ExcelWriter(inv_path, engine="xlsxwriter") as w:
        inv.to_excel(w, sheet_name="inventario", index=False)

    with pd.ExcelWriter(out_path, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
        wide.to_excel(w, sheet_name="datos_wide", index=False)
        if EXPORT_LONG:
            # Guardamos versión “largo” para auditoría (acota columnas para tamaño)
            cols_long = ["planta","año","tirada_num","tirada_fecha","sensor_id","timestamp",
                         "variable","valor","Record","Date_raw","LOC_time_raw","VarName","TimeString",
                         "VarValue","Validity","Time_ms","VarName_original","source_file","source_path"]
            cols_long = [c for c in cols_long if c in long_all.columns]
            long_all[cols_long].to_excel(w, sheet_name="datos_long", index=False)

        # Diccionario
        dicc = pd.DataFrame({
            "columna": [
                "planta","año","tirada_num","tirada_fecha","sensor_id","timestamp",
                "Date_raw","LOC_time_raw","TimeString",
                "V_HUM","V_TEM","(otras variables pivotadas)",
                "Record","VarName","VarValue","Validity","Time_ms","VarName_original",
                "source_file","source_path"
            ],
            "descripcion": [
                "Planta origen (JPV/RB)","Año lógico extraído de la ruta",
                "N° de tirada (JPV)","Fecha de tirada","ID de sensor (1–6)",
                "Marca de tiempo unificada",
                "Fecha cruda RB","Hora local cruda RB","Tiempo crudo JPV",
                "Voltaje humedad","Voltaje temperatura","Otras señales según VarName/CSV",
                "N° de registro RB","Nombre de variable JPV original","Valor JPV original",
                "Bandera de validez JPV","Tiempo en ms JPV","VarName sin normalizar",
                "Nombre archivo fuente","Ruta completa del archivo fuente"
            ]
        })
        dicc.to_excel(w, sheet_name="diccionario", index=False)

        # Logs y QA
        if not log_df.empty:
            log_df.to_excel(w, sheet_name="log_omisiones", index=False)
        qa.to_excel(w, sheet_name="qa_resumen", index=False)

    print(f"Inventario: {inv_path}")
    print(f"Consolidado: {out_path}")

def main():
    print(">> Construyendo inventario...")
    inv = build_inventory()
    if inv.empty:
        print("No se encontraron archivos. Revisá las rutas BASE_JPV y BASE_RB.")
        return

    print(f"Archivos detectados: {len(inv)} (SAMPLE={SAMPLE_MAX_FILES})")
    inv_path = Path(OUTPUT_DIR) / "inventario.xlsx"
    with pd.ExcelWriter(inv_path, engine="xlsxwriter") as w:
        inv.to_excel(w, sheet_name="inventario", index=False)
    print(f"Inventario preliminar: {inv_path}")

    print(">> Procesando archivos...")
    long_all, log_df, qa = process_files(inv)

    print(f"Registros en largo: {len(long_all)}")
    print(">> Pivot a formato ANCHO (estilo RB)...")
    wide = to_wide(long_all)

    print(f"Filas en datos_wide: {len(wide)} | Columnas: {len(wide.columns)}")
    print(">> Guardando Excel...")
    save_outputs(inv, long_all, wide, log_df, qa)

# --- EJECUCIÓN AUTOMÁTICA ---
if __name__ == "__main__":
        main()
# ---------------------------------------

>> Construyendo inventario...
Archivos detectados: 263 (SAMPLE=5)
Inventario preliminar: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\inventario.xlsx
>> Procesando archivos...
Registros en largo: 36708
>> Pivot a formato ANCHO (estilo RB)...
Filas en datos_wide: 7034 | Columnas: 10
>> Guardando Excel...
Inventario: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\inventario.xlsx
Consolidado: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\ConsolidadoSensores.xlsx


In [28]:
# Muestra chica equilibrada
SAMPLE_MAX_FILES = 5

inv = build_inventory()
long_all, log_df, qa = process_files(inv, sample_strategy="balanced")

# Chequeos clave
print("filas long_all:", len(long_all))
print("prop. valor no nulo:", long_all["valor"].notna().mean())

print("\nTop variables detectadas:")
print(long_all["variable"].value_counts().head(10))

# ¿Por qué podía quedar vacío el wide? probamos con to_wide corregido
wide = to_wide(long_all)
print("\nwide -> filas:", len(wide), " columnas:", len(wide.columns))
print(wide.head(3))

# Diagnóstico extra: ¿alguna clave estable con NaN?
key_cols = ["planta","año","tirada_num","tirada_fecha","sensor_id","timestamp"]
print("\nFilas con NaN en alguna clave estable:",
      long_all[key_cols].isna().any(axis=1).mean())


filas long_all: 36708
prop. valor no nulo: 1.0

Top variables detectadas:
HUMEDAD        5660
TEMPERATURA    5660
VOLT_HUME      5660
VOLT_TEMP      5660
OFFSET         5660
VARIEDAD       5660
V_HUM          1374
V_TEM          1374
Name: variable, dtype: int64

wide -> filas: 7034  columnas: 10
  planta   año tirada_fecha  sensor_id           timestamp  tirada_num  \
0    JPV  2024   2024-03-25          2 2024-03-15 13:44:35         1.0   
1    JPV  2024   2024-03-25          2 2024-03-15 13:49:35         1.0   
2    JPV  2024   2024-03-25          2 2024-03-15 13:54:35         1.0   

  Date_raw LOC_time_raw  VOLT_HUM  VOLT_TEM  
0     None         None       0.0       0.0  
1     None         None       0.0       0.0  
2     None         None       0.0       0.0  

Filas con NaN en alguna clave estable: 0.07486106570774763


In [29]:
#Chequeo archivo datos_wide

SAMPLE_MAX_FILES = 5
inv = build_inventory()
long_all, log_df, qa = process_files(inv, sample_strategy="balanced")
wide = to_wide(long_all)

print("filas wide:", len(wide))
print("por planta en wide:\n", wide["planta"].value_counts())


filas wide: 7034
por planta en wide:
 JPV    5660
RB     1374
Name: planta, dtype: int64


# Chequeo de Consolidación

In [14]:
inv = build_inventory()
long_all, log_df, qa = process_files(inv)
wide = to_wide(long_all)
save_outputs(inv, long_all, wide, log_df, qa)


Inventario: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\inventario.xlsx
Consolidado: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\ConsolidadoSensores.xlsx


In [15]:
# ¿La muestra de 10 archivos incluye JPV y RB?
inv.head(3), inv["planta"].value_counts()

(  planta  anio  tirada_num tirada_fecha  sensor_id   ext   source_file  \
 0    JPV  2024         1.0   2024-03-25          1  .txt  SENSOR10.txt   
 1    JPV  2024         1.0   2024-03-25          2  .txt  SENSOR20.txt   
 2    JPV  2024         1.0   2024-03-25          3  .txt  SENSOR30.txt   
 
                                          source_path  
 0  C:\Users\augus\OneDrive - Universidad de Monte...  
 1  C:\Users\augus\OneDrive - Universidad de Monte...  
 2  C:\Users\augus\OneDrive - Universidad de Monte...  ,
 JPV    138
 RB     126
 Name: planta, dtype: int64)

In [16]:
# ¿Se excluyeron SENSORb/SENSORc en JPV?
bad = inv[(inv["planta"]=="JPV") & inv["source_path"].str.contains(r"SENSOR\d+[bc]", case=False, regex=True)]
bad.shape  # debería ser (0, ?)


(0, 8)

In [17]:
# ¿Columnas “crudas” preservadas según la planta?
# RB debe tener Date_raw/LOC_time_raw con datos en datos_long
rb_long = long_all[long_all["planta"]=="RB"]
rb_long[["Date_raw","LOC_time_raw"]].isna().mean()

# JPV debe tener TimeString/VarName/VarValue con datos en datos_long
jpv_long = long_all[long_all["planta"]=="JPV"]
jpv_long[["TimeString","VarName","VarValue"]].isna().mean()


TimeString    0.0
VarName       0.0
VarValue      0.0
dtype: float64

In [18]:
# ¿Formato final ancho ok (V_HUM, V_TEM en columnas)?
[c for c in wide.columns if c.startswith("V_")][:10]  # debería listar ['V_HUM','V_TEM'] si estaban en la muestra


[]

In [19]:
# ¿Mapeo de sensor JPV SENSOR10 → 1, etc.?
tmp = inv[(inv["planta"]=="JPV")].copy()
tmp["ok"] = tmp.apply(lambda r: (("SENSOR10" in r["source_file"] and r["sensor_id"]==1) or
                                 ("SENSOR20" in r["source_file"] and r["sensor_id"]==2) or
                                 ("SENSOR30" in r["source_file"] and r["sensor_id"]==3) or
                                 ("SENSOR40" in r["source_file"] and r["sensor_id"]==4) or
                                 ("SENSOR50" in r["source_file"] and r["sensor_id"]==5) or
                                 ("SENSOR60" in r["source_file"] and r["sensor_id"]==6)), axis=1)
tmp["ok"].all()  # debería ser True


True

In [20]:
# 6) ¿Timestamps construidos correctamente?
# proporción de timestamps no nulos por planta
long_all.groupby("planta")["timestamp"].apply(lambda s: s.notna().mean())


planta
JPV    1.0
Name: timestamp, dtype: float64

In [21]:
# 7) ¿Duplicados tratados?
log_df["tipo"].value_counts()
#Si hay duplicado_largo, quedaron registrados y en datos_long se dejó la primera aparición.

Series([], Name: tipo, dtype: int64)

In [22]:
# ¿Rangos de tiempo sensatos por sensor (QA)?
qa.sort_values(["planta","sensor_id"]).head(12)

# Revisar fechas_min y fechas_max por sensor.

Unnamed: 0,planta,anio,sensor_id,registros,fechas_min,fechas_max
0,JPV,2024,1,36966,2023-05-20 13:50:34,2024-03-29 08:59:14
1,JPV,2024,2,36972,2023-05-20 13:50:34,2024-03-29 08:59:14
2,JPV,2024,3,36972,2023-05-20 13:50:34,2024-03-29 08:59:14
3,JPV,2024,4,36972,2023-05-20 13:50:34,2024-03-29 08:59:14
4,JPV,2024,5,16980,2024-03-15 13:44:35,2024-03-25 09:49:05
5,JPV,2024,6,16980,2024-03-15 13:44:35,2024-03-25 09:49:05


In [10]:
long_all["valor"].notna().mean()       # debería ser > 0
long_all["variable"].value_counts().head()  # debería listar variables como V_HUM, V_TEM o VarName de JPV


NameError: name 'long_all' is not defined

# SOLO RB

In [6]:
# === SOLO RB (autosuficiente) ===

BASE_RB  = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025"
OUTPUT_DIR = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_solo_RB"
RB_VOLT_SCALE = 0.01
SAMPLE_MAX_FILES = 5   # o un número chico p/ probar

os.makedirs(OUTPUT_DIR, exist_ok=True)

def _canon(s: str) -> str:
    return re.sub(r"[\s\-\.\(\)_]", "", str(s).upper())

def extract_sensor_id_from_name(name: str):
    m = re.search(r"SENSOR\s*(\d+)", str(name).upper())
    return int(m.group(1)) if m else None

def parse_tirada_rb(path: Path):
    DATE_RX = r"([0-3]?\d)[\.\-\/]([01]?\d)[\.\-\/](\d{2,4})"
    for part in path.parts[::-1]:
        mdate = re.search(DATE_RX, part)
        if mdate:
            d, m, y = mdate.groups()
            y = int(y) + (2000 if int(y) < 100 else 0)
            try:
                return datetime(y, int(m), int(d))
            except ValueError:
                pass
    return None

def read_rb_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, engine="python")
    # detectar fecha/hora
    date_col = next((c for c in df.columns if _canon(c) in ("DATE","FECHA")), None)
    lt_col   = next((c for c in df.columns if _canon(c) in ("LOCTIME","LOCTIEMPO","LOCALTIME")), None)
    if date_col and lt_col:
        df["timestamp"] = pd.to_datetime(df[date_col].astype(str)+" "+df[lt_col].astype(str), dayfirst=False, errors="coerce")
    elif date_col:
        df["timestamp"] = pd.to_datetime(df[date_col], errors="coerce")
    else:
        df["timestamp"] = pd.NaT

    # map variables de RB
    value_cols_map = {}
    for c in df.columns:
        cc = _canon(c)
        if cc in ("VHUM", "V_HUM"):
            value_cols_map[c] = "V_HUM"
        elif cc in ("VTEM","V_TEM","VTEMP","V_TEMP"):
            value_cols_map[c] = "V_TEM"
    value_cols = list(value_cols_map.keys())

    id_cols = ["timestamp"]
    if "Record" in df.columns: id_cols.insert(0, "Record")
    if date_col: id_cols.insert(1, date_col)
    if lt_col:   id_cols.insert(2, lt_col)

    long_df = df[id_cols + value_cols].melt(id_vars=id_cols, var_name="variable_raw", value_name="valor_raw")
    long_df["variable"] = long_df["variable_raw"].map(value_cols_map)
    if date_col: long_df.rename(columns={date_col:"Date_raw"}, inplace=True)
    if lt_col:   long_df.rename(columns={lt_col:"LOC_time_raw"}, inplace=True)

    # valor num (coma → punto)
    s = long_df["valor_raw"].astype(str).str.strip()
    s = np.where(s.str.contains(r"^\s*-?\d+,\d+\s*$"), s.str.replace(",", ".", regex=False), s)
    long_df["valor"] = pd.to_numeric(s, errors="coerce")

    # meta de path
    long_df["planta"] = "RB"
    long_df["año"] = None
    m = re.search(r"(20\d{2})\s+Datos\s+Sensores\s+RB", str(path), flags=re.IGNORECASE)
    if m: long_df["año"] = int(m.group(1))
    long_df["tirada_fecha"] = parse_tirada_rb(path.parent)
    long_df["sensor_id"] = extract_sensor_id_from_name(path.name) or extract_sensor_id_from_name(path.parent.name)
    long_df["source_file"] = path.name
    long_df["source_path"] = str(path)
    return long_df

# inventario RB
# inventario RB
rows = []
for root, dirs, files in os.walk(BASE_RB):
    # ⛔ omitir la carpeta "Datos otro formato" (no se desciende a ella)
    dirs[:] = [d for d in dirs if _canon(d) != _canon("Datos otro formato")]

    for f in files:
        if f.lower().endswith(".csv"):
            p = Path(root)/f
            rows.append({"planta":"RB","source_file":f,"source_path":str(p)})

inv_rb = pd.DataFrame(rows)
print("Archivos RB detectados:", len(inv_rb))

if SAMPLE_MAX_FILES:
    inv_rb = inv_rb.head(int(SAMPLE_MAX_FILES))

# procesar archivos
long_frames = []
for _, r in inv_rb.iterrows():
    try:
        long_frames.append(read_rb_csv(Path(r["source_path"])))
    except Exception as e:
        print("Error leyendo:", r["source_path"], e)

long_rb = pd.concat(long_frames, ignore_index=True) if long_frames else pd.DataFrame()
print("Registros RB en largo:", len(long_rb))

# wide unificado (solo voltajes, escalados)
key_cols = ["planta","año","tirada_fecha","sensor_id","timestamp"]
svar = long_rb["variable"].astype(str).str.upper().str.replace(r"[\s_\.\-]","", regex=True)
hum = svar.isin({"VHUM"})
tem = svar.isin({"VTEM"})
keep = hum | tem
lv = long_rb[keep].copy()
lv["var_norm"] = np.where(hum[keep], "VOLT_HUM", "VOLT_TEM")
lv["valor_norm"] = lv["valor"] * RB_VOLT_SCALE

wide_rb = (lv.pivot_table(index=key_cols, columns="var_norm", values="valor_norm", aggfunc="first").reset_index())

# anexar crudos
raw_keep = [c for c in ["Date_raw","LOC_time_raw"] if c in long_rb.columns]
if raw_keep:
    raw = (long_rb[key_cols + raw_keep].groupby(key_cols, as_index=False).first())
    wide_rb = wide_rb.merge(raw, on=key_cols, how="left")

# ordenar columnas
volt_cols = [c for c in ["VOLT_HUM","VOLT_TEM"] if c in wide_rb.columns]
other = [c for c in wide_rb.columns if c not in (key_cols + raw_keep + volt_cols)]
wide_rb = wide_rb[key_cols + raw_keep + volt_cols + other]

print("Filas en datos_wide (RB):", len(wide_rb), "| Columnas:", len(wide_rb.columns))

# exportar solo inventario + wide + diccionario + qa
inv_path_rb = Path(OUTPUT_DIR) / "inventario_RB.xlsx"
out_path_rb = Path(OUTPUT_DIR) / "ConsolidadoSensores_RB.xlsx"

with pd.ExcelWriter(inv_path_rb, engine="xlsxwriter") as w:
    inv_rb.to_excel(w, sheet_name="inventario_RB", index=False)

qa_rb = (wide_rb.groupby(["planta","año","sensor_id"])
         .agg(registros=("VOLT_HUM","size"),
              fechas_min=("timestamp","min"),
              fechas_max=("timestamp","max"))
         .reset_index()) if not wide_rb.empty else pd.DataFrame(columns=["planta","año","sensor_id","registros","fechas_min","fechas_max"])

with pd.ExcelWriter(out_path_rb, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
    wide_rb.to_excel(w, sheet_name="datos_wide", index=False)
    dicc = pd.DataFrame({
        "columna": ["planta","año","tirada_fecha","sensor_id","timestamp","Date_raw","LOC_time_raw","VOLT_HUM","VOLT_TEM","source_file","source_path"],
        "descripcion": ["Planta origen (RB)","Año","Fecha tirada","ID sensor","Timestamp unificado","Fecha cruda RB","Hora local cruda RB","Voltaje humedad (÷100)","Voltaje temperatura (÷100)","Archivo fuente","Ruta fuente"]
    })
    dicc.to_excel(w, sheet_name="diccionario", index=False)
    qa_rb.to_excel(w, sheet_name="qa_resumen", index=False)

print("Inventario RB:", inv_path_rb)
print("Consolidado RB:", out_path_rb)


Archivos RB detectados: 126
Registros RB en largo: 318034
Filas en datos_wide (RB): 159002 | Columnas: 9
Inventario RB: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_solo_RB\inventario_RB.xlsx
Consolidado RB: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_solo_RB\ConsolidadoSensores_RB.xlsx


# Solo JPV

In [5]:
# === SOLO JPV (autosuficiente) ===

# >>> CONFIG <<<
BASE_JPV   = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos JPV 2024 2025"
OUTPUT_DIR = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_solo_JPV"
SAMPLE_MAX_FILES = None   # poné un número (p. ej. 20) para prueba rápida, o None para todo
DROP_WIDE_COLS = ["TimeString", "HUMEDAD", "OFFSET", "TEMPERATURA", "Date_raw", "LOC_time_raw"] # excluir columnas del datos_wide
# >>> FIN CONFIG <<<

os.makedirs(OUTPUT_DIR, exist_ok=True)

def _canon(s: str) -> str:
    return re.sub(r"[\s\-\.\(\)_]", "", str(s).upper())

def is_plain_sensor_folder(name: str) -> bool:
    # Acepta SENSOR1..SENSOR6 (exacto, sin sufijos b/c)
    return re.fullmatch(r"SENSOR([1-6])", name.upper()) is not None

def extract_sensor_id_from_name(name: str):
    # JPV usa SENSOR10,20,...,60 (1..6) o carpetas SENSOR1..6
    m = re.search(r"SENSOR\s*(\d+)", str(name).upper())
    if not m:
        return None
    n = int(m.group(1))
    return n//10 if n in (10,20,30,40,50,60) else n

def parse_tirada_jpv(path: Path):
    """
    Busca en los segmentos: 'usb <número>' y una fecha dd.mm.aa[aa] / dd-mm-aaaa / dd/mm/aaaa
    Tolera texto adicional (p. ej., 'Datos USB ...', 'usb ... Cambio ...').
    """
    DATE_RX = r"([0-3]?\d)[\.\-\/]([01]?\d)[\.\-\/](\d{2,4})"
    usb_num = None
    date_dt = None
    for part in path.parts:
        if usb_num is None:
            mnum = re.search(r"\busb\s*(\d+)", part, flags=re.IGNORECASE)
            if mnum:
                usb_num = int(mnum.group(1))
        if date_dt is None:
            mdate = re.search(DATE_RX, part)
            if mdate:
                d, m, y = mdate.groups()
                y = int(y) + (2000 if int(y) < 100 else 0)
                try:
                    date_dt = datetime(y, int(m), int(d))
                except ValueError:
                    pass
        if usb_num is not None and date_dt is not None:
            break
    return usb_num, date_dt

def read_jpv_txt(path: Path) -> pd.DataFrame:
    # Intentos de lectura típicos (UTF-16)
    tried = [("utf-16", "\t"), ("utf-16le", "\t"), ("utf-8", "\t")]
    last_err = None
    df = None
    for enc, sep in tried:
        try:
            df = pd.read_csv(path, sep=sep, encoding=enc, engine="python", on_bad_lines="skip")
            break
        except Exception as e:
            last_err = e
    if df is None:
        raise last_err or RuntimeError(f"No se pudo leer {path}")

    # Filtrar metadatos
    if "VarName" in df.columns:
        df = df[~df["VarName"].astype(str).str.startswith("$RT_")]

    # Selección mínima + trazabilidad
    keep = [c for c in ["VarName","TimeString","VarValue","Validity","Time_ms"] if c in df.columns]
    df = df[keep].copy()

    # Timestamp
    df["timestamp"] = pd.to_datetime(df.get("TimeString", pd.Series(dtype=str)), errors="coerce")

    # Variable normalizada y original
    if "VarName" in df.columns:
        df["VarName_original"] = df["VarName"].astype(str)
        df["variable"] = df["VarName"].astype(str).str.replace(r"^\d+_", "", regex=True)
    else:
        df["VarName_original"] = np.nan
        df["variable"] = "Var"

    # VarValue -> número (coma decimal → punto si aplica)
    if "VarValue" in df.columns:
        s = df["VarValue"].astype(str).str.strip()
        s = np.where(s.str.contains(r"^\s*-?\d+,\d+\s*$"), s.str.replace(",", ".", regex=False), s)
        df["valor"] = pd.to_numeric(s, errors="coerce")
    else:
        df["valor"] = np.nan

    return df

# -------- Inventario JPV (solo .txt bajo SENSOR1..6; excluye SENSORb/c) --------
rows = []
for root, dirs, files in os.walk(BASE_JPV):
    root_p = Path(root)
    # Si estoy parado en una carpeta SENSOR*, omitir si no es exactamente SENSOR1..6
    if root_p.name.upper().startswith("SENSOR") and not is_plain_sensor_folder(root_p.name):
        # no descender a subcarpetas de estos sensores con sufijos b/c
        dirs[:] = []
        continue

    for f in files:
        if f.lower().endswith(".txt") and is_plain_sensor_folder(root_p.name):
            p = root_p / f
            planta = "JPV"
            sensor_id = extract_sensor_id_from_name(f) or extract_sensor_id_from_name(root_p.name)
            tirada_num, tirada_dt = parse_tirada_jpv(root_p)
            # año desde carpeta "20xx Datos Sensores JPV" si aparece en la ruta
            año = None
            m = re.search(r"(20\d{2})\s+Datos\s+Sensores\s+JPV", str(p), flags=re.IGNORECASE)
            if m: año = int(m.group(1))
            rows.append({
                "planta": planta, "año": año,
                "tirada_num": tirada_num, "tirada_fecha": tirada_dt,
                "sensor_id": sensor_id,
                "source_file": f, "source_path": str(p)
            })

inv_jpv = pd.DataFrame(rows)
if inv_jpv.empty:
    print("No se encontraron archivos JPV. Revisá BASE_JPV.")
else:
    inv_jpv.sort_values(["año","tirada_fecha","sensor_id","source_file"], inplace=True, ignore_index=True)
print("Archivos JPV detectados:", len(inv_jpv))

# Muestra (opcional)
if SAMPLE_MAX_FILES:
    inv_jpv = inv_jpv.head(int(SAMPLE_MAX_FILES)).reset_index(drop=True)

# -------- Procesamiento archivos (solo JPV) --------
long_frames = []
log_rows = []
for _, r in inv_jpv.iterrows():
    p = Path(r["source_path"])
    try:
        df = read_jpv_txt(p)
        # meta
        df["planta"] = "JPV"
        df["año"] = r["año"]
        df["tirada_num"] = r["tirada_num"]
        df["tirada_fecha"] = pd.to_datetime(r["tirada_fecha"]) if pd.notna(r["tirada_fecha"]) else pd.NaT
        df["sensor_id"] = r["sensor_id"]
        df["source_file"] = r["source_file"]
        df["source_path"] = r["source_path"]
        # asegurar columnas crudas
        for c in ["Date_raw","LOC_time_raw","VarName","TimeString","VarValue","Validity","Time_ms","VarName_original"]:
            if c not in df.columns:
                df[c] = pd.Series([np.nan]*len(df))
        long_frames.append(df)
    except Exception as e:
        log_rows.append({"tipo":"error_lectura","source_path":str(p),"detalle":str(e)})

long_jpv = pd.concat(long_frames, ignore_index=True) if long_frames else pd.DataFrame()
print("Registros JPV en largo:", len(long_jpv))

# -------- Wide (unificar voltajes en VOLT_HUM / VOLT_TEM; excluir HUMEDAD/TEMPERATURA/OFFSET) --------
if long_jpv.empty:
    wide_jpv = pd.DataFrame()
else:
    # claves estables
    key_cols = ["planta","año","tirada_fecha","sensor_id","timestamp"]
    for c in key_cols:
        if c not in long_jpv.columns:
            long_jpv[c] = np.nan

    # map variables JPV → VOLT_HUM / VOLT_TEM
    svar = (long_jpv["variable"].astype(str).str.upper().str.replace(r"[\s_\.\-]", "", regex=True))
    hum_aliases  = {"VOLTHUM","VOLTHUME","VHUM"}     # por si existiese VHUM alguna vez
    tem_aliases  = {"VOLTTEM","VOLTTEMP","VTEM","VTEMP"}
    drop_aliases = {"HUMEDAD","TEMPERATURA","OFFSET","VARIEDAD"}

    mask_hum  = svar.isin(hum_aliases)
    mask_tem  = svar.isin(tem_aliases)
    mask_drop = svar.isin(drop_aliases)

    keep = (mask_hum | mask_tem) & (~mask_drop)
    lv = long_jpv[keep].copy()
    lv["var_norm"] = np.where(mask_hum[keep], "VOLT_HUM", "VOLT_TEM")
    lv["valor_norm"] = pd.to_numeric(lv["valor"], errors="coerce")  # JPV no requiere escala

    wide_jpv = (lv.pivot_table(index=key_cols, columns="var_norm", values="valor_norm", aggfunc="first")
                  .reset_index())

    # anexar crudos + tirada_num
    raw_keep = [c for c in ["tirada_num","Date_raw","LOC_time_raw","TimeString"] if c in long_jpv.columns]
    if raw_keep:
        raw = (long_jpv[key_cols + raw_keep].groupby(key_cols, as_index=False).first())
        wide_jpv = wide_jpv.merge(raw, on=key_cols, how="left")

    # ordenar columnas
    volt_cols = [c for c in ["VOLT_HUM","VOLT_TEM"] if c in wide_jpv.columns]
    other = [c for c in wide_jpv.columns if c not in (key_cols + raw_keep + volt_cols)]
    wide_jpv = wide_jpv[key_cols + raw_keep + volt_cols + other]

    # quitar columnas no deseadas
    if DROP_WIDE_COLS:
        wide_jpv = wide_jpv.drop(columns=[c for c in DROP_WIDE_COLS if c in wide_jpv.columns], errors="ignore")

print("Filas en datos_wide (JPV):", len(wide_jpv), "| Columnas:", len(wide_jpv.columns))

# -------- QA y export --------
qa_jpv = (
    wide_jpv.groupby(["planta","año","sensor_id"])
    .agg(registros=("VOLT_HUM","size"), fechas_min=("timestamp","min"), fechas_max=("timestamp","max"))
    .reset_index()
) if not wide_jpv.empty else pd.DataFrame(columns=["planta","año","sensor_id","registros","fechas_min","fechas_max"])

out_dir = Path(OUTPUT_DIR)
inv_path = out_dir / "inventario_JPV.xlsx"
out_path = out_dir / "Consolidado_Sensores_JPV.xlsx"

with pd.ExcelWriter(inv_path, engine="xlsxwriter") as w:
    inv_jpv.to_excel(w, sheet_name="inventario_JPV", index=False)

with pd.ExcelWriter(out_path, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
    wide_jpv.to_excel(w, sheet_name="datos_wide", index=False)

    dicc = pd.DataFrame({
        "columna": ["planta","año","tirada_num","tirada_fecha","sensor_id","timestamp",
                    "Date_raw","LOC_time_raw","TimeString","VOLT_HUM","VOLT_TEM",
                    "VarName","VarValue","Validity","Time_ms","VarName_original","source_file","source_path"],
        "descripcion": ["Planta origen (JPV)","Año","N° tirada (si existía)","Fecha tirada",
                        "ID sensor (1–6)","Timestamp unificado",
                        "Fecha cruda RB (no aplica)","Hora local cruda RB (no aplica)","Tiempo crudo JPV",
                        "Voltaje humedad (JPV)","Voltaje temperatura (JPV)",
                        "Nombre variable original JPV","Valor crudo original","Bandera JPV","Tiempo en ms JPV",
                        "Nombre original JPV","Archivo fuente","Ruta fuente"]
    })
    dicc.to_excel(w, sheet_name="diccionario", index=False)

    if len(log_rows):
        pd.DataFrame(log_rows).to_excel(w, sheet_name="log_omisiones", index=False)
    qa_jpv.to_excel(w, sheet_name="qa_resumen", index=False)

print("Inventario JPV:", inv_path)
print("Consolidado JPV:", out_path)


Archivos JPV detectados: 137
Registros JPV en largo: 1550621
Filas en datos_wide (JPV): 258460 | Columnas: 8
Inventario JPV: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_solo_JPV\inventario_JPV.xlsx
Consolidado JPV: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_solo_JPV\Consolidado_Sensores_JPV.xlsx


# Solo RB por año

In [7]:
# === SOLO RB por año (autosuficiente) ===
# >>> CONFIG <<<
BASE_RB   = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025"
OUTPUT_DIR = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año"
SAMPLE_MAX_FILES = None       # poné un número para prueba (p. ej. 200) o None para todo
RB_VOLT_SCALE = 0.01          # dividir RB por 100
OMIT_DIR_NAME = "Datos otro formato"  # omitir esta carpeta
# >>> FIN CONFIG <<<

os.makedirs(OUTPUT_DIR, exist_ok=True)

def _canon(s: str) -> str:
    return re.sub(r"[\s\-\.\(\)_]", "", str(s).upper())

def extract_sensor_id_from_name(name: str):
    m = re.search(r"SENSOR\s*(\d+)", str(name).upper())
    return int(m.group(1)) if m else None

def parse_tirada_rb(path: Path):
    DATE_RX = r"([0-3]?\d)[\.\-\/]([01]?\d)[\.\-\/](\d{2,4})"
    for part in path.parts[::-1]:
        mdate = re.search(DATE_RX, part)
        if mdate:
            d, m, y = mdate.groups()
            y = int(y) + (2000 if int(y) < 100 else 0)
            try:
                return datetime(y, int(m), int(d))
            except ValueError:
                pass
    return None

def _canon_col(s: str) -> str:
    return re.sub(r"[\s\-\.\(\)_]", "", str(s).upper())

def read_rb_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, engine="python")

    # detectar columnas fecha/hora
    date_col = next((c for c in df.columns if _canon_col(c) in ("DATE","FECHA")), None)
    lt_col   = next((c for c in df.columns if _canon_col(c) in ("LOCTIME","LOCTIEMPO","LOCALTIME")), None)

    # timestamp
    if date_col and lt_col:
        df["timestamp"] = pd.to_datetime(df[date_col].astype(str)+" "+df[lt_col].astype(str),
                                         dayfirst=False, errors="coerce")
    elif date_col:
        df["timestamp"] = pd.to_datetime(df[date_col], errors="coerce")
    else:
        df["timestamp"] = pd.NaT

    # map variables (voltajes)
    value_cols_map = {}
    for c in df.columns:
        cc = _canon_col(c)
        if cc in ("VHUM","V_HUM"):
            value_cols_map[c] = "V_HUM"
        elif cc in ("VTEM","V_TEM","VTEMP","V_TEMP"):
            value_cols_map[c] = "V_TEM"
    value_cols = list(value_cols_map.keys())

    # id cols
    id_cols = ["timestamp"]
    if "Record" in df.columns: id_cols.insert(0, "Record")
    if date_col: id_cols.insert(1, date_col)
    if lt_col:   id_cols.insert(2, lt_col)

    if not value_cols:
        # no hay columnas de voltaje: devolvemos frame mínimo
        out = df[id_cols].copy()
        if date_col: out.rename(columns={date_col:"Date_raw"}, inplace=True)
        if lt_col:   out.rename(columns={lt_col:"LOC_time_raw"}, inplace=True)
        out["variable"] = np.nan
        out["valor"] = np.nan
        return out[["Record" if "Record" in out.columns else None,
                    "Date_raw" if "Date_raw" in out.columns else None,
                    "LOC_time_raw" if "LOC_time_raw" in out.columns else None,
                    "timestamp","variable","valor"]].dropna(how="all", axis=1)

    long_df = df[id_cols + value_cols].melt(id_vars=id_cols, var_name="variable_raw", value_name="valor_raw")
    long_df["variable"] = long_df["variable_raw"].map(value_cols_map)

    if date_col: long_df.rename(columns={date_col:"Date_raw"}, inplace=True)
    if lt_col:   long_df.rename(columns={lt_col:"LOC_time_raw"}, inplace=True)

    s = long_df["valor_raw"].astype(str).str.strip()
    s = np.where(s.str.contains(r"^\s*-?\d+,\d+\s*$"), s.str.replace(",", ".", regex=False), s)
    long_df["valor"] = pd.to_numeric(s, errors="coerce")

    # meta
    long_df["planta"] = "RB"
    m = re.search(r"(20\d{2})\s+Datos\s+Sensores\s+RB", str(path), flags=re.IGNORECASE)
    long_df["año"] = int(m.group(1)) if m else None
    long_df["tirada_fecha"] = parse_tirada_rb(path.parent)
    long_df["sensor_id"] = extract_sensor_id_from_name(path.name) or extract_sensor_id_from_name(path.parent.name)
    long_df["source_file"] = path.name
    long_df["source_path"] = str(path)
    return long_df

def build_inventory_rb() -> pd.DataFrame:
    rows = []
    for root, dirs, files in os.walk(BASE_RB):
        # omitir carpeta "Datos otro formato"
        dirs[:] = [d for d in dirs if _canon(d) != _canon(OMIT_DIR_NAME)]
        for f in files:
            if f.lower().endswith(".csv"):
                p = Path(root)/f
                rows.append({"source_file":f,"source_path":str(p)})
    inv = pd.DataFrame(rows)
    return inv

def process_subset(inv_subset: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    long_frames, logs = [], []
    for _, r in inv_subset.iterrows():
        try:
            long_frames.append(read_rb_csv(Path(r["source_path"])))
        except Exception as e:
            logs.append({"tipo":"error_lectura","source_path":r["source_path"],"detalle":str(e)})
    long_rb = pd.concat(long_frames, ignore_index=True) if long_frames else pd.DataFrame()

    # wide unificado (VOLT_HUM / VOLT_TEM), escala ÷100
    if long_rb.empty:
        wide_rb = pd.DataFrame()
    else:
        key = ["planta","año","tirada_fecha","sensor_id","timestamp"]
        for c in key:
            if c not in long_rb.columns: long_rb[c] = np.nan

        svar = long_rb["variable"].astype(str).str.upper().str.replace(r"[\s_\.\-]","", regex=True)
        hum = svar.isin({"VHUM"})
        tem = svar.isin({"VTEM"})
        keep = hum | tem
        lv = long_rb[keep].copy()
        lv["var_norm"] = np.where(hum[keep], "VOLT_HUM", "VOLT_TEM")
        lv["valor_norm"] = lv["valor"] * RB_VOLT_SCALE

        wide_rb = (lv.pivot_table(index=key, columns="var_norm", values="valor_norm", aggfunc="first")
                     .reset_index())

        raw_keep = [c for c in ["Date_raw","LOC_time_raw"] if c in long_rb.columns]
        if raw_keep:
            raw = (long_rb[key + raw_keep].groupby(key, as_index=False).first())
            wide_rb = wide_rb.merge(raw, on=key, how="left")

        # ordenar
        volt = [c for c in ["VOLT_HUM","VOLT_TEM"] if c in wide_rb.columns]
        other = [c for c in wide_rb.columns if c not in (key + raw_keep + volt)]
        wide_rb = wide_rb[key + raw_keep + volt + other]

    qa = (wide_rb.groupby(["planta","año","sensor_id"])
          .agg(registros=("VOLT_HUM","size"),
               fechas_min=("timestamp","min"),
               fechas_max=("timestamp","max"))
          .reset_index()) if not wide_rb.empty else pd.DataFrame(columns=["planta","año","sensor_id","registros","fechas_min","fechas_max"])
    log_df = pd.DataFrame(logs)
    return wide_rb, qa, log_df

# ===== Runner por año =====
inv_all = build_inventory_rb()
print("Archivos RB detectados:", len(inv_all))

# enriquecer inventario con meta básica (año/fecha/sensor) para dividir por año
meta_frames = []
for _, r in inv_all.iterrows():
    try:
        tmp = read_rb_csv(Path(r["source_path"])).head(1)  # una fila para extraer meta rápido
        meta_frames.append(tmp[["año","tirada_fecha","sensor_id","source_file","source_path"]])
    except Exception:
        pass
meta = pd.concat(meta_frames, ignore_index=True) if meta_frames else pd.DataFrame()
inv_all = inv_all.merge(meta[["año","source_path"]], on="source_path", how="left")

# fallback: si 'año' viene NaN pero hay tirada_fecha, lo inferimos
if "año" in inv_all.columns:
    if inv_all["año"].isna().any():
        # re-lee fecha solo cuando hace falta
        for i in inv_all[inv_all["año"].isna()].index:
            tf = parse_tirada_rb(Path(inv_all.at[i,"source_path"]).parent)
            inv_all.at[i,"año"] = tf.year if isinstance(tf, datetime) else np.nan

# filtrar por año
for yr in (2024, 2025):
    inv_y = inv_all[inv_all["año"] == yr].reset_index(drop=True)
    if inv_y.empty:
        print(f"[{yr}] No hay archivos RB.")
        continue

    # sample por año si se pide
    if SAMPLE_MAX_FILES:
        inv_y = inv_y.head(int(SAMPLE_MAX_FILES)).reset_index(drop=True)

    print(f"[{yr}] Archivos:", len(inv_y))
    wide_rb, qa_rb, log_rb = process_subset(inv_y)
    print(f"[{yr}] datos_wide filas:", len(wide_rb))

    out_dir = Path(OUTPUT_DIR)
    inv_path = out_dir / f"inventario_RB_{yr}.xlsx"
    out_path = out_dir / f"Consolidado_Sensores_RB_{yr}.xlsx"

    with pd.ExcelWriter(inv_path, engine="xlsxwriter") as w:
        inv_y.to_excel(w, sheet_name=f"inventario_RB_{yr}", index=False)

    with pd.ExcelWriter(out_path, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
        wide_rb.to_excel(w, sheet_name="datos_wide", index=False)
        dicc = pd.DataFrame({
            "columna": ["planta","año","tirada_fecha","sensor_id","timestamp",
                        "Date_raw","LOC_time_raw","VOLT_HUM","VOLT_TEM","source_file","source_path"],
            "descripcion": ["Planta origen (RB)","Año","Fecha tirada","ID sensor","Timestamp unificado",
                            "Fecha cruda RB","Hora local cruda RB","Voltaje humedad (÷100)","Voltaje temperatura (÷100)",
                            "Archivo fuente","Ruta fuente"]
        })
        dicc.to_excel(w, sheet_name="diccionario", index=False)
        if not log_rb.empty:
            log_rb.to_excel(w, sheet_name="log_omisiones", index=False)
        qa_rb.to_excel(w, sheet_name="qa_resumen", index=False)

    print(f"[{yr}] Inventario: {inv_path}")
    print(f"[{yr}] Consolidado: {out_path}")


Archivos RB detectados: 126
[2024] Archivos: 54
[2024] datos_wide filas: 71874
[2024] Inventario: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\inventario_RB_2024.xlsx
[2024] Consolidado: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\Consolidado_Sensores_RB_2024.xlsx
[2025] Archivos: 72
[2025] datos_wide filas: 87128
[2025] Inventario: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\inventario_RB_2025.xlsx
[2025] Consolidado: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\Consolidado_Sensores_RB_2025.xlsx


# Solo JPV por año

In [8]:
# === SOLO JPV por año (autosuficiente) ===
# >>> CONFIG <<<
BASE_JPV   = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos JPV 2024 2025"
OUTPUT_DIR = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año"
SAMPLE_MAX_FILES = None       # poner un número para prueba o None
DROP_WIDE_COLS = ["TimeString","HUMEDAD","OFFSET","TEMPERATURA","Date_raw","LOC_time_raw"]  # no queremos estos en wide JPV
# >>> FIN CONFIG <<<

os.makedirs(OUTPUT_DIR, exist_ok=True)

def _canon(s: str) -> str:
    return re.sub(r"[\s\-\.\(\)_]", "", str(s).upper())

def is_plain_sensor_folder(name: str) -> bool:
    return re.fullmatch(r"SENSOR([1-6])", name.upper()) is not None

def extract_sensor_id_from_name(name: str):
    m = re.search(r"SENSOR\s*(\d+)", str(name).upper())
    if not m: return None
    n = int(m.group(1))
    return n//10 if n in (10,20,30,40,50,60) else n

def parse_tirada_jpv(path: Path):
    DATE_RX = r"([0-3]?\d)[\.\-\/]([01]?\d)[\.\-\/](\d{2,4})"
    usb_num = None
    date_dt = None
    for part in path.parts:
        if usb_num is None:
            mnum = re.search(r"\busb\s*(\d+)", part, flags=re.IGNORECASE)
            if mnum: usb_num = int(mnum.group(1))
        if date_dt is None:
            mdate = re.search(DATE_RX, part)
            if mdate:
                d, m, y = mdate.groups()
                y = int(y) + (2000 if int(y) < 100 else 0)
                try:
                    date_dt = datetime(y, int(m), int(d))
                except ValueError:
                    pass
        if usb_num is not None and date_dt is not None:
            break
    return usb_num, date_dt

def read_jpv_txt(path: Path) -> pd.DataFrame:
    tried = [("utf-16","\t"),("utf-16le","\t"),("utf-8","\t")]
    df, last_err = None, None
    for enc, sep in tried:
        try:
            df = pd.read_csv(path, sep=sep, encoding=enc, engine="python", on_bad_lines="skip")
            break
        except Exception as e:
            last_err = e
    if df is None:
        raise last_err or RuntimeError(f"No se pudo leer {path}")

    if "VarName" in df.columns:
        df = df[~df["VarName"].astype(str).str.startswith("$RT_")]

    keep = [c for c in ["VarName","TimeString","VarValue","Validity","Time_ms"] if c in df.columns]
    df = df[keep].copy()
    df["timestamp"] = pd.to_datetime(df.get("TimeString", pd.Series(dtype=str)), errors="coerce")

    if "VarName" in df.columns:
        df["VarName_original"] = df["VarName"].astype(str)
        df["variable"] = df["VarName"].astype(str).str.replace(r"^\d+_", "", regex=True)
    else:
        df["VarName_original"] = np.nan
        df["variable"] = "Var"

    if "VarValue" in df.columns:
        s = df["VarValue"].astype(str).str.strip()
        s = np.where(s.str.contains(r"^\s*-?\d+,\d+\s*$"), s.str.replace(",", ".", regex=False), s)
        df["valor"] = pd.to_numeric(s, errors="coerce")
    else:
        df["valor"] = np.nan

    return df

def build_inventory_jpv() -> pd.DataFrame:
    rows = []
    for root, dirs, files in os.walk(BASE_JPV):
        root_p = Path(root)
        # no descender en sensores b/c
        if root_p.name.upper().startswith("SENSOR") and not is_plain_sensor_folder(root_p.name):
            dirs[:] = []
            continue
        for f in files:
            if f.lower().endswith(".txt") and is_plain_sensor_folder(root_p.name):
                p = root_p / f
                usb_num, tdt = parse_tirada_jpv(root_p)
                año = None
                m = re.search(r"(20\d{2})\s+Datos\s+Sensores\s+JPV", str(p), flags=re.IGNORECASE)
                if m: año = int(m.group(1))
                rows.append({
                    "source_file": f,
                    "source_path": str(p),
                    "sensor_id": extract_sensor_id_from_name(f) or extract_sensor_id_from_name(root_p.name),
                    "tirada_num": usb_num,
                    "tirada_fecha": tdt,
                    "año": año
                })
    inv = pd.DataFrame(rows)
    return inv

def process_subset(inv_subset: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    long_frames, logs = [], []
    for _, r in inv_subset.iterrows():
        p = Path(r["source_path"])
        try:
            df = read_jpv_txt(p)
            df["planta"] = "JPV"
            df["año"] = r["año"]
            df["tirada_num"] = r["tirada_num"]
            df["tirada_fecha"] = pd.to_datetime(r["tirada_fecha"]) if pd.notna(r["tirada_fecha"]) else pd.NaT
            df["sensor_id"] = r["sensor_id"]
            df["source_file"] = r["source_file"]
            df["source_path"] = r["source_path"]
            for c in ["Date_raw","LOC_time_raw","VarName","TimeString","VarValue","Validity","Time_ms","VarName_original"]:
                if c not in df.columns: df[c] = pd.Series([np.nan]*len(df))
            long_frames.append(df)
        except Exception as e:
            logs.append({"tipo":"error_lectura","source_path":str(p),"detalle":str(e)})

    long_jpv = pd.concat(long_frames, ignore_index=True) if long_frames else pd.DataFrame()

    if long_jpv.empty:
        wide_jpv = pd.DataFrame()
    else:
        key = ["planta","año","tirada_fecha","sensor_id","timestamp"]
        for c in key:
            if c not in long_jpv.columns: long_jpv[c] = np.nan

        svar = long_jpv["variable"].astype(str).str.upper().str.replace(r"[\s_\.\-]","", regex=True)
        hum_aliases  = {"VOLTHUM","VOLTHUME","VHUM"}
        tem_aliases  = {"VOLTTEM","VOLTTEMP","VTEM","VTEMP"}
        drop_aliases = {"HUMEDAD","TEMPERATURA","OFFSET","VARIEDAD"}

        hum = svar.isin(hum_aliases)
        tem = svar.isin(tem_aliases)
        drop = svar.isin(drop_aliases)

        keep = (hum | tem) & (~drop)
        lv = long_jpv[keep].copy()
        lv["var_norm"] = np.where(hum[keep], "VOLT_HUM", "VOLT_TEM")
        lv["valor_norm"] = pd.to_numeric(lv["valor"], errors="coerce")  # JPV sin escala

        wide_jpv = (lv.pivot_table(index=key, columns="var_norm", values="valor_norm", aggfunc="first")
                      .reset_index())

        # anexar solo crudos deseados (tirada_num y NO Date/LOC para JPV)
        raw_keep = [c for c in ["tirada_num","TimeString"] if c in long_jpv.columns]
        if raw_keep:
            raw = (long_jpv[key + raw_keep].groupby(key, as_index=False).first())
            wide_jpv = wide_jpv.merge(raw, on=key, how="left")

        # ordenar
        volt = [c for c in ["VOLT_HUM","VOLT_TEM"] if c in wide_jpv.columns]
        other = [c for c in wide_jpv.columns if c not in (key + raw_keep + volt)]
        wide_jpv = wide_jpv[key + raw_keep + volt + other]

        # quitar columnas no deseadas
        if DROP_WIDE_COLS:
            wide_jpv = wide_jpv.drop(columns=[c for c in DROP_WIDE_COLS if c in wide_jpv.columns], errors="ignore")

    qa = (wide_jpv.groupby(["planta","año","sensor_id"])
          .agg(registros=("VOLT_HUM","size"),
               fechas_min=("timestamp","min"),
               fechas_max=("timestamp","max"))
          .reset_index()) if not wide_jpv.empty else pd.DataFrame(columns=["planta","año","sensor_id","registros","fechas_min","fechas_max"])
    log_df = pd.DataFrame(logs)
    return wide_jpv, qa, log_df

# ===== Runner por año =====
inv_all = build_inventory_jpv()
print("Archivos JPV detectados:", len(inv_all))

# si falta 'año' pero hay tirada_fecha, inferir desde la fecha
if not inv_all.empty and inv_all["año"].isna().any():
    for i in inv_all[inv_all["año"].isna()].index:
        tf = inv_all.at[i,"tirada_fecha"]
        if isinstance(tf, datetime):
            inv_all.at[i,"año"] = tf.year

# ordenar e intentar sample por año
for yr in (2024, 2025):
    inv_y = inv_all[inv_all["año"] == yr].reset_index(drop=True)
    if inv_y.empty:
        print(f"[{yr}] No hay archivos JPV.")
        continue

    if SAMPLE_MAX_FILES:
        inv_y = inv_y.head(int(SAMPLE_MAX_FILES)).reset_index(drop=True)

    print(f"[{yr}] Archivos:", len(inv_y))
    wide_jpv, qa_jpv, log_jpv = process_subset(inv_y)
    print(f"[{yr}] datos_wide filas:", len(wide_jpv))

    out_dir = Path(OUTPUT_DIR)
    inv_path = out_dir / f"inventario_JPV_{yr}.xlsx"
    out_path = out_dir / f"Consolidado_Sensores_JPV_{yr}.xlsx"

    with pd.ExcelWriter(inv_path, engine="xlsxwriter") as w:
        inv_y.to_excel(w, sheet_name=f"inventario_JPV_{yr}", index=False)

    with pd.ExcelWriter(out_path, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
        wide_jpv.to_excel(w, sheet_name="datos_wide", index=False)
        dicc = pd.DataFrame({
            "columna": ["planta","año","tirada_num","tirada_fecha","sensor_id","timestamp",
                        "TimeString","VOLT_HUM","VOLT_TEM","source_file","source_path"],
            "descripcion": ["Planta origen (JPV)","Año","N° tirada (si estaba)","Fecha tirada","ID sensor",
                            "Timestamp unificado","Tiempo crudo JPV",
                            "Voltaje humedad (JPV)","Voltaje temperatura (JPV)","Archivo fuente","Ruta fuente"]
        })
        dicc.to_excel(w, sheet_name="diccionario", index=False)
        if not log_jpv.empty:
            log_jpv.to_excel(w, sheet_name="log_omisiones", index=False)
        qa_jpv.to_excel(w, sheet_name="qa_resumen", index=False)

    print(f"[{yr}] Inventario: {inv_path}")
    print(f"[{yr}] Consolidado: {out_path}")


Archivos JPV detectados: 137
[2024] Archivos: 59
[2024] datos_wide filas: 128934
[2024] Inventario: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\inventario_JPV_2024.xlsx
[2024] Consolidado: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\Consolidado_Sensores_JPV_2024.xlsx
[2025] Archivos: 78
[2025] datos_wide filas: 129526
[2025] Inventario: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\inventario_JPV_2025.xlsx
[2025] Consolidado: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\Consolidado_Sensores_JPV_2025.xlsx


# Identificación de tachadas para JPV (por año)

In [5]:
# === Identificación de tachadas para JPV (por año) ===

# ======== CONFIG – CAMBIAR ESTAS DOS RUTAS SEGÚN EL AÑO ========
# JPV 2024:
INPUT_WIDE_FILE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\Consolidado_Sensores_JPV_2024.xlsx"
LAB_FILE        = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos JPV 2024 2025\REVISADO UM Control Tachadas Secado JPV 2024.xlsx"

# JPV 2025 (descomentar estas dos líneas para ese año):
#INPUT_WIDE_FILE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\Consolidado_Sensores_JPV_2025.xlsx"
#LAB_FILE        = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos JPV 2024 2025\REVISADO UM Control Tachadas Secado JPV 2025.xlsx"

OUTPUT_PATH = Path(INPUT_WIDE_FILE).with_name(Path(INPUT_WIDE_FILE).stem + "_TACHADAS.xlsx")
SHEET_WIDE  = "datos_wide"   # hoja en tu consolidado
SHEET_LAB   = None           # si la dejás en None, intentamos detectar "Tachadas"
DAYFIRST    = True           # fechas dd/mm/yyyy hh:mm
# ================================================================

def _canon_col(s: str) -> str:
    """quita espacios/guiones/underscores y acentos, pasa a mayúsculas"""
    s = unidecode(str(s))
    s = re.sub(r"[\s\-_\.]+", "", s)
    return s.upper()

def _find_sheet_with(like: str, xls: pd.ExcelFile) -> str | None:
    like_c = _canon_col(like)
    for sh in xls.sheet_names:
        if like_c in _canon_col(sh):
            return sh
    return None

def _map_cols(df: pd.DataFrame, expected: dict) -> dict:
    """
    expected = {"Variedad":"Variedad", "Identificador":"Identificador", "Inicio":"Inicio", "Fin":"Fin", "Sensor":"Sensor", "Descarte":"Descarte", "EnDuda":"En duda"}
    Devuelve {dest_col: actual_col_en_df}
    """
    mapping = {}
    canon_map = {_canon_col(c): c for c in df.columns}
    for dest, want in expected.items():
        key = _canon_col(want)
        # buscar clave exacta canónica; si no, heurística por startswith
        if key in canon_map:
            mapping[dest] = canon_map[key]
            continue
        alt = next((canon_map[k] for k in canon_map if key in k or k in key), None)
        if alt:
            mapping[dest] = alt
        else:
            mapping[dest] = None
    return mapping

# ---------- 1) Leer consolidado (datos_wide) ----------
wide = pd.read_excel(INPUT_WIDE_FILE, sheet_name=SHEET_WIDE)
if "timestamp" not in wide.columns:
    raise RuntimeError("La hoja datos_wide no tiene columna 'timestamp'.")

# ---------- 2) Leer laboratorio ----------
xls = pd.ExcelFile(LAB_FILE)
lab_sheet = SHEET_LAB or _find_sheet_with("Tach", xls) or xls.sheet_names[0]
lab_raw = pd.read_excel(xls, sheet_name=lab_sheet)

# mapear columnas esperadas (robusto a variaciones menores de nombre)
exp = {
    "Variedad": "Variedad",
    "Identificador": "Identificador",
    "Inicio": "Inicio",
    "Fin": "Fin",
    "Sensor": "Sensor",
    "Descarte": "Descarte",   # puede no existir (no es error)
    "EnDuda": "En duda"       # puede no existir (no es error)
}
m = _map_cols(lab_raw, exp)

missing = [k for k in ["Variedad","Identificador","Inicio","Fin","Sensor"] if m[k] is None]
if missing:
    raise RuntimeError(f"Faltan columnas esenciales en el laboratorio: {missing}")

lab = pd.DataFrame({
    "Variedad": lab_raw[m["Variedad"]],
    "ID_tachada": lab_raw[m["Identificador"]],
    "Inicio": pd.to_datetime(lab_raw[m["Inicio"]], dayfirst=DAYFIRST, errors="coerce"),
    "Fin": pd.to_datetime(lab_raw[m["Fin"]], dayfirst=DAYFIRST, errors="coerce"),
    "sensor_id": pd.to_numeric(lab_raw[m["Sensor"]], errors="coerce").astype("Int64"),
})

# opcionales (si existen)
if m.get("Descarte") and m["Descarte"] in lab_raw.columns:
    lab["Descarte"] = lab_raw[m["Descarte"]]
else:
    lab["Descarte"] = np.nan

if m.get("EnDuda") and m["EnDuda"] in lab_raw.columns:
    lab["En duda"] = lab_raw[m["EnDuda"]]
else:
    lab["En duda"] = np.nan

# limpiar filas inválidas
lab = lab.dropna(subset=["Inicio","Fin","sensor_id"]).reset_index(drop=True)
if lab.empty:
    raise RuntimeError("El archivo de laboratorio quedó vacío tras limpieza (fechas o sensor inválidos).")

# ---------- 3) Normalizar tipos en wide ----------
wide["timestamp"] = pd.to_datetime(wide["timestamp"], errors="coerce")
if "sensor_id" not in wide.columns:
    # algunos wide pueden tener 'sensor' o 'Sensor'
    for alt in ["sensor","Sensor"]:
        if alt in wide.columns:
            wide.rename(columns={alt: "sensor_id"}, inplace=True)
            break
if "sensor_id" not in wide.columns:
    raise RuntimeError("datos_wide no tiene columna 'sensor_id' (ni 'sensor'/'Sensor').")

# ---------- 4) Interval join por sensor ----------
# Método eficiente: para cada sensor, buscar por búsqueda binaria en Inicios y validar con Fin
annot = pd.DataFrame(index=wide.index, columns=["Variedad","ID_tachada","Descarte","En duda"])

for sid, sub in wide.groupby("sensor_id"):
    if pd.isna(sid): 
        continue
    L = lab[lab["sensor_id"] == sid].sort_values("Inicio").reset_index(drop=True)
    if L.empty:
        continue

    starts = L["Inicio"].values
    ends   = L["Fin"].values

    # índices en wide a anotar
    idx = sub.index
    t   = sub["timestamp"].values

    # pos = última tachada cuyo Inicio <= t
    pos = np.searchsorted(starts, t, side="right") - 1
    # inválidos si pos < 0
    valid = (pos >= 0)
    pos[~valid] = 0  # placeholder

    # condición de estar dentro del intervalo [Inicio, Fin]
    in_range = valid & (t <= ends[pos])

    # crear arrays de anotaciones
    variedad  = np.where(in_range, L.loc[pos, "Variedad"].values, None)
    ident     = np.where(in_range, L.loc[pos, "ID_tachada"].values, None)
    descarte  = np.where(in_range, L.loc[pos, "Descarte"].values, None)
    enduda    = np.where(in_range, L.loc[pos, "En duda"].values, None)

    annot.loc[idx, "Variedad"]   = variedad
    annot.loc[idx, "ID_tachada"] = ident
    annot.loc[idx, "Descarte"]   = descarte
    annot.loc[idx, "En duda"]    = enduda

# ---------- 5) Combinar y logs ----------
wide_annot = pd.concat([wide, annot], axis=1)

sin_match = wide_annot[wide_annot["Variedad"].isna()][["planta","año","tirada_fecha","sensor_id","timestamp"]].copy() \
            if "planta" in wide_annot.columns else \
            wide_annot[wide_annot["Variedad"].isna()][["sensor_id","timestamp"]].copy()
sin_match_count = len(sin_match)

qa = (wide_annot
      .groupby(["sensor_id"], dropna=False)
      .agg(registros=("timestamp","size"),
           con_match=("Variedad", lambda s: s.notna().sum()),
           sin_match=("Variedad", lambda s: s.isna().sum()))
      .reset_index())

print(f"Filas anotadas: {len(wide_annot)} | sin match: {sin_match_count}")

# ---------- 6) Guardar ----------
with pd.ExcelWriter(OUTPUT_PATH, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
    wide_annot.to_excel(w, sheet_name="datos_wide_anot", index=False)
    lab.to_excel(w, sheet_name="lab_usado", index=False)
    qa.to_excel(w, sheet_name="qa_resumen", index=False)
    if sin_match_count:
        sin_match.to_excel(w, sheet_name="log_sin_match", index=False)

print("Archivo tachadas:", OUTPUT_PATH)


Filas anotadas: 128934 | sin match: 82291
Archivo tachadas: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\Consolidado_Sensores_JPV_2024_TACHADAS.xlsx


# Identificación de tachadas para RB (por año)

In [3]:
# === Anotación de tachadas para RB (por año) ===

# ======== CONFIG – CAMBIAR ESTAS DOS RUTAS SEGÚN EL AÑO ========

# RB 2025 (descomentar estas dos líneas cuando se quiera correr el de 2025):
#INPUT_WIDE_FILE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\Consolidado_Sensores_RB_2025.xlsx"
#LAB_FILE        = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025\2025 Control Tachadas RB.xlsx"

# RB 2024 (descomentar estas dos líneas cuando se quiera correr el de 2024):
INPUT_WIDE_FILE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\Consolidado_Sensores_RB_2024.xlsx"
LAB_FILE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025\2024 Control Tachadas RB.xlsx"

OUTPUT_PATH = Path(INPUT_WIDE_FILE).with_name(Path(INPUT_WIDE_FILE).stem + "_TACHADAS.xlsx")
SHEET_WIDE  = "datos_wide"   # hoja en tu consolidado
SHEET_LAB   = None           # si la dejás en None, intentamos detectar por nombre (contenga "Tach")
DAYFIRST    = True           # fechas dd/mm/yyyy hh:mm
REQUIRE_SENSOR_MATCH = True  # cruce por sensor obligatorio (recomendado)
# ================================================================

def _canon_col(s: str) -> str:
    """quita acentos, espacios/guiones/underscores/puntos y pasa a mayúsculas"""
    s = str(s)
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[\s\-_\.]+", "", s)
    return s.upper()

def _find_sheet_with(like: str, xls: pd.ExcelFile) -> str | None:
    like_c = _canon_col(like)
    for sh in xls.sheet_names:
        if like_c in _canon_col(sh):
            return sh
    return None

def _map_cols(df: pd.DataFrame, expected: dict) -> dict:
    """
    expected = {"Variedad":"Variedad", "Identificador":"Identificador", "Inicio":"Inicio",
                "Fin":"Fin", "Sensor":"Sensor",
                "Comentario":"Comentario", "ErrTemp":"Error entemperatura", "ErrHum":"Error enhumedad"}
    Devuelve {dest_col: actual_col_en_df (o None si no está)}
    """
    mapping = {}
    canon_map = {_canon_col(c): c for c in df.columns}
    for dest, want in expected.items():
        key = _canon_col(want)
        if key in canon_map:
            mapping[dest] = canon_map[key]
            continue
        # heurística: contiene / contenido
        alt = next((canon_map[k] for k in canon_map if key in k or k in key), None)
        mapping[dest] = alt
    return mapping

def _detect_header_row(df_preview: pd.DataFrame, expected_names: list[str]) -> int:
    """
    Dado un DataFrame leído con header=None, devuelve el índice de la fila
    que más se parece a la fila de encabezados (match por nombres esperados).
    """
    exp_can = {_canon_col(x) for x in expected_names}
    best_row, best_hits = 0, -1
    # miramos, por ejemplo, las primeras 15 filas
    max_rows = min(15, len(df_preview))
    for r in range(max_rows):
        vals = [str(v) for v in df_preview.iloc[r].tolist()]
        can = {_canon_col(v) for v in vals if isinstance(v, (str, bytes))}
        hits = len(exp_can & can)
        if hits > best_hits:
            best_hits = hits
            best_row = r
    return best_row


# ---------- 1) Leer consolidado (datos_wide) ----------
wide = pd.read_excel(INPUT_WIDE_FILE, sheet_name=SHEET_WIDE)
if "timestamp" not in wide.columns:
    raise RuntimeError("La hoja datos_wide no tiene columna 'timestamp'.")

# Normalizar tipos básicos
wide["timestamp"] = pd.to_datetime(wide["timestamp"], errors="coerce")
if "sensor_id" not in wide.columns:
    for alt in ["sensor","Sensor"]:
        if alt in wide.columns:
            wide.rename(columns={alt: "sensor_id"}, inplace=True)
            break
if "sensor_id" not in wide.columns:
    raise RuntimeError("datos_wide no tiene columna 'sensor_id' (ni 'sensor'/'Sensor').")

# ---------- 2) Leer laboratorio ----------
xls = pd.ExcelFile(LAB_FILE)
lab_sheet = SHEET_LAB or _find_sheet_with("Tach", xls) or xls.sheet_names[0]

# --- leer laboratorio detectando la fila de encabezados ---
# primero sin encabezado para detectar en qué fila están
lab_preview = pd.read_excel(xls, sheet_name=lab_sheet, header=None)
header_row = _detect_header_row(
    lab_preview,
    expected_names=["Variedad","Identificador","Inicio","Fin","Sensor",
                    "Comentario","Error entemperatura","Error enhumedad"]
)
# ahora leemos con ese header
lab_raw = pd.read_excel(xls, sheet_name=lab_sheet, header=header_row)

# Mapear columnas (robusto a variaciones de nombre)
exp = {
    "Variedad": "Variedad",
    "Identificador": "Identificador",
    "Inicio": "Inicio",
    "Fin": "Fin",
    "Sensor": "Sensor",
    # opcionales en RB 2025:
    "Comentario": "Comentario",
    "ErrTemp": "Error entemperatura",
    "ErrHum": "Error enhumedad",
}
m = _map_cols(lab_raw, exp)

missing = [k for k in ["Variedad","Identificador","Inicio","Fin","Sensor"] if m[k] is None]
if missing:
    raise RuntimeError(f"Faltan columnas esenciales en el laboratorio: {missing}")

lab = pd.DataFrame({
    "Variedad": lab_raw[m["Variedad"]],
    "ID_tachada": lab_raw[m["Identificador"]],
    "Inicio": pd.to_datetime(lab_raw[m["Inicio"]], dayfirst=DAYFIRST, errors="coerce"),
    "Fin": pd.to_datetime(lab_raw[m["Fin"]], dayfirst=DAYFIRST, errors="coerce"),
    "sensor_id": pd.to_numeric(lab_raw[m["Sensor"]], errors="coerce").astype("Int64"),
})

# columnas opcionales (si existen)
if m.get("Comentario") and m["Comentario"] in lab_raw.columns:
    lab["Comentario"] = lab_raw[m["Comentario"]]
else:
    lab["Comentario"] = np.nan

if m.get("ErrTemp") and m["ErrTemp"] in lab_raw.columns:
    lab["Error entemperatura"] = lab_raw[m["ErrTemp"]]
else:
    lab["Error entemperatura"] = np.nan

if m.get("ErrHum") and m["ErrHum"] in lab_raw.columns:
    lab["Error enhumedad"] = lab_raw[m["ErrHum"]]
else:
    lab["Error enhumedad"] = np.nan

# limpiar filas inválidas
lab = lab.dropna(subset=["Inicio","Fin","sensor_id"]).reset_index(drop=True)
if lab.empty:
    raise RuntimeError("El archivo de laboratorio quedó vacío tras limpieza (fechas o sensor inválidos).")

# ---------- 3) Interval join [Inicio, Fin] por sensor ----------
annot_cols = ["Variedad","ID_tachada","Comentario","Error entemperatura","Error enhumedad"]
annot = pd.DataFrame(index=wide.index, columns=annot_cols)

if REQUIRE_SENSOR_MATCH:
    grupos = wide.groupby("sensor_id")
else:
    # ⚠️ modo emergencia: ignora sensor (solo por tiempo)
    grupos = [(-1, wide)]

for sid, sub in grupos:
    if REQUIRE_SENSOR_MATCH:
        L = lab[lab["sensor_id"] == sid].sort_values("Inicio").reset_index(drop=True)
    else:
        L = lab.sort_values("Inicio").reset_index(drop=True)

    if L.empty:
        continue

    starts = L["Inicio"].values
    ends   = L["Fin"].values
    idx = sub.index
    t   = sub["timestamp"].values

    # última fila de lab con Inicio <= t
    pos = np.searchsorted(starts, t, side="right") - 1
    valid = (pos >= 0)
    pos[~valid] = 0  # placeholder
    in_range = valid & (t <= ends[pos])

    annot.loc[idx, "Variedad"]             = np.where(in_range, L.loc[pos, "Variedad"].values, None)
    annot.loc[idx, "ID_tachada"]           = np.where(in_range, L.loc[pos, "ID_tachada"].values, None)
    annot.loc[idx, "Comentario"]           = np.where(in_range, L.loc[pos, "Comentario"].values, None)
    annot.loc[idx, "Error entemperatura"]  = np.where(in_range, L.loc[pos, "Error entemperatura"].values, None)
    annot.loc[idx, "Error enhumedad"]      = np.where(in_range, L.loc[pos, "Error enhumedad"].values, None)

# ---------- 4) Combinar y logs ----------
wide_annot = pd.concat([wide, annot], axis=1)

cols_base = ["planta","año","tirada_fecha","sensor_id","timestamp"]
sin_match_cols = [c for c in cols_base if c in wide_annot.columns]
sin_match = wide_annot[wide_annot["Variedad"].isna()][sin_match_cols].copy()
sin_match_count = len(sin_match)

qa = (wide_annot
      .groupby(["sensor_id"], dropna=False)
      .agg(registros=("timestamp","size"),
           con_match=("Variedad", lambda s: s.notna().sum()),
           sin_match=("Variedad", lambda s: s.isna().sum()))
      .reset_index())

print(f"Filas anotadas: {len(wide_annot)} | sin match: {sin_match_count}")

# ---------- 5) Guardar ----------
with pd.ExcelWriter(OUTPUT_PATH, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
    wide_annot.to_excel(w, sheet_name="datos_wide_anot", index=False)
    lab.to_excel(w, sheet_name="lab_usado", index=False)
    qa.to_excel(w, sheet_name="qa_resumen", index=False)
    if sin_match_count:
        sin_match.to_excel(w, sheet_name="log_sin_match", index=False)

print("Archivo anotado:", OUTPUT_PATH)


  warn(msg)
  warn(msg)


NameError: name 'TOL_MIN' is not defined

# Identificación de tachadas para RB (por año, con +1 en sensor_id)

In [6]:
# === Anotación de tachadas para RB (por año, +1 sensores SIEMPRE y +1 lab SOLO si es 2025) ===

# ======== CONFIG ========
# RB 2025 :
INPUT_WIDE_FILE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\Consolidado_Sensores_RB_2025.xlsx"
LAB_FILE        = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025\REVISADO UM Control Tachadas Secado RB 2025.xlsx"

# RB 2024 (descomentar cuando se quiera correr):
#INPUT_WIDE_FILE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\Consolidado_Sensores_RB_2024.xlsx"
#LAB_FILE        = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025\2024 Control Tachadas RB.xlsx"

OUTPUT_PATH = Path(INPUT_WIDE_FILE).with_name(Path(INPUT_WIDE_FILE).stem + "_TACHADAS.xlsx")
SHEET_WIDE  = "datos_wide"
SHEET_LAB   = None
DAYFIRST    = True
REQUIRE_SENSOR_MATCH = True

# Overrides opcionales (dejar en None para auto):
APPLY_PLUS1_LAB     = None          # None = auto: +1 si LAB_FILE contiene "2025"
# ================================================================


def _canon_col(s: str) -> str:
    s = str(s)
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[\s\-_\.]+", "", s)
    return s.upper()

def _find_sheet_with(like: str, xls: pd.ExcelFile) -> str | None:
    like_c = _canon_col(like)
    for sh in xls.sheet_names:
        if like_c in _canon_col(sh):
            return sh
    return None

def _map_cols(df: pd.DataFrame, expected: dict) -> dict:
    mapping = {}
    canon_map = {_canon_col(c): c for c in df.columns}
    for dest, want in expected.items():
        key = _canon_col(want)
        if key in canon_map:
            mapping[dest] = canon_map[key]
            continue
        alt = next((canon_map[k] for k in canon_map if key in k or k in key), None)
        mapping[dest] = alt
    return mapping

def _detect_header_row(df_preview: pd.DataFrame, expected_names: list[str]) -> int:
    exp_can = {_canon_col(x) for x in expected_names}
    best_row, best_hits = 0, -1
    max_rows = min(15, len(df_preview))
    for r in range(max_rows):
        vals = [str(v) for v in df_preview.iloc[r].tolist()]
        can = {_canon_col(v) for v in vals if isinstance(v, (str, bytes))}
        hits = len(exp_can & can)
        if hits > best_hits:
            best_hits = hits
            best_row = r
    return best_row


# ---------- 1) Leer consolidado ----------
wide = pd.read_excel(INPUT_WIDE_FILE, sheet_name=SHEET_WIDE)
wide["timestamp"] = pd.to_datetime(wide["timestamp"], errors="coerce")

if "sensor_id" not in wide.columns:
    for alt in ["sensor", "Sensor"]:
        if alt in wide.columns:
            wide.rename(columns={alt: "sensor_id"}, inplace=True)
            break
if "sensor_id" not in wide.columns:
    raise RuntimeError("datos_wide no tiene columna sensor_id.")

# Ajuste sensores RB: mapear 1→2, 2→3, 3→4, 4→5 (sin tocar otros valores)
MAP_SENS = {1: 2, 2: 3, 3: 4, 4: 5}
wide["sensor_id"] = pd.to_numeric(wide["sensor_id"], errors="coerce").astype("Int64")
wide["sensor_id"] = wide["sensor_id"].map(MAP_SENS).fillna(wide["sensor_id"]).astype("Int64")
print("Ajuste aplicado: sensor_id remapeado (1→2, 2→3, 3→4, 4→5) en datos de sensores RB.")


# ---------- 2) Leer laboratorio ----------
xls = pd.ExcelFile(LAB_FILE)
lab_sheet = SHEET_LAB or _find_sheet_with("Tach", xls) or xls.sheet_names[0]

# detectar fila de encabezado y leer
lab_preview = pd.read_excel(xls, sheet_name=lab_sheet, header=None)
header_row = _detect_header_row(
    lab_preview,
    expected_names=["Variedad","Identificador","Inicio","Fin","Sensor",
                    "Comentario","Error entemperatura","Error enhumedad"]
)
lab_raw = pd.read_excel(xls, sheet_name=lab_sheet, header=header_row)

# mapear columnas
exp = {
    "Variedad": "Variedad",
    "Identificador": "Identificador",
    "Inicio": "Inicio",
    "Fin": "Fin",
    "Sensor": "Sensor",
    "Comentario": "Comentario",
    "ErrTemp": "Error entemperatura",
    "ErrHum": "Error enhumedad",
    "LAB_DESCARTE": "DESCARTE",
    "LAB_EN_DUDA": "ENDUDA"
}
m = _map_cols(lab_raw, exp)
missing = [k for k in ["Variedad","Identificador","Inicio","Fin","Sensor"] if m[k] is None]
if missing:
    raise RuntimeError(f"Faltan columnas esenciales en el laboratorio: {missing}")

lab = pd.DataFrame({
    "Variedad": lab_raw[m["Variedad"]],
    "ID_tachada": lab_raw[m["Identificador"]],
    "Inicio": pd.to_datetime(lab_raw[m["Inicio"]], dayfirst=DAYFIRST, errors="coerce"),
    "Fin": pd.to_datetime(lab_raw[m["Fin"]], dayfirst=DAYFIRST, errors="coerce"),
    "sensor_id": pd.to_numeric(lab_raw[m["Sensor"]], errors="coerce").astype("Int64"),
})

# --- Estandarizar flags del LAB a "Descarte" y "En duda" ---
for out_col, in_key in [("Descarte", "LAB_DESCARTE"), ("En duda", "LAB_EN_DUDA")]:
    src = m.get(in_key)
    if src and src in lab_raw.columns:
        s = lab_raw[src]
        sn = pd.to_numeric(s, errors="coerce")
        flag = (sn == 1)
        sval = s.astype(str).str.strip().str.casefold()
        flag = flag | sval.isin({"true", "si", "sí", "x"})
        lab[out_col] = flag.astype("int8")
    else:
        lab[out_col] = np.nan  # si ese año no trae estas columnas

# columnas opcionales
for col_out, col_in in [
    ("Comentario", "Comentario"),
    ("Error entemperatura", "ErrTemp"),
    ("Error enhumedad", "ErrHum"),
]:
    if m.get(col_in) and m[col_in] in lab_raw.columns:
        lab[col_out] = lab_raw[m[col_in]]
    else:
        lab[col_out] = np.nan

lab = lab.dropna(subset=["Inicio","Fin","sensor_id"]).reset_index(drop=True)

# --- decidir si aplicar +1 al laboratorio SOLO según el nombre del archivo ---
# (ej.: "2025 Control Tachadas.xlsx" → 2025; "2024 Control Tachadas.xlsx" → 2024)

if APPLY_PLUS1_LAB is None:
    basename = Path(LAB_FILE).name  # solo el nombre, sin la carpeta
    m = re.search(r'\b(20\d{2})\b', basename)
    lab_year = int(m.group(1)) if m else None
    apply_lab = (lab_year == 2025)
else:
    apply_lab = bool(APPLY_PLUS1_LAB)

if apply_lab:
    lab["sensor_id"] = lab["sensor_id"].map(MAP_SENS).fillna(lab["sensor_id"]).astype("Int64")
    print("Ajuste aplicado: sensor_id remapeado (1→2, 2→3, 3→4, 4→5) en laboratorio RB 2025.")
else:
    print("Ajuste +1 laboratorio: NO aplicado.")


# ---------- 3) Interval join ----------
annot_cols = ["Variedad","ID_tachada","Comentario",
              "Error entemperatura","Error enhumedad",
              "Descarte","En duda"]
annot = pd.DataFrame(index=wide.index, columns=annot_cols)

if REQUIRE_SENSOR_MATCH:
    grupos = wide.groupby("sensor_id")
else:
    grupos = [(-1, wide)]

for sid, sub in grupos:
    if REQUIRE_SENSOR_MATCH:
        L = lab[lab["sensor_id"] == sid].sort_values("Inicio").reset_index(drop=True)
    else:
        L = lab.sort_values("Inicio").reset_index(drop=True)
    if L.empty:
        continue

    starts = L["Inicio"].values
    ends   = L["Fin"].values
    idx = sub.index
    t   = sub["timestamp"].values
    pos = np.searchsorted(starts, t, side="right") - 1
    valid = (pos >= 0)
    pos[~valid] = 0
    in_range = valid & (t <= ends[pos])

    for col in annot_cols:
        annot.loc[idx, col] = np.where(in_range, L.loc[pos, col].values, None)

# ---------- 4) Combinar, QA y guardar ----------
wide_annot = pd.concat([wide, annot], axis=1)
sin_match = wide_annot[wide_annot["Variedad"].isna()]
qa = (wide_annot
      .groupby(["sensor_id"], dropna=False)
      .agg(registros=("timestamp","size"),
           con_match=("Variedad", lambda s: s.notna().sum()),
           sin_match=("Variedad", lambda s: s.isna().sum()))
      .reset_index())

# resumen consola
sin_match_count = len(sin_match)
print(f"Filas anotadas: {len(wide_annot)} | sin match: {sin_match_count}")

with pd.ExcelWriter(OUTPUT_PATH, engine="xlsxwriter", datetime_format="yyyy-mm-dd HH:MM:SS") as w:
    wide_annot.to_excel(w, sheet_name="datos_wide_anot", index=False)
    lab.to_excel(w, sheet_name="lab_usado", index=False)
    qa.to_excel(w, sheet_name="qa_resumen", index=False)
    if sin_match_count:
        sin_match.to_excel(w, sheet_name="log_sin_match", index=False)

print("Archivo anotado creado en:", OUTPUT_PATH)

Ajuste aplicado: sensor_id remapeado (1→2, 2→3, 3→4, 4→5) en datos de sensores RB.


  warn(msg)
  warn(msg)


Ajuste aplicado: sensor_id remapeado (1→2, 2→3, 3→4, 4→5) en laboratorio RB 2025.
Filas anotadas: 87128 | sin match: 39664
Archivo anotado creado en: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\Consolidado_Sensores_RB_2025_TACHADAS.xlsx


# Limpieza Tachadas

In [7]:
# --- CONFIGURACIÓN DE RUTAS ---
carpeta_jpv = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año"
carpeta_rb  = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año"

# Columnas a conservar en cada planta (incluye DESCARTAR al final)
cols_rb = [
    "planta", "año", "tirada_fecha", "sensor_id", "timestamp",
    "Date_raw", "LOC_time_raw", "VOLT_HUM", "VOLT_TEM", "Variedad",
    "ID_tachada","DESCARTE","ENDUDA","DESCARTAR"
]

cols_jpv = [
    "planta", "año", "tirada_fecha", "sensor_id", "timestamp",
    "tirada_num", "VOLT_HUM", "VOLT_TEM", "Variedad",
    "ID_tachada","Descarte","En duda","DESCARTAR"
]

# --------- Helpers de normalización ---------
def _norm(s: str) -> str:
    """Normaliza texto: sin acentos, minúscula, sin espacios/guiones/underscores/puntos."""
    if not isinstance(s, str):
        return ""
    s = ''.join(c for c in unicodedata.normalize("NFKD", s) if not unicodedata.combining(c)).casefold()
    s = re.sub(r"[\s_\-\.]+", "", s)
    return s

def _to_bool01_series(df: pd.DataFrame, colname_candidates) -> pd.Series:
    """
    Devuelve Serie booleana (True si valor==1 o {true, si, sí, x}) para la primera
    columna encontrada (match por nombre normalizado). Si ninguna existe, retorna todo False.
    """
    norm_map = {_norm(c): c for c in df.columns}
    chosen = None
    for name in colname_candidates:
        key = _norm(name)
        if key in norm_map:
            chosen = norm_map[key]
            break

    if chosen is None:
        # columna faltante → todo False
        return pd.Series(False, index=df.index)

    s = df[chosen]

    # Intento numérico (1 = True)
    snum = pd.to_numeric(s, errors="coerce")
    out = (snum == 1)

    # Para no numéricos, acepto "true", "si", "sí", "x"
    sval = s.astype(str).str.strip().str.casefold()
    out = out | sval.isin({"true", "si", "sí", "x"})
    return out.fillna(False)

# --- FUNCIÓN DE PROCESAMIENTO ---
def limpiar_tachadas(carpeta, tipo_planta):
    print(f"\n--- Procesando archivos de {tipo_planta} ---")

    for archivo in os.listdir(carpeta):
        if archivo.endswith("_TACHADAS.xlsx"):
            ruta = os.path.join(carpeta, archivo)
            print(f"Procesando: {archivo}")

            xls = pd.ExcelFile(ruta)
            if "datos_wide_anot" not in xls.sheet_names:
                print(f"  ⚠️  Hoja 'datos_wide_anot' no encontrada en {archivo}")
                continue

            # Leer hoja
            df = pd.read_excel(xls, sheet_name="datos_wide_anot")

            # ---- construir DESCARTAR a partir de Descarte / En duda (robusto) ----
            desc_candidates   = ["Descarte", "DESCARTE"]
            enduda_candidates = ["En duda", "En Duda", "ENDUDA", "EN_DUDA", "en_duda"]

            desc   = _to_bool01_series(df, desc_candidates)
            enduda = _to_bool01_series(df, enduda_candidates)

            df["DESCARTAR"] = (desc | enduda).astype("int8")
            print(f"  → DESCARTAR=1 en {int(df['DESCARTAR'].sum())} filas")
            # ---- fin agregado ----

            # Elegir columnas correctas según planta
            if tipo_planta == "RB":
                cols_deseadas = cols_rb
            else:
                cols_deseadas = cols_jpv

            # Mantener solo columnas existentes (y en el orden pedido)
            columnas_validas = [c for c in cols_deseadas if c in df.columns]
            df = df[columnas_validas]

            # Dropear filas sin match de tachada
            if "ID_tachada" in df.columns:
                df = df.dropna(subset=["ID_tachada"])
            else:
                print("  ⚠️  Columna 'ID_tachada' no existe; no puedo filtrar tachadas.")
                # si quisieras, podrías continuar sin dropear

            # Guardar archivo limpio
            nombre_salida = archivo.replace("_TACHADAS.xlsx", "_TACHADAS_LIMPIAS.xlsx")
            ruta_salida = os.path.join(carpeta, nombre_salida)

            df.to_excel(ruta_salida, index=False)
            print(f"  ✅ Guardado: {nombre_salida}")

    print(f"--- Limpieza finalizada para {tipo_planta} ---\n")


# --- EJECUCIÓN ---
#limpiar_tachadas(carpeta_jpv, "JPV")
limpiar_tachadas(carpeta_rb,  "RB")

print("✅ Proceso completado para todas las plantas y años.")


--- Procesando archivos de RB ---
Procesando: Consolidado_Sensores_RB_2024_TACHADAS.xlsx
  → DESCARTAR=1 en 13902 filas
  ✅ Guardado: Consolidado_Sensores_RB_2024_TACHADAS_LIMPIAS.xlsx
Procesando: Consolidado_Sensores_RB_2025_TACHADAS.xlsx
  → DESCARTAR=1 en 8464 filas
  ✅ Guardado: Consolidado_Sensores_RB_2025_TACHADAS_LIMPIAS.xlsx
--- Limpieza finalizada para RB ---

✅ Proceso completado para todas las plantas y años.


# Validación de Tachadas Faltantes

In [9]:
# =========================
#   CONFIGURACIÓN
# =========================

# Carpetas donde están los archivos LIMPIOS por planta
carpeta_jpv = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año"
carpeta_rb  = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año"

# Carpeta de salida para VALIDACIONES
validacion_dir_jpv = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\JPV_por_año\validación_tachadas_identificadas"
validacion_dir_rb  = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\validación_tachadas_identificadas"

# Diccionarios: año -> lista de rutas a archivos de "Control Tachadas" del LABORATORIO
# Cambiar por rutas correspondientes de Control Tachadas para cada año
CONTROL_TACHADAS_JPV = {
    "2024": [
        r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos JPV 2024 2025\REVISADO UM Control Tachadas Secado JPV 2024.xlsx"
    ],
    "2025": [
        r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos JPV 2024 2025\REVISADO UM Control Tachadas Secado JPV 2025.xlsx"
    ],
}

CONTROL_TACHADAS_RB = {
    "2024": [
        r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025\2024 Control Tachadas RB.xlsx"
    ],
    "2025": [
        r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\Datos RB 2024 2025\REVISADO UM Control Tachadas Secado RB 2025.xlsx"
    ],
}

# Nombre de columnas a usar
COL_ID_LIMPIOS = "ID_tachada"
COL_ID_LAB     = "Identificador"


# Nombre de hoja a usar en LAB (si "Control Tachadas" tienen una hoja específica, poner aquí;
# si es None, se leerá la PRIMERA hoja de cada archivo de control)

HOJA_CONTROL_TACHADAS = None

# Exportar un Excel de validación por archivo?
EXPORTAR_VALIDACION = True


# =========================
#   HELPERS
# =========================

NUMERIC_PATTERN = re.compile(r'^\s*\d+(?:[.,]\d+)?\s*$')  # solo números (permite ".0" o ",0")

def normalizar_id(x: str) -> str | None:
    """ Normalización conservadora (numéricos → enteros como str; alfanuméricos se conservan) """
    if x is None:
        return None
    s = str(x).strip()
    if s == "" or s.lower() in {"nan", "none"}:
        return None
    if NUMERIC_PATTERN.match(s):
        s = s.replace(",", ".")
        try:
            f = float(s)
            i = int(f)
            return str(i) if f == i else str(f)
        except ValueError:
            return s
    else:
        return re.sub(r'\s+', ' ', s)

    
    
def _encontrar_fila_header(path: str, sheet_name=None, key_header: str = "Identificador", max_probe_rows: int = 40):
    """
    Escanea las primeras filas de la hoja para encontrar en qué fila aparece el texto 'Identificador'
    (insensible a mayúsculas/espacios). Devuelve el índice 0-based para usar como 'header' en read_excel.
    Si no lo encuentra, devuelve 0 (primera fila como cabecera).
    """
    probe = pd.read_excel(path, sheet_name=sheet_name, header=None, nrows=max_probe_rows, dtype=str)
    key = str(key_header).strip().upper()
    for i in range(len(probe)):
        row = probe.iloc[i].astype(str).str.strip().str.upper()
        if row.eq(key).any():
            return i
    return 0


def cargar_df_control_tachadas(rutas_archivos: list[str], hoja: str | None = None) -> pd.DataFrame:
    """
    Lee una lista de archivos de 'Control Tachadas' del LAB y concatena TODAS SUS COLUMNAS.
    Detecta dinámicamente la fila de cabecera (útil para RB donde la cabecera arranca en la fila 4, etc.).
    Si 'hoja' es None, usa la primera hoja del archivo.
    """
    dfs = []
    for path in rutas_archivos:
        if not os.path.exists(path):
            print(f"  ⚠️ No existe archivo de Control Tachadas: {path}")
            continue
        try:
            # Determinar hoja a leer
            if hoja is None:
                xl = pd.ExcelFile(path)
                sheet_name = xl.sheet_names[0]
            else:
                sheet_name = hoja

            # Detectar fila de cabecera buscando 'Identificador'
            header_row = _encontrar_fila_header(path, sheet_name=sheet_name, key_header=COL_ID_LAB)

            # Leer con la fila detectada como cabecera
            df = pd.read_excel(path, sheet_name=sheet_name, header=header_row)

            # Limpieza suave: quitar columnas totalmente vacías
            df = df.dropna(axis=1, how="all")

            dfs.append(df)
        except Exception as e:
            print(f"  ⚠️ Error leyendo Control Tachadas '{os.path.basename(path)}': {e}")
            continue

    if not dfs:
        return pd.DataFrame()

    return pd.concat(dfs, ignore_index=True, sort=False)


def normalizar_y_unicos(serie: pd.Series) -> pd.Series:
    s = serie.dropna().map(normalizar_id)
    s = s.dropna()
    s = s[s.astype(str).str.len() > 0].astype(str)
    return pd.Series(pd.unique(s), dtype=str)

def archivos_limpios_en_carpeta(carpeta: str) -> list[str]:
    if not os.path.isdir(carpeta):
        return []
    return [os.path.join(carpeta, f) for f in os.listdir(carpeta) if f.endswith("_TACHADAS_LIMPIAS.xlsx")]

def obtener_año_desde_nombre(nombre: str) -> str | None:
    m = re.search(r"(20\d{2})", nombre)
    return m.group(1) if m else None


# =========================
#   VALIDACIÓN
# =========================

def validar_planta(carpeta_limpios: str, controles_por_año: dict[str, list[str]], etiqueta_planta: str, carpeta_salida: str):
    print(f"\n--- Validando {etiqueta_planta} ---")
    os.makedirs(carpeta_salida, exist_ok=True)

    limpios = archivos_limpios_en_carpeta(carpeta_limpios)
    if not limpios:
        print("  (No se encontraron *_TACHADAS_LIMPIAS.xlsx aquí)")
        return

    for ruta_limpio in limpios:
        nombre = os.path.basename(ruta_limpio)
        año = obtener_año_desde_nombre(nombre)
        if año is None:
            print(f"  ⚠️ No pude inferir el año desde el nombre: {nombre}")
            continue

        rutas_control = controles_por_año.get(año, [])
        print(f"\n📄 {nombre}  |  Año: {año}")
        if not rutas_control:
            print("  ⚠️ No hay archivos de Control Tachadas configurados para este año.")
            continue

        # 1) Leer LIMPIO
        try:
            df_limpio = pd.read_excel(ruta_limpio, dtype={COL_ID_LIMPIOS: object})
        except Exception as e:
            print(f"  ⚠️ Error leyendo LIMPIO: {e}")
            continue

        if COL_ID_LIMPIOS not in df_limpio.columns:
            print(f"  ⚠️ Falta columna '{COL_ID_LIMPIOS}' en LIMPIO.")
            continue

        ids_limpios_unq = normalizar_y_unicos(df_limpio[COL_ID_LIMPIOS])

        # 2) LAB: DF completo + clave normalizada por fila
        df_lab_control = cargar_df_control_tachadas(rutas_control, hoja=HOJA_CONTROL_TACHADAS)
        if df_lab_control.empty:
            print("  ⚠️ No se pudo armar DF de Control Tachadas.")
            continue
        if COL_ID_LAB not in df_lab_control.columns:
            print(f"  ⚠️ Falta columna '{COL_ID_LAB}' en Control Tachadas del LAB.")
            continue

        df_lab_aux = df_lab_control.copy()
        df_lab_aux["_id_norm"] = df_lab_aux[COL_ID_LAB].map(normalizar_id)

        # Únicos LAB (normalizados)
        ids_lab_unq = normalizar_y_unicos(df_lab_aux[COL_ID_LAB])

        # 3) Comparación
        set_lab = set(ids_lab_unq)
        set_limpios = set(ids_limpios_unq)

        faltantes_en_limpios = sorted(set_lab - set_limpios, key=lambda x: (len(x), x))
        sobrantes_en_limpios = sorted(set_limpios - set_lab, key=lambda x: (len(x), x))

        print(f"  Únicos LAB (control, sin dups): {len(set_lab)}")
        print(f"  Únicos LIMPIOS:                 {len(set_limpios)}")
        print(f"  ➖ En LAB y faltan en LIMPIOS:  {len(faltantes_en_limpios)}")
        print(f"  ➕ En LIMPIOS y no en LAB:      {len(sobrantes_en_limpios)}")

        if len(faltantes_en_limpios) == 0:
            print("  ✅ Chequeo principal OK (no faltan tachadas en LIMPIOS).")
        else:
            print(f"     Ejemplos faltantes: {faltantes_en_limpios[:10]}")

        # 4) Exportar validación (en subcarpeta)
        if EXPORTAR_VALIDACION:
            base_out = os.path.splitext(nombre)[0]
            ruta_out = os.path.join(carpeta_salida, f"{base_out}_VALIDACION_TACHADAS.xlsx")
            with pd.ExcelWriter(ruta_out, engine="openpyxl") as xw:
                # Resumen
                pd.DataFrame({
                    "archivo": [nombre],
                    "año": [año],
                    "unicos_lab_sin_dups": [len(set_lab)],
                    "unicos_limpios": [len(set_limpios)],
                    "faltantes_en_limpios": [len(faltantes_en_limpios)],
                    "sobrantes_en_limpios": [len(sobrantes_en_limpios)],
                }).to_excel(xw, index=False, sheet_name="resumen")

                # Listas
                pd.Series(faltantes_en_limpios, name=COL_ID_LAB).to_frame().to_excel(
                    xw, index=False, sheet_name="faltantes_en_limpias"
                )
                pd.Series(sobrantes_en_limpios, name=COL_ID_LIMPIOS).to_frame().to_excel(
                    xw, index=False, sheet_name="sobrantes_en_limpias"
                )

                # Réplica total del control (todas las columnas)
                df_lab_control.to_excel(xw, index=False, sheet_name="lab_control_original")

                # LAB sin duplicados por Identificador (normalizado)
                df_lab_sin_dups = df_lab_aux.drop_duplicates(subset=["_id_norm"]).drop(columns=["_id_norm"])
                df_lab_sin_dups.to_excel(xw, index=False, sheet_name="lab_control_sin_dups")

                # Detalle del LAB solo para los FALTANTES (todas sus columnas)
                if faltantes_en_limpios:
                    faltantes_set = set(faltantes_en_limpios)
                    lab_faltantes_detalle = df_lab_aux[df_lab_aux["_id_norm"].isin(faltantes_set)].drop(columns=["_id_norm"])
                    lab_faltantes_detalle.to_excel(xw, index=False, sheet_name="lab_faltantes_detalle")
                else:
                    # Hoja vacía pero creada para consistencia
                    pd.DataFrame(columns=df_lab_control.columns).to_excel(
                        xw, index=False, sheet_name="lab_faltantes_detalle"
                    )

            print(f"  💾 Validación exportada: {ruta_out}")

# =========================
#   EJECUCIÓN
# =========================
#validar_planta(carpeta_jpv, CONTROL_TACHADAS_JPV, "JPV", validacion_dir_jpv)
validar_planta(carpeta_rb,  CONTROL_TACHADAS_RB,  "RB",  validacion_dir_rb)
print("\n✅ Validación finalizada.")


--- Validando RB ---

📄 Consolidado_Sensores_RB_2024_TACHADAS_LIMPIAS.xlsx  |  Año: 2024


  warn(msg)
  warn(msg)
  pd.Series(sobrantes_en_limpios, name=COL_ID_LIMPIOS).to_frame().to_excel(


  Únicos LAB (control, sin dups): 480
  Únicos LIMPIOS:                 269
  ➖ En LAB y faltan en LIMPIOS:  211
  ➕ En LIMPIOS y no en LAB:      0
     Ejemplos faltantes: ['18801', '18802', '18803', '18804', '18805', '18807', '18808', '18809', '18810', '18811']
  💾 Validación exportada: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\validación_tachadas_identificadas\Consolidado_Sensores_RB_2024_TACHADAS_LIMPIAS_VALIDACION_TACHADAS.xlsx

📄 Consolidado_Sensores_RB_2025_TACHADAS_LIMPIAS.xlsx  |  Año: 2025


  warn(msg)
  warn(msg)


  Únicos LAB (control, sin dups): 404
  Únicos LIMPIOS:                 357
  ➖ En LAB y faltan en LIMPIOS:  47
  ➕ En LIMPIOS y no en LAB:      0
     Ejemplos faltantes: ['1111', '17800', '18613', '18697', '18709', '18747', '18748', '18749', '18750', '18751']


  pd.Series(sobrantes_en_limpios, name=COL_ID_LIMPIOS).to_frame().to_excel(


  💾 Validación exportada: C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\RB_por_año\validación_tachadas_identificadas\Consolidado_Sensores_RB_2025_TACHADAS_LIMPIAS_VALIDACION_TACHADAS.xlsx

✅ Validación finalizada.


# Cálculo de Humedad y Temperatura
## Cruce con curvas de calibración

In [10]:
# ===================== RUTAS BASE =====================
BASE_EDITABLE = Path(r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable")

BASE_CONS_JPV = BASE_EDITABLE / r"_consolidados\JPV_por_año"
BASE_CONS_RB  = BASE_EDITABLE / r"_consolidados\RB_por_año"

# Salidas
BASE_OUT = BASE_EDITABLE / r"_consolidados\_salidas_calibradas"
BASE_OUT.mkdir(parents=True, exist_ok=True)

# ===================== ESQUEMA CONSOLIDADO =====================
ORIG = [
    "planta","año","tirada_fecha","sensor_id","timestamp",
    "VOLT_HUM","VOLT_TEM","Variedad","ID_tachada","DESCARTAR"
]
COL_SENSOR_ID = "sensor_id"
COL_VARIEDAD  = "Variedad"
COL_VH        = "VOLT_HUM"
COL_VT        = "VOLT_TEM"

# ===================== HELPERS =====================
def deaccent(s: str) -> str:
    if not isinstance(s, str):
        return s
    return ''.join(c for c in unicodedata.normalize("NFKD", s) if not unicodedata.combining(c)).casefold()

def norm_str(s: str) -> str:
    if not isinstance(s, str):
        return s
    return deaccent(' '.join(s.strip().split()))

def guess_secadora(sensor_id) -> Optional[int]:
    if pd.isna(sensor_id):
        return None
    m = re.search(r"(\d+)", str(sensor_id))
    return int(m.group(1)) if m else None

def find_cell(df: pd.DataFrame, value: str):
    matches = np.where(df.values == value)
    if len(matches[0]) == 0:
        return None, None
    return int(matches[0][0]), int(matches[1][0])

def parse_humedad_sheet(df_raw: pd.DataFrame):
    AH = float(df_raw.iloc[1, 0]); BH = float(df_raw.iloc[1, 1]); CH = float(df_raw.iloc[1, 2])
    r, c = find_cell(df_raw, "Fecha")
    if r is None: raise ValueError("No encontré 'Fecha' en hoja de HUMEDAD.")
    cfix_row = r + 1
    cols = list(range(c + 1, c + 7))  # 6 secadoras
    cfix = {i: float(df_raw.iloc[cfix_row, col]) for i, col in enumerate(cols, start=1)}
    data = df_raw.iloc[(r + 2):, [c] + cols].copy()
    data.columns = ["fecha"] + [f"s{i}" for i in range(1, 7)]
    data = data.dropna(how="all")
    data["fecha"] = pd.to_datetime(data["fecha"], errors="coerce")
    data = data.dropna(subset=["fecha"])
    cvar_tbl = {}
    for i in range(1, 7):
        tmp = data[["fecha", f"s{i}"]].rename(columns={f"s{i}": "craw"})
        tmp["craw"] = tmp["craw"].replace(0, np.nan)
        tmp = tmp.sort_values("fecha")
        tmp["cvar"] = tmp["craw"].ffill().fillna(0.0)
        cvar_tbl[i] = tmp[["fecha", "cvar"]].reset_index(drop=True)
    return AH, BH, CH, cfix, cvar_tbl

def parse_temperatura_sheet(df_raw: pd.DataFrame):
    AT = float(df_raw.iloc[1, 0]); BT = float(df_raw.iloc[1, 1])
    r, c = find_cell(df_raw, "Fecha")
    if r is None: raise ValueError("No encontré 'Fecha' en hoja TEMPERATURA.")
    cfix_row = r + 1
    cols = list(range(c + 1, c + 7))
    cfix = {i: float(df_raw.iloc[cfix_row, col]) for i, col in enumerate(cols, start=1)}
    data = df_raw.iloc[(r + 2):, [c] + cols].copy()
    data.columns = ["fecha"] + [f"s{i}" for i in range(1, 7)]
    data = data.dropna(how="all")
    data["fecha"] = pd.to_datetime(data["fecha"], errors="coerce")
    data = data.dropna(subset=["fecha"])
    cvar_tbl = {}
    for i in range(1, 7):
        tmp = data[["fecha", f"s{i}"]].rename(columns={f"s{i}": "craw"})
        tmp["craw"] = tmp["craw"].replace(0, np.nan)
        tmp = tmp.sort_values("fecha")
        tmp["cvar"] = tmp["craw"].ffill().fillna(0.0)
        cvar_tbl[i] = tmp[["fecha", "cvar"]].reset_index(drop=True)
    return AT, BT, cfix, cvar_tbl

def merge_asof_cvar(df, key_fecha, key_secadora, cvar_tbl, out_col):
    df[out_col] = 0.0
    for s in range(1, 7):
        mask = df[key_secadora] == s
        if not mask.any(): continue
        base = df.loc[mask, [key_fecha]].copy().sort_values(key_fecha)
        cv = cvar_tbl[s].sort_values("fecha")
        merged = pd.merge_asof(base, cv.rename(columns={"fecha": key_fecha}), on=key_fecha, direction="backward")
        merged["cvar"] = merged["cvar"].fillna(0.0)
        idx_sorted = df.loc[mask].sort_values(key_fecha).index
        df.loc[idx_sorted, out_col] = merged.set_index(idx_sorted)["cvar"].values
    return df

# === Equivalencias de variedades (no destructivas) ===
ALIAS_EQUIV = {
    # Grupo Merín / L5903
    "merin": {"merin", "l5903"},
    "l5903": {"merin", "l5903"},

    # Grupo SLIO9193 / SLI9193 / 9193
    "slio9193": {"slio9193", "sli9193", "9193"},
    "sli9193": {"slio9193", "sli9193", "9193"},
    "9193": {"slio9193", "sli9193", "9193"},
}

def resolve_variedad_key(key_norm: str, name_map: Dict[str, str]) -> Optional[str]:
    """
    Dado el nombre normalizado de la variedad (key_norm) y el mapa name_map
    (nombre_normalizado_de_hoja -> nombre_hoja_original), devuelve cuál
    de los equivalentes existe como hoja en el Excel de curvas.
    """
    candidatos = ALIAS_EQUIV.get(key_norm, {key_norm})
    for cand in candidatos:
        if cand in name_map:
            return cand
    return None


# ===================== PATHS POR PLANTA =====================
def path_consolidado(planta: str, año: int) -> Path:
    planta = planta.upper().strip()
    if planta == "JPV":
        base = BASE_CONS_JPV
        nombre = f"Consolidado_Sensores_JPV_{año}_TACHADAS_LIMPIAS.xlsx"
    elif planta == "RB":
        base = BASE_CONS_RB
        nombre = f"Consolidado_Sensores_RB_{año}_TACHADAS_LIMPIAS.xlsx"
    else:
        raise ValueError("planta debe ser 'JPV' o 'RB'")
    p = base / nombre
    if not p.exists():
        raise FileNotFoundError(f"No encontré el consolidado para {planta} {año}: {p}")
    return p

def path_curvas(planta: str, año: int) -> Path:
    planta = planta.upper().strip()
    if planta == "JPV":
        patron = f"*Datos JPV*/*{año}*Curvas*JPV*.xlsx"
    elif planta == "RB":
        patron = f"*Datos RB*/*{año}*Curvas*RB*.xlsx"
    else:
        raise ValueError("planta debe ser 'JPV' o 'RB'")
    candidatos = list(BASE_EDITABLE.rglob(patron))
    if not candidatos:
        raise FileNotFoundError(f"No encontré el Excel de curvas para {planta} {año} usando patrón {patron}")
    return candidatos[0]

# ===================== PIPELINE GENERAL =====================
def calibrar_por_planta_y_año(planta: str, año: int) -> Path:
    # 1) Consolidado
    p_in = path_consolidado(planta, año)
    df = pd.read_excel(p_in)
    faltan = [c for c in ORIG if c not in df.columns]
    if faltan:
        raise KeyError(f"Faltan columnas en el consolidado: {faltan}")

    # 2) Timestamp (con hora) y secadora (solo para cálculos internos)
    fecha_ref = pd.to_datetime(df["timestamp"], errors="coerce")  # sin normalize: respeta hora
    secadora = df["secadora"] if "secadora" in df.columns else df[COL_SENSOR_ID].apply(guess_secadora)

    # 3) Voltajes limpios
    vh = pd.to_numeric(df[COL_VH], errors="coerce")
    vt = pd.to_numeric(df[COL_VT], errors="coerce")
    mask_vh_zero = (vh.fillna(0) == 0)
    mask_vt_zero = (vt.fillna(0) == 0)

    # 4) Curvas
    p_curvas = path_curvas(planta, año)

    # --- TEMPERATURA ---
    df_temp_raw = pd.read_excel(p_curvas, sheet_name="TEMPERATURA", header=None)
    AT, BT, cfix_T, cvar_T = parse_temperatura_sheet(df_temp_raw)
    aux_T = pd.DataFrame({"fecha_ref": fecha_ref, "secadora": secadora})
    aux_T = merge_asof_cvar(aux_T, "fecha_ref", "secadora", cvar_T, "cvar_T")
    cfix_T_series = aux_T["secadora"].map(cfix_T).astype(float)
    TEMPERATURA = vt * AT + BT + cfix_T_series - aux_T["cvar_T"]
    TEMPERATURA = TEMPERATURA.mask(mask_vt_zero | vt.isna(), np.nan)

    # --- HUMEDAD (por variedad) ---
    xl = pd.ExcelFile(p_curvas)
    hojas_hum = [s for s in xl.sheet_names if norm_str(s) != norm_str("TEMPERATURA")]
    name_map = {norm_str(s): s for s in hojas_hum}
    cache_params: Dict[str, Tuple[float,float,float,Dict[int,float],Dict[int,pd.DataFrame]]] = {}

    HUMEDAD = pd.Series(np.nan, index=df.index, dtype=float)
    variedad_norm = df[COL_VARIEDAD].astype(str).map(norm_str)

    faltantes: List[str] = []

    for key_norm in variedad_norm.dropna().unique():
        key_lookup = resolve_variedad_key(key_norm, name_map)
        if key_lookup is None:
            faltantes.append(key_norm)
            continue

        if key_lookup not in cache_params:
            raw = pd.read_excel(p_curvas, sheet_name=name_map[key_lookup], header=None)
            cache_params[key_lookup] = parse_humedad_sheet(raw)

        AH, BH, CH, cfix_H, cvar_H = cache_params[key_lookup]
        mask = (variedad_norm == key_norm)

        aux_H = pd.DataFrame({"fecha_ref": fecha_ref[mask], "secadora": secadora[mask]})
        aux_H = merge_asof_cvar(aux_H, "fecha_ref", "secadora", cvar_H, "cvar_H")
        cfix_H_series = aux_H["secadora"].map(cfix_H).astype(float)
        HUMEDAD.loc[mask] = (vh[mask]**2) * AH + vh[mask] * BH + CH + cfix_H_series - aux_H["cvar_H"]


    HUMEDAD = HUMEDAD.mask(mask_vh_zero | vh.isna(), np.nan)

    if faltantes:
        faltantes_uniq = sorted(set(faltantes))
        print(f"[AVISO] {planta} {año}: variedades sin hoja de calibración -> {', '.join(faltantes_uniq)}")

    # 5) Salida: 10 originales + HUMEDAD, TEMPERATURA
    out = df[[c for c in ORIG if c in df.columns]].copy()
    out["HUMEDAD"] = HUMEDAD
    out["TEMPERATURA"] = TEMPERATURA

    p_out = BASE_OUT / f"{planta.upper()}_{año}_calibrado.xlsx"
    with pd.ExcelWriter(p_out, engine="xlsxwriter", datetime_format="yyyy-mm-dd hh:mm:ss") as xw:
        out.to_excel(xw, index=False, sheet_name="datos")
    print(f"Listo -> {p_out}")
    return p_out

# --------- EJECUCIÓN ----------
if __name__ == "__main__":
    # Elegir cuales ejecutar:
    jobs = [
         ("JPV", 2024),
         ("JPV", 2025),
         ("RB", 2024),
         ("RB", 2025),
    ]
    for planta, año in jobs:
        try:
            calibrar_por_planta_y_año(planta, año)
        except Exception as e:
                print(f"[ERROR] {planta} {año}: {e}")
                continue

[AVISO] JPV 2024: variedades sin hoja de calibración -> inov, sli9193
Listo -> C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\JPV_2024_calibrado.xlsx
[AVISO] JPV 2025: variedades sin hoja de calibración -> inov, nan
Listo -> C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\JPV_2025_calibrado.xlsx
[AVISO] RB 2024: variedades sin hoja de calibración -> cl1294
Listo -> C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\RB_2024_calibrado.xlsx
[AVISO] RB 2025: variedades sin hoja de calibración -> xp 117
Listo -> C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\RB_2025_calibrado.xlsx


# Generación de Gráficos

In [2]:
# ---------- CONFIG ----------
INPUT_FILES = [
#r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\JPV_2024_calibrado.xlsx",
#r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\JPV_2025_calibrado.xlsx",
#r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\RB_2024_calibrado.xlsx",
r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas\RB_2025_calibrado.xlsx"
]

# Carpeta base donde se creará "Gráficos Tachadas"
OUTPUT_BASE = r"C:\Users\augus\OneDrive - Universidad de Montevideo\Tésis - Latitud\Secado Latitud - UM - Editable\_consolidados\_salidas_calibradas"

# Columna de tiempo y nombres alternativos de temperatura y humedad (detección flexible)
TIMESTAMP_COL = "timestamp"
TEMP_CANDIDATES = ["Temperatura","TEMPERATURA"]
HUM_CANDIDATES  = ["Humedad","HUMEDAD"]

# Escalas fijas (pueden cambiarse o desactivarse)
FORCE_YLIMS = True
TEMP_YLIM   = (25, 55)   # °C
HUM_YLIM    = (11, 25)   # %
TEMP_YTICK  = 2          # paso de ticks (None para automático)
HUM_YTICK   = 2

# Sistema de muestras (True y definir max para no generar todos, False para generar todos)
SAMPLE_MODE = False
SAMPLE_MAX  = 10

FIGSIZE = (6, 6.5)  # más cuadrado como el ejemplo
DPI     = 150

# ---------- UTILIDADES ----------
def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)
    return p

def slugify(value: str) -> str:
    value = str(value)
    value = unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")
    value = re.sub(r"[^\w\s-]", "", value).strip()
    value = re.sub(r"[-\s]+", " ", value)
    return value.replace(" ", "_")

def pick_col(df: pd.DataFrame, candidates: list):
    for c in candidates:
        if c in df.columns:
            return c
    raise KeyError(f"No se encontró ninguna de estas columnas en el archivo: {candidates}")

def short_year(y):
    try:
        return str(int(y))[-2:]
    except:
        return str(y)[-2:]

def read_any(path: Path) -> pd.DataFrame:
    if path.suffix.lower() in {".xlsx", ".xls"}:
        return pd.read_excel(path)
    elif path.suffix.lower() == ".csv":
        return pd.read_csv(path)
    else:
        raise ValueError(f"Formato no soportado: {path.suffix}")

def add_min_acum(df: pd.DataFrame) -> pd.DataFrame:
    if TIMESTAMP_COL not in df.columns:
        raise KeyError(f"Falta la columna de timestamp: {TIMESTAMP_COL}")
    df = df.copy()
    df[TIMESTAMP_COL] = pd.to_datetime(df[TIMESTAMP_COL])
    df = df.sort_values(["planta", "año", "ID_tachada", TIMESTAMP_COL])
    df["min_acum"] = (
        df.groupby(["planta","año","ID_tachada"])[TIMESTAMP_COL]
          .transform(lambda s: (s - s.min()).dt.total_seconds()/60.0)
          .round(2)
    )
    df["horas"] = (df["min_acum"]/60.0).round(3)
    return df

# ---------- GRÁFICO ----------
def plot_tachada(
    gdf: pd.DataFrame,
    variedad: str, id_tachada, planta: str, año,
    temp_col: str, hum_col: str, out_dir: Path
):
    title = f"{variedad} tachada {id_tachada}_{short_year(año)}"
    fname = slugify(title) + ".png"
    out_path = out_dir / fname

    # Fondo BLANCO explícito
    fig, (ax1, ax2) = plt.subplots(
        2, 1, figsize=FIGSIZE, dpi=DPI, sharex=True,
        gridspec_kw={"height_ratios":[1,1]}, facecolor="white"
    )

    for ax in (ax1, ax2):
        ax.set_facecolor("white")
        ax.grid(True, axis="y", linestyle="-", linewidth=0.6, alpha=0.35, color="#7f7f7f")
        for spine in ax.spines.values():
            spine.set_color("black")
        ax.tick_params(colors="black", labelsize=9)

    # Título DENTRO de la figura
    ax1.set_title(title, fontsize=12, color="black", pad=6)

    # Temperatura (azul)
    ax1.plot(gdf["horas"], gdf[temp_col], linewidth=1.8, color="#1f77b4")
    ax1.set_ylabel("Temperatura (°C)", color="black")

    # Humedad (rojo)
    ax2.plot(gdf["horas"], gdf[hum_col], linewidth=1.8, color="#d62728")
    ax2.set_xlabel("Tiempo (h)", color="black")
    ax2.set_ylabel("Humedad (%)", color="black")

    # Escalas fijas + ticks como el ejemplo
    if FORCE_YLIMS:
        ax1.set_ylim(*TEMP_YLIM)
        ax2.set_ylim(*HUM_YLIM)
        if TEMP_YTICK:
            ax1.set_yticks(np.arange(TEMP_YLIM[0], TEMP_YLIM[1]+1e-9, TEMP_YTICK))
        if HUM_YTICK:
            ax2.set_yticks(np.arange(HUM_YLIM[0], HUM_YLIM[1]+1e-9, HUM_YTICK))

    plt.tight_layout()
    fig.savefig(out_path, bbox_inches="tight", facecolor="white")
    plt.close(fig)
    return out_path

# ---------- MAIN ----------
def main():
    base_dir = ensure_dir(Path(OUTPUT_BASE) / "Gráficos Tachadas")
    total_imgs = 0

    for f in INPUT_FILES:
        fpath = Path(f)
        if not fpath.exists():
            print(f"⚠️  No se encontró: {fpath}")
            continue

        print(f"\n📄 Procesando: {fpath.name}")
        df = read_any(fpath)

        needed = ["planta","año","ID_tachada","Variedad", TIMESTAMP_COL]
        missing = [c for c in needed if c not in df.columns]
        if missing:
            print(f"   ❌ Faltan columnas: {missing}. Salteado.")
            continue

        temp_col = pick_col(df, TEMP_CANDIDATES)
        hum_col  = pick_col(df, HUM_CANDIDATES)

        # Evitar que falte humedad/temperatura
        for c in (temp_col, hum_col):
            if df[c].isna().all():
                print(f"   ⚠️  La columna '{c}' está vacía en este archivo.")
        
        df = add_min_acum(df)
        groups = df.groupby(["planta","año","ID_tachada","Variedad"], sort=True)

        iterable = list(groups)
        if SAMPLE_MODE:
            iterable = iterable[:SAMPLE_MAX]

        for (planta, año, id_t, variedad), g in iterable:
            out_dir = ensure_dir(base_dir / str(planta) / str(int(año)))
            try:
                out = plot_tachada(
                    g.sort_values("horas"),
                    variedad=variedad, id_tachada=id_t, planta=planta, año=año,
                    temp_col=temp_col, hum_col=hum_col, out_dir=out_dir
                )
                total_imgs += 1
                #print(f"   ✅ {planta}/{año} -> {out.name}")
            except Exception as e:
                print(f"   ⚠️  Error graficando {planta}-{año}-{id_t}: {e}")

    print(f"\n🧾 Listo. Imágenes generadas: {total_imgs}")

if __name__ == "__main__":
    main()



📄 Procesando: RB_2025_calibrado.xlsx

🧾 Listo. Imágenes generadas: 351
