# Detection of modified images or videos using Neural Networks

## Importing the libraries

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import shutil
from kaggle.api.kaggle_api_extended import KaggleApi

import tensorflow as tf
from tensorflow import keras
from keras.models import Model, load_model
from keras.optimizers import Adam
from keras.layers import Dense, GlobalAveragePooling2D
from keras.applications import EfficientNetB3
from keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard, ReduceLROnPlateau
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import matplotlib.cm as cm

## Downloading the dataset

In [None]:
URL = "sophatvathana/casia-dataset"
PATH_DATASET = './../dataset/'

def download_dataset():
    api = KaggleApi()
    api.authenticate()
    print("Downloading files...")
    api.dataset_download_files('sophatvathana/casia-dataset', path=PATH_DATASET, unzip=True)

    print("\rDownload complete.")


def clean_directory():
    print("Moving folder...")
    os.rename(PATH_DATASET+"CASIA2/Au", PATH_DATASET+"Au")
    os.rename(PATH_DATASET+"CASIA2/Tp", PATH_DATASET+"Tp")
    
    print("Cleaning directory...")
    shutil.rmtree(PATH_DATASET+"casia")
    shutil.rmtree(PATH_DATASET+"CASIA1")
    shutil.rmtree(PATH_DATASET+"CASIA2")
    os.remove(PATH_DATASET+"Tp/Thumbs.db")
    os.remove(PATH_DATASET+"Au/Thumbs.db")
    print("Cleaning complete.")

def remove_images():
    print("Removing images...")
    for i in range(1, 4):
        os.remove(PATH_DATASET+"Au/Au ("+str(i)+").jpg")
        os.remove(PATH_DATASET+"Tp/Tp ("+str(i)+").jpg")
    print("Removing complete.")

In [None]:
if not os.path.exists(PATH_DATASET+"Au"):
    download_dataset()
    clean_directory()
else:
    print("Dataset already Downloaded.")

In [None]:
REAL_IMAGE_PATH = '../dataset/Au'
FAKE_IMAGE_PATH = "../dataset/Tp"
IMG_SIZE = (256, 256)
CLASS = ["real", "modified"]

## Loading the dataset

In [None]:
cabezera_au = "category", "image", "class"
df_au = pd.DataFrame(columns=cabezera_au)

cabezera_tp = "category", "image", "region", "class"
df_tp = pd.DataFrame(columns=cabezera_tp)

for idx, file in enumerate(os.listdir(REAL_IMAGE_PATH)):
    img_path = os.path.join(REAL_IMAGE_PATH, file)
    category = file.split("_")

    df_au = pd.concat([df_au, pd.DataFrame([[category[1], img_path, CLASS[0]]], columns=cabezera_au)], ignore_index=True)

df_au = df_au[df_au.category != "txt"]
df_au = df_au[df_au.category != "ind"]
df_au = df_au.groupby('category').head(600)

for file in os.listdir(FAKE_IMAGE_PATH):
    #convert image to np array
    img_path = os.path.join(FAKE_IMAGE_PATH, file)

    category = file.split("_")
    category[5] = category[5][:3]
    df_tp = pd.concat([df_tp, pd.DataFrame([[category[5], img_path, category[1], CLASS[1]]], columns=cabezera_tp)], ignore_index=True)

df_tp = df_tp[df_tp.category != "txt"]
df_tp = df_tp[df_tp.category != "ind"]
df_tp = df_tp.groupby(['category', 'region']).head(300)
df_tp = df_tp.drop(columns=['region'])

df = pd.concat([df_au, df_tp], ignore_index=True)
df = df.sample(frac=1).reset_index(drop=True)

df.head()


## Creating Preprocessing Function

In [None]:
def ela(image, quality=99):
    # Comprimir y descomprimir la imagen
    _, buffer = cv2.imencode('.jpg', image, [cv2.IMWRITE_JPEG_QUALITY, quality])
    compressed_image = cv2.imdecode(np.frombuffer(buffer, np.uint8), cv2.IMREAD_COLOR)

    compressed_image = compressed_image.astype(np.float32)

    diff = 15 * cv2.absdiff(image, compressed_image)
    
    return diff

def preprocess_image(image):
    image = ela(image)
    return tf.convert_to_tensor(image)

In [None]:
df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)
BATCH_SIZE = 32

train_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_image,
    validation_split=0.2,
    horizontal_flip=True,
    vertical_flip=True
)

test_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_image,
)

train_generator = train_datagen.flow_from_dataframe(
    dataframe=df_train,
    directory=None,
    x_col="image",
    y_col="class",
    subset="training",
    batch_size=BATCH_SIZE,
    seed=42,
    shuffle=True,
    class_mode="binary",
    target_size=IMG_SIZE
)

val_generator = train_datagen.flow_from_dataframe(
    dataframe=df_train,
    directory=None,
    x_col="image",
    y_col="class",
    subset="validation",
    batch_size=BATCH_SIZE,
    seed=42,
    shuffle=True,
    class_mode="binary",
    target_size=IMG_SIZE
)

test_generator = test_datagen.flow_from_dataframe(
    dataframe=df_test,
    directory=None,
    x_col="image",
    y_col="class",
    batch_size=BATCH_SIZE,
    shuffle=False,
    class_mode="binary",
    target_size=IMG_SIZE
)

21 epocas 118s 41m 26s

loss: 0.2441 - binary_accuracy: 0.9161 - precision: 0.9393 - recall: 0.9024 - auc: 0.9730 - prc: 0.9697

In [None]:
class CustomModel(tf.keras.Model):
    def __init__(self):
        super(CustomModel, self).__init__()

        self.base_model = EfficientNetB3(weights='imagenet', include_top=False, input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
        self.global_pooling = GlobalAveragePooling2D()(self.base_model.output)
        self.dense1 = Dense(2048, activation='relu')(self.global_pooling)
        self.output_layer = Dense(1, activation='sigmoid')(self.dense1)

        self.model = Model(inputs=self.base_model.inputs, outputs=self.output_layer)

    def call(self, inputs):
        return self.model(inputs)
    
    def predict(self, image, grad_cam=False):
        img_preprocessed = self._preprocess_image(image)
        if grad_cam:
            return self._predict_with_grad_cam(image, img_preprocessed)
        else:
            return super(CustomModel, self).predict(img_preprocessed)
        
    def _preprocess_image(self, img):
        img = cv2.resize(img, IMG_SIZE, interpolation=cv2.INTER_AREA)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        ela_image = ela(img)
        img_expanded = np.expand_dims(ela_image, axis=0)  
        return img_expanded

    def _get_last_conv_layer(self):
        for layer in reversed(self.model.layers):
            if isinstance(layer, tf.keras.layers.Conv2D):
                return layer.name
        return None
    
    def _predict_with_grad_cam(self, image, preprocess_image):
        image = cv2.resize(image, IMG_SIZE, interpolation=cv2.INTER_AREA)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        grad_model = tf.keras.models.Model(
            [self.model.inputs], [self.model.get_layer(self._get_last_conv_layer()).output, self.model.output]
        )
        
        with tf.GradientTape() as tape:
            last_conv_layer_output, preds = grad_model(preprocess_image)
            pred_index = tf.argmax(preds[0])
            class_channel = preds[:, pred_index]

        grads = tape.gradient(class_channel, last_conv_layer_output)
        pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
        last_conv_layer_output = last_conv_layer_output[0]
        heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
        heatmap = tf.squeeze(heatmap)
        heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
        heatmap = heatmap.numpy()

        heatmap = np.uint8(255 * heatmap)

        # Use jet colormap to colorize heatmap
        jet = cm.get_cmap("jet")

        # Use RGB values of the colormap
        jet_colors = jet(np.arange(256))[:, :3]
        jet_heatmap = jet_colors[heatmap]

        # Create an image with RGB colorized heatmap
        jet_heatmap = keras.utils.array_to_img(jet_heatmap)
        jet_heatmap = jet_heatmap.resize((image.shape[1], image.shape[0]))
        jet_heatmap = keras.utils.img_to_array(jet_heatmap)

        # Superimpose the heatmap on the original image
        superimposed_img = jet_heatmap * 0.4 + image
        superimposed_img = keras.utils.array_to_img(superimposed_img)

        return class_channel[0].numpy(), np.array(superimposed_img)

    def save_model(self, filepath):
        os.makedirs(filepath, exist_ok=True)

        # Save the model architecture and weights
        self.model.save_weights(os.path.join(filepath, 'model_weights.h5'))

        model_json = self.model.to_json()
        with open(os.path.join(filepath, 'model_architecture.json'), 'w') as json_file:
            json_file.write(model_json)
    
    @classmethod
    def load_model(cls, filepath):
        # Load the model architecture and weights
        with open(filepath + '/model_architecture.json', 'r') as json_file:
            model_json = json_file.read()
        loaded_model = cls._from_json(model_json)

        loaded_model.model.load_weights(filepath + '/model_weights.h5')
        return loaded_model

    @classmethod
    def _from_json(cls, model_json):
        # Create a model instance from JSON architecture
        model = cls()
        model.model = tf.keras.models.model_from_json(model_json)
        return model

### Creating the checkpoints

In [None]:
metrics = [
    tf.keras.metrics.BinaryAccuracy(name='binary_accuracy'),
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall'),
    tf.keras.metrics.AUC(name='auc'),
    tf.keras.metrics.AUC(name='prc', curve='PR')
]

early_stopping = EarlyStopping(
    monitor='val_loss', 
    min_delta=0, 
    patience=10, 
    verbose=0, 
    mode='auto',
    baseline=None, 
    restore_best_weights=False
)

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001)


model_chekpoint = ModelCheckpoint(
    filepath='./../model/checkpoints', 
    monitor='val_loss', 
    verbose=0, 
    save_best_only=True,
    save_weights_only=True, 
    mode='auto', 
    save_freq='epoch'
)

tensor_board = TensorBoard(
    log_dir='./../model/logs',
    histogram_freq=0,
    write_graph=True,
    write_images=False,
    update_freq='epoch',
    profile_batch=2,
    embeddings_freq=0,
    embeddings_metadata=None
)

callbacks = [early_stopping, tensor_board]
optimizer = Adam(learning_rate=1e-3)
loss = keras.losses.BinaryCrossentropy()

### Training the model

In [None]:
initial_epochs = 20
fine_tune_epochs = 30
total_epochs =  initial_epochs + fine_tune_epochs

In [None]:
model = CustomModel()
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
history = model.fit(
    train_generator,
    epochs=50,
    batch_size=BATCH_SIZE, 
    validation_data=val_generator,
    callbacks=callbacks
)

In [None]:
model.evaluate(test_generator)

In [None]:
def plot_metrics(history):
    # Obtener las métricas de entrenamiento
    loss = history.history['loss']
    accuracy = history.history['binary_accuracy']
    
    # Obtener las métricas de validación si están disponibles
    if 'val_loss' in history.history:
        val_loss = history.history['val_loss']
        val_accuracy = history.history['val_binary_accuracy']
        has_validation = True
    else:
        has_validation = False
    
    # Crear los subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
    
    # Plot de la pérdida
    ax1.plot(loss, label='Training Loss')
    if has_validation:
        ax1.plot(val_loss, label='Validation Loss')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.set_title('Training and Validation Loss')
    ax1.legend()
    
    # Plot de la precisión binaria
    ax2.plot(accuracy, label='Training Accuracy')
    if has_validation:
        ax2.plot(val_accuracy, label='Validation Accuracy')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy')
    ax2.set_title('Training and Validation Accuracy')
    ax2.legend()
    
    # Mostrar el gráfico
    plt.tight_layout()
    plt.show()

plot_metrics(history)

In [None]:
path = '../dataset/Tp/Tp_D_CRD_S_O_ani10103_ani10111_10637.jpg'
img = cv2.imread(path)
md = load_model("./../model/ela_models/detect_manipulated_images_model_EfficientNetB2.h5")
model = CustomModel()
model.model = md
y_pred, img_heamap = model.predict(img, grad_cam=True)

img = cv2.resize(img, IMG_SIZE, interpolation=cv2.INTER_AREA)

print(y_pred)

plt.figure(figsize=(10, 6))
plt.subplot(1, 3, 1)
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.title('Original Image')
plt.axis('off')

plt.subplot(1, 3, 2)
plt.imshow(ela(img))
plt.title('prepocessing Image')
plt.axis('off')

plt.subplot(1, 3, 3)
plt.imshow(img_heamap)
plt.title('Superimposed Image')
plt.axis('off')

plt.show()

In [None]:
path = '../dataset/test/me_x_3.jpg'
img = cv2.imread(path)

y_pred, img_heamap = model.predict(img, grad_cam=True)

img = cv2.resize(img, IMG_SIZE, interpolation=cv2.INTER_AREA)

print(y_pred)

plt.figure(figsize=(10, 6))
plt.subplot(1, 3, 1)
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.title('Original Image')
plt.axis('off')

plt.subplot(1, 3, 2)
plt.imshow(ela(img))
plt.title('prepocessing Image')
plt.axis('off')

plt.subplot(1, 3, 3)
plt.imshow(img_heamap)
plt.title('Superimposed Image')
plt.axis('off')

plt.show()

In [None]:
model.save_model('../model/custom_models/EfficientNetB3')