<a href="https://colab.research.google.com/github/rohan-mrrobot/cvpr/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import cv2
import glob
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (Dense, LSTM, GRU, GlobalAveragePooling2D, TimeDistributed,
                                     Input, Dropout, RepeatVector)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import pickle


prefix categorization

In [None]:
def load_videos_from_folder(folder_path):
    video_classes = {"original": [], "insertion": [], "deletion": [], "replication": []}
    video_files = glob.glob(os.path.join(folder_path, "*.mp4"))
    for video in video_files:
        file_name = os.path.basename(video)
        if file_name.startswith("i"):
            video_classes["insertion"].append(video)
        elif file_name.startswith("d"):
            video_classes["deletion"].append(video)
        elif file_name.startswith("r"):
            video_classes["replication"].append(video)
        else:
            video_classes["original"].append(video)
    return video_classes

In [None]:
def split_video_into_clips(video_path, clip_length=16, overlap=8):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (224, 224))
        frames.append(frame)
    cap.release()
    clips = [frames[i:i + clip_length] for i in range(0, len(frames) - clip_length + 1, clip_length - overlap)]
    return np.array(clips)

In [None]:
def compute_stp(clips):
    stp_frames = []
    for clip in clips:
        stp_frame = np.mean(clip, axis=0)  # Average across the temporal dimension
        stp_frames.append(stp_frame)
    return np.array(stp_frames)

In [None]:
def build_feature_extractor():
    vgg16 = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    model = Model(inputs=vgg16.input, outputs=GlobalAveragePooling2D()(vgg16.output))
    return model

In [None]:
def save_progress(X, y, save_path):
    with open(save_path, 'wb') as f:
        pickle.dump({'X': X, 'y': y}, f)

# Function to load progress (features and labels)
def load_progress(load_path):
    with open(load_path, 'rb') as f:
        data = pickle.load(f)
    return data['X'], data['y']

In [None]:
def extract_features(feature_extractor, stp_frames):
    return feature_extractor.predict(stp_frames)

In [None]:
def build_autoencoder(input_dim, reduced_dim):
    encoder = Sequential([
        Dense(256, activation='relu'),
        Dense(reduced_dim, activation='relu')
    ])

    decoder = Sequential([
        Dense(256, activation='relu'),
        Dense(input_dim, activation='relu')
    ])

    input_layer = Input(shape=(input_dim,))
    encoded = encoder(input_layer)
    reconstructed = decoder(encoded)

    autoencoder = Model(inputs=input_layer, outputs=reconstructed)
    autoencoder.compile(optimizer=Adam(learning_rate=1e-3), loss='mse')
    return autoencoder, encoder

In [None]:
from tensorflow.keras.layers import Masking
from tensorflow.keras.layers import BatchNormalization

def build_lstm_classifier(input_dim, num_classes):
    model = Sequential([
        Masking(mask_value=0.0, input_shape=(None, input_dim)),  # Ignore padded zeros
        LSTM(64, return_sequences=True),
        BatchNormalization(),
        LSTM(32),
        Dense(32, activation='relu'),
        Dropout(0.3),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer=Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
folder_path = "/content/drive/MyDrive/videos"
save_path = "/content/drive/MyDrive/videos/progress.pkl"

In [None]:
if os.path.exists(save_path):
    print("Loading saved progress...")
    X, y = load_progress(save_path)
else:
    print("Starting feature extraction...")
    feature_extractor = build_feature_extractor()
    autoencoder, encoder = build_autoencoder(input_dim=512, reduced_dim=128)

    X, y = [], []
    for label, videos in video_classes.items():
        for video_path in videos:
            print(f"Processing {video_path}...")
            clips = split_video_into_clips(video_path)
            stp_frames = compute_stp(clips)
            features = extract_features(feature_extractor, stp_frames)
            reduced_features = encoder.predict(features)
            X.append(reduced_features)
            y.append(label)

    # Convert labels to numerical values
    label_mapping = {"original": 0, "insertion": 1, "deletion": 2, "replication": 3}
    y = np.array([label_mapping[label] for label in y])

    # One-hot encode the labels
    y = to_categorical(y, num_classes=4)

    # Pad or truncate sequences to a fixed length
    max_clips = max([x.shape[0] for x in X])
    X_padded = np.zeros((len(X), max_clips, X[0].shape[1]))  # Shape: (num_samples, max_clips, reduced_dim)
    for i, x in enumerate(X):
        X_padded[i, :x.shape[0], :] = x  # Pad with zeros if necessary
    X = X_padded

    # Save progress
    save_progress(X, y, save_path)
    print("Progress saved.")

Loading saved progress...


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(np.argmax(y, axis=1)), y=np.argmax(y, axis=1))
class_weights = dict(enumerate(class_weights))
# Normalizse X by standaridation
Xn = (X - np.mean(X)) / np.std(X)

# Perform stratified train-test split
X_train, X_test, y_train, y_test = train_test_split(
    Xn, y, test_size=0.2, random_state=42, stratify=y
)

# Build and train the LSTM classifier
lstm_model = build_lstm_classifier(input_dim=128, num_classes=4)
lstm_model.fit(X_train, y_train, epochs=100, batch_size=16, validation_data=(X_test, y_test), class_weight=class_weights)

# Evaluate the model
loss, accuracy = lstm_model.evaluate(X_test, y_test)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

  super().__init__(**kwargs)


Epoch 1/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 120ms/step - accuracy: 0.2881 - loss: 1.4110 - val_accuracy: 0.2286 - val_loss: 1.3976
Epoch 2/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step - accuracy: 0.2576 - loss: 1.3927 - val_accuracy: 0.2286 - val_loss: 1.3923
Epoch 3/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step - accuracy: 0.2700 - loss: 1.3849 - val_accuracy: 0.2571 - val_loss: 1.3857
Epoch 4/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step - accuracy: 0.1852 - loss: 1.4071 - val_accuracy: 0.2286 - val_loss: 1.3854
Epoch 5/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step - accuracy: 0.2943 - loss: 1.3802 - val_accuracy: 0.2571 - val_loss: 1.3858
Epoch 6/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step - accuracy: 0.1577 - loss: 1.3895 - val_accuracy: 0.2571 - val_loss: 1.3842
Epoch 7/100
[1m9/9[0m [32m━━━━━━━━━━

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import cv2
import glob
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (Dense, LSTM, GRU, GlobalAveragePooling2D,
                                     Input, Masking, BatchNormalization, Dropout)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import pickle
import cupy as cp

# ========== Configuration Parameters ==========
CLIP_LENGTH = 2        # As per paper Section 5.1
OVERLAP = 1            # 50% overlap for 2-frame clips
AUTOENCODER_DIM = 128  # Reduced feature dimension
LSTM_UNITS = 15        # Optimal from paper Section 5.3
DROPOUT_RATE = 0.3
LEARNING_RATE = 1e-3
BATCH_SIZE = 32
EPOCHS = 100

# ========== Video Processing Functions ==========
def load_videos(folder_path):
    """Load videos into 3 classes as per paper"""
    categories = {
        "authentic": [],
        "insertion": [],
        "deletion": [],
        "replication": []
    }
    for video_path in glob.glob(os.path.join(folder_path, "*.mp4")):
        filename = os.path.basename(video_path).lower()
        if filename.startswith("i"):
            categories["insertion"].append(video_path)
        elif filename.startswith("d"):
            categories["deletion"].append(video_path)
        elif filename.startswith("r"):
            categories["replication"].append(video_path)
        else :
          categories["authentic"].append(video_path)
    return categories
'''
def process_clips(video_path):
    """Split video into 2-frame clips with 1-frame overlap and compute STP"""
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (224, 224))
        frames.append(frame)
    cap.release()

    # Create clips with 2-frame length and 1-frame overlap
    clips = [frames[i:i+CLIP_LENGTH] for i in range(0, len(frames)-1, OVERLAP)]

    # Compute STP with 3x3 averaging filter as per paper Section 3.2.1
    stp_frames = []
    for clip in clips:
        filtered = [cv2.blur(frame, (3,3)) for frame in clip]  # 3x3 averaging
        stp = np.mean(filtered, axis=0).astype(np.float32)
        stp_frames.append(stp)
    return np.array(stp_frames)
'''
def process_clips(video_path):
    """Split video into 2-frame clips with 1-frame overlap and compute STP using CUDA"""
    cap = cv2.VideoCapture(video_path)
    frames = []

    # Read frames from the video
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (224, 224))
        frames.append(frame)
    cap.release()

    # Convert frames to a CuPy array for GPU processing
    frames = cp.array(frames)

    # Create clips with 2-frame length and 1-frame overlap
    clips = [frames[i:i + CLIP_LENGTH] for i in range(0, len(frames) - 1, OVERLAP)]

    # Compute STP with 3x3 averaging filter using CuPy
    stp_frames = []
    for clip in clips:
        # Apply 3x3 averaging filter using CuPy
        filtered = cp.empty_like(clip)
        for i in range(CLIP_LENGTH):
            filtered[i] = cp.asarray(cv2.blur(cp.asnumpy(clip[i]), (3, 3)))

        # Compute the mean across the filtered frames
        stp = cp.mean(filtered, axis=0).astype(cp.float32)
        stp_frames.append(stp)

    return cp.asnumpy(cp.array(stp_frames))  # Convert back to NumPy array for return

# ========== Model Architecture ==========
def create_feature_extractor():
    """VGG16-based feature extractor as per paper Section 3.2.2"""
    base = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    x = GlobalAveragePooling2D()(base.output)
    return Model(inputs=base.input, outputs=x)

def build_autoencoder():
    """Single hidden layer autoencoder as per paper Section 3.2.3"""
    encoder = Sequential([
        Dense(AUTOENCODER_DIM, activation='relu', input_shape=(512,))
    ])
    decoder = Sequential([
        Dense(512, activation='relu')
    ])
    autoencoder = Sequential([encoder, decoder])
    autoencoder.compile(optimizer=Adam(LEARNING_RATE), loss='mse')
    return autoencoder, encoder

def create_lstm_model(input_shape, num_classes):
    """Single layer LSTM as per paper Section 5.3"""
    model = Sequential([
        Masking(mask_value=0.0, input_shape=input_shape),
        LSTM(LSTM_UNITS),
        Dropout(DROPOUT_RATE),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer=Adam(LEARNING_RATE),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

# ========== Main Pipeline ==========
def main():
    # Load dataset
    video_data = load_videos("/content/drive/MyDrive/videos")
    print("loaded videos")
    # Initialize models
    feature_extractor = create_feature_extractor()
    print("created feature extractor")
    autoencoder, feature_encoder = build_autoencoder()
    print("created autoencoder")
    print(len(video_data.items()))
    # Phase 1: Feature extraction and autoencoder training
    all_features = []
    for category, paths in video_data.items():
        print(len(paths),category)
        for video_path in paths:
            stp_frames = process_clips(video_path)
            if len(stp_frames) == 0:
                continue
            features = feature_extractor.predict(stp_frames)
            all_features.extend(features)
    print("phase 1 feature extraction complete")

    # Train autoencoder
    all_features = np.array(all_features)
    autoencoder.fit(all_features, all_features,
                   epochs=50,
                   batch_size=256,
                   shuffle=True)
    print("phase 1 complete")

    # Phase 2: Process videos with trained encoder
    X, y = [], []
    label_map = {"authentic": 0, "insertion": 1, "deletion": 2}

    for category, paths in video_data.items():
        for video_path in paths:
            stp_frames = process_clips(video_path)
            if len(stp_frames) == 0:
                continue

            # Extract and reduce features
            features = feature_extractor.predict(stp_frames)
            reduced = feature_encoder.predict(features)

            X.append(reduced)
            y.append(label_map[category])

    # Pad sequences and prepare labels
    max_length = max(len(seq) for seq in X)
    X_padded = np.zeros((len(X), max_length, AUTOENCODER_DIM))
    for i, seq in enumerate(X):
        X_padded[i, :len(seq)] = seq

    y = to_categorical(y, num_classes=3)

    # Split dataset
    X_train, X_test, y_train, y_test = train_test_split(
        X_padded, y, test_size=0.2, stratify=y, random_state=42
    )
    print("phase 2 complete")
    # Build and train LSTM
    model = create_lstm_model((None, AUTOENCODER_DIM), 3)
    history = model.fit(X_train, y_train,
                        epochs=EPOCHS,
                        batch_size=BATCH_SIZE,
                        validation_data=(X_test, y_test),
                        verbose=1)
    print("phase 3 complete")
    # Evaluate
    loss, acc = model.evaluate(X_test, y_test)
    print(f"\nFinal Test Accuracy: {acc*100:.2f}%")

if __name__ == "__main__":
    main()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
loaded videos
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m58889256/58889256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 0us/step
created feature extractor
created autoencoder
3
88 authentic


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m77/77[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 270ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 6s/step
[1m47/47[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 317ms/step
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 331ms/step
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 362ms/step
[1m105/105[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 262ms/step
[1m56/56[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 392ms/step
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 358ms/step
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 612ms/step
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 238ms/step
[1m43/43[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 422ms/step
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 261ms/step
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 230ms/step
[1m98/98[0m [3

  super().__init__(**kwargs)


Epoch 1/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 262ms/step - accuracy: 0.3708 - loss: 1.1366 - val_accuracy: 0.2286 - val_loss: 1.1552
Epoch 2/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 130ms/step - accuracy: 0.4150 - loss: 1.1209 - val_accuracy: 0.3429 - val_loss: 1.1170
Epoch 3/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 135ms/step - accuracy: 0.4245 - loss: 1.0896 - val_accuracy: 0.4571 - val_loss: 1.1014
Epoch 4/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 132ms/step - accuracy: 0.4749 - loss: 1.0303 - val_accuracy: 0.4857 - val_loss: 1.1102
Epoch 5/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 132ms/step - accuracy: 0.4922 - loss: 1.0241 - val_accuracy: 0.4857 - val_loss: 1.1295
Epoch 6/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 132ms/step - accuracy: 0.5234 - loss: 1.0127 - val_accuracy: 0.4857 - val_loss: 1.1440
Epoch 7/100
[1m5/5[0m [32m━━━━━