In [1]:
# import the built-in functions
from tensorflow import keras
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import cv2
import os
import face_recognition


In [2]:
DATA_FOLDER = './deepfake_detection/'
TRAIN_SAMPLE_FOLDER = 'train_sample_videos'
TEST_FOLDER = 'test_videos'

print(f"Train samples: {len(os.listdir(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER)))}")
print(f"Test samples: {len(os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER)))}")

Train samples: 401
Test samples: 400


In [3]:
train_sample_metadata = pd.read_json('./deepfake_detection/train_sample_videos/metadata.json').T
train_sample_metadata.head()

Unnamed: 0,label,split,original
aagfhgtpmv.mp4,FAKE,train,vudstovrck.mp4
aapnvogymq.mp4,FAKE,train,jdubbvfswz.mp4
abarnvbtwb.mp4,REAL,train,
abofeumbvv.mp4,FAKE,train,atvmxvwyns.mp4
abqwwspghj.mp4,FAKE,train,qzimuostzz.mp4


In [4]:
test_videos = pd.DataFrame(list(os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER))), columns=['video'])
test_videos.head()

Unnamed: 0,video
0,aassnaulhq.mp4
1,aayfryxljh.mp4
2,acazlolrpz.mp4
3,adohdulfwb.mp4
4,ahjnxtiamx.mp4


In [5]:
# create a function to display video
from IPython.display import HTML
from base64 import b64encode

def play_video(video_file, subset=TRAIN_SAMPLE_FOLDER):

    video_url = open(os.path.join(DATA_FOLDER, subset,video_file),'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(video_url).decode()
    return HTML("""<video width=500 controls><source src="%s" type="video/mp4"></video>""" % data_url)

In [6]:
# Modeling

IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 10

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048

In [57]:
# pretreatment of videos
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

def face_loc_img (frame):
    face_loc = face_recognition.face_locations(frame)
    for face_location in face_loc:
        top, right, bottom, left = face_location
        face_image = frame[top:bottom, left:right]
        face_image = cv2.resize(face_image, (IMG_SIZE, IMG_SIZE))
        return face_image

def load_facess(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = face_loc_img(frame)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    except Exception:
        pass
    else:
        cap.release()
    return np.array(frames)

In [None]:
# check the original frame of video
def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)
    
plt.imshow(load_video("deepfake_detection/train_sample_videos/adohikbdaz.mp4")[1])

In [None]:
# check the facial frame of video

plt.imshow(face_loc_img(load_facess("deepfake_detection/train_sample_videos/adohikbdaz.mp4")[1]))

In [None]:
# show frames at intervals of 10 

videoframedetect = load_video("deepfake_detection/train_sample_videos/adohikbdaz.mp4")

fig, axs = plt.subplots(4, 4, figsize=(15, 15))
axs = np.array(axs)
axs = axs.reshape(-1)
j = 0
for i in [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150]:
    ax = videoframedetect[i]
    axs[j].set_title(f"frame:{i}")
    axs[j].imshow(videoframedetect[i])
    j +=1
plt.show()

In [None]:
# show the first 30 frames of video
videotest = load_facess("deepfake_detection/train_sample_videos/adohikbdaz.mp4")

fig, axs = plt.subplots(6, 5, figsize=(15, 20))
axs = np.array(axs)
axs = axs.reshape(-1)
for i in range (len(videotest)//10):
    ax = axs[i]
    ax.title.set_text("frame:"+str(i))
    ax.imshow(videotest[i])
    
plt.show()

In [None]:
# show facial frames at intervals of 10 
facialdetect = load_facess("deepfake_detection/train_sample_videos/adohikbdaz.mp4")

fig, axs = plt.subplots(4, 4, figsize=(15, 15))
axs = np.array(axs)
axs = axs.reshape(-1)
j = 0
for i in [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150]:
    ax = facialdetect[i]
    axs[j].set_title(f"frame:{i}")
    axs[j].imshow(facialdetect[i])
    j +=1
plt.show()

In [15]:
# create a feature extraction function with Inception V3 model
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()

In [16]:
# pretreatment of training videos
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = list(df.index)
    labels = df["label"].values
    labels = np.array(labels=='FAKE').astype(np.int)
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    for idx, path in enumerate(video_paths):
        frames = load_facess(os.path.join(root_dir, path))
        frames = frames[None, ...]
        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 1

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels

In [17]:
# split training videos into training set and testing set
from sklearn.model_selection import train_test_split

Train_set, Test_set = train_test_split(train_sample_metadata,test_size=0.05,random_state=42,stratify=train_sample_metadata['label'])

print(Train_set.shape, Test_set.shape )

(380, 3) (20, 3)


In [18]:
train_data, train_labels = prepare_all_videos(Train_set, "train")
test_data, test_labels = prepare_all_videos(Test_set, "test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  labels = np.array(labels=='FAKE').astype(np.int)


Frame features in train set: (380, 20, 2048)
Frame masks in train set: (380, 20)


In [19]:
# model training
frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

x = keras.layers.GRU(16, return_sequences=True)(
    frame_features_input, mask=mask_input
)
x = keras.layers.GRU(8)(x)
x = keras.layers.Dropout(0.4)(x)
x = keras.layers.Dense(8, activation="relu")(x)
output = keras.layers.Dense(1, activation="sigmoid")(x)

model = keras.Model([frame_features_input, mask_input], output)

model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 20, 2048)]   0           []                               
                                                                                                  
 input_4 (InputLayer)           [(None, 20)]         0           []                               
                                                                                                  
 gru (GRU)                      (None, 20, 16)       99168       ['input_3[0][0]',                
                                                                  'input_4[0][0]']                
                                                                                                  
 gru_1 (GRU)                    (None, 8)            624         ['gru[0][0]']                

In [20]:
checkpoint = keras.callbacks.ModelCheckpoint('./', save_weights_only=True, save_best_only=True)
history = model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_data=([test_data[0], test_data[1]],test_labels),
        callbacks=[checkpoint],
        epochs=EPOCHS,
        batch_size=8
    )

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [24]:
# pretreatment of testing videos
def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1

    return frame_features, frame_mask

def sequence_prediction(path):
    frames = load_facess(os.path.join(DATA_FOLDER, TEST_FOLDER,path))
    frame_features, frame_mask = prepare_single_video(frames)
    return model.predict([frame_features, frame_mask])[0]


In [30]:
# read the document for videos' information
dff = pd.read_csv('./metadata_img.csv')
df = pd.DataFrame(dff)
df.head()

Unnamed: 0,videoname,original_width,original_height,label,original
0,aznyksihgl.mp4,129,129,FAKE,xnojggkrxt.mp4
1,gkwmalrvcj.mp4,129,129,FAKE,hqqmtxvbjj.mp4
2,lxnqzocgaq.mp4,223,217,FAKE,xjzkfqddyk.mp4
3,itsbtrrelv.mp4,186,186,FAKE,kqvepwqxfe.mp4
4,ddvgrczjno.mp4,155,155,FAKE,pluadmqqta.mp4


In [None]:
# pick videos randomly to test
test_video = np.random.choice(test_videos["video"].values.tolist())
test_video_str = str(test_video)

print(f"Test video path: {test_video}")
df.loc[df['videoname'] == test_video_str]
if(sequence_prediction(test_video)>=0.6):
    c = "FAKE"
    print(f'The predicted class of the video is',c)
elif(sequence_prediction(test_video)<0.6):
    c = "REAL"
    print(f'The predicted class of the video is',c)


try:
    a = df.loc[df['videoname'] == test_video_str]
    b = a.iloc[0,3]
    print("The video ("+test_video_str+") is actually:",b)
    e = "CORRECT!!"
    f = "WRONG"
    if c == b:
        print("The prediction is",e)
            
    else:
        print("The prediction is",f)
except Exception:
     print("There is no existed data for the test data")


play_video(test_video,TEST_FOLDER)


In [None]:
# accuracy test function

ass = {}
def try_video_with_times(times):
    for test_video in range (times):
        test_video = np.random.choice(test_videos["video"].values.tolist())
        test_video_str = str(test_video)
        print(f"Test video path: {test_video}")
        df.loc[df['videoname'] == test_video_str]
        if(sequence_prediction(test_video)>=0.6):
            c = "FAKE"
            print('The predicted class of the video is',c)
        elif(sequence_prediction(test_video)<0.6):
            c = "REAL"
            print('The predicted class of the video is',c)

        try: 
            a = df.loc[df['videoname'] == test_video_str]
            b = a.iloc[0,3]
            print("The video ("+test_video_str+") is actually:",b)
            e = "CORRECT!!"
            f = "WRONG"
            if c == b:
                print("The prediction is",e)
                ass.update({test_video_str:"correct"})
            else:
                print("The prediction is",f)
                ass.update({test_video_str:"incorrect"})
        except Exception:
            print("There is no existed data for the test data")

    return ass

try_video_with_times(20)
value = list(ass.values())
accuracy = value.count("correct")/len(value)
print("dict:", ass)
print("accuracy", accuracy)

In [55]:
# test function to check the value
def check_video_value_test(videoname):
    test_video_str = str(videoname)
    df.loc[df['videoname'] == test_video_str]
    print("Probability:", sequence_prediction(videoname))
    facialdetect_1 = load_facess(f"deepfake_detection/test_videos/{videoname}")
    fig, axs = plt.subplots(5, 5, figsize=(20, 20))
    axs = np.array(axs)
    axs = axs.reshape(-1)
    j = 0
    for i in [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240]:
        ax = facialdetect_1[i]
        axs[j].set_title(f"frame:{i}")
        axs[j].imshow(facialdetect_1[i])
        j +=1
    plt.show()
    if(sequence_prediction(videoname)>=0.6):
        c = "FAKE"
        print('The predicted class of the video is',c)
    elif(sequence_prediction(videoname)<0.6):
        c = "REAL"
        print('The predicted class of the video is',c)
    try:
        a = df.loc[df['videoname'] == videoname]
        b = a.iloc[0,3]
        print("The video ("+videoname+") is actually:",b)
        if c == b:
            print("The prediction is CORRECT!!")
        else:
            print("The prediction is WRONG")
    except Exception:
        print("There is no existed data for the test data")
    
    return(play_video(videoname,TEST_FOLDER))

In [None]:
check_video_value_test("aktnlyqpah.mp4")

In [30]:
################################### Videos test (10 sec videos on the Internet or input videos) ###############################################

In [59]:
# create functions for videos 
def play_other_video(video_file):
    video_url = open(os.path.join(video_file),'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(video_url).decode()
    return HTML("""<video width=500 controls><source src="%s" type="video/mp4"></video>""" % data_url)

def prepare_single_video_other(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1

    return frame_features, frame_mask

def sequence_prediction_other(video_file):
    frames = load_facess(os.path.join(video_file))
    frame_features, frame_mask = prepare_single_video_other(frames)
    return model.predict([frame_features, frame_mask])[0]

# video test function
def check_other_video_value(videoname):
    if(sequence_prediction_other(videoname)>=0.6):
        c = "FAKE"
        print('The predicted class of the video is',c)
    elif(sequence_prediction_other(videoname)<0.6):
        c = "REAL"
        print('The predicted class of the video is',c)
    print("prediction:",sequence_prediction_other(videoname),c)
    # print the file name
    print("The video is actually:",videoname[22:-5])
    a= load_facess(videoname)
    fig, axs = plt.subplots(3, 3, figsize=(11, 11))
    axs = np.array(axs)
    axs = axs.reshape(-1)
    j = 0
    for i in [0, 25, 50, 75, 100, 125, 150, 175, 200]:
        ax = a[i]
        axs[j].set_title(f"frame:{i}")
        axs[j].imshow(a[i])
        j +=1
    video = play_other_video(videoname)
    return(video)

In [None]:
# fake video test
check_other_video_value("other_video_test/fake/fake1.mp4")

In [None]:
check_other_video_value("other_video_test/fake/fake2.mp4")

In [None]:
check_other_video_value("other_video_test/fake/fake3.mp4")

In [None]:
check_other_video_value("other_video_test/fake/fake4.mp4")

In [None]:
# real video test
check_other_video_value("other_video_test/real/real1.mp4")

In [None]:
check_other_video_value("other_video_test/real/real2.mp4")