In [1]:
relative_base_path = './'

fake_dataset_dir = f"{relative_base_path}dataset/manipulated_sequences"
real_dataset_dir = f"{relative_base_path}dataset/original_sequences"

fake_output_dir = f"{relative_base_path}out/fake"
real_output_dir = f"{relative_base_path}out/real"

In [2]:
import os
import glob
import numpy as np
from tqdm import tqdm
from models.blink_detection.DetectBlinking import DetectBlinking
from sklearn.decomposition import PCA

In [5]:
data = np.load("./out/fake/DeepFakeDetection_c40_videos_01_02__exit_phone_room__YVGY8LOK.npz")
data["features"].shape

(210, 53)

In [7]:
def apply_pca(features, n_components=50):
    pca = PCA(n_components=n_components)
    reduced_features = pca.fit_transform(features)

    return reduced_features

In [8]:
def save(path, output_filename, features):
    if not (os.path.exists(f"{path}{output_filename}")):
        os.makedirs(path, exist_ok=True)
        ds = {"ORE_MAX_GIORNATA": 5}
        np.savez_compressed(os.path.join(path, output_filename), ds)

    print(f"Video Processed | Features: ", features.shape)
    np.savez_compressed(f"{path}/{output_filename}", features=features)

In [9]:
def extract_features_and_save(video_paths, output_dir, output_files):
    saved_useless_files = np.load("out/useless_files.npy")

    useless_files = saved_useless_files.tolist()
    print("saved_useless_files: ", len(useless_files))
    for idx, p in enumerate(video_paths):
        filePath, tail = os.path.split(p)
        orig_name = tail.split(".")[0]

        splitted_path = filePath.split("/")
        name = f"{splitted_path[3]}_{splitted_path[4]}_{splitted_path[5]}_{orig_name}"

        np_path = output_dir + f"/{name}.npz"
        output_filename = f"{name}.npz"
        path = output_dir

        if p not in saved_useless_files:
            if np_path in output_files:
                print(f"{idx} => File already processed: ", np_path)
            else:
                try:
                    detect_blinking = DetectBlinking(
                        p, 0.3, 4,
                        crop_face=True,
                        return_features=True,
                        process=True,
                        logs=False,
                    )
                    # print(f"Path: {p}")
                    video_features, ear_features = detect_blinking.process_video()

                    if video_features is not None and len(video_features) > 0:
                        video_features = np.array(video_features)
                        ear_features = np.array(ear_features)
                        reduced_features = apply_pca(video_features, n_components=50)
                        final_features = np.concatenate((reduced_features, ear_features), axis=1)
                        print(f"{idx} Final Features: ", final_features.shape)
                        save(path, output_filename, final_features)
                    else:
                        useless_files.append(p)
                        print(f"{idx} Video Skipped...", len(useless_files))
                        np.save("./out/useless_files.npy", useless_files)
                except Exception as e:
                    print(f"Found error is path: {p}")
                    print(f"Error: {e}")
                    useless_files.append(p)
                    print(f"{idx} Video Skipped...", len(useless_files))
                    np.save("./out/useless_files.npy", useless_files)

        else:
            print(f"File found in useless list: {p}")

In [10]:
extracted_fake_paths_npy = np.array(glob.glob(fake_output_dir + "/*.npz"))
extracted_real_paths_npy = np.array(glob.glob(real_output_dir + "/*.npz"))

print("extracted_fake_paths_npy: ", extracted_fake_paths_npy.shape)
print("extracted_real_paths_npy: ", extracted_real_paths_npy.shape)

fake_mp4_paths = glob.glob(fake_dataset_dir + "/*/*/*/*.mp4")
real_mp4_paths = glob.glob(real_dataset_dir + "/*/*/*/*.mp4")
print("fake_mp4_paths: ", len(fake_mp4_paths))
print("real_mp4_paths: ", len(real_mp4_paths))

extracted_fake_paths_npy:  (3238,)
extracted_real_paths_npy:  (1015,)
fake_mp4_paths:  1000
real_mp4_paths:  1363


In [11]:
useless_paths = np.load("out/useless_files.npy")
print(len(useless_paths))

2464


In [None]:
# Extract features and save them as .npy files

extract_features_and_save(fake_mp4_paths, fake_output_dir, extracted_fake_paths_npy)
# extract_features_and_save(real_mp4_paths, real_output_dir, extracted_real_paths_npy)
print("ITS DONE")
print("ITS DONE")
print("ITS DONE")
print("ITS DONE")

In [5]:
def pad_to_max_length(array, max_length, pad_value = 0):
    if array.ndim == 2:
        padded = np.pad(array, ((0, max_length - len(array)), (0, 0)), mode="constant", constant_values=pad_value)
    else:
        padded = np.pad(array, (0, max_length - len(array)), mode='constant', constant_values=pad_value)

    return padded

In [6]:
extracted_fake_paths_npy = np.array(glob.glob(fake_output_dir + "/*.npz"))
extracted_real_paths_npy = np.array(glob.glob(real_output_dir + "/*.npz"))

print("extracted_fake_paths_npy: ", len(extracted_fake_paths_npy), extracted_fake_paths_npy.shape)
print("extracted_real_paths_npy: ", len(extracted_real_paths_npy), extracted_real_paths_npy.shape)

extracted_fake_paths_npy:  3238 (3238,)
extracted_real_paths_npy:  1015 (1015,)


In [7]:
for idx, path in enumerate(extracted_fake_paths_npy):
    features = np.load(path)
    try:
        print(f"{idx} --- {features["features"][0][0]}")
    except Exception as e:
        print(f"{idx} --- {path}")
        print(f"Error: {e}")


0 --- 20.14983558654785
1 --- 5.076423168182373
2 --- 3.8185863494873047
3 --- -2.4431378841400146
4 --- -0.7283381223678589
5 --- -2.130507707595825
6 --- 7.9461989402771
7 --- 3.238757610321045
8 --- 1.1928497552871704
9 --- -2.7353718280792236
10 --- -4.196608066558838
11 --- 1.8785152435302734
12 --- 0.7933956980705261
13 --- -4.984035968780518
14 --- 0.847224235534668
15 --- 6.581628799438477
16 --- -2.55808424949646
17 --- 1.1671017408370972
18 --- -3.501603841781616
19 --- -5.601445198059082
20 --- -0.9172372817993164
21 --- -0.8826799392700195
22 --- -12.448854446411133
23 --- -6.915759086608887
24 --- -0.7525575160980225
25 --- -0.6487594246864319
26 --- 3.57362699508667
27 --- 3.497943878173828
28 --- 4.238090515136719
29 --- -9.469429016113281
30 --- -0.8067854046821594
31 --- -9.207867622375488
32 --- -10.441001892089844
33 --- -3.1694846153259277
34 --- 3.616042375564575
35 --- -0.32208701968193054
36 --- -2.0773205757141113
37 --- -2.6327414512634277
38 --- -3.48003864288

In [13]:
# Load the features and pad them to the same length
fake_features = []
real_features = []

# Load the features, 
for idx, path in enumerate(extracted_fake_paths_npy):
    features = np.load(path)["features"]
    fake_features.append(features)

for idx, path in enumerate(extracted_real_paths_npy):
    features = np.load(path)["features"]
    real_features.append(features)

# fake_features = np.array(fake_features)
# real_features = np.array(real_features)

print("fake_features: ", len(fake_features))
print("real_features: ", len(real_features))

fake_features:  3238
real_features:  1015


In [8]:
# Get the max length of the features
max_length = max(max(len(features) for features in fake_features), max(len(features) for features in real_features))
print("Max length of features: ", max_length)

Max length of features:  1814


In [9]:
# Get the max length of the features
min_length = min(min(len(features) for features in fake_features), min(len(features) for features in real_features))
print("Min length of features: ", min_length)

Min length of features:  61


In [14]:
# len(fake_features):  3238
# len(real_features):  1015

# Pad the features
fake_features_padded = []
real_features_padded = []


for idx, features in enumerate(fake_features):
    if len(features) > 200:
        padded_arr = pad_to_max_length(features, max_length)
        fake_features_padded.append(padded_arr)

fake_features_padded = np.array(fake_features_padded)

for idx, features in enumerate(real_features):
    if len(features) > 200:
        padded_arr = pad_to_max_length(features, max_length)
        real_features_padded.append(padded_arr)

real_features_padded = np.array(real_features_padded)

print("fake_features_padded: ", fake_features_padded.shape)
print("real_features_padded: ", real_features_padded.shape)

NameError: name 'pad_to_max_length' is not defined

In [15]:
import numpy as np

def sliding_window(features, window_size=200, stride=100):
    """Generates sliding windows over the feature sequence."""
    windows = []
    for start in range(0, len(features) - window_size + 1, stride):
        window = features[start:start + window_size]
        windows.append(window)
    return windows

# Parameters
window_size = 200
stride = 100

# Process fake features with sliding window
fake_features_windowed = []
for features in fake_features:
    if len(features) >= window_size:
        windows = sliding_window(features, window_size, stride)
        fake_features_windowed.extend(windows)

fake_features_windowed = np.array(fake_features_windowed)

# Process real features with sliding window
real_features_windowed = []
for features in real_features:
    if len(features) >= window_size:
        windows = sliding_window(features, window_size, stride)
        real_features_windowed.extend(windows)

real_features_windowed = np.array(real_features_windowed)

print("fake_features_windowed: ", fake_features_windowed.shape)
print("real_features_windowed: ", real_features_windowed.shape)


fake_features_windowed:  (14821, 200, 53)
real_features_windowed:  (4077, 200, 53)


In [16]:
all_fake_features_windowed = fake_features_windowed
fake_features_windowed = fake_features_windowed[:10000]

print("fake_features_windowed: ", fake_features_windowed.shape)
print("real_features_windowed: ", real_features_windowed.shape)

fake_features_windowed:  (10000, 200, 53)
real_features_windowed:  (4077, 200, 53)


In [17]:
import numpy as np
from sklearn.utils import resample

# Assuming real_features_windowed and fake_features_windowed are NumPy arrays
real_videos_upsampled = resample(
    real_features_windowed, 
    replace=True,        # Allow duplicates
    n_samples=len(fake_features_windowed),  # Match the fake video count
    random_state=42
)

print("Balanced real_features_windowed shape:", real_videos_upsampled.shape)

Balanced real_features_windowed shape: (10000, 200, 53)


In [19]:
# Save the fake and real features in a single .npy file with the respective targets i.e. 0 for fake and 1 for real
# fake_targets = np.zeros(fake_features_padded.shape[0])
# real_targets = np.ones(real_features_padded.shape[0])
fake_targets = np.zeros(fake_features_windowed.shape[0])
real_targets = np.ones(real_videos_upsampled.shape[0])


all_features = np.concatenate((fake_features_windowed, real_videos_upsampled), axis=0)
all_targets = np.concatenate((fake_targets, real_targets), axis=0)

print("X_dataset: ", all_features.shape)
print("Y_dataset: ", all_targets.shape)

X_dataset:  (20000, 200, 53)
Y_dataset:  (20000,)


In [20]:
np.savez_compressed(f"{relative_base_path}out/pca_features_{len(all_features)}", X_dataset=all_features, Y_dataset=all_targets)