In [1]:
import numpy as np
import librosa
import io
import soundfile as sf
from moviepy.editor import VideoFileClip
from tqdm import tqdm
import pickle as pk
import os
import tensorflow as tf
# from tensorflow.keras.saving import register_keras_serializable
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

In [2]:
# real_audio_dir = (
#     r"H:\.shortcut-targets-by-id\1jH_pc6mMj0Iu8wLS1r0vggMWpVElJvOU\SIH2024_DATASET\REAL"
# )
# fake_audio_dir = (
#     r"H:\.shortcut-targets-by-id\1jH_pc6mMj0Iu8wLS1r0vggMWpVElJvOU\SIH2024_DATASET\FAKE"
# )

In [3]:
# with open(
#     r"H:\.shortcut-targets-by-id\1jH_pc6mMj0Iu8wLS1r0vggMWpVElJvOU\SIH2024_DATASET\real_files.pkl",
#     "rb",
# ) as f:
#     real_files = pk.load(f)

# with open(
#     r"H:\.shortcut-targets-by-id\1jH_pc6mMj0Iu8wLS1r0vggMWpVElJvOU\SIH2024_DATASET\fake_files.pkl",
#     "rb",
# ) as f:
#     fake_files = pk.load(f)

In [4]:
# len(real_files), len(fake_files)

In [5]:
# real_files = real_files[:2000]
# fake_files = fake_files[:2000]

In [6]:
# fake_files = fake_files[: len(real_files)]

In [7]:
# len(real_files), len(fake_files)

In [8]:
# def extract_features(file_path):
#     try:
#         # Load the video file
#         video_clip = VideoFileClip(file_path)
#         audio = video_clip.audio
#         fps = audio.fps
#         audio_samples = np.array(
#             list(audio.iter_frames(fps=fps, dtype="float32"))
#         ).flatten()
#         buffer = io.BytesIO()
#         sf.write(buffer, audio_samples, fps, format="wav")
#         buffer.seek(0)
#         x, sr = librosa.load(buffer, sr=None)
#         mfccs = librosa.feature.mfcc(y=x, sr=sr, n_mfcc=20)

#         return mfccs

#     except Exception as e:
#         print(f"Error encountered while parsing file: {file_path}, {e}")
#         return None


# def load_data(real_dir, fake_dir):
#     labels = []
#     features = []

#     # Load real audios
#     for file_name in real_files:
#         file_path = os.path.join(real_dir, file_name)
#         mfccs = extract_features(file_path)
#         if mfccs is not None:
#             features.append(mfccs)
#             labels.append(0)  # 0 for REAL

#     # Load fake audios
#     for file_name in fake_files:
#         file_path = os.path.join(fake_dir, file_name)
#         mfccs = extract_features(file_path)
#         if mfccs is not None:
#             features.append(mfccs)
#             labels.append(1)  # 1 for FAKE

#     return np.array(features), np.array(labels)

In [9]:
def extract_frame_features(file_path, frame_duration=1.0):
    try:
        video_clip = VideoFileClip(file_path)
        audio = video_clip.audio
        fps = audio.fps
        audio_samples = np.array(
            list(audio.iter_frames(fps=fps, dtype="float32"))
        ).flatten()
        buffer = io.BytesIO()
        sf.write(buffer, audio_samples, fps, format="wav")
        buffer.seek(0)
        x, sr = librosa.load(buffer, sr=None)

        # Split audio into frames of 'frame_duration' seconds
        frame_length = int(frame_duration * sr)
        frames = [
            librosa.feature.mfcc(y=x[i : i + frame_length], sr=sr, n_mfcc=20)
            for i in range(0, len(x), frame_length)
            if i + frame_length <= len(x)
        ]

        return frames  # Returns list of MFCCs for each frame

    except Exception as e:
        print(f"Error processing file {file_path}: {e}")
        return None

In [10]:
def load_data(real_dir, fake_dir, real_files, fake_files):
    labels, features = [], []

    # Load real audio frames with progress bar
    print("Loading real audio files:")
    for file_name in tqdm(real_files, desc="Processing Real Files"):
        file_path = os.path.join(real_dir, file_name)
        frame_features = extract_frame_features(file_path)
        if frame_features:
            features.extend(frame_features)
            labels.extend([0] * len(frame_features))  # Label 0 for REAL

    # Load fake audio frames with progress bar
    print("Loading fake audio files:")
    for file_name in tqdm(fake_files, desc="Processing Fake Files"):
        file_path = os.path.join(fake_dir, file_name)
        frame_features = extract_frame_features(file_path)
        if frame_features:
            features.extend(frame_features)
            labels.extend([1] * len(frame_features))  # Label 1 for FAKE

    # Convert to numpy arrays
    features = np.array(features)
    labels = np.array(labels)

    # Shuffle the data
    indices = np.arange(len(features))
    np.random.shuffle(indices)
    features = features[indices]
    labels = labels[indices]

    return features, labels

In [11]:
# X, y = load_data(real_audio_dir, fake_audio_dir, real_files, fake_files)
# X = X[..., np.newaxis]

In [12]:
# with open("X_for_dl_2000.pkl", "wb") as f:
#     pk.dump(X, f)
# with open("y_for_dl_2000.pkl", "wb") as f:
#     pk.dump(y, f)

In [13]:
with open("X_for_dl_2000.pkl", "rb") as f:
    X = pk.load(f)
with open("y_for_dl_2000.pkl", "rb") as f:
    y = pk.load(f)

In [14]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=30
)

In [15]:
model = models.Sequential(
    [
        layers.Conv1D(
            64,
            kernel_size=3,
            dilation_rate=1,
            padding="causal",
            activation="relu",
            input_shape=(X.shape[1], X.shape[2]),
        ),
        layers.Conv1D(
            128, kernel_size=3, dilation_rate=2, padding="causal", activation="relu"
        ),
        layers.Conv1D(
            256, kernel_size=3, dilation_rate=4, padding="causal", activation="relu"
        ),
        layers.GlobalAveragePooling1D(),
        layers.Dropout(0.5),
        layers.Dense(64, activation="relu"),
        layers.Dense(2, activation="softmax"),
    ]
)

In [16]:
model.compile(
    optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

In [17]:
checkpoint = ModelCheckpoint(
    "model/best_model.keras", monitor="val_loss", save_best_only=True
)
early_stopping = EarlyStopping(monitor="val_loss", patience=3)

In [18]:
model.summary()

In [19]:
history = model.fit(
    X_train,
    y_train,
    epochs=100,
    batch_size=16,
    validation_data=(X_test, y_test),
    callbacks=[checkpoint],
)

Epoch 1/100
[1m3998/3998[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3ms/step - accuracy: 0.5424 - loss: 0.9849 - val_accuracy: 0.6379 - val_loss: 0.6318
Epoch 2/100
[1m3998/3998[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 3ms/step - accuracy: 0.6276 - loss: 0.6486 - val_accuracy: 0.6644 - val_loss: 0.6187
Epoch 3/100
[1m3998/3998[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 3ms/step - accuracy: 0.6522 - loss: 0.6277 - val_accuracy: 0.6531 - val_loss: 0.6211
Epoch 4/100
[1m3998/3998[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 3ms/step - accuracy: 0.6602 - loss: 0.6141 - val_accuracy: 0.6815 - val_loss: 0.5839
Epoch 5/100
[1m3998/3998[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 3ms/step - accuracy: 0.6765 - loss: 0.5985 - val_accuracy: 0.6912 - val_loss: 0.5788
Epoch 6/100
[1m3998/3998[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 3ms/step - accuracy: 0.6851 - loss: 0.5915 - val_accuracy: 0.7016 - val_loss: 0.5689
Epoc

In [29]:
# model = tf.keras.models.load_model("model/best_model.keras")
y_pred = model.predict(X_test)
y_pred_labels = np.argmax(y_pred, axis=1)

# Print classification report
print(classification_report(y_test, y_pred_labels, target_names=["REAL", "FAKE"]))

[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step
              precision    recall  f1-score   support

        REAL       0.88      0.57      0.69      7920
        FAKE       0.69      0.92      0.79      8072

    accuracy                           0.75     15992
   macro avg       0.78      0.75      0.74     15992
weighted avg       0.78      0.75      0.74     15992



In [30]:
def test_on_video(file_path, frame_duration=1.0):
    # Load the trained model
    model = tf.keras.models.load_model("model/best_model.keras")

    # Extract features for each frame in the new video
    frames = extract_frame_features(file_path, frame_duration)

    if frames is None:
        print("No frames extracted.")
        return

    # Reshape frames for model input
    frames = np.array(frames)[..., np.newaxis]

    # Predict on each frame
    predictions = model.predict(frames)
    pred_labels = np.argmax(predictions, axis=1)

    # Store deepfake frames
    deepfake_frames = []
    deepfake_indices = []

    # Identify deepfake frames
    for i, label in enumerate(pred_labels):
        if label == 1:  # If the label is FAKE
            deepfake_frames.append(frames[i])
            deepfake_indices.append(i)

    if not deepfake_frames:
        print("No deepfake frames detected in the video.")
        return

    # Analyze deepfake frames
    print(f"Found {len(deepfake_frames)} deepfake frames:")
    for i in deepfake_indices:
        status = "FAKE"
        print(f"Frame {i + 1}: {status}")


# Example usage
test_video_path = r"REAL\eudeqjhdfd.mp4"  # Replace with your test video path
test_on_video(test_video_path)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 89ms/step
No deepfake frames detected in the video.


In [22]:
# def test_on_video(file_path, frame_duration=1.0):
#     # Load the trained model
#     model = tf.keras.models.load_model("model/best_model.keras")

#     # Extract features for each frame in the new video
#     frames = extract_frame_features(file_path, frame_duration)

#     if frames is None:
#         print("No frames extracted.")
#         return

#     # Reshape frames for model input
#     frames = np.array(frames)[..., np.newaxis]

#     # Predict on each frame
#     predictions = model.predict(frames)
#     pred_labels = np.argmax(predictions, axis=1)

#     # Output results for each frame
#     for i, label in enumerate(pred_labels):
#         status = "REAL" if label == 0 else "FAKE"
#         print(f"Frame {i+1}: {status}")


# # Example usage
# test_video_path = r"REAL\bddjdhzfze.mp4"  # Replace with your test video path
# test_on_video(test_video_path)

In [23]:
# @register_keras_serializable()
# class AudioModel(tf.keras.Model):
#     def __init__(self, input_shape):
#         super(AudioModel, self).__init__()
#         self.input_shape = input_shape  # Store the input shape
#         # Define the model layers
#         self.conv1 = layers.Conv2D(
#             32, kernel_size=(3, 3), activation="relu", input_shape=input_shape
#         )
#         self.conv2 = layers.Conv2D(64, kernel_size=(3, 3), activation="relu")
#         self.pool = layers.MaxPooling2D(pool_size=(2, 2))
#         self.dropout1 = layers.Dropout(0.25)

#         self.reshape = layers.Reshape((64, -1))
#         self.gru = layers.Bidirectional(layers.GRU(128, return_sequences=False))

#         self.dense1 = layers.Dense(128, activation="relu")
#         self.dropout2 = layers.Dropout(0.5)
#         self.dense2 = layers.Dense(2, activation="softmax")

#     def call(self, inputs):
#         # Forward pass through the layers
#         x = self.conv1(inputs)
#         x = self.conv2(x)
#         x = self.pool(x)
#         x = self.dropout1(x)

#         x = self.reshape(x)
#         x = self.gru(x)

#         x = self.dense1(x)
#         x = self.dropout2(x)
#         return self.dense2(x)

#     def get_config(self):
#         config = super(AudioModel, self).get_config()
#         config.update(
#             {"input_shape": self.input_shape}  # Include input shape in config
#         )
#         return config

#     @classmethod
#     def from_config(cls, config):
#         # Create a model instance from the config
#         input_shape = config.pop("input_shape")  # Extract input_shape from config
#         return cls(input_shape)  # Create an instance of the model


# # Function to create and compile the model
# def create_model(input_shape):
#     model = AudioModel(input_shape)
#     model.compile(
#         optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
#     )
#     return model


# # Example usage
# input_shape = (
#     64,
#     40,
#     1,
# )  # Adjust based on your data (e.g., (n_mfccs, time_steps, channels))

In [24]:
# model = create_model(input_shape)
# model.summary()

In [25]:
# checkpoint = ModelCheckpoint(r"models/dl_model.keras", monitor="val_loss", save_best_only=True, verbose=1)
# early_stopping = EarlyStopping(monitor="val_loss", patience=5, verbose=1)

# history = model.fit(
#     X_train, y_train, epochs=10, batch_size=16, validation_data=(X_test, y_test), callbacks=[checkpoint, early_stopping]
# )

In [26]:
# model.save(r"models/dl_model.keras", overwrite=True)
# print("Model saved successfully.")

In [27]:
# # Ensure to import keras properly
# import tensorflow as tf
# from tensorflow import keras


# # Function to load the model
# def load_model(model_path):
#     try:
#         # Load the model from the specified path
#         model = keras.models.load_model(model_path)
#         print("Model loaded successfully.")
#         return model
#     except Exception as e:
#         print(f"Error loading model: {e}")
#         return None

In [28]:
# model_path = r"models/dl_model.keras"

# # Load the model
# loaded_model = load_model(model_path)