# Import library

In [1]:
#import necessary libraries

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
np.random.seed(2)
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from pathlib import Path
import h5py
from keras.utils.np_utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, MaxPool2D, Dropout
from keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator
import itertools

In [2]:
from PIL import Image
import os
from pylab import *
import re
from PIL import Image, ImageChops, ImageEnhance
import tensorflow as tf
from tensorflow.keras.utils import Sequence
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Apply Error Level Analysis

In [3]:
def convert_to_ela_image(path, quality):
    temp_filename = 'temp_file_name.jpg'
    ela_filename = 'temp_ela.png'
    
    image = Image.open(path).convert('RGB')
    image.save(temp_filename, 'JPEG', quality = quality)
    temp_image = Image.open(temp_filename)
    
    ela_image = ImageChops.difference(image, temp_image)
    
    extrema = ela_image.getextrema()
    max_diff = max([ex[1] for ex in extrema])
    if max_diff == 0:
        max_diff = 1
    scale = 255.0 / max_diff
    
    ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)
    
    return ela_image

In [4]:
image_size = (128, 128)

In [5]:
def prepare_image(image_path):
    return np.array(convert_to_ela_image(image_path, 90).resize(image_size)).flatten() / 255.0

In [6]:
X = [] # ELA converted images
Y = [] # 0 for fake, 1 for real

# We are doing ELA for Real Images

In [7]:
import random
path = 'Data/real2/'
for dirname, _, filenames in os.walk(path):
    for filename in filenames:
        if filename.endswith('jpg') or filename.endswith('png'):
            full_path = os.path.join(dirname, filename)
            X.append(prepare_image(full_path))
            Y.append(1)
            if len(Y) % 10000 == 0:
                print(f'Processing {len(Y)} images')

#random.shuffle(X)
#X = X[:2100]
#Y = Y[:2100]
print(len(X), len(Y))

Processing 500 images
Processing 1000 images
Processing 1500 images
Processing 2000 images
Processing 2500 images
Processing 3000 images
Processing 3500 images
Processing 4000 images
Processing 4500 images
Processing 5000 images
Processing 5500 images
Processing 6000 images
Processing 6500 images
Processing 7000 images
Processing 7500 images
Processing 8000 images
Processing 8500 images
Processing 9000 images
Processing 9500 images
Processing 10000 images
Processing 10500 images
Processing 11000 images
Processing 11500 images
Processing 12000 images
Processing 12500 images
Processing 13000 images
Processing 13500 images
Processing 14000 images
Processing 14500 images
Processing 15000 images
Processing 15500 images
Processing 16000 images
Processing 16500 images
Processing 17000 images
Processing 17500 images
Processing 18000 images
Processing 18500 images
Processing 19000 images
Processing 19500 images
Processing 20000 images
Processing 20500 images
Processing 21000 images
Processing 2

# Applying ELA for Fake images

In [8]:
path = 'data/fake2/'
for dirname, _, filenames in os.walk(path):
    for filename in filenames:
        if filename.endswith('jpg') or filename.endswith('png'):
            full_path = os.path.join(dirname, filename)
            X.append(prepare_image(full_path))
            Y.append(0)
            if len(Y) % 10000 == 0:
                print(f'Processing {len(Y)} images')

print(len(X), len(Y))

Processing 50500 images
Processing 51000 images
Processing 51500 images
Processing 52000 images
Processing 52500 images
Processing 53000 images
Processing 53500 images
Processing 54000 images
Processing 54500 images
Processing 55000 images
Processing 55500 images
Processing 56000 images
Processing 56500 images
Processing 57000 images
Processing 57500 images
Processing 58000 images
Processing 58500 images
Processing 59000 images
Processing 59500 images
Processing 60000 images
Processing 60500 images
Processing 61000 images
Processing 61500 images
Processing 62000 images
Processing 62500 images
Processing 63000 images
Processing 63500 images
Processing 64000 images
Processing 64500 images
Processing 65000 images
Processing 65500 images
Processing 66000 images
Processing 66500 images
Processing 67000 images
Processing 67500 images
Processing 68000 images
Processing 68500 images
Processing 69000 images
Processing 69500 images
Processing 70000 images
Processing 70500 images
Processing 71000

In [9]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import Sequence, to_categorical

# Create a directory to store the processed batches
os.makedirs('processed_batches', exist_ok=True)

# Process the data in batches and save each batch separately
batch_size = 1000
num_batches = (len(X) + batch_size - 1) // batch_size
for batch_num in range(num_batches):
    batch_start = batch_num * batch_size
    batch_end = min(batch_start + batch_size, len(X))
    X_batch = np.array(X[batch_start:batch_end])
    Y_batch = np.array(Y[batch_start:batch_end])
    X_batch = X_batch.reshape(-1, 128, 128, 3)
    Y_batch = to_categorical(Y_batch, 2)
    np.save(f'processed_batches/X_batch_{batch_num}.npy', X_batch)
    np.save(f'processed_batches/Y_batch_{batch_num}.npy', Y_batch)

# Get the list of processed batch files
X_files = sorted([f for f in os.listdir('processed_batches') if f.startswith('X_batch_')])
Y_files = sorted([f for f in os.listdir('processed_batches') if f.startswith('Y_batch_')])

# Perform the train-test split on the batch files
train_files, val_files = train_test_split(list(zip(X_files, Y_files)), test_size=0.2, random_state=5)

class DataGenerator(Sequence):
    def __init__(self, batch_files, batch_size):
        self.batch_files = batch_files
        self.batch_size = batch_size

    def __len__(self):
        return len(self.batch_files)

    def __getitem__(self, index):
        X_file, Y_file = self.batch_files[index]
        X_batch = np.load(f'processed_batches/{X_file}')
        Y_batch = np.load(f'processed_batches/{Y_file}')
        return X_batch, Y_batch

# Create separate generators for training and validation
train_generator = DataGenerator(train_files, batch_size)
val_generator = DataGenerator(val_files, batch_size)

# Print the lengths of the training and validation sets
print(len(train_generator), len(train_generator))
print(len(val_generator), len(val_generator))



80 80
20 20


# Split data into Train and Test

In [10]:
# X_train, X_val, Y_train, Y_val = train_test_split(X_processed, Y_processed, test_size=0.2, random_state=5)
# X = X.reshape(-1,1,1,1)
# print(len(X_train), len(Y_train))
# print(len(X_val), len(Y_val))

# Build the Model

In [11]:
def build_model():
    model = Sequential()
    model.add(Conv2D(filters = 32, kernel_size = (5, 5), padding = 'valid', activation = 'relu', input_shape = (128, 128, 3)))
    model.add(MaxPool2D(pool_size = (2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(256, activation = 'relu'))
    model.add(Dropout(0.5))
    model.add(Dense(2, activation = 'softmax'))
    return model

In [12]:
model = build_model()
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 124, 124, 32)      2432      
                                                                 
 max_pooling2d (MaxPooling2D  (None, 62, 62, 32)       0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 62, 62, 32)        0         
                                                                 
 flatten (Flatten)           (None, 123008)            0         
                                                                 
 dense (Dense)               (None, 256)               31490304  
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                        

# Compile the model

In [21]:
epochs = 20
batch_size = 6

In [22]:
init_lr = 1e-3
optimizer = Adam(learning_rate = init_lr, decay = init_lr/epochs)

In [23]:
model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics = ['accuracy'])

# Train the model

In [24]:
hist = model.fit(train_generator,
                 steps_per_epoch=len(train_generator),
                 epochs=epochs,
                 validation_data=val_generator,
                 validation_steps=len(val_generator))

Epoch 1/20


2024-04-22 08:42:22.990157: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [1]:
# Plot the loss and accuracy curves for training and validation 
fig, ax = plt.subplots(2,1)
ax[0].plot(hist.history['loss'], color='b', label="Training loss")
ax[0].plot(hist.history['val_loss'], color='r', label="validation loss")
legend = ax[0].legend(loc='best', shadow=True)

ax[1].plot(hist.history['accuracy'], color='b', label="Training accuracy")
ax[1].plot(hist.history['val_accuracy'], color='r',label="Validation accuracy")
legend = ax[1].legend(loc='best', shadow=True)

NameError: name 'plt' is not defined

# Plot Confusion Matrix

In [2]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="black" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

NameError: name 'plt' is not defined

In [None]:
# Predict the values from the validation dataset
Y_pred = model.predict(X_val)
# Convert predictions classes to one hot vectors 
Y_pred_classes = np.argmax(Y_pred,axis = 1) 
# Convert validation observations to one hot vectors
Y_true = np.argmax(Y_val,axis = 1) 
# compute the confusion matrix
confusion_mtx = confusion_matrix(Y_true, Y_pred_classes) 
# plot the confusion matrix
plot_confusion_matrix(confusion_mtx, classes = range(2))

In [None]:
# model performance
score = model.evaluate(x= X_val, y= Y_val, batch_size=32)
acc = score[1]
err = 1 - acc
print("Loss Value : ", score[0])
print("Accuracy : ", score[1])

# Save the Model

In [None]:
model_structure = model.to_json()
f = Path("models/model_structure-5.json")
f.write_text(model_structure)

In [None]:
model.save_weights("models/model_weights-5.h5")

In [None]:
model.save('my_model_5.keras')

# End...