In [1]:
import os
import csv
import gdown
!apt-get install unrar
!pip install gdown rarfile
import rarfile
import cv2
import numpy as np
import shutil
import random
import tensorflow as tf
from tensorflow.keras.applications import MobileNet
# from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, Flatten, Dropout, Input, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
unrar is already the newest version (1:6.1.5-1).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [3]:
file_id = '1sA0waMKbP8ffMlb1NRqxIFhsymuq2Frx'
output_dir = 'datasets'
rar_file = os.path.join(output_dir, 'CASIA_faceAntisp.rar')

#make the directory
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

gdown.download(f'https://drive.google.com/uc?id={file_id}', rar_file, quiet=False)
extract_dir = 'CASIA_faceAntisp'
if not os.path.exists(extract_dir):
    os.makedirs(extract_dir)


Downloading...
From (original): https://drive.google.com/uc?id=1sA0waMKbP8ffMlb1NRqxIFhsymuq2Frx
From (redirected): https://drive.google.com/uc?id=1sA0waMKbP8ffMlb1NRqxIFhsymuq2Frx&confirm=t&uuid=2050af80-dfa9-42d6-bc38-03a283643df3
To: /content/datasets/CASIA_faceAntisp.rar
100%|██████████| 876M/876M [00:14<00:00, 61.9MB/s]


In [4]:
with rarfile.RarFile(rar_file) as rf:
    rf.extractall(extract_dir)

In [2]:
train_folder_path = 'CASIA_faceAntisp/train_release'
test_folder_path = 'CASIA_faceAntisp/test_release'

In [3]:
labels = {
    'HR_1': 1,
    'HR_2': 0,
    'HR_3': 0,
    'HR_4': 1,
    '1': 1,
    '2': 1,
    '3': 0,
    '4': 0,
    '5': 0,
    '6': 0,
    '7': 0,
    '8': 0
}

In [52]:
def create_frame_labels_dict(folder_path, labels):
    frame_labels_dict = {}

    for folder in os.listdir(folder_path):
        path = os.path.join(folder_path, folder)
        if os.path.isdir(path):
            # Iterate through each file in the folder
            for file in os.listdir(path):
                file_name, file_extension = os.path.splitext(file)
                # if file_name in labels:
                label = labels[file_name]
                frame_labels_dict[f"{folder}_{file_name}"] = label

    return frame_labels_dict

In [5]:
train_frame_labels_dict = create_frame_labels_dict(train_folder_path, labels)
test_frame_labels_dict = create_frame_labels_dict(test_folder_path, labels)


In [6]:
def make_directory(directory_path):
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
    else:
        # Empty the directory if it already exists
        for filename in os.listdir(directory_path):
            file_path = os.path.join(directory_path, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                print(f'Failed to delete {file_path}. Reason: {e}')

In [7]:
def extract_random_frames(folder_path, output_dir, num_frames=3):

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for folder in os.listdir(folder_path):
        path = os.path.join(folder_path, folder)
        if os.path.isdir(path):
            for file in os.listdir(path):
                file_name, file_extension = os.path.splitext(file)
                video_path = os.path.join(path, file)
                video_capture = cv2.VideoCapture(video_path)

                # Get total number of frames
                total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))

                if total_frames > num_frames:
                    # Generate random frame numbers
                    frame_numbers = sorted(random.sample(range(total_frames), num_frames))

                    for i, frame_num in enumerate(frame_numbers):
                        # Set the video capture to the specified frame
                        video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_num)

                        # Read the specified frame
                        success, frame = video_capture.read()
                        if success:
                            frame_output_dir = os.path.join(output_dir, f"{folder}_{file_name}")
                            if not os.path.exists(frame_output_dir):
                                os.makedirs(frame_output_dir)
                            frame_path = os.path.join(frame_output_dir, f"{i+1}.jpg")
                            cv2.imwrite(frame_path, frame)
                        else:
                            print(f"Warning: Could not read frame {frame_num} from video {file}")
                else:
                    print(f"Warning: Video {file} has only {total_frames} frames. Skipping.")

                video_capture.release()

In [8]:
train_frames_dir = 'train_frames'
make_directory(train_frames_dir)

extract_random_frames(train_folder_path, train_frames_dir)

In [9]:
def extract_faces_from_frames(folder_path, output_dir):
    # Load Haar cascade for face detection
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for folder in os.listdir(folder_path):
        path = os.path.join(folder_path, folder)
        if os.path.isdir(path):
            for file in os.listdir(path):
                file_path = os.path.join(path, file)
                frame = cv2.imread(file_path)

                # Check if the frame was read successfully
                if frame is None:
                    print(f"Warning: Could not read frame {file}. Skipping this frame.")
                    continue

                gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                faces = face_cascade.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

                # if len(faces) == 0:
                #     print(f"Warning: No faces detected in frame {file}.")

                for i, (x, y, w, h) in enumerate(faces):
                    face = frame[y:y+h, x:x+w]
                    frame_output_dir = os.path.join(output_dir, folder)
                    if not os.path.exists(frame_output_dir):
                        os.makedirs(frame_output_dir)
                    face_filename = f"{os.path.splitext(file)[0]}.jpg"
                    face_path = os.path.join(frame_output_dir, face_filename)
                    cv2.imwrite(face_path, face)


In [10]:
# make directory for store the detected faces
train_faces_dir = 'train_faces'
make_directory(train_faces_dir)

extract_faces_from_frames(train_frames_dir, train_faces_dir)

In [11]:
#function to extract frequency features from frames
def extract_frequency_features(image):
    resized_image = cv2.resize(image, (224, 224))
    gray_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2GRAY)
    f_transform = np.fft.fft2(gray_image)
    f_magnitude = np.abs(f_transform)
    return f_magnitude.flatten()

In [12]:
def create_frequency_frame_dict(folder_path):
    frequency_frame_dict = {}

    for folder in os.listdir(folder_path):
        path = os.path.join(folder_path, folder)
        if os.path.isdir(path):
            for frame_file in os.listdir(path):
                frame_path = os.path.join(path, frame_file)
                frame_image = cv2.imread(frame_path)

                # Check if the frame was loaded correctly
                if frame_image is None:
                    print(f"Warning: Could not read image {frame_file}. Skipping this frame.")
                    continue

                # Extract frequency features from the frame image
                feature = extract_frequency_features(frame_image)
                frame_name = os.path.splitext(frame_file)[0]
                frequency_frame_dict[f"{folder}_{frame_name}"] = feature

    return frequency_frame_dict

In [13]:
#dictionary to store frame names and corresponding features
features_dict = create_frequency_frame_dict(train_faces_dir)

In [14]:
test_frames_dir = 'test_frames'
make_directory(test_frames_dir)

test_faces_dir = 'test_faces'
make_directory(test_faces_dir)

extract_random_frames(test_folder_path, test_frames_dir,1)

extract_faces_from_frames(test_frames_dir, test_faces_dir)

In [15]:
features_dict_test = create_frequency_frame_dict(test_faces_dir)

In [15]:
# Load frames and labels into numpy arrays
def load_frames_and_labels_from_dict(frame_labels_dict, frames_dir):
    frames = []
    labels = []
    for frame_name, label in frame_labels_dict.items():
        for frame_name_path in os.listdir(os.path.join(frames_dir, frame_name)):
            frame_path = os.path.join(os.path.join(frames_dir, frame_name), frame_name_path)
            frame = cv2.imread(frame_path)
            if frame is not None:
                frames.append(cv2.resize(frame, (224, 224)))
                labels.append(label)
            else:
                print(f"Warning: Could not read image1 {frame_path}")
    frames = np.array(frames) / 255.0  # Normalize
    labels = np.array(labels)
    return frames, labels

# Load frequency features and labels into numpy arrays
def load_frequency_features_and_labels_from_dict(features_dict, frames_dir):
    features = []
    labels = []
    for frame_name, label in features_dict.items():
        desired_part = '_'.join(frame_name.rsplit('_', 1)[:-1])
        features.append(label)
        labels.append(train_frame_labels_dict[desired_part])
        # for frame_name_path in os.listdir(os.path.join(frames_dir, frame_name)):
        #   frame = cv2.imread(frame_path)
        #   if frame is not None:
        #       feature = extract_frequency_features(frame)
        #       features.append(feature)
        #       labels.append(label)
        #       # print(f"Extracted feature shape for {frame_name}: {feature.shape}")  # Debugging line
        #   else:
        #       print(f"Warning: Could not read image {frame_path}")
    features = np.array(features)
    labels = np.array(labels)
    return features, labels

In [16]:
train_frames, train_labels = load_frames_and_labels_from_dict(train_frame_labels_dict, train_frames_dir)

In [17]:
train_faces_frame, train_faces_label = load_frames_and_labels_from_dict(train_frame_labels_dict, train_frames_dir)

In [18]:
train_frequency_frame, train_frequency_label = load_frequency_features_and_labels_from_dict(features_dict, train_frames_dir)

In [19]:
test_frames, test_labels = load_frames_and_labels_from_dict(test_frame_labels_dict, test_frames_dir)

In [20]:
test_faces_frame, test_faces_label = load_frames_and_labels_from_dict(test_frame_labels_dict, test_frames_dir)

In [22]:
test_frequency_frame, test_frequency_label = load_frequency_features_and_labels_from_dict(features_dict_test, test_frames_dir)

In [25]:
min_train_samples = min(len(train_frames), len(train_faces_frame), len(train_frequency_frame), len(train_labels))
min_test_samples = min(len(test_frames), len(test_faces_frame), len(test_labels))

train_frames = train_frames[:min_train_samples]
train_faces_frame = train_faces_frame[:min_train_samples]
train_frequency_frame = train_frequency_frame[:min_train_samples]
train_labels = train_labels[:min_train_samples]

test_frames = test_frames[:min_test_samples]
test_faces_frame = test_faces_frame[:min_test_samples]
test_frequency_frame = train_frequency_frame[:min_test_samples]
test_labels = test_labels[:min_test_samples]

In [27]:

# Define the model for frames
frame_input = Input(shape=(224, 224, 3), name='frame_input')
base_model_frame = MobileNet(weights='imagenet', include_top=False, input_tensor=frame_input)
for layer in base_model_frame.layers:
    layer.trainable = False
    # Rename layers to avoid conflicts
    layer._name = 'frame_' + layer.name

x_frame = base_model_frame.output
x_frame = Flatten(name='frame_flatten')(x_frame)
x_frame = Dense(1024, activation='relu', name='frame_dense')(x_frame)
x_frame = Dropout(0.5, name='frame_dropout')(x_frame)
frame_output = Dense(1, activation='sigmoid', name='frame_output')(x_frame)
frame_model = Model(inputs=frame_input, outputs=frame_output, name='frame_model')

# Define the model for face frames (similar modifications)
face_input = Input(shape=(224, 224, 3), name='face_input')
base_model_face = MobileNet(weights='imagenet', include_top=False, input_tensor=face_input)
for layer in base_model_face.layers:
    layer.trainable = False
    # Rename layers to avoid conflicts
    layer._name = 'face_' + layer.name

x_face = base_model_face.output
x_face = Flatten(name='face_flatten')(x_face)
x_face = Dense(1024, activation='relu', name='face_dense')(x_face)
x_face = Dropout(0.5, name='face_dropout')(x_face)
face_output = Dense(1, activation='sigmoid', name='face_output')(x_face)
face_model = Model(inputs=face_input, outputs=face_output, name='face_model')

# Define the model for frequency inputs (reshaped to 224x224)
frequency_input = Input(shape=(224*224,), name='frequency_input')
x_frequency = Dense(1024, activation='relu', name='frequency_dense')(frequency_input)
x_frequency = Dropout(0.5, name='frequency_dropout')(x_frequency)
frequency_output = Dense(1, activation='sigmoid', name='frequency_output')(x_frequency)
frequency_model = Model(inputs=frequency_input, outputs=frequency_output, name='frequency_model')

# Combine the models with unique layer names
combined_input = concatenate([frame_model.output, face_model.output, frequency_model.output], name='combined_concatenate')
x_combined = Dense(1024, activation='relu', name='combined_dense_1')(combined_input)
x_combined = Dropout(0.5, name='combined_dropout_1')(x_combined)
combined_output = Dense(1, activation='sigmoid', name='combined_output')(x_combined)
combined_model = Model(inputs=[frame_input, face_input, frequency_input], outputs=combined_output, name='combined_model')

# Compile the combined model
combined_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


# Add early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Train the model with early stopping
combined_model.fit([train_frames, train_faces_frame, train_frequency_frame], train_labels,
                   epochs=5, batch_size=32,
                   validation_data=([test_frames, test_faces_frame, test_frequency_frame[:(len(test_frames))]], test_labels),
                   callbacks=[early_stopping])



Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5

KeyboardInterrupt: 

In [57]:
file_id_test = '1vUgpR0RVdrmSI_YDpR9YTCqTiM8m040a'
output_dir = './'
rar_file_test = os.path.join(output_dir, 'dataset.rar')

#make the directory
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

gdown.download(f'https://drive.google.com/uc?id={file_id_test}', rar_file_test, quiet=False)
extract_dir_test = 'Test2'
if not os.path.exists(extract_dir_test):
    os.makedirs(extract_dir_test)

Downloading...
From (original): https://drive.google.com/uc?id=1vUgpR0RVdrmSI_YDpR9YTCqTiM8m040a
From (redirected): https://drive.google.com/uc?id=1vUgpR0RVdrmSI_YDpR9YTCqTiM8m040a&confirm=t&uuid=4f0ac156-2b8a-48f2-ba30-faa1b8e1a700
To: /content/dataset.rar
100%|██████████| 138M/138M [00:02<00:00, 50.7MB/s]


In [58]:
with rarfile.RarFile(rar_file_test) as rft:
    rft.extractall(extract_dir_test)

In [74]:
test2_folder_path = 'Test2'

test2_frames_dir = 'test2_frames'
make_directory(test2_frames_dir)

test2_faces_dir = 'test2_faces'
make_directory(test2_faces_dir)

with open('datasets.txt', 'r', encoding='utf-8') as file:
    lines = file.readlines()
cleaned_lines = [line.strip() for line in lines]


test2_frames2 = extract_random_frames(test2_folder_path, test2_frames_dir,1)
test2_faces2 = extract_faces_from_frames(test2_frames_dir, test2_faces_dir)

In [76]:
test2_freqs2 = features_dict_test2 = create_frequency_frame_dict(test2_faces_dir)

In [78]:
predictions = combined_model.predict([test2_frames2, test2_faces2, test2_freqs2])
# Save the predictions to a CSV file
results = pd.DataFrame({
    'filename': frame_names,
    'prediction': predictions.flatten()
})

results.to_csv('predictions.csv', index=False)

ValueError: Failed to find data adapter that can handle input: (<class 'list'> containing values of types {'(<class \'dict\'> containing {"<class \'str\'>"} keys and {"<class \'numpy.ndarray\'>"} values)', "<class 'NoneType'>"}), <class 'NoneType'>

In [None]:
import os
import cv2
import numpy as np
from skimage.feature import hog
from sklearn.preprocessing import StandardScaler

# Paths
train_frames_dir = 'train_frames'
train_face_dir = 'train_faces'

# Function to extract HOG features (example for image feature extraction)
def extract_hog_features(image):
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    features, _ = hog(gray_image, pixels_per_cell=(8, 8), cells_per_block=(2, 2), visualize=True)
    return features

# Function to extract frequency features
def extract_frequency_features(image):
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    f_transform = np.fft.fft2(gray_image)
    f_magnitude = np.abs(f_transform)
    return f_magnitude.flatten()

# Load images and extract features
whole_image_features = []
cropped_face_features = []
frequency_features = []
labels = []  # Assuming you have a way to get labels for your images

for frame_file in os.listdir(train_frames_dir):
    # Load whole image
    frame_path = os.path.join(train_frames_dir, frame_file)
    whole_image = cv2.imread(frame_path)
    if whole_image is None:
        continue

    # Extract features from whole image
    whole_image_feature = extract_hog_features(whole_image)
    whole_image_features.append(whole_image_feature)

    # Load cropped face
    face_path = os.path.join(train_face_dir, frame_file)
    if os.path.exists(face_path):
        cropped_face = cv2.imread(face_path)
        if cropped_face is not None:
            cropped_face_feature = extract_hog_features(cropped_face)
        else:
            cropped_face_feature = np.zeros_like(whole_image_feature)  # Handle missing face case
    else:
        cropped_face_feature = np.zeros_like(whole_image_feature)  # Handle missing face case
    cropped_face_features.append(cropped_face_feature)

    # Extract frequency features
    freq_features = extract_frequency_features(whole_image)
    frequency_features.append(freq_features)

    # Append the corresponding label
    # You need to implement your way of getting labels here
    # labels.append(your_label_function(frame_file))

# Convert lists to numpy arrays
whole_image_features = np.array(whole_image_features)
cropped_face_features = np.array(cropped_face_features)
frequency_features = np.array(frequency_features)
labels = np.array(labels)


In [None]:
# Ensure frequency features are scaled to have the same range as other features
scaler = StandardScaler()
frequency_features_scaled = scaler.fit_transform(frequency_features)

# Combine all features
combined_features = np.concatenate((whole_image_features, cropped_face_features, frequency_features_scaled), axis=1)


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Split data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(combined_features, labels, test_size=0.2, random_state=42)

# Train Random Forest Classifier
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
rf_classifier.fit(X_train, y_train)

# Evaluate the model
y_pred = rf_classifier.predict(X_val)
accuracy = accuracy_score(y_val, y_pred)
print(f"Validation Accuracy: {accuracy:.4f}")
