In [3]:
# @title Entrenamiento de modelo por fases
import os, zipfile, shutil, json, argparse, sys
from typing import List, Tuple, Dict
import glob
import numpy as np
from PIL import Image, UnidentifiedImageError, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import BinaryAccuracy, Precision, Recall, AUC

# configuración global (seed nos sirve para que al momento de re entrenar el modelo no nos dé resultados diferentes)
SEED = 42
np.random.seed(SEED)
tf.keras.utils.set_random_seed(SEED)
try:
    from tensorflow.keras import mixed_precision
    if tf.config.list_physical_devices('GPU'):
        mixed_precision.set_global_policy("mixed_float16"); print("⚙️ GPU: mixed_float16 ON")
    else:
        mixed_precision.set_global_policy("float32"); print("🧠 CPU: float32 ON")
except Exception:
    pass

EXTS = {".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".heic",".heif"}

# funciones auxiliares para lectura de carpetas e imágenes
def clear_dir(path: str):
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path, exist_ok=True)

def extract_zip_to_dir(zip_path: str, target_dir: str):
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(target_dir)

def gather_images(src_dir: str, dst_dir: str) -> int:
    os.makedirs(dst_dir, exist_ok=True)
    count = 0
    for root, _, files_in in os.walk(src_dir):
        if "__MACOSX" in root:
            continue
        for fname in files_in:
            if fname.startswith("._"):
                continue
            ext = os.path.splitext(fname)[1].lower()
            if ext in EXTS:
                src = os.path.join(root, fname)
                new_name = f"{count:08d}_{os.path.basename(fname)}"
                dst = os.path.join(dst_dir, new_name)
                try:
                    shutil.move(src, dst)
                except Exception:
                    try: shutil.copy2(src, dst)
                    except Exception: continue
                count += 1
    return count

def remove_too_small_files(folder: str, min_kb=5):
    removed=0
    for fname in list(os.listdir(folder)):
        p=os.path.join(folder,fname)
        if os.path.isfile(p):
            try:
                if os.path.getsize(p) < min_kb*1024:
                    os.remove(p); removed+=1
            except Exception:
                pass
    if removed>0:
        print(f"🧹 Eliminados <{min_kb}KB en {folder}: {removed}")

#unificar formatos si se encontraran varios
def sanitize_and_convert_to_jpg(folder: str) -> Tuple[int,int,int]:
    """Convierte todo a JPG; las fallidas van a _fallidas/."""
    failed_dir = os.path.join(folder, "_fallidas"); os.makedirs(failed_dir, exist_ok=True)
    ok, rewrote, fail, shown = 0, 0, 0, 0
    for fname in list(os.listdir(folder)):
        path = os.path.join(folder, fname)
        if not os.path.isfile(path):
            continue
        root, ext = os.path.splitext(fname)
        try:
            with Image.open(path) as im:
                if im.mode in ("RGBA","LA"):
                    bg = Image.new("RGB", im.size, (255,255,255))
                    bg.paste(im, mask=im.split()[-1]); im = bg
                else:
                    im = im.convert("RGB")
                out_path = os.path.join(folder, f"{root}.jpg")
                im.save(out_path, format="JPEG", quality=95, optimize=True)
                if out_path != path:
                    try: os.remove(path)
                    except Exception: pass
                if ext.lower() in (".jpg",".jpeg"):
                    rewrote += 1
                else:
                    ok += 1
        except (UnidentifiedImageError, OSError, ValueError) as e:
            fail += 1
            try: shutil.move(path, os.path.join(failed_dir, fname))
            except Exception: pass
            if shown < 8:
                print(f"⚠️ Falló {fname}: {type(e).__name__}: {e}"); shown += 1
    return ok, rewrote, fail

def list_jpgs(dir_):
    return [p for p in glob.glob(os.path.join(dir_, "**", "*.jpg"), recursive=True)
            if "/_fallidas/" not in p and "\\_fallidas\\" not in p]

def stratified_split(paths: List[str], val_frac=0.2, rng=None) -> Tuple[List[str], List[str]]:
    rng = rng or np.random.RandomState(SEED)
    arr = np.array(paths)
    rng.shuffle(arr)
    k = max(1, int(len(arr)*val_frac))
    return arr[k:].tolist(), arr[:k].tolist()  # train, val

def make_balanced_train(train_nv, train_v, mode="none", rng=None):
    rng = rng or np.random.RandomState(SEED)
    train_nv = list(train_nv); train_v = list(train_v)
    if mode == "none":
        return train_nv, train_v
    if mode == "undersample":
        m = min(len(train_nv), len(train_v))
        rng.shuffle(train_nv); rng.shuffle(train_v)
        return train_nv[:m], train_v[:m]
    if mode == "oversample":
        if len(train_nv) < len(train_v):
            need = len(train_v) - len(train_nv)
            add = rng.choice(train_nv, size=need, replace=True).tolist()
            return train_nv + add, train_v
        else:
            need = len(train_nv) - len(train_v)
            add = rng.choice(train_v, size=need, replace=True).tolist()
            return train_nv, train_v + add
    if mode == "both":
        target = max(len(train_nv), len(train_v))
        if len(train_nv) < target:
            add = rng.choice(train_nv, size=target-len(train_nv), replace=True).tolist()
            train_nv = train_nv + add
        if len(train_v) < target:
            add = rng.choice(train_v, size=target-len(train_v), replace=True).tolist()
            train_v = train_v + add
        rng.shuffle(train_nv); rng.shuffle(train_v)
        m = min(len(train_nv), len(train_v))
        return train_nv[:m], train_v[:m]
    raise ValueError("balance_mode inválido")

# datasets (positiva = no_validas -> etiqueta=1)
def build_datasets_from_paths(train_nv, train_v, val_nv, val_v, img_size=(160,160), batch=32):
    # labels: 1 = no_validas (positiva), 0 = validas
    train_paths = np.array(list(train_nv) + list(train_v))
    train_labels= np.array([1]*len(train_nv) + [0]*len(train_v))
    val_paths   = np.array(list(val_nv) + list(val_v))
    val_labels  = np.array([1]*len(val_nv) + [0]*len(val_v))

    rng = np.random.RandomState(SEED)
    p = rng.permutation(len(train_paths))
    train_paths, train_labels = train_paths[p], train_labels[p]

    def load_img(path, label):
        img = tf.io.read_file(path)
        img = tf.io.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, img_size)
        return img, tf.cast(label, tf.int32)

    def aug_fn(img, label):
        img = tf.image.random_flip_left_right(img, seed=SEED)
        img = tf.image.random_contrast(img, 0.9, 1.1, seed=SEED)
        img = tf.image.random_brightness(img, 0.05, seed=SEED)
        return img, label

    def scale_fn(img, label):
        # MobileNetV2 escala [-1,1]
        img = (img/127.5) - 1.0
        return img, label

    train_ds = tf.data.Dataset.from_tensor_slices((train_paths, train_labels)) \
        .shuffle(2048, seed=SEED) \
        .map(load_img, num_parallel_calls=tf.data.AUTOTUNE) \
        .map(aug_fn, num_parallel_calls=tf.data.AUTOTUNE) \
        .map(scale_fn, num_parallel_calls=tf.data.AUTOTUNE) \
        .batch(batch).prefetch(tf.data.AUTOTUNE)

    val_ds = tf.data.Dataset.from_tensor_slices((val_paths, val_labels)) \
        .map(load_img, num_parallel_calls=tf.data.AUTOTUNE) \
        .map(scale_fn, num_parallel_calls=tf.data.AUTOTUNE) \
        .batch(batch).prefetch(tf.data.AUTOTUNE)

    return train_ds, val_ds, (train_paths, train_labels, val_paths, val_labels)

# MODELO
def build_model(img_size=(160,160), try_imagenet=True):
    inputs = layers.Input(shape=(*img_size, 3))

    # escalado tf.data
    from tensorflow.keras.applications import mobilenet_v2
    base = None
    if try_imagenet:
        try:
            print("🔼 Cargando MobileNetV2 (ImageNet)…")
            base = mobilenet_v2.MobileNetV2(include_top=False, weights="imagenet", input_tensor=inputs, alpha=1.0)
        except Exception as e:
            print(f"⚠️ No se pudo cargar ImageNet ({e}); entrenaré desde cero.")
    if base is None:
        base = mobilenet_v2.MobileNetV2(include_top=False, weights=None, input_tensor=inputs, alpha=1.0)

    base.trainable = False
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dropout(0.35)(x)
    out = layers.Dense(1, activation="sigmoid", dtype="float32", name="prob_no_validas")(x)
    model = models.Model(inputs, out)
    return model, base

def partial_unfreeze(base, fraction=0.30):
    base.trainable = True
    cut = int(len(base.layers)*(1.0 - fraction))
    for i, layer in enumerate(base.layers):
        layer.trainable = (i >= cut)

def compute_class_weights(nv_count: int, v_count: int) -> Dict[int, float]:
    total = nv_count + v_count
    """ la etiqueta 1 es para imágenes no válidas y la etiqueta 0 para imágenes válidas
        el cw se utiliza para balancear el peso del error según la proporción de imágenes con las que contamos"""
    cw = {1: float(total / (2 * max(1, nv_count))),
          0: float(total / (2 * max(1, v_count)))}
    print("⚖️ class_weights:", cw)
    return cw

# métricas y evaluación
from sklearn.metrics import (classification_report, confusion_matrix,
                             precision_score, recall_score, f1_score,
                             average_precision_score, roc_auc_score)

def evaluate(best_model, val_ds):
    y_true, y_prob = [], []
    for x, y in val_ds:
        p = best_model.predict(x, verbose=0).reshape(-1)
        y_true.append(y.numpy().reshape(-1))
        y_prob.append(p)
    y_true = np.concatenate(y_true).astype(int)
    y_prob = np.concatenate(y_prob)

    ap = average_precision_score(y_true, y_prob)
    aucroc = roc_auc_score(y_true, y_prob)
    print(f"🔎 AUC-PR(no_validas) = {ap:.3f}")
    print(f"🔎 AUC-ROC(no_validas) = {aucroc:.3f}")

    for th in [0.30, 0.40, 0.45, 0.50, 0.55, 0.60]:
        yp = (y_prob >= th).astype(int)
        p = precision_score(y_true, yp, zero_division=0)
        r = recall_score(y_true, yp, zero_division=0)
        f1= f1_score(y_true, yp, zero_division=0)
        print(f"th={th:.2f} → Precision={p:.3f} | Recall={r:.3f} | F1={f1:.3f}")

    yp = (y_prob >= 0.50).astype(int)
    print("\n📋 Reporte (th=0.50, positiva=no_validas)")
    print(classification_report(y_true, yp, target_names=["validas","no_validas"], digits=3))
    print("🧩 Matriz de confusión:\n", confusion_matrix(y_true, yp))

# función principal
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode", choices=["colab","local"], default="colab")
    parser.add_argument("--validas_zip", type=str, default="")
    parser.add_argument("--no_validas_zip", type=str, default="")
    parser.add_argument("--data_root", type=str, default="./dataset")
    parser.add_argument("--img_size", type=int, default=160)
    parser.add_argument("--batch", type=int, default=32)
    parser.add_argument("--val_frac", type=float, default=0.20)
    parser.add_argument("--balance_mode", choices=["none","undersample","oversample","both"], default="none")
    parser.add_argument("--epochs_head", type=int, default=8)
    parser.add_argument("--epochs_ft", type=int, default=12)
    parser.add_argument("--lr_head", type=float, default=1e-3)
    parser.add_argument("--lr_ft", type=float, default=2e-5)
    parser.add_argument("--no_pretrained", action="store_true")
    parser.add_argument("--export_saved_model", action="store_true")
    args, _ = parser.parse_known_args()

    DATA_ROOT = args.data_root
    IMG_SIZE = (args.img_size, args.img_size)

    # Preparar carpetas
    clear_dir(DATA_ROOT)
    vdir = os.path.join(DATA_ROOT, "validas")
    nvdir = os.path.join(DATA_ROOT, "no_validas")
    os.makedirs(vdir, exist_ok=True); os.makedirs(nvdir, exist_ok=True)

    # Ingesta
    if args.mode == "colab":
        try:
            from google.colab import files
        except Exception:
            print("❌ No estás en Colab. Usa --mode local."); sys.exit(1)
        print("📦 Sube el ZIP para *validas*")
        up_v = files.upload(); vz = next(iter(up_v.keys()))
        print("📦 Sube el ZIP para *no_validas*")
        up_nv = files.upload(); nvz = next(iter(up_nv.keys()))
    else:
        if not (args.validas_zip and args.no_validas_zip):
            print("❌ En modo local pasa --validas_zip y --no_validas_zip"); sys.exit(1)
        vz, nvz = args.validas_zip, args.no_validas_zip

    # Extraer y consolidar
    tmp_v, tmp_nv = "./tmp_validas", "./tmp_no_validas"
    clear_dir(tmp_v); clear_dir(tmp_nv)
    extract_zip_to_dir(vz, tmp_v); extract_zip_to_dir(nvz, tmp_nv)
    n_v = gather_images(tmp_v, vdir); n_nv = gather_images(tmp_nv, nvdir)
    shutil.rmtree(tmp_v, ignore_errors=True); shutil.rmtree(tmp_nv, ignore_errors=True)
    print(f"📦 validas movidas: {n_v}  |  no_validas movidas: {n_nv}")

    # Filtrar ruido y sanitizar
    remove_too_small_files(vdir, 5); remove_too_small_files(nvdir, 5)
    print("🧼 Sanitizando 'validas'…")
    ok_v, rew_v, fail_v = sanitize_and_convert_to_jpg(vdir)
    print(f"   ✔ convertidos: {ok_v}, re-salvados jpg: {rew_v}, fallidos: {fail_v}")
    print("🧼 Sanitizando 'no_validas'…")
    ok_nv, rew_nv, fail_nv = sanitize_and_convert_to_jpg(nvdir)
    print(f"   ✔ convertidos: {ok_nv}, re-salvados jpg: {rew_nv}, fallidos: {fail_nv}")

    # Listado y split estratificado
    paths_nv = list_jpgs(nvdir)
    paths_v  = list_jpgs(vdir)
    print(f"📊 Dataset → no_validas={len(paths_nv)} | validas={len(paths_v)}")

    train_nv, val_nv = stratified_split(paths_nv, args.val_frac)
    train_v,  val_v  = stratified_split(paths_v,  args.val_frac)
    print(f"✂️ Split → train: no_validas={len(train_nv)} validas={len(train_v)} | val: no_validas={len(val_nv)} validas={len(val_v)}")

    train_nv_b, train_v_b = make_balanced_train(train_nv, train_v, args.balance_mode)
    print(f"⚖️ Train balanceado ({args.balance_mode}) → no_validas={len(train_nv_b)} | validas={len(train_v_b)}")

    # Datasets
    train_ds, val_ds, _ = build_datasets_from_paths(train_nv_b, train_v_b, val_nv, val_v,
                                                    img_size=IMG_SIZE, batch=args.batch)

    # Modelo
    model, backbone = build_model(IMG_SIZE, try_imagenet=not args.no_pretrained)

    loss = tf.keras.losses.BinaryCrossentropy()
    metrics = [BinaryAccuracy(name="bin_acc"),
               Precision(name="precision"),
               Recall(name="recall"),
               AUC(name="auc_roc", curve="ROC"),
               AUC(name="auc_pr", curve="PR")]

    callbacks_head = [
        tf.keras.callbacks.ModelCheckpoint(os.path.join(DATA_ROOT, "best.keras"), monitor="val_loss", save_best_only=True, verbose=1),
        tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-6, verbose=1),
        tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True, verbose=1),
    ]

    # class_weights solo si no balanceamos
    use_cw = (args.balance_mode == "none")
    class_weights = None
    if use_cw:
        class_weights = compute_class_weights(len(train_nv), len(train_v))

    # Fase 1
    print("\n🚀 Fase 1: entrenando cabezal (backbone congelado)")
    model.compile(optimizer=Adam(args.lr_head), loss=loss, metrics=metrics)
    model.fit(train_ds, validation_data=val_ds, epochs=args.epochs_head,
              callbacks=callbacks_head, class_weight=class_weights)

    # Fase 2
    print("\n🔓 Fase 2: fine-tuning parcial")
    partial_unfreeze(backbone, fraction=0.30)
    callbacks_ft = [
        tf.keras.callbacks.ModelCheckpoint(os.path.join(DATA_ROOT, "best.keras"), monitor="val_loss", save_best_only=True, verbose=1),
        tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-6, verbose=1),
        tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=4, restore_best_weights=True, verbose=1),
    ]
    model.compile(optimizer=Adam(args.lr_ft), loss=loss, metrics=metrics)
    model.fit(train_ds, validation_data=val_ds, epochs=args.epochs_ft,
              callbacks=callbacks_ft, class_weight=class_weights)

    # Guardados
    with open(os.path.join(DATA_ROOT, "class_names.json"), "w") as f:
        json.dump(["no_validas","validas"], f)
    out_model = os.path.join(DATA_ROOT, "clf_validas_no_validas.keras")
    model.save(out_model)
    print(f"\n✅ Modelo final: {out_model}")
    print(f"🧠 Mejor checkpoint: {os.path.join(DATA_ROOT, 'best.keras')}")
    print(f"📝 class_names.json guardado.")

    # SavedModel opcional
    if args.export_saved_model:
        export_dir = os.path.join(DATA_ROOT, "saved_model")
        try:
            @tf.function(input_signature=[tf.TensorSpec([None, *IMG_SIZE, 3], tf.float32)])
            def serving_fn(x): return {"prob_no_validas": model(x, training=False)}
            tf.saved_model.save(model, export_dir, signatures={"serving_default": serving_fn})
            print(f"📦 SavedModel exportado en: {export_dir}")
        except Exception as e:
            print(f"⚠️ No se pudo exportar SavedModel: {e}")

    # Evaluación final
    print("\n📈 Evaluación final (positiva = no_validas) — validación estratificada")
    best = tf.keras.models.load_model(os.path.join(DATA_ROOT, "best.keras"))
    evaluate(best, val_ds)

if __name__ == "__main__":
    main()


🧠 CPU: float32 ON
📦 Sube el ZIP para *validas*


Saving validas.zip to validas.zip
📦 Sube el ZIP para *no_validas*


Saving no_validas.zip to no_validas.zip
📦 validas movidas: 1104  |  no_validas movidas: 338
🧼 Sanitizando 'validas'…
   ✔ convertidos: 0, re-salvados jpg: 1104, fallidos: 0
🧼 Sanitizando 'no_validas'…
   ✔ convertidos: 0, re-salvados jpg: 338, fallidos: 0
📊 Dataset → no_validas=338 | validas=1104
✂️ Split → train: no_validas=271 validas=884 | val: no_validas=67 validas=220
⚖️ Train balanceado (none) → no_validas=271 | validas=884
🔼 Cargando MobileNetV2 (ImageNet)…


  base = mobilenet_v2.MobileNetV2(include_top=False, weights="imagenet", input_tensor=inputs, alpha=1.0)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
⚖️ class_weights: {1: 2.1309963099630997, 0: 0.6532805429864253}

🚀 Fase 1: entrenando cabezal (backbone congelado)
Epoch 1/8
[1m36/37[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 439ms/step - auc_pr: 0.2842 - auc_roc: 0.6125 - bin_acc: 0.6390 - loss: 0.8052 - precision: 0.2725 - recall: 0.4562
Epoch 1: val_loss improved from inf to 0.41972, saving model to ./dataset/best.keras
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 763ms/step - auc_pr: 0.2896 - auc_roc: 0.6156 - bin_acc: 0.6389 - loss: 0.8032 - precision: 0.2760 - recall: 0.4627 - val_auc_pr: 0.6854 - val_auc_roc: 0.8607 - val_bin_acc: 0.8118 - val_loss: 0.4197 - val_precision: 0.5823 - val_recall: 0.6866 - learning_rate: 0.0010
Epoch 2/8
[1m36/37[0m [32m━

In [5]:
from google.colab import files
files.download("dataset/best.keras")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [2]:
# @title Clasificador
import os, zipfile, shutil, json, argparse, sys, csv
from typing import List, Tuple
import numpy as np

from PIL import Image, UnidentifiedImageError, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

import tensorflow as tf

# configuración por defecto del modelo
MODEL_PATH_DEFAULT = "./dataset/best.keras"
CLASSES_JSON_DEFAULT = "./dataset/class_names.json"

# Como queremos menos falsos positivos, ponemos un umbral alto por defecto.
LOW_DEFAULT  = 0.50     # este no se usa ahorita porque no queremos crear carpeta revisar, pero por si acaso, también está aquí
HIGH_DEFAULT = 0.75     # >= HIGH => no_validas; < HIGH => validas
USE_GRAY_ZONE_DEFAULT = False  # sin zona gris (esto sirve para la carpeta revisar)


def clear_dir(path: str):
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path, exist_ok=True)

def extract_zip_to_dir(zip_path: str, target_dir: str):
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(target_dir)

def iter_image_files(root_dir: str, exts={".jpg",".jpeg",".png",".bmp",".webp",".tif",".tiff",".heic",".heif"}):
    for root, _, fs in os.walk(root_dir):
        if "__MACOSX" in root:  # esto es por si está usando una macbook
            continue
        for fname in fs:
            if fname.startswith("._"):
                continue
            ext = os.path.splitext(fname)[1].lower()
            if ext in exts:
                yield os.path.join(root, fname)

def load_img_arr(path: str, size: Tuple[int,int]):
    # Devuelve array float32 escalado a [-1,1] (MobileNetV2).
    with Image.open(path) as im:
        if im.mode in ("RGBA","LA"):
            bg = Image.new("RGB", im.size, (255,255,255))
            bg.paste(im, mask=im.split()[-1])
            im = bg
        else:
            im = im.convert("RGB")
        im = im.resize(size, resample=Image.BILINEAR)
        arr = np.asarray(im, dtype=np.float32)
    arr = (arr / 127.5) - 1.0
    return arr

def safe_makedirs(path: str):
    os.makedirs(path, exist_ok=True)

def write_csv(rows: List[Tuple[str,float,str]], out_csv: str):
    with open(out_csv, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["file", "prob_no_validas", "decision"])
        w.writerows(rows)

# función principal
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode", choices=["colab","local"], default="colab")
    parser.add_argument("--input_zip", type=str, default="", help="Ruta al ZIP de imágenes (modo local)")
    parser.add_argument("--model_path", type=str, default=MODEL_PATH_DEFAULT)
    parser.add_argument("--classes_json", type=str, default=CLASSES_JSON_DEFAULT)
    parser.add_argument("--img_size", type=int, default=160)  # usa 160 si entrenaste a 160; 224 si a 224
    parser.add_argument("--batch", type=int, default=64)

    # Umbral binario
    parser.add_argument("--high", type=float, default=HIGH_DEFAULT, help="Umbral binario: >= high => no_validas")

    # aquí no se está usando gray zone (que es la establece imágenes confusas en carpeta revisar)
    parser.add_argument("--use_gray_zone", action="store_true", default=USE_GRAY_ZONE_DEFAULT)
    parser.add_argument("--low", type=float, default=LOW_DEFAULT)

    # Salidas
    parser.add_argument("--workdir", type=str, default="./clasificacion_work")
    parser.add_argument("--output_zip", type=str, default="./clasificado.zip")
    parser.add_argument("--output_csv", type=str, default="./resultados.csv")

    args, _ = parser.parse_known_args()

    # leer zip de imágenes
    IN_DIR = os.path.join(args.workdir, "imagenes_input")
    clear_dir(args.workdir)
    safe_makedirs(IN_DIR)

    if args.mode == "colab":
        try:
            from google.colab import files as colab_files  # type: ignore
        except Exception:
            print("❌ No estás en Colab; usa --mode local y --input_zip"); sys.exit(1)
        print("📦 Sube el archivo ZIP con imágenes a clasificar")
        uploaded = colab_files.upload()
        if not uploaded:
            print("❌ No se subió nada."); sys.exit(1)
        input_zip = next(iter(uploaded.keys()))
    else:
        if not args.input_zip or not os.path.exists(args.input_zip):
            print("❌ En modo local debes pasar --input_zip con ruta")
            sys.exit(1)
        input_zip = args.input_zip

    extract_zip_to_dir(input_zip, IN_DIR)

    # recolecta imágenes
    files_list = sorted(list(iter_image_files(IN_DIR)))
    print(f"📂 Total archivos candidatos: {len(files_list)}")
    if len(files_list) == 0:
        print("❌ No se encontraron imágenes válidas dentro del ZIP."); sys.exit(1)

    # cargar modelo
    print("🔮 Cargando modelo…")
    model = tf.keras.models.load_model(args.model_path)
    with open(args.classes_json, "r") as f:
        class_names = json.load(f)
    # probabilidad de no válidas
    if isinstance(class_names, list) and "no_validas" in class_names:
        idx_no_validas = class_names.index("no_validas")
    else:
        idx_no_validas = 0
    print("✅ Modelo cargado.")

    IMG_SIZE = (args.img_size, args.img_size)
    BATCH = args.batch
    probs, paths_ok, failed = [], [], []

    # inferencia
    print("🧠 Ejecutando inferencia…")
    for start in range(0, len(files_list), BATCH):
        batch = files_list[start:start+BATCH]
        x_batch, keep = [], []
        for p in batch:
            try:
                x_batch.append(load_img_arr(p, IMG_SIZE))
                keep.append(p)
            except (UnidentifiedImageError, OSError, ValueError) as e:
                failed.append((p, type(e).__name__))
        if not x_batch:
            continue
        x = np.stack(x_batch, axis=0)
        pred = model.predict(x, verbose=0)
        pred = pred.reshape(-1).tolist()
        probs.extend(pred)
        paths_ok.extend(keep)

    print(f"✅ Inferencia terminada. Exitosas: {len(paths_ok)}  |  Fallidas al leer: {len(failed)}")

    # decisión
    HIGH = args.high
    def decide(p):
        return "no_validas" if p >= HIGH else "validas"

    decisions = [decide(p) for p in probs]

    # clasificación en carpetas y descarga de zip
    OUT_DIR = os.path.join(args.workdir, "salida")
    dir_validas   = os.path.join(OUT_DIR, "validas")
    dir_novalidas = os.path.join(OUT_DIR, "no_validas")
    safe_makedirs(dir_validas); safe_makedirs(dir_novalidas)

    rows_csv = []
    counts = {"validas":0,"no_validas":0}
    for p, pr, lab in zip(paths_ok, probs, decisions):
        dst_dir = dir_validas if lab=="validas" else dir_novalidas
        dst_path = os.path.join(dst_dir, os.path.basename(p))
        try:
            shutil.copy2(p, dst_path)
        except Exception:
            with open(p, "rb") as fsrc, open(dst_path, "wb") as fdst:
                fdst.write(fsrc.read())
        rows_csv.append((os.path.relpath(dst_path, OUT_DIR), float(pr), lab))
        counts[lab] = counts.get(lab, 0) + 1

    # guardar un csv con los resultados
    write_csv(rows_csv, args.output_csv)

    # crear un zip al final
    if os.path.exists(args.output_zip):
        os.remove(args.output_zip)
    with zipfile.ZipFile(args.output_zip, "w", zipfile.ZIP_DEFLATED) as zf:
        for root,_,fs in os.walk(OUT_DIR):
            for f in fs:
                full = os.path.join(root, f)
                rel = os.path.relpath(full, OUT_DIR)
                zf.write(full, rel)

    print("\n📊 Resumen de clasificación")
    print(f"  - validas:    {counts.get('validas',0)}")
    print(f"  - no_validas: {counts.get('no_validas',0)}")
    if failed:
        print(f"  - fallidas al leer: {len(failed)} (omitidas)")

    print(f"\n✅ ZIP listo: {args.output_zip}")
    print(f"✅ CSV listo: {args.output_csv}")
    if args.mode == "colab":
        from google.colab import files as colab_files
        colab_files.download(args.output_zip)
        colab_files.download(args.output_csv)

if __name__ == "__main__":
    main()


📦 Sube tu archivo ZIP con imágenes a clasificar


Saving ImagesTest.zip to ImagesTest.zip
📂 Total archivos candidatos: 1768
🔮 Cargando modelo…
✅ Modelo cargado.
🧠 Ejecutando inferencia…
✅ Inferencia terminada. Exitosas: 1768  |  Fallidas al leer: 0

📊 Resumen de clasificación
  - validas:    1298
  - no_validas: 470

✅ ZIP listo: ./clasificado.zip
✅ CSV listo: ./resultados.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>