In [5]:
import os
import numpy as np
import cv2
from PIL import Image, ImageChops, ImageEnhance
from multiprocessing import Pool, cpu_count
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tensorflow.keras.optimizers import Adam

In [6]:

# --- ELA Image Generation ---
def generate_ela_image(image_path, quality=90):
    """
    Generate an ELA image to highlight tampering artifacts.
    """
    original = Image.open(image_path).convert("RGB")
    temp_path = "temp_compressed.jpg"
    original.save(temp_path, "JPEG", quality=quality)
    compressed = Image.open(temp_path)
    ela_image = ImageChops.difference(original, compressed)
    extrema = ela_image.getextrema()
    max_diff = max([ex[1] for ex in extrema]) or 1
    scale = 255.0 / max_diff
    ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)
    return np.array(ela_image).astype(np.uint8)


# --- Wavelet Noise Extraction ---
import pywt

def extract_wavelet_noise(image_array):
    """
    Extract noise using Discrete Wavelet Transform (DWT).
    """
    gray = cv2.cvtColor(image_array, cv2.COLOR_RGB2GRAY) / 255.0
    coeffs = pywt.wavedec2(gray, "haar", level=2)
    coeffs_H = list(coeffs)
    coeffs_H[0] = np.zeros_like(coeffs_H[0])  # Zero out approximation coefficients
    noise = pywt.waverec2(coeffs_H, "haar")
    noise = np.clip(noise * 255.0, 0, 255).astype(np.uint8)
    return noise


# --- Preprocessing ---
def preprocess_image(image_path, target_size=(224, 224)):
    """
    Preprocess image by combining ELA and Wavelet Noise.
    """
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_resized = cv2.resize(img, target_size)
    
    ela_image = generate_ela_image(image_path)
    ela_resized = cv2.resize(ela_image, target_size)
    
    wavelet_noise = extract_wavelet_noise(img)
    wavelet_resized = cv2.resize(wavelet_noise, target_size)
    
    combined = np.stack([img_resized[:, :, 0], ela_resized[:, :, 0], wavelet_resized], axis=-1)
    return combined.astype(np.float32) / 255.0  # Normalize


def preprocess_image_wrapper(args):
    """
    Wrapper function for multiprocessing.
    """
    return preprocess_image(*args)


def load_casia2_dataset(dataset_path, target_size=(224, 224), max_images=None):
    """
    Load CASIA2 dataset with parallel preprocessing.
    """
    images, labels = [], []
    for label, folder in enumerate(["Au", "Tp"]):
        folder_path = os.path.join(dataset_path, folder)
        files = os.listdir(folder_path)[:max_images]
        file_paths = [(os.path.join(folder_path, file), target_size) for file in files]
        
        with Pool(cpu_count()) as p:
            preprocessed_images = p.map(preprocess_image_wrapper, file_paths)
        
        images.extend(preprocessed_images)
        labels.extend([label] * len(preprocessed_images))
    
    return np.array(images), np.array(labels)


# --- Model Definition ---


# --- Main Training Pipeline ---



In [7]:
def create_mobilenetv2_model(input_shape, learning_rate=0.0001):
    """
    Create MobileNetV2 model for binary classification.
    """
    base_model = MobileNetV2(input_shape=input_shape, include_top=False, weights="imagenet")
    base_model.trainable = False  # Freeze base model layers
    
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, activation="relu")(x)
    x = Dropout(0.5)(x)
    output = Dense(1, activation="sigmoid")(x)
    
    model = Model(inputs=base_model.input, outputs=output)
    model.compile(optimizer=Adam(learning_rate=learning_rate),
                  loss="binary_crossentropy",
                  metrics=["accuracy", tf.keras.metrics.AUC()])
    return model

In [14]:
import os
import numpy as np
import cv2
from PIL import Image, ImageChops, ImageEnhance
from multiprocessing import Pool, cpu_count
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tensorflow.keras.optimizers import Adam
import pywt


# --- ELA Image Generation ---
def generate_ela_image(image_path, quality=90):
    """
    Generate an ELA image to highlight tampering artifacts.
    """
    original = Image.open(image_path).convert("RGB")
    temp_path = "temp_compressed.jpg"
    original.save(temp_path, "JPEG", quality=quality)
    compressed = Image.open(temp_path)
    ela_image = ImageChops.difference(original, compressed)
    extrema = ela_image.getextrema()
    max_diff = max([ex[1] for ex in extrema]) or 1
    scale = 255.0 / max_diff
    ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)
    return np.array(ela_image).astype(np.uint8)


# --- Wavelet Noise Extraction ---
def extract_wavelet_noise(image_array):
    """
    Extract noise using Discrete Wavelet Transform (DWT).
    """
    gray = cv2.cvtColor(image_array, cv2.COLOR_RGB2GRAY) / 255.0
    coeffs = pywt.wavedec2(gray, "haar", level=2)
    coeffs_H = list(coeffs)
    coeffs_H[0] = np.zeros_like(coeffs_H[0])  # Zero out approximation coefficients
    noise = pywt.waverec2(coeffs_H, "haar")
    noise = np.clip(noise * 255.0, 0, 255).astype(np.uint8)
    return noise


# --- Preprocessing ---
def preprocess_image(image_path, target_size=(224, 224)):
    """
    Preprocess image by combining ELA and Wavelet Noise.
    """
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_resized = cv2.resize(img, target_size)
    
    ela_image = generate_ela_image(image_path)
    ela_resized = cv2.resize(ela_image, target_size)
    
    wavelet_noise = extract_wavelet_noise(img)
    wavelet_resized = cv2.resize(wavelet_noise, target_size)
    
    combined = np.stack([img_resized[:, :, 0], ela_resized[:, :, 0], wavelet_resized], axis=-1)
    return combined.astype(np.float32) / 255.0  # Normalize


# --- Wrapper for Multiprocessing ---
def preprocess_image_wrapper(args):
    """
    Wrapper function for multiprocessing.
    """
    return preprocess_image(*args)


# --- Dataset Loading ---
from concurrent.futures import ProcessPoolExecutor

def load_casia2_dataset(dataset_path, target_size=(224, 224), max_images=None):
    """
    Load CASIA2 dataset using ProcessPoolExecutor for parallel preprocessing.
    """
    images, labels = [], []
    file_paths = []

    for label, folder in enumerate(["Au", "Tp"]):
        folder_path = os.path.join(dataset_path, folder)
        files = os.listdir(folder_path)[:max_images]
        for file in files:
            file_paths.append((os.path.join(folder_path, file), target_size, label))
    
    def process_file(args):
        file_path, target_size, label = args
        try:
            preprocessed_image = preprocess_image(file_path, target_size)
            return preprocessed_image, label
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            return None, None
    
    with ProcessPoolExecutor() as executor:
        results = executor.map(process_file, file_paths)
    
    for image, label in results:
        if image is not None:
            images.append(image)
            labels.append(label)
    
    return np.array(images), np.array(labels)


# --- Model Definition ---
def create_mobilenetv2_model(input_shape, learning_rate=0.0001):
    """
    Create MobileNetV2 model for binary classification.
    """
    base_model = MobileNetV2(input_shape=input_shape, include_top=False, weights="imagenet")
    base_model.trainable = False  # Freeze base model layers
    
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, activation="relu")(x)
    x = Dropout(0.5)(x)
    output = Dense(1, activation="sigmoid")(x)
    
    model = Model(inputs=base_model.input, outputs=output)
    model.compile(optimizer=Adam(learning_rate=learning_rate),
                  loss="binary_crossentropy",
                  metrics=["accuracy", tf.keras.metrics.AUC()])
    return model


# --- Main Function ---
def main():
    dataset_path = "../CASIA2"  # Path to the CASIA2 dataset
    target_size = (224, 224)
    max_images = 1000  # Limit the number of images per class
    
    # Load and preprocess dataset
    print("Loading and preprocessing dataset...")
    X, y = load_casia2_dataset(dataset_path, target_size, max_images)
    
    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
    
    # Create model
    input_shape = (224, 224, 3)
    print("Creating model...")
    model = create_mobilenetv2_model(input_shape)
    
    # Train model
    print("Training model...")
    history = model.fit(X_train, y_train, batch_size=16, epochs=10, validation_data=(X_test, y_test))
    
    # Evaluate model
    print("Evaluating model...")
    y_pred = (model.predict(X_test) > 0.5).astype(int)
    print(classification_report(y_test, y_pred, target_names=["Authentic", "Tampered"]))
    
    # Save model
    model.save("mobilenetv2_casia2.h5")
    print("Model saved as mobilenetv2_casia2.h5")


# Ensure compatibility with multiprocessing
if __name__ == "__main__":
    main()


Loading and preprocessing dataset...


AttributeError: Can't pickle local object 'load_casia2_dataset.<locals>.process_file'

In [None]:
# Train model
print("Training model...")
history = model.fit(X_train, y_train, batch_size=16, epochs=10, validation_data=(X_test, y_test))



In [None]:
# Evaluate model
print("Evaluating model...")
y_pred = (model.predict(X_test) > 0.5).astype(int)
print(classification_report(y_test, y_pred, target_names=["Authentic", "Tampered"]))

# Save model
model.save("mobilenetv2_casia2.h5")
print("Model saved as mobilenetv2_casia2.h5")