# Image and Video Anomaly Detector

In [1]:
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices('GPU')
print("Number of GPUs Available: ", len(physical_devices))
tf.config.experimental.set_memory_growth(physical_devices[0], True)

Number of GPUs Available:  1


## Import of libraries

In [2]:
from preprocess import *
from utils import *
import random
import os
import glob
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from keras.utils.np_utils import to_categorical
import tensorflow as tf
from sklearn.metrics import confusion_matrix
import seaborn as sns


np.random.seed(2)
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, MaxPool2D, Dropout, MaxPooling2D
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint
#from numba import jit, cuda

from PIL import Image, ImageChops, ImageEnhance,ImageFilter, ImageOps
from io import BytesIO
import cv2
from scipy.fftpack import dct
from scipy import ndimage
from scipy import fftpack

### Structure of the models

In [3]:
def build_model(X_train, Y_train, X_val, Y_val, img_size=(128,128)):
    model = Sequential()
    model.add(Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu', input_shape=(img_size[0], img_size[1], 3)))
    model.add(Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Conv2D(filters=64, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=64, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Conv2D(filters=128, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=128, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(2, activation='softmax'))

    model.summary()

    epochs = 30
    batch_size = 32
    init_lr = 1e-4
    opt = Adam(lr=init_lr, decay=init_lr / epochs)
    model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])

    early_stopping = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=5, verbose=0, mode='auto')
    checkpoint = ModelCheckpoint('../models/model.h5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

    history = model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs, validation_data=(X_val, Y_val), callbacks=[early_stopping, checkpoint])

    # Plot training and validation accuracy and loss curves
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')
    #guardamos la gráfica , ponemos el nombre del modelo y la precisión
    plt.savefig('../graphs/model_acc_'+str(history.history['val_accuracy'][-1])+'.png')
    plt.show()

    plt.figure(figsize=(10, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    #guardamos la gráfica , ponemos el nombre del modelo y la precisión
    plt.savefig('../graphs/model_loss_'+str(history.history['val_accuracy'][-1])+'.png')
    plt.show()

    #guardamos el modelo
    model.save('../models/model'+str(history.history['val_accuracy'][-1])+'.h5')

    score = model.evaluate(X_val, Y_val, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    return model

def build_model_2(X_train, Y_train, X_val, Y_val, img_size=(128,128)):
    model = Sequential()

    model.add(Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu', input_shape=(img_size[0], img_size[1], 3)))
    model.add(Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=64, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=64, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=128, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=128, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=256, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=256, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=256, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=512, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=512, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=512, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=512, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=512, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(Conv2D(filters=512, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(2, activation='softmax'))

    model.summary()

    epochs = 50
    batch_size = 32
    init_lr = 1e-4
    opt = Adam(lr=init_lr, decay=init_lr / epochs)
    model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])

    early_stopping = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=2, verbose=0, mode='auto')
    checkpoint = ModelCheckpoint('../models/model.h5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

    history = model.fit(X_train, Y_train, validation_data=(X_val, Y_val), epochs=epochs, batch_size=batch_size, callbacks=[early_stopping, checkpoint])


    # Plot training and validation accuracy and loss curves
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')
    #guardamos la gráfica , ponemos el nombre del modelo y la precisión
    plt.savefig('../graphs/model_acc_'+str(history.history['val_accuracy'][-1])+'.png')
    plt.show()

    
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    #guardamos la gráfica , ponemos el nombre del modelo y la precisión
    plt.savefig('../graphs/model_loss_'+str(history.history['val_accuracy'][-1])+'.png')
    plt.show()

    #guardamos el modelo
    model.save('../models/model'+str(history.history['val_accuracy'][-1])+'.h5')

    score = model.evaluate(X_val, Y_val, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    return model


def build_model_3(X_train, Y_train, X_val, Y_val, img_size=(128,128)):
    model = Sequential()
    model.add(Conv2D(filters = 32, kernel_size = (5, 5), padding = 'valid', activation = 'relu', input_shape = (img_size[0], img_size[1], 3)))
    model.add(Conv2D(filters = 32, kernel_size = (5, 5), padding = 'valid', activation = 'relu', input_shape = (img_size[0], img_size[1], 3)))
    model.add(MaxPool2D(pool_size = (2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(256, activation = 'relu'))
    model.add(Dropout(0.5))
    model.add(Dense(2, activation = 'softmax'))

    model.summary()

    epochs = 50
    batch_size = 32
    init_lr = 1e-4
    opt = Adam(lr=init_lr, decay=init_lr / epochs)
    model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])

    early_stopping = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=2, verbose=0, mode='auto')
    checkpoint = ModelCheckpoint('../models/model.h5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

    history = model.fit(X_train, Y_train, validation_data=(X_val, Y_val), epochs=epochs, batch_size=batch_size, callbacks=[early_stopping, checkpoint])
    
    # Plot training and validation accuracy and loss curves
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')
    #guardamos la gráfica , ponemos el nombre del modelo y la precisión
    plt.savefig('../graphs/model_acc_'+str(history.history['val_accuracy'][-1])+'.png')
    plt.show()

    
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    #guardamos la gráfica , ponemos el nombre del modelo y la precisión
    plt.savefig('../graphs/model_loss_'+str(history.history['val_accuracy'][-1])+'.png')
    plt.show()

    #guardamos el modelo
    model.save('../models/model'+str(history.history['val_accuracy'][-1])+'.h5')

    score = model.evaluate(X_val, Y_val, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    return model



def build_splice_classification_model(X_train, Y_train, X_val, Y_val):
    model = Sequential()
    model.add(Conv2D(filters = 32, kernel_size = (5, 5), padding = 'valid', activation = 'relu', input_shape = (128, 128, 3)))
    model.add(Conv2D(filters = 32, kernel_size = (5, 5), padding = 'valid', activation = 'relu', input_shape = (128, 128, 3)))
    model.add(Conv2D(filters = 32, kernel_size = (3, 3), padding = 'valid', activation = 'relu'))
    model.add(MaxPool2D(pool_size = (2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(256, activation = 'relu'))
    model.add(Dropout(0.5))
    model.add(Dense(2, activation = 'softmax'))

    model.summary()

    epochs = 40
    batch_size = 32
    init_lr = 1e-4
    opt = Adam(lr=init_lr, decay=init_lr / epochs)
    model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])

    early_stopping = EarlyStopping(monitor = 'val_accuracy', min_delta = 0, patience = 2,verbose = 0,mode = 'auto')
    
    model.fit(X_train,Y_train,batch_size = batch_size, epochs = epochs,validation_data = (X_val, Y_val), callbacks = [early_stopping])

    #calculamos la precisión del modelo
    score = model.evaluate(X_val, Y_val, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    model.save('models/splice_model'+str(score[1])+'.h5')

    return model


### Preprocessing functions

### Main Model

In [None]:
if __name__ == '__main__':
    real_images_path = '../data/dataset/data/CASIA2/Au/*.*'
    fake_images_path = '../data/dataset/data/CASIA2/Tp/*.*'
    image_size = (128, 128)
    
    X = []
    y = []

    print("Cargando datos de imágenes reales...")
    for file_path in glob.glob(real_images_path):
            X.append(preparete_image_ela(file_path, image_size))
            y.append(0)

    # poner de manera aleatoria las imágenes 
    random.shuffle(X)
   
    print("Cargando datos de imágenes falsificadas...")
    for file_path in glob.glob(fake_images_path):
            X.append(preparete_image_ela(file_path, image_size))
            y.append(1)


    print("X shape: ", np.array(X).shape)
    print("y shape: ", np.array(y).shape)

    X= np.array(X).reshape(-1, image_size[0], image_size[1], 3)
    y = to_categorical(y, num_classes = 2)

    # Entrenamos con 80% de los datos de train, usar 10% para validación y 10% para test
    X_train, X_val, Y_train, Y_val = train_test_split(X, y, test_size = 0.2, random_state=5)
    X_val, X_test, Y_val, Y_test = train_test_split(X_val, Y_val, test_size = 0.5, random_state=5)

In [None]:
model = build_model(X_train, Y_train, X_val, Y_val,image_size)
#model = tf.keras.models.load_model('../models/MODEL_ELA_V4_094_TEST.h5')

# Calculamos la precisión del modelo con los datos de test
score = model.evaluate(X_test, Y_test)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

# Calcula la matriz de confusión
y_pred = model.predict(X_test)
cm = confusion_matrix(y_true=Y_test.argmax(axis=1), y_pred=y_pred.argmax(axis=1))

# Libera la memoria de la GPU
tf.keras.backend.clear_session()
tf.compat.v1.reset_default_graph()
    

In [None]:
model = tf.keras.models.load_model('../models/MODEL_ELA_V4_0.90.h5')

# Calculamos la precisión del modelo con los datos de test
score = model.evaluate(X_test, Y_test)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

class_names = ['real', 'fake']

image_size = (128, 128)

#Obtener todos los path de las imagenes de test
test_images_path = '../data/dataset/data/CASIA2/Tp/*.*'

correcto= 0
total = 0

for file_path in glob.glob(test_images_path):
    img = preparete_image_ela(file_path, image_size)
    img = np.array(img).reshape(-1, image_size[0], image_size[1], 3)
    y_pred = model.predict(img)
    y_pred_class = np.argmax(y_pred, axis = 1)[0]
    total += 1
    if y_pred_class == 1:
        correcto += 1
        print(f'Class: {class_names[1]} Confidence: {np.amax(y_pred) * 100:0.2f}')
    
print(f'Total: {total}, Correct: {correcto}, Acc: {correcto / total * 100.0}')
    

# mostrar imagen y escribir el resultado en la imagen
img = cv2.imread(file_path)
cv2.putText(img, f'Class: {class_names[y_pred_class]} Confidence: {np.amax(y_pred) * 100:0.2f}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
# show with matplotlib
plt.imshow(img)
plt.show()