PreProcess Data (image)

In [1]:
!pip install PyWavelets




In [2]:
import os
import cv2
import imghdr
import numpy as np
from numpy.fft import fft2, fftshift
import tensorflow as tf
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dropout
from tensorflow.keras.regularizers import l2
from prnu import functions  # Ensure your 'prnu' module is available

data_dir = 'images'
image_exts = ['jpg', 'jpeg', 'png']
levels = 4
sigma = 5
target_size = (128, 128)

label_to_index = {"real": 0, "fake": 1}

def list_image_files(data_dir, image_exts):
    """
    Walks through subdirectories of data_dir and returns a list of file paths and corresponding numeric labels.
    Expects folder names like "0_real" or "1_fake" (the label is taken as the part after the underscore).
    """
    file_paths = []
    labels = []
    for folder in os.listdir(data_dir):
        folder_path = os.path.join(data_dir, folder)
        if not os.path.isdir(folder_path):
            continue

        label_str = folder.split('_')[-1]
        if label_str not in label_to_index:
            continue
        numeric_label = label_to_index[label_str]

        for file_name in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file_name)
            file_paths.append(file_path)
            labels.append(numeric_label)
    return file_paths, labels

all_file_paths, all_labels = list_image_files(data_dir, image_exts)
print("Total image file paths found:", len(all_file_paths))

# Split into training and test sets
train_files, test_files, train_labels, test_labels = train_test_split(
    all_file_paths, all_labels, test_size=0.2, random_state=42
)

Total image file paths found: 684113


Configure and Train model

In [3]:
def _load_and_process_image(file_path_str):
    """
    Given a file path, load the image, process it to extract the PRNU noise residual,
    apply Fourier transform and normalization, and return a  randomly cropped (128,128) float32 array.
    """
    
    img = cv2.imread(file_path_str, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise ValueError(f"Could not read image: {file_path_str}")

    # Extract the PRNU noise residual using PRNU module
    prnu_noise = functions.extract_single(img, levels=levels, sigma=sigma)
    
    # Randomly crop a patch of size target_size from the noise residual
    crop_h, crop_w = target_size
    h, w = prnu_noise.shape
    if h < crop_h or w < crop_w:
        raise ValueError(f"Image {file_path_str} is too small for target crop size {target_size}")
    
    start_y = np.random.randint(0, h - crop_h + 1)
    start_x = np.random.randint(0, w - crop_w + 1)
    prnu_noise_cropped = prnu_noise[start_y:start_y + crop_h, start_x:start_x + crop_w]

    # Compute Fourier transform and shift the zero frequency component to the center
    prnu_noise_fourier = fft2(prnu_noise_cropped)
    prnu_noise_fourier_shifted = fftshift(prnu_noise_fourier)
    prnu_noise_fourier_magnitude = np.abs(prnu_noise_fourier_shifted)
    
    # Normalize the magnitude to the range [0, 1]
    min_val = np.min(prnu_noise_fourier_magnitude)
    max_val = np.max(prnu_noise_fourier_magnitude)
    prnu_noise_normalized = (prnu_noise_fourier_magnitude - min_val) / (max_val - min_val + 1e-8)
    
    return prnu_noise_normalized.astype(np.float32)

def _load_image_fn(fp):
    """
    This function is wrapped by tf.py_function. It takes the file path input,
    converts it to a Python string, and calls _load_and_process_image.
    """
    # If fp is an EagerTensor, convert it to a numpy scalar.
    if hasattr(fp, "numpy"):
        fp = fp.numpy()
    # fp is now expected to be of type bytes.
    # Convert bytes to string if necessary.
    file_path = fp.decode('utf-8') if isinstance(fp, bytes) else fp
    return _load_and_process_image(file_path)

def load_and_process_image(file_path):
    """
    Wrap _load_image_fn with tf.py_function so that it can be used in the tf.data pipeline.
    """
    image = tf.py_function(
        func=_load_image_fn,
        inp=[file_path],
        Tout=tf.float32
    )
    # Set static shape: the image is target_size (e.g., (128, 128)).
    image.set_shape(target_size)
    # Add a channel dimension to form (128, 128, 1) for the CNN.
    image = tf.expand_dims(image, axis=-1)
    return image

def process_path(file_path, label):
    """
    Given a file path and label, load the image and return (image, label).
    """
    image = load_and_process_image(file_path)
    return image, label

# Create training and test datasets from the file lists.
train_ds = tf.data.Dataset.from_tensor_slices((train_files, train_labels))
test_ds = tf.data.Dataset.from_tensor_slices((test_files, test_labels))

# Map the process_path function to load and process each image lazily.
batch_size = 32
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.map(process_path, num_parallel_calls=AUTOTUNE)
train_ds = train_ds.shuffle(buffer_size=1000)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(buffer_size=AUTOTUNE)

test_ds = test_ds.map(process_path, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(batch_size)
test_ds = test_ds.prefetch(buffer_size=AUTOTUNE)

Plot Accuracy and Loss

In [None]:
# Define data augmentation
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal_and_vertical"),
    tf.keras.layers.RandomRotation(0.2),
])

# Build the CNN model with increased complexity and reduced regularization
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu',
                           input_shape=(128, 128, 1)),  #
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Dropout(0.1),
    
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Dropout(0.1),
    
    tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Dropout(0.1),

    tf.keras.layers.Conv2D(256, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.Dropout(0.1),
    
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Compile the model with a potentially adjusted learning rate
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])


model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

# Train the model.
def augment(image, label):
    return data_augmentation(image), label

augmented_train_ds = train_ds.map(augment, num_parallel_calls=AUTOTUNE)

history = model.fit(
    augmented_train_ds,
    epochs=100,
    validation_data=test_ds
)

#learning rate, batch size, more complex architecture
#-> how to overcome underfitting problem
#look for opensource commercial products for AI detecting -> to compare accuracy/precision/f1score etc
# Use rich texture code to train first on their dataset, then if accruacy matches, train with our dataset

# Evaluate the model on the test dataset.
test_loss, test_accuracy = model.evaluate(test_ds)
print(f"Test accuracy: {test_accuracy:.4f}")
print(f"Test loss: {test_loss:.4f}")

import numpy as np
import matplotlib.pyplot as plt

# Take one batch from the test dataset.
for images, labels in test_ds.take(1):
    predictions = model.predict(images)
    predicted_labels = (predictions > 0.5).astype(np.int32).flatten()
    true_labels = labels.numpy()

    # Display a few images with their true and predicted labels.
    plt.figure(figsize=(15, 15))
    num_images = min(9, images.shape[0])
    for i in range(num_images):
        plt.subplot(3, 3, i + 1)
        plt.imshow(np.squeeze(images[i].numpy()), cmap='gray')
        plt.title(f"True: {true_labels[i]}, Pred: {predicted_labels[i]}")
        plt.axis('off')
    plt.show()

# Plot training history.
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title("Accuracy")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title("Loss")
plt.legend()
plt.show()


Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 126, 126, 32)      320       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 63, 63, 32)       0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 63, 63, 32)        0         
                                                                 
 conv2d_1 (Conv2D)           (None, 61, 61, 64)        18496     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 30, 30, 64)       0         
 2D)                                                             
                                                                 
 dropout_1 (Dropout)         (None, 30, 30, 64)       

  x = x * noise_var / (coef_var_min + noise_var)


Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100


Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78/100
Epoch 79/100
Epoch 80/100

Evaluate and Test Model