In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
import os

# Define base directory
base_dir = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline"

# Folder structure
folders = [
    "data",
    "data/processed",
    "data/splits",
    "models",
    "results",
    "scripts",
    "scripts/preprocessing",
    "scripts/training",
    "scripts/inference",
    "scripts/utils",
    "notebooks"
]

# Create folders
for folder in folders:
    path = os.path.join(base_dir, folder)
    os.makedirs(path, exist_ok=True)

path


In [None]:
import os
os.listdir("/content/drive/MyDrive/biovid_dual_auth/dataset/train")


In [None]:
!pip install opencv-python-headless librosa ffmpeg-python numpy tqdm


**Preprocessing Code**

In [None]:
import os
import cv2
import librosa
import numpy as np
import ffmpeg
from pathlib import Path
from tqdm import tqdm

def extract_audio(video_path, output_wav, sr=16000):
    y, _ = librosa.load(video_path, sr=sr, mono=True)
    librosa.output.write_wav(output_wav, y, sr)

def extract_frames(video_path, output_npy, num_frames=30, size=(96, 96)):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_idxs = np.linspace(0, total_frames - 1, num=num_frames, dtype=int)

    frames = []
    for idx in range(total_frames):
        ret, frame = cap.read()
        if not ret: break
        if idx in frame_idxs:
            frame = cv2.resize(frame, size)
            frame = frame[..., ::-1]  # BGR to RGB
            frame = frame / 255.0  # normalize
            frames.append(frame.transpose(2, 0, 1))  # CxHxW

    cap.release()
    frames = np.stack(frames, axis=0)  # [30, 3, 96, 96]
    frames = frames.transpose(1, 0, 2, 3)  # [3, 30, 96, 96]
    np.save(output_npy, frames)

def preprocess_folder(input_folder, output_folder):
    Path(output_folder).mkdir(parents=True, exist_ok=True)
    video_files = list(Path(input_folder).rglob("*.mp4"))

    for video_path in tqdm(video_files):
        uid = video_path.stem
        out_video_dir = Path(output_folder) / uid
        out_video_dir.mkdir(exist_ok=True, parents=True)

        frames_path = out_video_dir / "frames.npy"
        audio_path = out_video_dir / "audio.wav"

        try:
            extract_frames(str(video_path), str(frames_path))
            extract_audio(str(video_path), str(audio_path))
        except Exception as e:
            print(f"Error with {video_path}: {e}")


These Paths in Colab Before Running Preprocessing

In [None]:
train_input = "/content/drive/MyDrive/biovid_dual_auth/dataset/train"
test_input = "/content/drive/MyDrive/biovid_dual_auth/dataset/test-set"

train_output = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/train"
test_output  = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/test"

preprocess_folder(train_input, train_output)
preprocess_folder(test_input, test_output)


In [None]:
train_input = "/content/drive/MyDrive/biovid_dual_auth/dataset/train"
test_input = "/content/drive/MyDrive/biovid_dual_auth/dataset/test-set"

train_output = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/train"
test_output  = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/test"

preprocess_folder(train_input, train_output)
preprocess_folder(test_input, test_output)


In [None]:
import os

processed_path = "/content/drive/MyDrive/biovid_dual_auth/data/processed/train"
print("Sample folders found:")
print(os.listdir(processed_path))



In [None]:
frames_path = f"/content/drive/MyDrive/biovid_dual_auth/data/processed/train/5_01_T/frames.npy"
frames = np.load(frames_path)
print("Shape of frames:", frames.shape)


In [None]:
import glob

all_npy = glob.glob("/content/drive/MyDrive/biovid_dual_auth/data/processed/train/5_01_T/frames.npy", recursive=True)
print("Total .npy files found:", len(all_npy))
print("Example path:", all_npy[0] if all_npy else "None found")


In [None]:
import subprocess
import librosa
import soundfile as sf

def extract_audio(video_path, output_wav, sr=16000):
    # Step 1: extract raw audio from video using ffmpeg
    tmp_wav = "temp.wav"
    command = [
        "ffmpeg",
        "-i", video_path,
        "-ar", str(sr),
        "-ac", "1",
        "-y", tmp_wav
    ]
    subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    # Step 2: load with librosa to ensure it's in correct shape
    y, _ = librosa.load(tmp_wav, sr=sr)
    sf.write(output_wav, y, sr)

    # Clean up
    os.remove(tmp_wav)


In [None]:
from models.audio_encoder import AudioEncoder

audio_path = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/train/10_00_T/audio.wav"
audio_encoder = AudioEncoder()
embedding = audio_encoder(audio_path)
print("Audio embedding shape:", embedding.shape)


# New Section

In [None]:
train_input = "/content/drive/MyDrive/biovid_dual_auth/dataset/train"
train_output = "/content/drive/MyDrive/biovid_dual_auth/data/processed/train"

preprocess_folder(train_input, train_output)


In [None]:
!pip install torch torchvision


In [None]:
import sys
sys.path.append("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline")

In [None]:
from models.visual_encoder import VisualEncoder


In [None]:
import torch
import numpy as np
from models.visual_encoder import VisualEncoder

# Load a sample frames.npy
frames_path = "/content/drive/MyDrive/biovid_dual_auth/data/processed/train/5_01_T/frames.npy"
frames = np.load(frames_path)
frames_tensor = torch.tensor(frames, dtype=torch.float32).unsqueeze(0)  # [1, 3, 30, 96, 96]

# Run through model
model = VisualEncoder()
embedding = model(frames_tensor)  # Output: [1, 256]
print("Visual embedding shape:", embedding.shape)


#  Audio Encoder â€“ ECAPA-TDNN

Audio Encoder Setup

Extract 192-dimensional speaker embeddings from each audio.wav file using a pretrained ECAPA-TDNN model.

| Component     | Description                                          |
| ------------- | ---------------------------------------------------- |
| **Framework** | [`speechbrain`](https://speechbrain.readthedocs.io/) |
| **Model**     | `ECAPA-TDNN` pretrained on VoxCeleb2                 |
| **Input**     | `.wav` audio (16 kHz mono)                           |
| **Output**    | `[192]` speaker embedding                            |


 Install SpeechBrain

In [None]:
!pip install speechbrain


Use the Audio Encoder in Colab

In [None]:
import glob

all_audio = glob.glob("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/train/10_00_T/audio.wav", recursive=True)
print(f"Total audio files found: {len(all_audio)}")
print("Example file:", all_audio[0] if all_audio else "None found")


Gated Multimodal Unit (GMU) Fusion



In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

visual_encoder = VisualEncoder().to(device)
fusion_model = GMUFusion().to(device)
audio_encoder = AudioEncoder(device=device)

frames_tensor = frames_tensor.to(device)

with torch.no_grad():
    v_emb = visual_encoder(frames_tensor)
    a_emb = audio_encoder(audio_path)
    a_emb = a_emb.to(device)  # move audio embedding to same device
    score, joint_emb = fusion_model(a_emb, v_emb)

print("Score:", score.item())
print("Joint Embedding Shape:", joint_emb.shape)


#  Fusion + Output Head

In [None]:
import torch
import numpy as np
from models.visual_encoder import VisualEncoder
from models.audio_encoder import AudioEncoder
from models.fusion_head import GMUFusion

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Load models on correct device
visual_encoder = VisualEncoder().to(device)
fusion_model = GMUFusion().to(device)
audio_encoder = AudioEncoder(device=device)  # SpeechBrain uses its own device

# Load input
frames_path = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/train/10_00_T/frames.npy"
audio_path = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/data/processed/train/10_00_T/audio.wav"

frames = np.load(frames_path)
frames_tensor = torch.tensor(frames, dtype=torch.float32).unsqueeze(0).to(device)  # [1, 3, 30, 96, 96]

# Run inference
with torch.no_grad():
    v_emb = visual_encoder(frames_tensor)  # [1, 256]
    a_emb = audio_encoder(audio_path)      # [1, 192]
    a_emb = a_emb.to(device)               # Ensure same device
    score, joint_emb = fusion_model(a_emb, v_emb)

print("âœ… Score:", score.item())
print("âœ… Joint Embedding Shape:", joint_emb.shape)



ðŸŽ¯ Goal:
Fuse the 256-dim visual embedding and 192-dim audio embedding into a single 256-dim joint representation.

GMU Fusion Module Design

Equation:
Let:

v = visual embedding [256]
a = audio embedding [192]
We first project a â†’ 256, then compute:


z = sigmoid(Wz_v * v + Wz_a * a)
h = z âŠ™ tanh(Wv * v) + (1 - z) âŠ™ tanh(Wa * a)


Test in

In [None]:
!python /content/drive/MyDrive/biovid_dual_auth/updated_pipeline/train_crossval.py


testset inference


In [None]:
!python /content/drive/MyDrive/biovid_dual_auth/updated_pipeline/test_inference_vote.py


In [None]:
import os
os.path.exists("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/submission/submission.json")


In [None]:
import torch
import sys

sys.path.append("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline")
from models.gmu_fusion import GMUFusion

# Dummy embeddings
v = torch.randn(1, 256)  # visual
a = torch.randn(1, 192)  # audio

fuser = GMUFusion()
fused = fuser(v, a)

print("Fused embedding shape:", fused.shape)  # [1, 256]


Output Head (Classification + Embedding)



Build a head that:

Outputs a binary prediction (genuine vs impostor)
Produces a normalized 256-dim embedding for triplet loss

Test the Head

In [None]:
from models.output_head import OutputHead

head = OutputHead()
x = torch.randn(4, 256)  # batch of 4 fused embeddings
logits, emb = head(x)

print("Logits shape:", logits.shape)      # [4, 2]
print("Embedding shape:", emb.shape)      # [4, 256]


Create 3-Fold Splits Without Leakage

In [None]:
from sklearn.model_selection import KFold
from pathlib import Path

def create_3fold_user_split(root_dir):
    root = Path(root_dir)
    all_dirs = list(root.glob("*_*_*"))  # each video folder
    all_users = sorted(set([p.name.split("_")[0] for p in all_dirs]))

    user_to_label = {u: i for i, u in enumerate(all_users)}
    folds = []

    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    for train_idx, val_idx in kf.split(all_users):
        train_users = set([all_users[i] for i in train_idx])
        val_users = set([all_users[i] for i in val_idx])

        train_videos = [str(p) for p in all_dirs if p.name.split("_")[0] in train_users]
        val_videos = [str(p) for p in all_dirs if p.name.split("_")[0] in val_users]

        folds.append((train_videos, val_videos))

    return folds, user_to_label


In [None]:
!python /content/drive/MyDrive/biovid_dual_auth/updated_pipeline/train_crossval.py


In [None]:
import sys
sys.path.append("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline")


In [None]:
import os

print("Folder contents:")
print(os.listdir("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline"))

print("\nDatasets folder contents:")
print(os.listdir("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/datasets"))


In [None]:
import sys
sys.path.append("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline")


In [None]:
import importlib.util

# Point to the full path of your biovid_dataset.py file
file_path = "/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/datasets/biovid_dataset.py"

# Load as a module
spec = importlib.util.spec_from_file_location("biovid_dataset", file_path)
biovid = importlib.util.module_from_spec(spec)
spec.loader.exec_module(biovid)

# Now access the classes/functions
BiovidDataset = biovid.BiovidDataset
create_3fold_user_split = biovid.create_3fold_user_split

print("âœ… Import successful via importlib")


In [None]:
!python /content/drive/MyDrive/biovid_dual_auth/updated_pipeline/train_crossval.py


In [None]:
!python /content/drive/MyDrive/biovid_dual_auth/updated_pipeline/evaluate_eer.py


In [None]:
!python /content/drive/MyDrive/biovid_dual_auth/updated_pipeline/test_inference_vote.py


In [None]:
!pip install pydot
!apt-get install graphviz


In [None]:
from keras.models import Model
from keras.layers import (Input, Conv2D, TimeDistributed, GlobalAveragePooling2D,
                          Bidirectional, GRU, Dense, Concatenate, Lambda, Multiply, Add)
from keras.utils import plot_model
import tensorflow as tf

# --- Visual Stream (per-frame Conv2D + GRU) ---
video_input = Input(shape=(30, 96, 96, 3), name='video_input')
x = TimeDistributed(Conv2D(32, (3, 3), padding='same', activation='relu'))(video_input)
x = TimeDistributed(GlobalAveragePooling2D())(x)
x = Bidirectional(GRU(128))(x)
visual_embedding = Dense(256, activation='relu', name='visual_fc')(x)

# --- Audio Stream ---
audio_input = Input(shape=(192,), name='audio_input')
audio_proj = Dense(256, activation='relu', name='audio_fc')(audio_input)

# --- GMU Fusion ---
concat = Concatenate(name='concat_audio_visual')([visual_embedding, audio_input])
z = Dense(256, activation='sigmoid', name='z_gate')(concat)

v_trans = Dense(256, activation='tanh', name='v_transform')(visual_embedding)
a_trans = Dense(256, activation='tanh', name='a_transform')(audio_proj)

z_inv = Lambda(lambda x: 1.0 - x, name='1_minus_z')(z)
zv = Multiply(name='zv')([z, v_trans])
za = Multiply(name='1mz_a')([z_inv, a_trans])
fused = Add(name='gmu_output')([zv, za])

# --- Output Head ---
classification_output = Dense(1, activation='sigmoid', name='classification')(fused)
triplet_embedding = Lambda(lambda x: tf.math.l2_normalize(x, axis=1), name='triplet_norm')(fused)

model = Model(inputs=[video_input, audio_input], outputs=[classification_output, triplet_embedding])

# Save the model diagram
plot_model(model, show_shapes=True, show_layer_names=True, to_file="biovid_model.png", dpi=96)


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv("/content/drive/MyDrive/biovid_dual_auth/updated_pipeline/results/biovid_results_20250620_0738.csv")  # use your actual path

plt.figure(figsize=(10, 5))
plt.plot(df['fold'], df['accuracy'], label='Accuracy')
plt.plot(df['fold'], df['eer'], label='EER')
plt.plot(df['fold'], df['apcer'], label='APCER')
plt.plot(df['fold'], df['bpcer'], label='BPCER')
plt.xlabel("Fold")
plt.ylabel("Metric")
plt.title("Performance Across 3-Fold Cross-Validation")
plt.legend()
plt.grid()
plt.savefig("training_metrics_curve.png")
plt.show()
