In [None]:
import warnings
warnings.filterwarnings("ignore")

import tensorflow as tf
tf.get_logger().setLevel('ERROR')

### Import necessary libraries

In [None]:
import numpy as np
import os
import matplotlib.pyplot as plt
import json
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from keras.models import Sequential, load_model
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dense, Input
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping
from PIL import Image, ImageChops, ImageEnhance
from tqdm.notebook import tqdm as tqdm
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

### Error Level Analysis

In [None]:
# Convert the input image to an ELA (Error Level Analysis) applied image
def convert_to_ela_image(path, quality):

    original_image = Image.open(path).convert('RGB')

    # Save the input image again with the desired quality
    resaved_file_name = 'resaved_image.jpg'  # default name for the resaved image
    original_image.save(resaved_file_name, 'JPEG', quality=quality)
    resaved_image = Image.open(resaved_file_name)

    # Pixel difference between the original image and the resaved image
    ela_image = ImageChops.difference(original_image, resaved_image)

    # Scale factors are calculated from the extremes of the pixels
    extrema = ela_image.getextrema()
    max_difference = max([pix[1] for pix in extrema])
    if max_difference == 0:
        max_difference = 1
    scale = 350.0 / max_difference

    # Enhance the ELA image to brighten the pixels
    ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)

    ela_image.save("ela_image.png")
    return ela_image

### Dataset Preparation

In [None]:
def prepare_image(image_path):
    image_size = (128, 128)
    #normalizing the array values obtained from input image
    return np.array(convert_to_ela_image(image_path, 90).resize(image_size)).flatten() / 255.0         

In [None]:
X = [] # ELA converted images
Y = [] # 0 for fake, 1 for real

In [None]:
#adding authentic images

path = './data/authentic'       #folder path of the authentic images in the dataset
for filename in tqdm(os.listdir(path),desc="Processing Images : "):
    if filename.endswith('jpg') or filename.endswith('png'):
        full_path = os.path.join(path, filename)
        X.append(prepare_image(full_path))        
        Y.append(1)     # label for authentic images 
        
print(f'Total images: {len(X)}\nTotal labels: {len(Y)}')

In [None]:
#adding forged images

path = './data/forged'       #folder path of the forged images in the dataset
for filename in tqdm(os.listdir(path),desc="Processing Images : "):
    if filename.endswith('jpg') or filename.endswith('png'):
        full_path = os.path.join(path, filename)
        X.append(prepare_image(full_path))        
        Y.append(0)     # label for forged images 
        
print(f'Total images: {len(X)}\nTotal labels: {len(Y)}')

In [None]:
X = np.array(X)
Y = np.array(Y)
X = X.reshape(-1, 128, 128, 3) 

### Partitioning dataset for training, validation and testing

In [None]:
# Training : Validation : Testing 
X_temp, X_test, Y_temp, Y_test = train_test_split(X, Y, test_size = 0.05, random_state=5)
X_train, X_val, Y_train, Y_val = train_test_split(X_temp, Y_temp, test_size = 0.2, random_state=5)
X = X.reshape(-1,1,1,1)

print(f'Training images: {len(X_train)} , Training labels: {len(Y_train)}')
print(f'Validation images: {len(X_val)} , Validation labels: {len(Y_val)}')
print(f'Test images: {len(X_test)} , Test labels: {len(Y_test)}')

### CNN Model

In [None]:
def build_model():
    model = Sequential()
    model.add(Input(shape=(128, 128, 3)))  # Definisce l'input come primo strato

    model.add(Conv2D(filters=64, kernel_size=(5, 5), padding='valid', activation='relu'))
    model.add(Conv2D(filters=64, kernel_size=(5, 5), padding='valid', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=64, kernel_size=(5, 5), padding='valid', activation='relu'))
    model.add(Conv2D(filters=64, kernel_size=(5, 5), padding='valid', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=64, kernel_size=(5, 5), padding='valid', activation='relu'))
    model.add(Conv2D(filters=64, kernel_size=(5, 5), padding='valid', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(filters=32, kernel_size=(5, 5), padding='valid', activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(GlobalAveragePooling2D())
    model.add(Dense(1, activation='sigmoid'))

    return model


In [None]:
model = build_model()
model.summary()

### Model Training

In [None]:
epochs = 15
batch_size = 32

In [None]:
#Optimizer
init_lr = 1e-4   #learning rate for the optimizer
optimizer = Adam(learning_rate=init_lr, weight_decay=init_lr/epochs)
model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics = ['accuracy'])

In [None]:
#Early Stopping
early_stopping = EarlyStopping(monitor = 'val_accuracy',
                               min_delta = 0,
                               patience = 10,
                               verbose = 0,
                               mode = 'auto')

In [None]:
hist = model.fit(X_train,
                 Y_train,
                 batch_size = batch_size,
                 epochs = epochs,
                 validation_data = (X_val, Y_val),
                 callbacks = [early_stopping])

In [None]:
#save the model as a h5 file
model.save('.h5') 

# get the dictionary containing each metric and the loss for each epoch
history_dict = hist.history

# save it as a json file
json.dump(history_dict, open('', 'w'))

In [None]:
def plot_loss_accuracy(train_losses, val_losses, train_accuracies, val_accuracies):
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, color="#E74C3C", label='Train Loss', marker='o')
    plt.plot(val_losses, color="#641E15", label='Validation Loss', marker='h')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss')

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, color="#E74C3C", label='Train Accuracy', marker='o')
    plt.plot(val_accuracies, color="#641E15", label='Validation Accuracy', marker='h')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Accuracy')

    plt.tight_layout()
    plt.show()

### Plotting the training and validation curves

In [None]:
train_losses = history_dict['loss']
val_losses = history_dict['val_loss']
train_accuracies = history_dict['accuracy']
val_accuracies = history_dict['val_accuracy']

plot_loss_accuracy(train_losses, val_losses, train_accuracies, val_accuracies)

### Confusion Matrix

In [None]:
def print_confusion_matrix(conf_matrix, class_names):
    fig, ax = plt.subplots(figsize=(5, 5))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", ax=ax, xticklabels=class_names, yticklabels=class_names)
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.title('Confusion Matrix')
    plt.show()

In [None]:
Y_pred = model.predict(X_val)  # Predict the values from the validation dataset
Y_pred_classes = np.round(Y_pred)  # Round off the sigmoid value

Y_true = Y_val
class_names = ['Forged', 'Authentic']
conf_matrix = confusion_matrix(Y_true, Y_pred_classes)

print_confusion_matrix(conf_matrix, class_names)

### Classification Report

In [None]:
print(classification_report(Y_true, Y_pred_classes))

### Testing Accuracy

In [None]:
class_names = ['Forged', 'Authentic']

In [None]:
# Testing accuracy
correct_test = 0 #correctly predicted test images
total_test = 0   #total test images

for index,image in enumerate(tqdm(X_test,desc="Processing Images : ")):
    image = image.reshape(-1, 128, 128, 3)
    y_pred = model.predict(image)
    y_pred_class = np.round(y_pred)
    total_test += 1
    if y_pred_class == Y_test[index]: #if prediction is correct
        correct_test += 1
    
print(f'Total test images: {total_test}\nCorrectly predicted images: {correct_test}\nAccuracy: {correct_test / total_test * 100.0} %')

# Test an image

In [None]:
def prepare_image(fname):
    # return ela_image as a numpy array
    image_size = (128, 128)
    return (
        np.array(convert_to_ela_image(fname, 90).resize(image_size)).flatten()
        / 255.0
    )  

def predict_result(fname):
    model = load_model("trained_model.h5")  # load the trained model
    class_names = ["Forged", "Authentic"]  # classification outputs
    test_image = prepare_image(fname)
    test_image = test_image.reshape(-1, 128, 128, 3)

    y_pred = model.predict(test_image)
    y_pred_class = round(y_pred[0][0])

    prediction = class_names[y_pred_class]
    if y_pred <= 0.5:
        confidence = f"{(1-(y_pred[0][0])) * 100:0.2f}"
    else:
        confidence = f"{(y_pred[0][0]) * 100:0.2f}"
    return (prediction, confidence)

In [None]:
test_image_path = './data/test/forged/splicing.jpg'    # test image path
test_image = prepare_image(test_image_path)
test_image = test_image.reshape(-1, 128, 128, 3)

y_pred = model.predict(test_image)
y_pred_class = round(y_pred[0][0])

fig, ax = plt.subplots(1,2,figsize=(15,5)) 

#display original image
original_image = plt.imread(test_image_path) 
ax[0].axis('off')
ax[0].imshow(original_image)
ax[0].set_title('Original Image')

#display ELA applied image
ax[1].axis('off')
ax[1].imshow(convert_to_ela_image(test_image_path,90)) 
ax[1].set_title('ELA Image')

print(f'Prediction: {class_names[y_pred_class]}')
if y_pred<=0.5:
    print(f'Confidence:  {(1-(y_pred[0][0])) * 100:0.2f}%')
else:
    print(f'Confidence: {(y_pred[0][0]) * 100:0.2f}%')
print('--------------------------------------------------------------------------------------------------------------')

In [None]:
test_image_path = './data/test/authentic/img2.jpg'    # test image path
test_image = prepare_image(test_image_path)
test_image = test_image.reshape(-1, 128, 128, 3)

y_pred = model.predict(test_image)
y_pred_class = round(y_pred[0][0])

fig, ax = plt.subplots(1,2,figsize=(15,5)) 

#display original image
original_image = plt.imread(test_image_path) 
ax[0].axis('off')
ax[0].imshow(original_image)
ax[0].set_title('Original Image')

#display ELA applied image
ax[1].axis('off')
ax[1].imshow(convert_to_ela_image(test_image_path,90)) 
ax[1].set_title('ELA Image')

print(f'Prediction: {class_names[y_pred_class]}')
if y_pred<=0.5:
    print(f'Confidence:  {(1-(y_pred[0][0])) * 100:0.2f}%')
else:
    print(f'Confidence: {(y_pred[0][0]) * 100:0.2f}%')
print('--------------------------------------------------------------------------------------------------------------')

# Test a dataset

In [None]:
test_folder_path = './data/test/dataset/'
authentic, forged, total = 0, 0, 0
y_true, y_pred_custom = [], []

for filename in tqdm(os.listdir(test_folder_path), desc="Processing Images : "):
    if filename.endswith('jpg') or filename.endswith('png'):
        test_image_path = os.path.join(test_folder_path, filename)
        test_image = prepare_image(test_image_path)  
        
        if test_image is not None:
            test_image = test_image.reshape(-1, 128, 128, 3)
            y_pred = model.predict(test_image)
            y_pred_class = np.round(y_pred)
            
            y_true.append(0 if 'forged' in filename.lower() else 1)
            y_pred_custom.append(y_pred_class)

            total += 1
            if y_pred_class == 0:
                forged += 1
            else:
                authentic += 1

print(f'Total images: {total}\nAuthentic Images: {authentic}\nForged Images: {forged}')

## Comparison with well known models 'GoogleNet', 'SqueezeNet' and 'AlexNet'

In [None]:
# Carica i modelli pre-addestrati
googlenet = models.googlenet(pretrained=True)
squeezenet = models.squeezenet1_0(pretrained=True)
alexnet = models.alexnet(pretrained=True)

In [None]:
def train_model(model, train_loader, device, epochs=15):
    model.to(device)  

    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    train_losses = []
    train_accuracies = []

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs.float()) 
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = correct / total
        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_accuracy)

        print(f'Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')

    return train_losses, train_accuracies

### Prepare DataLoaders

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [None]:
# define the transformation
transform = transforms.Compose([
    transforms.ToTensor(),  
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
])

train_dataset = CustomDataset(X_train, Y_train, transform=transform)
val_dataset = CustomDataset(X_val, Y_val, transform=transform)
test_dataset = CustomDataset(X_test, Y_test, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Verify the data
for inputs, labels in train_loader:
    print(inputs.shape, labels.shape)  
    break  

## Finetuning

In [None]:
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
#train_losses_googlenet, train_accuracies_googlenet = train_model(googlenet, train_loader, device=dev, epochs=10)

In [None]:
#train_losses_squeezenet, train_accuracies_squeezenet = train_model(squeezenet, train_loader, device=dev, epochs=10)

In [None]:
train_losses_alexnet, train_accuracies_alexnet = train_model(alexnet, train_loader, device=dev, epochs=10)

### Saving models finetuned and them histories

In [None]:
torch.save(googlenet.state_dict(), 'googlenet_finetuned.pt')
torch.save(squeezenet.state_dict(), 'squeezenet_finetuned.pt')
torch.save(alexnet.state_dict(), 'alexnet_finetuned.pt')

history_googlenet = {
    'train_losses': train_losses_googlenet,
    'train_accuracies': train_accuracies_googlenet
}

history_squeezenet = {
    'train_losses': train_losses_squeezenet,
    'train_accuracies': train_accuracies_squeezenet
}

history_alexnet = {
    'train_losses': train_losses_alexnet,
    'train_accuracies': train_accuracies_alexnet
}

with open('googlenet_history.json', 'w') as f:
    json.dump(history_googlenet, f)

with open('squeezenet_history.json', 'w') as f:
    json.dump(history_squeezenet, f)

with open('alexnet_history.json', 'w') as f:
    json.dump(history_alexnet, f)

## Plot train curves

In [None]:
# Plot delle curve di loss e accuracy
def plot_loss_accuracy(train_losses, train_accuracies, model_name):
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, color="#641E15", label='Validation Accuracy', marker='h')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title(f'{model_name} Loss')

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, color="#E74C3C", label='Train Accuracy', marker='o')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title(f'{model_name} Accuracy')

    plt.tight_layout()
    plt.show()

In [None]:
plot_loss_accuracy(train_losses_googlenet, train_accuracies_googlenet, "GoogLeNet")

In [None]:
plot_loss_accuracy(train_losses_squeezenet, train_accuracies_squeezenet, "SqueezeNet")

In [None]:
plot_loss_accuracy(train_losses_alexnet, train_accuracies_alexnet, "AlexNet")

## Observation

<h4>
Due to time constraints, only a limited number of training epochs were permitted for the GoogLeNet and SqueezeNet models. <br>
However, this is not a significant issue as both models are highly complex and require extensive training time. Furthermore, as evidenced by the learning curves, both models demonstrated excellent generalisation capabilities, indicating that even with limited training, they can already offer remarkable performance.
</h4>