In [None]:
import os
import glob
import numpy as np
import cv2
import albumentations as A
from sklearn.utils import shuffle
from tensorflow.keras.utils import Sequence
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.applications.resnet50 import preprocess_input
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Input, Dense, Dropout, GlobalAveragePooling2D, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

In [None]:
def prepare_file_lists(image_dir, landmark_dir):
    real_image_files = glob.glob(os.path.join(image_dir, 'Real', '*.jpg')) + \
                       glob.glob(os.path.join(image_dir, 'real', '*.jpg'))
    fake_image_files = glob.glob(os.path.join(image_dir, 'Fake', '*.jpg')) + \
                       glob.glob(os.path.join(image_dir, 'fake', '*.jpg'))

    # Prepare corresponding landmark file paths
    real_landmark_files = [
        os.path.join(landmark_dir, 'real', os.path.basename(f).replace('.jpg', '_landmarks.npy'))
        for f in real_image_files
    ]
    fake_landmark_files = [
        os.path.join(landmark_dir, 'fake', os.path.basename(f).replace('.jpg', '_landmarks.npy'))
        for f in fake_image_files
    ]

    # Filter out images without landmarks
    real_pairs = [
        (img, lm) for img, lm in zip(real_image_files, real_landmark_files) if os.path.exists(lm)
    ]
    fake_pairs = [
        (img, lm) for img, lm in zip(fake_image_files, fake_landmark_files) if os.path.exists(lm)
    ]

    # Unzip the pairs
    if real_pairs:
        real_image_files, real_landmark_files = zip(*real_pairs)
    else:
        real_image_files, real_landmark_files = [], []

    if fake_pairs:
        fake_image_files, fake_landmark_files = zip(*fake_pairs)
    else:
        fake_image_files, fake_landmark_files = [], []

    # Combine real and fake
    image_files = list(real_image_files) + list(fake_image_files)
    landmark_files = list(real_landmark_files) + list(fake_landmark_files)
    labels = [0] * len(real_image_files) + [1] * len(fake_image_files)  # 0 for real, 1 for fake

    # Shuffle the data
    image_files, landmark_files, labels = shuffle(image_files, landmark_files, labels, random_state=42)

    return image_files, landmark_files, labels


In [None]:
# Base directory where your dataset is stored
base_dir = '/kaggle/working/dataset-600k'  # Update with your actual path

# Training data
train_image_dir = os.path.join(base_dir, 'Train')
train_landmark_dir = os.path.join(base_dir, 'Train_Landmark')

# Validation data
val_image_dir = os.path.join(base_dir, 'Validation')
val_landmark_dir = os.path.join(base_dir, 'Validation_Landmark')

# Testing data
test_image_dir = os.path.join(base_dir, 'Test')
test_landmark_dir = os.path.join(base_dir, 'Test_Landmark')


In [None]:
# Prepare training data
train_image_files, train_landmark_files, train_labels = prepare_file_lists(train_image_dir, train_landmark_dir)

# Prepare validation data
val_image_files, val_landmark_files, val_labels = prepare_file_lists(val_image_dir, val_landmark_dir)

# Prepare testing data
test_image_files, test_landmark_files, test_labels = prepare_file_lists(test_image_dir, test_landmark_dir)

print(f"Number of training samples: {len(train_image_files)}")
print(f"Number of validation samples: {len(val_image_files)}")
print(f"Number of testing samples: {len(test_image_files)}")

# Optionally, print some file paths to verify
print("Sample training image:", train_image_files[0] if train_image_files else "No training images")
print("Sample training landmark:", train_landmark_files[0] if train_landmark_files else "No training landmarks")


In [None]:
class DataGenerator(Sequence):
    def __init__(self, image_files, landmark_files, labels, batch_size, image_size, shuffle=True, augment=False):
        self.image_files = image_files
        self.landmark_files = landmark_files
        self.labels = labels
        self.batch_size = batch_size
        self.image_size = image_size  # Tuple (height, width)
        self.shuffle = shuffle
        self.augment = augment
        self.on_epoch_end()
        
        # Define Albumentations augmentation pipeline
        if self.augment:
            self.augmentation_pipeline = A.Compose([
                A.HorizontalFlip(p=0.5),
                A.VerticalFlip(p=0.2),
                A.RandomBrightnessContrast(p=0.5),
                A.Rotate(limit=30, p=0.5),
                A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=30, p=0.5),
                A.GaussNoise(var_limit=(10.0, 50.0), p=0.5),
                A.MotionBlur(blur_limit=5, p=0.5),
                A.CLAHE(clip_limit=4.0, p=0.5),
                A.RandomResizedCrop(height=self.image_size[0], width=self.image_size[1], scale=(0.6, 1.0), p=0.5),
            ], keypoint_params=A.KeypointParams(format='xy', remove_invisible=False))
        else:
            self.augmentation_pipeline = None

    def __len__(self):
        return int(np.ceil(len(self.image_files) / self.batch_size))

    def __getitem__(self, index):
        batch_image_files = self.image_files[index * self.batch_size:(index + 1) * self.batch_size]
        batch_landmark_files = self.landmark_files[index * self.batch_size:(index + 1) * self.batch_size]
        batch_labels = self.labels[index * self.batch_size:(index + 1) * self.batch_size]
        
        images = []
        landmarks = []
        labels = []
        
        for img_file, lm_file, label in zip(batch_image_files, batch_landmark_files, batch_labels):
            try:
                # Load image
                img = load_img(img_file, target_size=self.image_size)
                img = np.array(img)  # Convert to NumPy array (RGB format)
                
                # Load landmarks
                lm = np.load(lm_file).astype('float32')  # Shape: (68, 2)
                
                # Adjust landmarks
                lm[:, 0] *= (self.image_size[1] / img.shape[1])
                lm[:, 1] *= (self.image_size[0] / img.shape[0])
                
                # Apply augmentation if enabled
                if self.augment and self.augmentation_pipeline:
                    augmented = self.augmentation_pipeline(image=img, keypoints=lm)
                    img = augmented['image']
                    lm = np.array(augmented['keypoints'])
                
                # Ensure image is float32 before preprocessing
                img = img.astype('float32')
                
                # Apply model-specific preprocessing
                img = preprocess_input(img)
                
                # Normalize landmarks relative to image size
                lm[:, 0] /= self.image_size[1]
                lm[:, 1] /= self.image_size[0]
                lm = lm.flatten()
                
                images.append(img)
                landmarks.append(lm)
                labels.append(label)
            except Exception as e:
                print(f"Error processing {img_file} and {lm_file}: {e}")
                continue  # Skip to the next item
        
        # Convert lists to arrays
        images_array = np.array(images)
        landmarks_array = np.array(landmarks)
        labels_array = np.array(labels)
        
        return {'image_input': images_array, 'landmark_input': landmarks_array}, labels_array

    def on_epoch_end(self):
        if self.shuffle:
            temp = list(zip(self.image_files, self.landmark_files, self.labels))
            np.random.shuffle(temp)
            self.image_files, self.landmark_files, self.labels = zip(*temp)


In [None]:
batch_size = 32
image_size = (360, 360)  # Updated image resolution

# Training generator with augmentation
train_generator = DataGenerator(
    train_image_files,
    train_landmark_files,
    train_labels,
    batch_size,
    image_size,
    shuffle=True,
    augment=True  # Enable augmentation for training
)

# Validation generator without augmentation
val_generator = DataGenerator(
    val_image_files,
    val_landmark_files,
    val_labels,
    batch_size,
    image_size,
    shuffle=False,
    augment=False  # No augmentation for validation
)

# Testing generator without augmentation
test_generator = DataGenerator(
    test_image_files,
    test_landmark_files,
    test_labels,
    batch_size,
    image_size,
    shuffle=False,
    augment=False  # No augmentation for testing
)


In [None]:
# Image Input Branch
image_input = Input(shape=(image_size[0], image_size[1], 3), name='image_input')
landmark_input = Input(shape=(136,), name='landmark_input')
base_model = ResNet50(weights='imagenet', include_top=False, input_tensor=image_input)

# Optionally unfreeze last N layers
for layer in base_model.layers[-10:]:
    layer.trainable = True

x_image = base_model.output
x_image = GlobalAveragePooling2D()(x_image)
x_image = Dense(512, activation='relu', kernel_regularizer=l2(1e-3))(x_image)
x_image = Dropout(0.6)(x_image)

# Landmark Input Branch
x_landmark = Dense(256, activation='relu', kernel_regularizer=l2(1e-4))(landmark_input)
x_landmark = Dropout(0.5)(x_landmark)
x_landmark = Dense(128, activation='relu', kernel_regularizer=l2(1e-4))(x_landmark)
x_landmark = Dropout(0.5)(x_landmark)

# Combine Features
combined = Concatenate()([x_image, x_landmark])

# Fully Connected Layers
x = Dense(256, activation='relu', kernel_regularizer=l2(1e-4))(combined)
x = Dropout(0.5)(x)
x = Dense(128, activation='relu', kernel_regularizer=l2(1e-4))(x)
x = Dropout(0.5)(x)

# Output Layer
output = Dense(1, activation='sigmoid')(x)


In [None]:
# Final Model
model = Model(inputs=[image_input, landmark_input], outputs=output)

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss='binary_crossentropy',
    metrics=[
        'accuracy',
        tf.keras.metrics.AUC(name='auc'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall')
    ]
)


In [None]:
model.summary()

In [None]:
checkpoint = ModelCheckpoint(
    'best_model.keras',
    monitor='val_auc',
    save_best_only=True,
    mode='max',
    verbose=1
)

early_stopping = EarlyStopping(
    monitor='val_auc',
    patience=3,  # Reduced patience as per your previous code
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=2,  # Reduced patience
    verbose=1,
    min_lr=1e-7
)


In [None]:
epochs = 10

history = model.fit(
    train_generator,
    epochs=epochs,
    validation_data=val_generator,
    callbacks=[checkpoint, early_stopping, reduce_lr]
)
