In [75]:
from datasets import load_dataset
import numpy as np
from tqdm import tqdm
from PIL import Image, ImageOps
from sklearn.model_selection import train_test_split
import os, math, random
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import MobileNetV2



In [76]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU memory growth activado.")
    except Exception as e:
        print("No se pudo activar memory growth:", e)

try:
    tf.config.threading.set_intra_op_parallelism_threads(2)
    tf.config.threading.set_inter_op_parallelism_threads(2)
    print("Hilos CPU limitados a 2/2.")
except Exception:
    pass

try:
    tf.config.optimizer.set_jit(True)
    print("XLA activado.")
except Exception:
    pass

# ==== (opcional) Desactiva mixed precision por estabilidad ====
# Si quieres probar luego: comenta la línea inferior y activa mixed precision.
tf.keras.mixed_precision.set_global_policy("float32")
print("Precision: float32")


GPU memory growth activado.
Hilos CPU limitados a 2/2.
XLA activado.
Precision: float32


In [77]:
dataset = load_dataset("Hemg/AI-Generated-vs-Real-Images-Datasets", split="train")
print(dataset)

Dataset({
    features: ['image', 'label'],
    num_rows: 152710
})


In [78]:
labels = np.array([int(dataset[i]["label"]) for i in tqdm(range(len(dataset)), desc="Leyendo etiquetas")])


Leyendo etiquetas: 100%|██████████| 152710/152710 [01:39<00:00, 1532.89it/s]


In [79]:
idx_full = np.arange(len(dataset))
idx_sub, _idx_discard, y_sub, _y_discard = train_test_split(
    idx_full, labels, test_size=0.7, stratify=labels, random_state=42
)
print(f"Usando ~30% del dataset: {len(idx_sub)} de {len(dataset)}")

Usando ~30% del dataset: 45813 de 152710


In [80]:
idx_train, idx_tmp, y_train, y_tmp = train_test_split(
    idx_sub, y_sub, test_size=0.2, stratify=y_sub, random_state=42
)
idx_val, idx_test, y_val, y_test = train_test_split(
    idx_tmp, y_tmp, test_size=0.5, stratify=y_tmp, random_state=42
)

idx_train = [int(i) for i in idx_train]
idx_val   = [int(i) for i in idx_val]
idx_test  = [int(i) for i in idx_test]

print(f"Train: {len(idx_train)} | Val: {len(idx_val)} | Test: {len(idx_test)}")

Train: 36650 | Val: 4581 | Test: 4582


In [81]:
IMG_SIZE = 128

def preprocess_pil(pil_img: Image.Image, size=IMG_SIZE):
    img = ImageOps.exif_transpose(pil_img)
    if img.mode == "P":
        if "transparency" in img.info:
            img = img.convert("RGBA")
        else:
            img = img.convert("RGB")
    if img.mode in ("RGBA", "LA"):
        bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
        img = Image.alpha_composite(bg, img.convert("RGBA")).convert("RGB")
    if img.mode != "RGB":
        img = img.convert("RGB")
    img = ImageOps.fit(img, (size, size), method=Image.BILINEAR)
    arr = np.asarray(img, dtype=np.uint8).astype("float32") / 255.0
    return arr

In [82]:
def _py_load_and_process(i):
    i = int(i)
    sample = dataset[i]
    x = preprocess_pil(sample["image"], IMG_SIZE)
    y = np.float32(sample["label"])  # 0.0 ó 1.0
    return x, y

def make_tf_dataset(indices, batch_size=32, shuffle=True):
    ds = tf.data.Dataset.from_tensor_slices(indices)
    if shuffle:
        ds = ds.shuffle(buffer_size=min(4096, len(indices)), reshuffle_each_iteration=True)

    def tf_loader(i):
        x, y = tf.py_function(func=_py_load_and_process, inp=[i], Tout=(tf.float32, tf.float32))
        x.set_shape((IMG_SIZE, IMG_SIZE, 3))
        y.set_shape(())
        return x, y

    ds = ds.map(tf_loader, num_parallel_calls=4)
    ds = ds.batch(batch_size, drop_remainder=False)
    options = tf.data.Options()
    options.experimental_deterministic = False
    ds = ds.with_options(options)
    ds = ds.prefetch(2)
    return ds

BATCH_SIZE = 32
train_ds = make_tf_dataset(idx_train, batch_size=BATCH_SIZE, shuffle=True)
val_ds   = make_tf_dataset(idx_val,   batch_size=BATCH_SIZE, shuffle=False)
test_ds  = make_tf_dataset(idx_test,  batch_size=BATCH_SIZE, shuffle=False)

In [83]:
def build_model(input_shape=(IMG_SIZE, IMG_SIZE, 3), train_base=False, dropout=0.3, alpha=0.35):
    inputs = layers.Input(shape=input_shape)

    # Augmentaciones ligeras
    x = layers.RandomFlip("horizontal")(inputs)
    x = layers.RandomContrast(0.1)(x)
    x = layers.RandomZoom(0.05)(x)
    x = layers.RandomRotation(0.02)(x)

    # Escala a [-1,1] (MobileNetV2)
    x = layers.Rescaling(1./127.5, offset=-1.0)(x)

    base = MobileNetV2(input_tensor=x, include_top=False, weights="imagenet", alpha=alpha)
    base.trainable = train_base

    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(dropout)(x)
    # Binaria: 1 logit -> sigmoid
    out = layers.Dense(1, activation="sigmoid", dtype="float32")(x)
    return models.Model(inputs, out)

model = build_model()


  base = MobileNetV2(input_tensor=x, include_top=False, weights="imagenet", alpha=alpha)


In [84]:
counts = np.bincount(y_sub)
ratio = counts.max() / counts.min()
if ratio > 1.5:
    total = counts.sum(); unique = np.arange(len(counts))
    class_weight = {int(k): float(total / (len(unique) * v)) for k, v in zip(unique, counts)}
    print(f"Usaré class_weight (ratio={ratio:.2f}):", class_weight)
else:
    class_weight = None
    print(f"Dataset balanceado (ratio={ratio:.2f}); NO usaré class_weight.")


Dataset balanceado (ratio=1.13); NO usaré class_weight.


In [85]:
class WarmupThenCosine(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, base_lr, steps_per_epoch, total_epochs, warmup_epochs=1):
        super().__init__()
        self.base_lr = base_lr
        self.spe = tf.cast(steps_per_epoch, tf.float32)
        self.total_steps = tf.cast(steps_per_epoch * total_epochs, tf.float32)
        self.warmup_steps = tf.cast(steps_per_epoch * warmup_epochs, tf.float32)
        self.decay_steps = tf.maximum(self.total_steps - self.warmup_steps, 1.)

    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        # warmup lineal
        def warm():
            return self.base_lr * (step + 1.) / tf.maximum(self.warmup_steps, 1.)
        # cosine decay
        def cosine():
            t = (step - self.warmup_steps) / self.decay_steps
            t = tf.clip_by_value(t, 0., 1.)
            return 0.5 * self.base_lr * (1 + tf.cos(tf.constant(math.pi) * t))
        return tf.cond(step < self.warmup_steps, warm, cosine)

steps_per_epoch = max(1, math.ceil(len(idx_train) / BATCH_SIZE))
TOTAL_EPOCHS = 6
WARMUP_EPOCHS = 1
BASE_LR = 1e-3

lr_schedule = WarmupThenCosine(base_lr=BASE_LR, steps_per_epoch=steps_per_epoch,
                               total_epochs=TOTAL_EPOCHS, warmup_epochs=WARMUP_EPOCHS)

optimizer = tf.keras.optimizers.AdamW(learning_rate=lr_schedule, weight_decay=1e-4)

# Compilación binaria (sin one-hot, estable)
model.compile(
    optimizer=optimizer,
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=["accuracy", tf.keras.metrics.AUC(name="auc")]
)

print(model.summary(line_length=120))



None


In [86]:
early = tf.keras.callbacks.EarlyStopping(
    monitor="val_auc", mode="max", patience=2, restore_best_weights=True
)


In [None]:
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=TOTAL_EPOCHS,
    callbacks=[early],
    class_weight=class_weight,   # None si balanceado
    verbose=1
)


Epoch 1/6
[1m1146/1146[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m114s[0m 89ms/step - accuracy: 0.5686 - auc: 0.5943 - loss: 0.6762 - val_accuracy: 0.6606 - val_auc: 0.7227 - val_loss: 0.6185
Epoch 2/6
[1m 936/1146[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m15s[0m 75ms/step - accuracy: 0.6394 - auc: 0.6946 - loss: 0.6310

In [None]:
fine_tune_at = max(0, len(model.layers) - 40)
for layer in model.layers[fine_tune_at:]:
    if hasattr(layer, "trainable"):
        layer.trainable = True

model.compile(
    optimizer=tf.keras.optimizers.AdamW(learning_rate=2e-4, weight_decay=5e-5),
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=["accuracy", tf.keras.metrics.AUC(name="auc")]
)
model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=2,             # 1–2 es suficiente
    callbacks=[early],
    class_weight=class_weight,
    verbose=1
)


In [None]:
test_metrics = model.evaluate(test_ds, verbose=1)
print("Test metrics:", dict(zip(model.metrics_names, test_metrics)))

In [None]:

val_probs = model.predict(val_ds, verbose=0).ravel()
# Recupera etiquetas reales de val (ya las tienes en y_val):
y_val_true = np.array([dataset[i]["label"] for i in idx_val], dtype=np.int32)

# Busca umbral óptimo por accuracy (puedes cambiar a F1/AUC-Youden)
ths = np.linspace(0.1, 0.9, 81)
accs = [(th, ( (val_probs >= th).astype(int) == y_val_true ).mean()) for th in ths]
best_th, best_acc = max(accs, key=lambda x: x[1])
print(f"Umbral óptimo val: {best_th:.3f} (acc={best_acc:.3f})")

# Evalúa en test con ese umbral
test_probs = model.predict(test_ds, verbose=0).ravel()
y_test_true = np.array([dataset[i]["label"] for i in idx_test], dtype=np.int32)
test_pred = (test_probs >= best_th).astype(int)
test_acc = (test_pred == y_test_true).mean()
print(f"Test accuracy @th={best_th:.3f}: {test_acc:.3f}")