In [None]:
!pip install -U --upgrade tensorflow

In [2]:
# from tensorflow_docs.vis import embed
from tensorflow import keras
#from imutils import paths

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os
import time

In [None]:
DATA_FOLDER = '/kaggle/input/deepfake-detection-challenge'
TRAIN_SAMPLE_FOLDER = '/kaggle/input/deepfake-detection-challenge/train_sample_videos'
TEST_FOLDER = '/kaggle/input/deepfake-detection-challenge/test_videos'

print(f"Train samples: {len(os.listdir(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER)))}")
print(f"Test samples: {len(os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER)))}")

In [None]:
train_sample_metadata = pd.read_json('../input/deepfake-detection-challenge/train_sample_videos/metadata.json').T
train_sample_metadata.head()

In [None]:
train_sample_metadata.groupby('label')['label'].count().plot(figsize=(15, 5), kind='bar', title='Distribution of Labels in the Training Set')
plt.show()

In [None]:
train_sample_metadata.shape

In [None]:
fake_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='FAKE'].sample(10).index)
fake_train_sample_video

In [8]:
def display_image_from_video(video_path):
    
    capture_image = cv2.VideoCapture(video_path) 
    ret, frame = capture_image.read()
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(111)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    ax.imshow(frame)

In [None]:
for video_file in fake_train_sample_video:
    display_image_from_video(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER, video_file))

In [None]:
real_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.label=='REAL'].sample(3).index)
real_train_sample_video

In [None]:
for video_file in real_train_sample_video:
    display_image_from_video(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER, video_file))

In [None]:
train_sample_metadata['original'].value_counts()[0:5]

In [13]:
def display_image_from_video_list(video_path_list, video_folder=TRAIN_SAMPLE_FOLDER):
    
    plt.figure()
    fig, ax = plt.subplots(2,3,figsize=(16,8))
    # we only show images extracted from the first 6 videos
    for i, video_file in enumerate(video_path_list[0:6]):
        video_path = os.path.join(DATA_FOLDER, video_folder,video_file)
        capture_image = cv2.VideoCapture(video_path) 
        ret, frame = capture_image.read()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        ax[i//3, i%3].imshow(frame)
        ax[i//3, i%3].set_title(f"Video: {video_file}")
        ax[i//3, i%3].axis('on')

In [None]:
same_original_fake_train_sample_video = list(train_sample_metadata.loc[train_sample_metadata.original=='atvmxvwyns.mp4'].index)
display_image_from_video_list(same_original_fake_train_sample_video)

In [15]:
test_videos = pd.DataFrame(list(os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER))), columns=['video'])

In [None]:
test_videos.head()

In [None]:
display_image_from_video(os.path.join(DATA_FOLDER, TEST_FOLDER, test_videos.iloc[2].video))

In [18]:
fake_videos = list(train_sample_metadata.loc[train_sample_metadata.label=='FAKE'].index)

In [None]:
from IPython.display import HTML
from base64 import b64encode

def play_video(video_file, subset=TRAIN_SAMPLE_FOLDER):
    
    video_url = open(os.path.join(DATA_FOLDER, subset,video_file),'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(video_url).decode()
    return HTML("""<video width=500 controls><source src="%s" type="video/mp4"></video>""" % data_url)

play_video(fake_videos[10])

In [20]:
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 10

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048

In [21]:
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)

In [None]:
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()

In [23]:
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = list(df.index)
    labels = df["label"].values
    labels = np.array(labels=='FAKE').astype(int)

    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # For each video.
    for idx, path in enumerate(video_paths):
        # Gather all its frames and add a batch dimension.
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]

        # Initialize placeholders to store the masks and features of the current video.
        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # Extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels

In [None]:
from sklearn.model_selection import train_test_split

Train_set, Test_set = train_test_split(train_sample_metadata,test_size=0.1,random_state=42,stratify=train_sample_metadata['label'])

print(Train_set.shape, Test_set.shape )

In [None]:
train_data, train_labels = prepare_all_videos(Train_set, "train")
test_data, test_labels = prepare_all_videos(Test_set, "test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

In [None]:
from keras.layers import Bidirectional, BatchNormalization

frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

# Increase model complexity
x = Bidirectional(keras.layers.GRU(128, return_sequences=True))(frame_features_input, mask=mask_input)
x = BatchNormalization()(x)
x = Bidirectional(keras.layers.GRU(64))(x)
x = keras.layers.Dropout(0.5)(x)  # Increased dropout rate

# Add more dense layers
x = keras.layers.Dense(64, activation="relu")(x)
x = BatchNormalization()(x)
x = keras.layers.Dense(32, activation="relu")(x)

# Output layer
output = keras.layers.Dense(1, activation="sigmoid")(x)

# Model definition
model = keras.Model([frame_features_input, mask_input], output)

# Compile the model with a modified optimizer
optimizer = keras.optimizers.Adam(learning_rate=0.001)
model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])

# Model summary
model.summary()


In [None]:
frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

# Increase model complexity
x = keras.layers.GRU(64, return_sequences=True)(frame_features_input, mask=mask_input)
x = keras.layers.GRU(32)(x)

# Dropout for regularization (adjust dropout rate)
x = keras.layers.Dropout(0.2)(x)

# Increase units in dense layer
x = keras.layers.Dense(32, activation="relu")(x)

# Output layer
output = keras.layers.Dense(1, activation="sigmoid")(x)

# Model definition
model = keras.Model([frame_features_input, mask_input], output)

# Compile the model
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

# Model summary
model.summary()

In [None]:
checkpoint = keras.callbacks.ModelCheckpoint('./', save_weights_only=True, save_best_only=True)
history = model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_data=([test_data[0], test_data[1]],test_labels),
        callbacks=[checkpoint],
        epochs=10,
        batch_size=2
    )

In [None]:
import matplotlib.pyplot as plt

# Plot training & validation accuracy values
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [30]:
def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

    return frame_features, frame_mask

def sequence_prediction(path):
    frames = load_video(os.path.join(DATA_FOLDER, TEST_FOLDER,path))
    frame_features, frame_mask = prepare_single_video(frames)
    return model.predict([frame_features, frame_mask])[0]
    
# This utility is for visualization.
# Referenced from:
# https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub
def to_gif(images):
    converted_images = images.astype(np.uint8)
    imageio.mimsave("animation.gif", converted_images, fps=30)
    return embed.embed_file("animation.gif")

In [None]:
test_video = "/kaggle/input/deepfake-detection-challenge/train_sample_videos/aelfnikyqj.mp4"
print(f"Test video path: {test_video}")

if(sequence_prediction(test_video)<=0.5):
    print(f'The predicted class of the video is FAKE')
else:
    print(f'The predicted class of the video is REAL')

play_video(test_video,TEST_FOLDER)

In [None]:
test_video = "/kaggle/input/deepfake-detection-challenge/train_sample_videos/aagfhgtpmv.mp4"
print(f"Test video path: {test_video}")

if(sequence_prediction(test_video)<=0.5):
    print(f'The predicted class of the video is FAKE')
else:
    print(f'The predicted class of the video is REAL')
    

play_video(test_video,TEST_FOLDER)

In [None]:
from sklearn.model_selection import KFold
from tensorflow import keras
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler
import numpy as np

def build_model():
    from keras.layers import Bidirectional, BatchNormalization

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # Increase model complexity
    x = Bidirectional(keras.layers.GRU(128, return_sequences=True))(frame_features_input, mask=mask_input)
    x = BatchNormalization()(x)
    x = Bidirectional(keras.layers.GRU(64))(x)
    x = keras.layers.Dropout(0.5)(x)  # Increased dropout rate

    # Add more dense layers
    x = keras.layers.Dense(64, activation="relu")(x)
    x = BatchNormalization()(x)
    x = keras.layers.Dense(32, activation="relu")(x)

    # Output layer
    output = keras.layers.Dense(1, activation="sigmoid")(x)

    # Model definition
    model = keras.Model([frame_features_input, mask_input], output)

    # Compile the model with a modified optimizer
    optimizer = keras.optimizers.Adam(learning_rate=0.001)
    model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])

    # Model summary
    return model


def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

kf = KFold(n_splits=5, shuffle=True, random_state=42)

all_features = np.random.randn(100, MAX_SEQ_LENGTH, NUM_FEATURES)  
all_labels = np.random.randint(2, size=(100, 1))                  

fold_no = 1
for train_index, test_index in kf.split(all_features):
    train_features, test_features = all_features[train_index], all_features[test_index]
    train_labels, test_labels = all_labels[train_index], all_labels[test_index]
    
    train_mask = np.ones((train_features.shape[0], MAX_SEQ_LENGTH), dtype=bool)
    test_mask = np.ones((test_features.shape[0], MAX_SEQ_LENGTH), dtype=bool)
    
    model = build_model()

    lr_scheduler = LearningRateScheduler(scheduler)
    early_stopping = EarlyStopping(monitor='val_loss', patience=5)

    print(f'Training for fold {fold_no}...')
    history = model.fit(
        [train_features, train_mask], 
        train_labels,
        validation_data=([test_features, test_mask], test_labels),
        epochs=30,
        batch_size=10,
        callbacks=[lr_scheduler, early_stopping]
    )
    
    fold_no += 1


In [None]:
model.summary()

In [None]:
test_video = "/kaggle/input/srkiisfake/WhatsApp Video 2024-05-10 at 16.50.47_5f4b00c2.mp4"
print(f"Test video path: {test_video}")

if(sequence_prediction(test_video)<=0.5):
    print(f'The predicted class of the video is FAKE')
else:
    print(f'The predicted class of the video is REAL')
    

play_video(test_video,TEST_FOLDER)

In [None]:
from time import time
start_time=time()
test_video = "/kaggle/input/yashraj/final_WhatsApp_Video_2024-05-16_at_1.24.11_PM-1715846270537-1735-945ab71d-dffd-4200-ba1a-43548d8b6a74-9455.mp4"
print(f"Test video path: {test_video}")

if(sequence_prediction(test_video)<=0.5):
    print(f'The predicted class of the video is FAKE')
    print(time()-start_time)
else:
    print(f'The predicted class of the video is REAL')
    print(time()-start_time)
    

play_video(test_video,TEST_FOLDER)

In [37]:
model.save('my_model.h5')