In [69]:
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
import dlib
import matplotlib.pyplot as plt
 
metadata = {'Video Name':[], 'label': []}


<h1>Our main program and how will it work</h2>
I am going to use the face forensics ++ videos at least 100 videos real and fake. 
--> Videos pre processing: Crop out the faces and save those videos in a new file reduce the video lengths and make them of same sizes
--> How am i going to crop only the faces part? -> load the open cv pre-trained model for this 
--> these videos will be stored in a seperate folder
processed/real
processed/fake
--> face feature extraction using Haar Casades

<h1>Demo Representation of the code</h1>

<h3>Cropped Data Folder Structure</h3>

dataset/
│
├── train/
│   ├── real/
│   └── fake/
│
├── val/
│   ├── real/
│   └── fake/
│
└── test/
    ├── real/
    └── fake/


In [70]:
import tensorflow as tf
tf.config.list_physical_devices('GPU')


[]

In [71]:
#ok so first i need to take one video and perfrom manipulation on it
#process the videos -> generate uuid for each fake and real --> same them in dataset folder

#NOTE: Helper function 
def display_video(video_path):
    # Open the video file
    cap = cv2.VideoCapture(video_path)

    # Check if the video file was opened successfully
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    # Loop through each frame in the video
    while cap.isOpened():
        # Read a frame from the video
        ret, frame = cap.read()

        # If the frame was not read successfully, break the loop
        if not ret:
            break

        # Display the frame
        cv2.imshow('Video', frame)

        # Wait for a short duration and check for the 'q' key to quit
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break

    # Release the video capture object and close all OpenCV windows
    cap.release()
    cv2.destroyAllWindows()

In [50]:
def process_and_compile_faces_to_video(video_path, output_video_path, frame_limit=200, process_every_n_frames=30, skip_frames=30):
    # Initialize dlib's face detector
    detector = dlib.get_frontal_face_detector()
    
    # Open the video file
    cap = cv2.VideoCapture(video_path)
    
    processed_frames_count = 0
    total_frames_processed = 0  # To keep track of the total number of processed frames
    faces_images = []  # List to hold cropped faces for display
    
    # Initialize the VideoWriter object later once we have the face dimensions
    video_writer = None
    
    while cap.isOpened() and total_frames_processed < frame_limit:
        ret, frame = cap.read()
        if not ret:
            break  # End of video or error reading frame
        
        # Check if the current frame should be processed based on the skipping logic
        if processed_frames_count % (process_every_n_frames + skip_frames) < process_every_n_frames:
            # Convert the frame to grayscale to speed up detection
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # Detect faces
            faces = detector(gray, 0)

            for face in faces:
                x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom()
                
                face_crop = frame[y1:y2, x1:x2]
                face_crop_rgb = cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB)
                faces_images.append(face_crop_rgb)
                
                # Initialize video writer with the first cropped face dimensions
                if video_writer is None:
                    height, width, _ = face_crop.shape
                    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Adjust codec if necessary
                    video_writer = cv2.VideoWriter(output_video_path, fourcc, 10, (width, height))
                
                video_writer.write(face_crop)

            total_frames_processed += 1  # Only increment if a frame was processed

        # Increment processed_frames_count for every frame, processed or skipped
        processed_frames_count += 1

    cap.release()  # Release the video capture object
    
    if video_writer is not None:
        video_writer.release()  # Release the VideoWriter object
  
  #NOTE: testing by matplot lib
    # num_faces = len(faces_images)
#     cols = 5
#     rows = num_faces // cols + (1 if num_faces % cols > 0 else 0)
#     fig, axes = plt.subplots(rows, cols, figsize=(15, 3*rows))
#     axes = axes.flatten() if rows * cols > 1 else [axes]
    
#     for ax, face_img in zip(axes, faces_images):
#         if face_img is not None:
#             ax.imshow(face_img)
#             ax.axis('off')
    
#     # Hide any unused subplot spaces
#     for ax in axes[num_faces:]:
#         ax.axis('off')
    
#     plt.tight_layout()
#     plt.show()

# # Example usage
# video_path = 'dataset/fake_videos/fake/000_003.mp4'
# output_video_path = 'compiled_faces_video.mp4'
# process_and_compile_faces_to_video(video_path, output_video_path)

In [72]:
import uuid
metadata = dict()
metadata['video_name'] = []
metadata['label'] = []

def processing_videos(parent_path):
    """
    NOTE:
        In this function i will perform manipulations on it 
        -> using dlib extract the faces
        -> then using uuid create unique id's for them
    """
    global metadata 
    fake_videos_path = parent_path + '\\fake_videos\\fake\\' 
    real_videos_path = parent_path + '\\real_videos\\real\\' 
    
    '''we are going to retreive the first 10 videos for easiness '''
    real_vids=os.listdir(real_videos_path)[:100]
    fake_vids=os.listdir(fake_videos_path)[:100]
  
    '''now performing dlib faces extraction
       we will send the video paths by os.joining them
       NOTE: 
            We need to loop through each of the files in real-fake folders and then using the lists
            stores in real and fake vids we can create new paths and then finally, call the extract faces function on it:
            new argument in extract_faces_from_video will be label to specify which folder to save the video
    '''
    ids = []
    
    for vid_count,r_vid in enumerate(real_vids):
        unique_id = str(uuid.uuid4())
        vid_path = r'dataset\real_videos\real' + fr'\{r_vid}'
        outpath = fr'cropped_videos_dataset\real_videos_cropped\real_vid_{vid_count}.mp4'
        process_and_compile_faces_to_video(vid_path,outpath)
        ids.append(unique_id)
        metadata['video_name'].append(outpath)
        metadata['label'].append(1)
        
        
    for vid_count,f_vid in enumerate(fake_vids):
        vid_path = r'dataset\fake_videos\fake' + fr'\{f_vid}'
        outpath = fr'cropped_videos_dataset\fake_videos_cropped\fake_vid_{vid_count}.mp4'
        process_and_compile_faces_to_video(vid_path , outpath)
        metadata['video_name'].append(outpath)
        metadata['label'].append(0)
        
        
    

In [73]:
dir_path = 'dataset'
processing_videos(dir_path)
df = pd.DataFrame(metadata)
df

Unnamed: 0,video_name,label
0,cropped_videos_dataset\real_videos_cropped\rea...,1
1,cropped_videos_dataset\real_videos_cropped\rea...,1
2,cropped_videos_dataset\real_videos_cropped\rea...,1
3,cropped_videos_dataset\real_videos_cropped\rea...,1
4,cropped_videos_dataset\real_videos_cropped\rea...,1
...,...,...
195,cropped_videos_dataset\fake_videos_cropped\fak...,0
196,cropped_videos_dataset\fake_videos_cropped\fak...,0
197,cropped_videos_dataset\fake_videos_cropped\fak...,0
198,cropped_videos_dataset\fake_videos_cropped\fak...,0


In [74]:
# display_video(r'cropped_videos_dataset\fake_videos_cropped\f1882333-7282-4fa5-8032-0e72b4581a23.mp4')
df.to_csv('annotations.csv')

In [75]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling3D, Flatten, Dense, Dropout, Conv3D
from tensorflow.keras.utils import to_categorical

In [76]:
def preprocess_video(video_path, frame_count=60, target_size=(64, 64)):
    cap = cv2.VideoCapture(str(video_path))
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret or len(frames) == frame_count:
                break
            frame = cv2.resize(frame, target_size)  # Resize frame
            frame = frame / 255.0  # Normalize pixel values
            frames.append(frame)
    finally:
        cap.release()
    # Only return if we have enough frames
    if len(frames) == frame_count:
        return np.array(frames)
    else:
        return None


In [77]:
def get_frames(df = pd.read_csv('annotations.csv')):
    X, y = [], []
    for _, row in df.iterrows():
        frames = preprocess_video(row['video_name'])
        if frames is not None:
            X.append(frames)
            y.append(row['label'])
    return X,y

In [78]:
X,y=get_frames()

In [79]:
X = np.array(X)
y = np.array(y)
y = to_categorical(y, num_classes=2)

In [60]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [80]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TimeDistributed, Conv2D, MaxPooling2D, Flatten, LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
# model = Sequential([
#     Conv3D(32, kernel_size=(3, 3, 3), activation='relu', input_shape=(30, 64, 64, 3)),
#     MaxPooling3D(pool_size=(2, 2, 2)),
#     Conv3D(64, kernel_size=(3, 3, 3), activation='relu'),
#     MaxPooling3D(pool_size=(2, 2, 2)),
#     Flatten(),
#     Dense(128, activation='relu'),
#     Dropout(0.5),
#     Dense(2, activation='softmax')
# ])

frame_count = 30
frame_height = 64
frame_width = 64
channels = 3

model = Sequential([
    # Apply CNN across each frame independently
    TimeDistributed(Conv2D(32, (3, 3), activation='relu'), input_shape=(frame_count, frame_height, frame_width, channels)),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Conv2D(64, (3, 3), activation='relu')),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Flatten()),

    # Use LSTM to understand temporal dynamics
    LSTM(64, return_sequences=False),
    Dense(64, activation='relu'),
    Dropout(0.5),
    Dense(2, activation='softmax')
])

optimizer = Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

model.summary()

  super().__init__(**kwargs)


In [81]:
#checking the LTSM model 
model.fit(X_train, y_train, epochs=50, validation_data=(X_test, y_test), batch_size=5)

Epoch 1/50
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 1s/step - accuracy: 0.6533 - loss: 0.7284 - val_accuracy: 0.6111 - val_loss: 0.6873
Epoch 2/50
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 1s/step - accuracy: 0.4172 - loss: 0.7404 - val_accuracy: 0.3889 - val_loss: 0.7124
Epoch 3/50
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 1s/step - accuracy: 0.5677 - loss: 0.7143 - val_accuracy: 0.5556 - val_loss: 0.6902
Epoch 4/50
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 1s/step - accuracy: 0.4542 - loss: 0.7303 - val_accuracy: 0.6111 - val_loss: 0.6910
Epoch 5/50
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 1s/step - accuracy: 0.6287 - loss: 0.6714 - val_accuracy: 0.6111 - val_loss: 0.6860
Epoch 6/50
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 1s/step - accuracy: 0.4592 - loss: 0.6924 - val_accuracy: 0.6111 - val_loss: 0.6907
Epoch 7/50
[1m15/15[0m [32m━━━━━━━━━━

<keras.src.callbacks.history.History at 0x2aeb2d9e2a0>

In [37]:
# model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])


In [65]:
# model.fit(X_train, y_train, epochs=10, validation_split=0.1)


In [82]:
history_df=pd.DataFrame(model.history.history)

In [83]:
eval_result = model.evaluate(X_test, y_test)
print(f"Test Loss: {eval_result[0]}, Test Accuracy: {eval_result[1]}")


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 224ms/step - accuracy: 0.5000 - loss: 1.8260
Test Loss: 1.8259607553482056, Test Accuracy: 0.5


In [84]:
# Assuming X_test is your preprocessed test data ready for prediction
predictions = model.predict(X_test)

# If your model outputs one-hot encoded predictions, find the class with the highest probability
predicted_classes = np.argmax(predictions, axis=1)

# If y_test is also one-hot encoded, convert it back to labels for comparison
true_classes = np.argmax(y_test, axis=1)

# Compare predictions with the true labels
correct_predictions = np.sum(predicted_classes == true_classes)
total_predictions = len(true_classes)

# Print out accuracy or any other performance metric
print(f"Correct Predictions: {correct_predictions}")
print(f"Total Predictions: {total_predictions}")
print(f"Accuracy: {correct_predictions / total_predictions:.2f}")

# Optionally, print the first few predictions
for i in range(5):  # Adjust the range as needed
    print(f"Video {i+1} predicted as {'Real' if predicted_classes[i] == 1 else 'Fake'}, Actual: {'Real' if true_classes[i] == 1 else 'Fake'}")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 952ms/step
Correct Predictions: 9
Total Predictions: 18
Accuracy: 0.50
Video 1 predicted as Fake, Actual: Real
Video 2 predicted as Real, Actual: Fake
Video 3 predicted as Real, Actual: Real
Video 4 predicted as Fake, Actual: Real
Video 5 predicted as Real, Actual: Fake


In [86]:
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing import image
base_model = VGG16(weights='imagenet', include_top=True)
# Create a new model that will output the features from the layer before the final classification layer
model = Model(inputs=base_model.input, outputs=base_model.get_layer('fc2').output)  # 'fc2' is the second-to-last layer in VGG16


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5
[1m553467096/553467096[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m56s[0m 0us/step


In [88]:
model.summary()

In [89]:
def get_frames_traditonal(df=pd.read_csv('annotations.csv'), frame_count=30, target_size=(224, 224)):
    X, y = [], []
    for _, row in df.iterrows():
        frames = preprocess_video(row['video_name'], frame_count, target_size)
        if frames is not None:
            # Initialize a list to hold the features extracted from each frame
            frame_features = []
            for frame in frames:
                # Preprocess the frame for VGG16
                x = np.expand_dims(frame, axis=0)
                x = preprocess_input(x)
                # Extract features
                features = model.predict(x)
                frame_features.append(features.flatten())
            
            # Aggregate frame features (example: taking the mean across all frames)
            video_features = np.mean(frame_features, axis=0)
            X.append(video_features)
            y.append(row['label'])  # Labeling each feature vector with its corresponding label
    return np.array(X), np.array(y)

In [94]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.85, random_state=5)
# Reshape or flatten the feature vectors
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_test_flat = X_test.reshape(X_test.shape[0], -1)

y_train_1d = np.argmax(y_train, axis=1)


In [97]:
from sklearn.metrics import accuracy_score

# Logistic Regression
from sklearn.linear_model import LogisticRegression
clf_lr = LogisticRegression(random_state=0, max_iter=1000)
# Train logistic regression model
clf_lr.fit(X_train_flat, y_train_1d)


# Predict with logistic regression model
y_pred_lr = clf_lr.predict(X_test_flat)

# Calculate accuracy
# Convert probabilities to binary labels
threshold = 0.5
y_pred_binary = (y_pred_lr > threshold).astype(int)
y_test_binary = np.argmax(y_test, axis=1)

print("Logistic Regression Accuracy:", accuracy_score(y_test_binary, y_pred_binary))


Logistic Regression Accuracy: 0.42857142857142855


In [105]:
def preprocess_video_regression(video_path, frame_count=30, target_size=(224, 224)):
    cap = cv2.VideoCapture(str(video_path))
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret or len(frames) == frame_count:
                break
            frame = cv2.resize(frame, target_size)  # Resize frame
            frame = frame / 255.0  # Normalize pixel values
            frames.append(frame)
    finally:
        cap.release()
    # Only return if we have enough frames
    if len(frames) == frame_count:
        return np.array(frames)
    else:
        return None

def extract_features_regression(df=pd.read_csv('annotations.csv'), frame_count=30, target_size=(224, 224)):
    X, y = [], []
    for _, row in df.iterrows():
        frames = preprocess_video(row['video_name'], frame_count, target_size)
        if frames is not None:
            # Initialize a list to hold the features extracted from each frame
            frame_features = []
            for frame in frames:
                # Preprocess the frame for VGG16
                x = np.expand_dims(frame, axis=0)
                x = preprocess_input(x)
                # Extract features
                features = model.predict(x)
                frame_features.append(features.flatten())
            
            # Aggregate frame features (example: taking the mean across all frames)
            video_features = np.mean(frame_features, axis=0)
            X.append(video_features)
            y.append(row['label'])
    return np.array(X), np.array(y)


In [104]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Flatten the feature matrices
X_train_flattened = X_train.reshape(X_train.shape[0], -1)
X_val_flattened = X_val.reshape(X_val.shape[0], -1)

# Initialize logistic regression model
clf_lr = LogisticRegression(max_iter=1000)

best_accuracy = 0  # Initialize variable to track best validation accuracy
no_improvement_count = 0  # Initialize counter for no improvement in validation accuracy

for i in range(1000):  # Manually control maximum number of iterations
    # Train logistic regression model on flattened training data
    clf_lr.fit(X_train_flattened, y_train)
    
    # Predict on flattened validation set
    y_pred_val = clf_lr.predict(X_val_flattened)
    
    # Calculate accuracy on validation set
    accuracy_val = accuracy_score(y_val, y_pred_val)
    print(f"Iteration {i+1}: Validation Accuracy = {accuracy_val}")
    
    # Check for improvement in validation accuracy
    if accuracy_val > best_accuracy:
        best_accuracy = accuracy_val
        no_improvement_count = 0
    else:
        no_improvement_count += 1
    
    # Early stopping criteria: Stop if no improvement in validation accuracy for 10 consecutive iterations
    if no_improvement_count >= 10:
        print("Early stopping due to no improvement in validation accuracy.")
        break

# Predict on test set
X_test_flattened = X_test.reshape(X_test.shape[0], -1)
y_pred_lr = clf_lr.predict(X_test_flattened)

# Calculate accuracy on test set
accuracy_test = accuracy_score(y_test, y_pred_lr)
print("Test Accuracy:", accuracy_test)


ValueError: y should be a 1d array, got an array of shape (21, 2) instead.