In [None]:
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import train_test_split

class SimpleBoostingClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, base_models, rounds=5, learning_rate=0.1, validation_fraction=0.1, random_state=42):
        self.base_models = base_models
        self.rounds = rounds
        self.learning_rate = learning_rate
        self.validation_fraction = validation_fraction
        self.random_state = random_state
        self.models = []
        self.alphas = []

    def fit(self, X, y):
        # Asegurarse de que y sea un array 1D
        y = np.ravel(y)

        # Dividir los datos en conjuntos de entrenamiento y validación
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=self.validation_fraction, random_state=self.random_state)

        self.classes_ = np.unique(y_train)

        # Inicializar pesos
        weights = np.ones(len(y_train)) / len(y_train)

        for i in range(self.rounds):
            errors = np.zeros(len(self.base_models))
            predictions = np.zeros((len(self.base_models), len(y_train)))

            # Entrenar modelos base y calcular errores
            for i, model in enumerate(self.base_models):
                model.fit(X_train, y_train, sample_weight=weights)
                pred = model.predict(X_train)
                predictions[i] = (pred.ravel() > 0.5).astype(int)  # Convertir probabilidades a etiquetas
                errors[i] = np.sum(weights * (predictions[i] != y_train.astype(int)))

            # Seleccionar el mejor modelo
            best_model_index = np.argmin(errors)
            best_model = self.base_models[best_model_index]
            best_pred = predictions[best_model_index]

            # Calcular alpha
            error = errors[best_model_index]
            alpha = self.learning_rate * (np.log((1 - error) / error) + np.log(len(self.classes_) - 1))

            # Actualizar pesos
            weights *= np.exp(alpha * (best_pred != y_train.astype(int)))
            weights /= np.sum(weights)

            # Guardar modelo y alpha
            self.models.append(best_model)
            self.alphas.append(alpha)

            # Evaluar en el conjunto de validación
            val_pred = self.predict(X_val)
            val_error = np.mean(val_pred != y_val.astype(int))
            print(f"Validation error after round {i} model {len(self.models)}: {val_error:.4f}")

        return self

    def predict(self, X):
        predictions = np.zeros((len(self.models), X.shape[0]))
        for i, model in enumerate(self.models):
            pred = model.predict(X)
            predictions[i] = (pred.ravel() > 0.5).astype(int)  # Convertir probabilidades a etiquetas

        weighted_preds = np.sum(np.array(self.alphas)[:, np.newaxis] * predictions, axis=0)

        return self.classes_[(weighted_preds > 0).astype(int)]

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Input
from tensorflow.keras.optimizers import Adam
import tensorflow as tf

class CNNImageForgeryPredictorModel(BaseEstimator, ClassifierMixin):
    def __init__(self, compression_quality: int = 90):
        self.compression_quality = compression_quality

        model = Sequential([
            Input(shape=(128, 128, 1)),
            Conv2D(32, (3, 3), activation='relu'),
            Conv2D(32, (3, 3), activation='relu'),
            Conv2D(32, (3, 3), activation='relu'),
            MaxPooling2D(pool_size=(2, 2)),
            Flatten(),
            Dense(256, activation='relu'),
            Dense(1, activation='sigmoid')
        ])
        model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])
        
        self.model = model

    def recomprimir_imagen_tf(self, imagen):
        imagen_jpeg = tf.image.encode_jpeg(imagen, quality=self.compression_quality)
        imagen_recomprimida = tf.image.decode_jpeg(imagen_jpeg)
        return imagen_recomprimida
    
    def preprocess_image(self, image):
        imagen_original = tf.image.decode_jpeg(image, channels=3)
        image_compressed = self.recomprimir_imagen_tf(imagen_original)
        diff = tf.abs(tf.cast(imagen_original, tf.int32) - tf.cast(image_compressed, tf.int32))
        diferencia_gris = tf.image.rgb_to_grayscale(tf.cast(diff, tf.uint8))
        resized = tf.image.resize(diferencia_gris, (128, 128))
        return resized
    
    def prepare_dataset(self, X):
        X_processed = [self.preprocess_image(image) for image in X]
        return np.array(X_processed)

    def fit(self, X, y, sample_weight):
        X_processed = self.prepare_dataset(X)
        self.model.fit(X_processed, y, sample_weight=sample_weight)

    def predict(self, X):
        X_processed = self.prepare_dataset(X)
        return self.model.predict(X_processed)


In [None]:
import cv2
import numpy as np
from sklearn import svm
from skimage.feature import graycomatrix, graycoprops

class SVMImageForgeryPredictorModel(BaseEstimator, ClassifierMixin):
    def __init__(self):
        self.model = svm.SVC()

    @staticmethod
    def fourier_transform(image):
        f = np.fft.fft2(image)
        fshift = np.fft.fftshift(f)
        magnitude_spectrum = 20*np.log(np.abs(fshift))
        magnitude_spectrum[np.isinf(magnitude_spectrum)] = 0  # Reemplazar infinitos con 0
        return magnitude_spectrum
    
    @staticmethod
    def noise_features(image):
        # modelo de ruido básico
        mean_noise = np.mean(image)
        std_noise = np.std(image)
        return mean_noise, std_noise
    
    @staticmethod
    def edge_detection(image):
        edges = cv2.Canny(image, 100, 200)
        return edges
    
    @staticmethod
    def texture_features(image):
        g = graycomatrix(image, [1], [0, np.pi/4, np.pi/2, 3*np.pi/4], levels=256)
        contrast = graycoprops(g, 'contrast')
        return np.mean(contrast)
    
    @staticmethod
    # Compatible with grey scale 
    def segment_image(image, k=4):
        # Flatten the image to a 1D array suitable for k-means
        Z = image.reshape((-1, 1))

        # Convert to float32
        Z = np.float32(Z)

        # Criteria and k-means application
        criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
        ret, label, center = cv2.kmeans(Z, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)

        # Convert back to uint8 and map centers to the original image
        center = np.uint8(center)
        res = center[label.flatten()]
        segmented_image = res.reshape((image.shape))

        return segmented_image

    def extract_features(self, image):
        ft = self.fourier_transform(image).ravel()  # Aplana el resultado de la transformada de Fourier 70
        nf = [], []#noise_features(image)  # Retorna dos escalares 72
        ed = []#edge_detection(image).ravel()  # Aplana los bordes detectados 77
        tf = []#np.array([texture_features(image)])  # Envuelve el escalar en un arreglo 72
        seg = self.segment_image(image).ravel()  # Aplana la imagen segmentada 58
        
        # Concatena todas las características en un solo arreglo 1D
        return np.hstack([ft, nf[0], nf[1], ed, tf, seg])

    def preprocess_image(self, image):
        image = tf.image.decode_jpeg(image, channels=3)
        image = tf.image.rgb_to_grayscale(tf.cast(image, tf.float32))
        image = tf.image.resize(image, (256, 384))
        image = image.numpy()
        feat = self.extract_features(image)
        return feat

    def prepare_dataset(self, images):
        images = np.array(images)
        features = [self.preprocess_image(image) for image in images]
        return np.array(features)
    
    def fit(self, X, y, sample_weight):
        X_processed = self.prepare_dataset(X)
        self.model.fit(X_processed, y, sample_weight=sample_weight)

    def predict(self, X):
        X_processed = self.prepare_dataset(X)
        return self.model.predict(X_processed)

In [None]:
import tensorflow as tf

class TransferLearningImageForgeryPredictorModel(BaseEstimator, ClassifierMixin):
    def __init__(self, img_size=(160, 160), dropout_rate=0.2, learning_rate=0.0001):
        ## Configuración de la imagen
        self.img_size = img_size
        self.img_shape = self.img_size + (3,)

        ## Data Augmentation
        self.data_augmentation = tf.keras.Sequential([
            tf.keras.layers.RandomFlip('horizontal'),
            tf.keras.layers.RandomRotation(0.2),
        ])
        self.preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input

        ## Modelo base
        self.base_model = tf.keras.applications.MobileNetV2(input_shape=self.img_shape,
                                                           include_top=False,
                                                           weights='imagenet')
        self.base_model.trainable = False

        ## Capas adicionales
        self.global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
        self.dropout_layer = tf.keras.layers.Dropout(dropout_rate)
        self.prediction_layer = tf.keras.layers.Dense(1)

        ## Construcción del modelo
        self.inputs = tf.keras.Input(shape=self.img_shape)
        x = self.data_augmentation(self.inputs)
        x = self.preprocess_input(x)
        x = self.base_model(x, training=False)
        x = self.global_average_layer(x)
        x = self.dropout_layer(x)
        self.outputs = self.prediction_layer(x)
        self.model = tf.keras.Model(self.inputs, self.outputs)

        ## Compilación del modelo
        self.base_learning_rate = learning_rate
        self.model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.base_learning_rate),
                          loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                          metrics=['accuracy'])
    
    def preprocess_image(self, image):
        image = tf.image.decode_jpeg(image, channels=3)
        image = tf.image.resize(image, self.img_size)
        return image
    
    def prepare_dataset(self, X):
        X_processed = [self.preprocess_image(image) for image in X]
        return np.array(X_processed)

    def fit(self, X, y, sample_weight):
        X_processed = self.prepare_dataset(X)

        initial_epochs = 10
        history = self.model.fit(X_processed, y, epochs=initial_epochs, sample_weight=sample_weight)
        self.base_model.trainable = True

        # Fine-tune from this layer onwards
        fine_tune_at = 100

        # Freeze all the layers before the `fine_tune_at` layer
        for layer in self.base_model.layers[:fine_tune_at]:
            layer.trainable = False

        self.model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              optimizer = tf.keras.optimizers.RMSprop(learning_rate=self.base_learning_rate/10),
              metrics=['accuracy'])
        
        fine_tune_epochs = 10
        total_epochs =  initial_epochs + fine_tune_epochs

        self.model.fit(X_processed, y, epochs=total_epochs, initial_epoch=history.epoch[-1])

    def predict(self, X):
        X_processed = self.prepare_dataset(X)
        return self.model.predict(X_processed)


In [None]:
import os
import tensorflow as tf

# Path to your CASIA2 dataset
PATH = '../data/CASIA2'

# Directories for authentic and tampered images
authentic_dir = os.path.join(PATH, 'Au')
tampered_dir = os.path.join(PATH, 'Tp2')

authentic_number = 100
tampered_number = 100

def load_images_from_directory(directory_path, n, i=0):
    images = []
    for filename in os.listdir(directory_path)[i:n]:
        if filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".png"):
            image_path = os.path.join(directory_path, filename)
            imagen_original = tf.io.read_file(image_path)
            images.append(imagen_original)
    return images

# Get file lists and labels
authentic_files = load_images_from_directory(authentic_dir, authentic_number)
tampered_files = load_images_from_directory(tampered_dir, tampered_number)
authentic_labels = [0] * len(authentic_files)
tampered_labels = [1] * len(tampered_files)

# Combine authentic and tampered data
all_files = authentic_files + tampered_files
all_labels = authentic_labels + tampered_labels

all_files = np.array(all_files)
all_labels = np.array(all_labels)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    all_files, all_labels, test_size=0.4, random_state=42
)

In [None]:
y_train = y_train.ravel()
y_test = y_test.ravel()

In [7]:
from sklearn.metrics import accuracy_score

# Crear modelos base
base_models = [TransferLearningImageForgeryPredictorModel() for _ in range(2)] + [SVMImageForgeryPredictorModel() for _ in range(2)] + [CNNImageForgeryPredictorModel() for _ in range(2)]

# Crear y entrenar el modelo de boosting con early stopping
boosting_model = SimpleBoostingClassifier(
    base_models, 
    rounds=1
)
boosting_model.fit(X_train, y_train)

# Hacer predicciones y evaluar
y_pred = boosting_model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")