In [1]:
# Reproducibility
seed = 125

# Architecture
input_shape = (256, 256, 1)
kernel_shape = (3, 3)
pool_size = (2, 2)

# Training
fake_path = "./data/AI recognition dataset/fakeV2/fake-v2"
path_real = "./data/AI recognition dataset/real"
initial_learning_rate = 0.001
batch_size = 32
sample_frac = 1
epochs = 50
patience = 5
decay_after_n_epochs = 5
decay_rate = 0.5
verbose = 1
l2_regularization = 0.001
params = {
    "epochs": epochs,
    "batch_size": batch_size,
    "patience": patience,
    "seed": seed,
    "verbose": verbose,
    "sample_frac": sample_frac,
    "l2_regularization": l2_regularization
}

model_id = f"model_v{params['epochs']}_bs{params['batch_size']}_pat{params['patience']}_frac_{sample_frac}"
checkpoint_path = f"./checkpoints/{model_id}.keras"

In [None]:
params

In [3]:
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras import layers, Model
import keras.backend as K
import keras

import tensorflow as tf
from preprocessing.patch_generator import preprocess
from sklearn.model_selection import train_test_split
from typing import List, Tuple
import matplotlib.pyplot as plt
import numpy as np
import json
import os

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"Available GPUs: {gpus}")
else:
    print("No GPUs available.")

In [None]:
print(tf.__version__)

In [None]:
@tf.function
def hard_tanh(x) -> tf.Tensor:
    return tf.maximum(tf.minimum(x, 1), -1)

x_values = np.linspace(-2, 2, 100)
y_values = hard_tanh(tf.convert_to_tensor(x_values)).numpy()

plt.figure(figsize=(8, 4))
plt.plot(x_values, y_values, color="blue")
plt.title("Hard Tanh Activation Function")
plt.xlabel("Input")
plt.ylabel("Output")
plt.axvline(0, color="gray", linestyle="--")
plt.show()

In [7]:
@tf.function
def get_f1(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)

    true_positives = tf.reduce_sum(tf.round(tf.clip_by_value(y_true * y_pred, 0.0, 1.0)))
    possible_positives = tf.reduce_sum(tf.round(tf.clip_by_value(y_true, 0.0, 1.0)))
    predicted_positives = tf.reduce_sum(tf.round(tf.clip_by_value(y_pred, 0.0, 1.0)))

    epsilon = tf.constant(1e-7, dtype=tf.float32)
    precision = true_positives / (predicted_positives + epsilon)
    recall = true_positives / (possible_positives + epsilon)

    f1_val = (2 * (precision * recall)) / (precision + recall + epsilon)
    return f1_val

In [8]:
def load_images(
    path: str,
    label: int,
    sample_frac: float) -> Tuple[List[str], List[int]]:
    imgs = [os.path.join(path, img) for img in os.listdir(path)]
    sampled_imgs = imgs[:int(len(imgs) * sample_frac)]
    labels = [label for _ in range(len(sampled_imgs))]
    return sampled_imgs, labels

ai_imgs, ai_label = load_images(fake_path, 1, sample_frac)
real_imgs, real_label = load_images(path_real, 0, sample_frac)

X = ai_imgs + real_imgs
y = ai_label + real_label

X_train, X_validate, y_train, y_validate = train_test_split(
    X, y, test_size=0.3, random_state=seed, stratify=y
)

def dict_map(X1, X2, y):
    return {"rich_texture": X1, "poor_texture": X2}, y

def set_shapes(frt, fpt, label):
    frt.set_shape(input_shape)
    fpt.set_shape(input_shape)
    label.set_shape([])
    return frt, fpt, label

dataset = (
    tf.data.Dataset.from_tensor_slices((X_train, y_train))
    .shuffle(len(X_train), seed)
    .map(
        lambda filepath, label: tf.py_function(
            preprocess, [filepath, label], [tf.float64, tf.float64, tf.int32]
        )
    )
    .map(set_shapes)
    .map(dict_map)
    .batch(batch_size)
    .prefetch(tf.data.AUTOTUNE)
)

validation_set = (
    tf.data.Dataset.from_tensor_slices((X_validate, y_validate))
    .map(
        lambda filepath, label: tf.py_function(
            preprocess, [filepath, label], [tf.float64, tf.float64, tf.int32]
        )
    )
    .map(set_shapes)
    .map(dict_map)
    .batch(10)
    .prefetch(tf.data.AUTOTUNE)
)

In [None]:
REG_PARAMS = {"kernel_regularizer": keras.regularizers.l2(l2_regularization)}

class FeatureExtractionLayer(layers.Layer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conv = layers.Conv2D(filters=32, kernel_size=(3, 3), activation="relu", **REG_PARAMS)
        self.bn = layers.BatchNormalization()
        self.activation = layers.Lambda(hard_tanh)

    def call(self, input):
        x = self.conv(input)
        x = self.bn(x)
        x = self.activation(x)
        return x

input0 = layers.Input(shape=input_shape, name="rich_texture")
input1 = layers.Input(shape=input_shape, name="poor_texture")

l0 = FeatureExtractionLayer(name="feature_extraction_layer_rich_texture")(input0)
l1 = FeatureExtractionLayer(name="feature_extraction_layer_poor_texture")(input1)

contrast = layers.subtract((l0, l1))

x = layers.Conv2D(filters=32, kernel_size=kernel_shape, activation="relu", **REG_PARAMS)(contrast)
x = layers.BatchNormalization()(x)
for i in range(3):
    x = layers.Conv2D(filters=32, kernel_size=kernel_shape, activation="relu", **REG_PARAMS)(x)
    x = layers.BatchNormalization()(x)
x = layers.BatchNormalization()(x)

for i in range(4):
    x = layers.Conv2D(filters=32, kernel_size=kernel_shape, activation="relu", **REG_PARAMS)(x)
    x = layers.BatchNormalization()(x)
x = layers.AveragePooling2D(pool_size)(x)

for i in range(2):
    x = layers.Conv2D(filters=32, kernel_size=kernel_shape, activation="relu", **REG_PARAMS)(x)
    x = layers.BatchNormalization()(x)
x = layers.AveragePooling2D(pool_size)(x)

for i in range(2):
    x = layers.Conv2D(filters=32, kernel_size=kernel_shape, activation="relu", **REG_PARAMS)(x)
    x = layers.BatchNormalization()(x)
x = layers.GlobalAveragePooling2D()(x)

x = layers.Flatten()(x)
x = layers.Dense(1, activation="sigmoid", **REG_PARAMS)(x)

model = Model(
    inputs=(input0, input1), outputs=x, name="rt_pt_contrast"
)

steps_per_epoch  = int(len(X_train) / batch_size)
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=steps_per_epoch * decay_after_n_epochs,
    decay_rate=decay_rate
)
optimizer = keras.optimizers.Adam(learning_rate=lr_schedule)
model.compile(optimizer=optimizer, loss=keras.losses.BinaryCrossentropy, metrics=[keras.metrics.BinaryAccuracy(), keras.metrics.Recall(), keras.metrics.Precision()])
model.summary()

## Making data pipeline

In [13]:
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path, monitor="val_loss", save_best_only=True, verbose=verbose
)

early_stopping_callback = EarlyStopping(
    monitor="val_loss", patience=patience, verbose=verbose, restore_best_weights=True
)

## Training the model

In [None]:
history = model.fit(
    dataset,
    epochs=epochs,
    steps_per_epoch=steps_per_epoch,
    batch_size=batch_size,
    validation_data=validation_set,
    callbacks=[checkpoint_callback, early_stopping_callback],
)

In [None]:
steps_per_epoch

In [None]:
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.title("model loss")
plt.ylabel("loss")
plt.xlabel("epoch")
plt.legend(["train", "val"], loc="upper left")
plt.show()

In [None]:
history.history

In [None]:
plt.plot(history.history["binary_accuracy"])
plt.plot(history.history["val_binary_accuracy"])
plt.title("model accuracy")
plt.ylabel("accuracy")
plt.xlabel("epoch")
plt.legend(["train", "val"], loc="upper left")
plt.show()

In [None]:
def save_model(model_id: str, model: Model, params: dict):
    model.save(f"./models/{model_id}.keras")
    params["history"] = history.history
    with open(f"./models/{model_id}.json", "w") as f:
        json.dump(params, f, indent=4)

In [None]:
save_model(model_id, model, params)