In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Description
This pipeline extracts and processes audio, visual and text features from videos using deep learning models.

# Importing required libraries 

In [2]:
import os
import subprocess
from typing import Dict, Optional, Tuple

import numpy as np
import torch
import torchaudio
import torch.nn as nn
from torchvision import transforms
from torchvision.models import resnet18
from PIL import Image
import cv2

In [3]:
from transformers import (
    WhisperProcessor, WhisperForConditionalGeneration,
    BertTokenizer, BertModel
)
import librosa

2025-05-08 16:28:16.278819: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746721696.541422      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746721696.618885      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# Setting up the parameters

In [13]:
VIDEO_PATH = "/kaggle/input/abcdefg/SampleVideo_1280x720_1mb.mp4" #provide video path here
SAMPLE_RATE = 16000  # Audio sample rate (Hz)
N_MFCC = 5           # Number of MFCC features
VISION_FEATURES = 20 # Visual feature dimension
MAX_TIMESTEPS = 50   # Max time steps for alignment
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# FFmpeg Command Execution
This helper function runs an FFmpeg command in the shell and returns `True` if the command succeeds, or `False` if it fails. It also prints useful error messages if something goes wrong.

**Function Purpose:**  To safely execute FFmpeg commands and handle errors 

In [5]:
def run_ffmpeg_command(cmd: str) -> bool:
    """Execute an ffmpeg command and return True if successful."""
    try:
        subprocess.run(
            cmd.split(),
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        return True
    except subprocess.CalledProcessError as e:
        print(f"FFmpeg failed: {e.stderr.decode()}")
        return False
    except Exception as e:
        print(f"Error running FFmpeg: {str(e)}")
        return False

# Feature Alignment

In [6]:
def align_features(features: np.ndarray, max_len: int = MAX_TIMESTEPS) -> np.ndarray:
    """Align features to a fixed length by padding or truncating."""
    if features is None or features.size == 0:
        # Return zeros with the expected shape if features are empty
        return np.zeros((max_len, features.shape[1])) if len(features.shape) > 1 else np.zeros(max_len)
    if len(features) > max_len:
        return features[:max_len]
    elif len(features) < max_len:
        pad_shape = (max_len - len(features),) + features.shape[1:]
        return np.concatenate([features, np.zeros(pad_shape)], axis=0)
    return features

# Feature Normalization
This function normalizes each feature tensor in the input dictionary to the range `[0, 1]`. This ensures that all modalities are on a similar scale to help model train more efficiently.

In [7]:
def normalize_features(features: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
    """Normalize each feature tensor to [0, 1] range."""
    for modality in features:
        feat = features[modality]
        if feat.nelement() == 0:
            continue
        min_val = feat.min()
        max_val = feat.max()
        features[modality] = (feat - min_val) / (max_val - min_val + 1e-8)
    return features

# Audio Processing
Extracts MFCC and their delta features from audio tracks in video files

In [8]:
class AudioProcessor:
    """Extracts MFCC features and deltas from audio."""
    def __init__(self):
        self.mel_transform = torchaudio.transforms.MelSpectrogram(
            sample_rate=SAMPLE_RATE,
            n_mels=N_MFCC,
            n_fft=400,
            hop_length=160
        ).to(DEVICE)

    def extract_features(self, video_path: str) -> Optional[np.ndarray]:
        """Extract and process audio features from video file."""
        audio_temp = "temp_audio.wav"
        cmd = f"ffmpeg -y -i {video_path} -vn -acodec pcm_s16le -ar {SAMPLE_RATE} {audio_temp}"
        if not run_ffmpeg_command(cmd):
            return None

        if not os.path.exists(audio_temp):
            print(f"Audio file {audio_temp} not created!")
            return None

        try:
            waveform, _ = torchaudio.load(audio_temp)
            if waveform.nelement() == 0:
                raise ValueError("Empty audio file")
            waveform = waveform.to(DEVICE)

            # Extract MFCCs and deltas
            mfcc = torchaudio.compliance.kaldi.mfcc(
                waveform,
                sample_frequency=SAMPLE_RATE,
                num_ceps=N_MFCC
            )
            deltas = torchaudio.functional.compute_deltas(mfcc)
            features = torch.cat([mfcc, deltas], dim=-1)[..., :N_MFCC]
            return features.squeeze(0).cpu().numpy()

        except Exception as e:
            print(f"Audio processing failed: {str(e)}")
            return None
        finally:
            if os.path.exists(audio_temp):
                os.remove(audio_temp)

# Vision Feature Extraction
Extracts visual features from video frames using a pretrained ResNet18 model.

In [9]:
class VisionProcessor:
    """Extracts visual features from video frames using ResNet18."""
    def __init__(self):
        self.model = resnet18(weights="DEFAULT")  # Use latest weights
        self.model.fc = nn.Linear(512, VISION_FEATURES)
        self.model = self.model.to(DEVICE).eval()

        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.205]
            )
        ])

    def extract_features(self, video_path: str) -> Optional[np.ndarray]:
        """Extract visual features from video frames."""
        if not os.path.exists(video_path):
            print(f"Video file {video_path} not found!")
            return None

        try:
            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                raise ValueError("Could not open video file")

            frames = []
            fps = cap.get(cv2.CAP_PROP_FPS)
            frame_interval = max(1, int(fps))  # At least 1 frame per second
            frame_count = 0

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                if frame_count % frame_interval == 0:
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frames.append(Image.fromarray(frame))
                frame_count += 1
            cap.release()

            if not frames:
                raise ValueError("No frames extracted")

            features = []
            for frame in frames:
                tensor = self.transform(frame).unsqueeze(0).to(DEVICE)
                with torch.no_grad():
                    feat = self.model(tensor)
                features.append(feat.squeeze().cpu().numpy())
            return np.array(features)

        except Exception as e:
            print(f"Vision processing failed: {str(e)}")
            return None

# Text Feature Extraction
Handles **speech-to-text transcription** (using Whisper ASR) and **text embedding generation** (using BERT).

In [10]:
class TextProcessor:
    """Transcribes audio to text (ASR) and extracts BERT embeddings."""
    def __init__(self):
        # ASR (Whisper)
        self.asr_processor = WhisperProcessor.from_pretrained("openai/whisper-small")
        self.asr_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small").to(DEVICE)
        # BERT Embeddings
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.bert = BertModel.from_pretrained('bert-base-uncased').to(DEVICE)

    def extract_features(self, video_path: str) -> Optional[np.ndarray]:
        """Transcribe audio and extract BERT embeddings."""
        audio_temp = "temp_audio_asr.wav"
        cmd = f"ffmpeg -y -i {video_path} -vn -acodec pcm_s16le -ar {SAMPLE_RATE} {audio_temp}"
        if not run_ffmpeg_command(cmd):
            return None

        if not os.path.exists(audio_temp):
            print(f"ASR audio file {audio_temp} not created!")
            return None

        try:
            audio, _ = librosa.load(audio_temp, sr=16000)
            if len(audio) == 0:
                raise ValueError("Empty audio for ASR")

            inputs = self.asr_processor(
                audio,
                sampling_rate=SAMPLE_RATE,
                return_tensors="pt"
            ).input_features.to(DEVICE)

            with torch.no_grad():
                predicted_ids = self.asr_model.generate(inputs)
            text = self.asr_processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]

            if not text.strip():
                raise ValueError("No text transcribed")

            inputs = self.tokenizer(
                text,
                return_tensors="pt",
                truncation=True,
                padding='max_length',
                max_length=MAX_TIMESTEPS
            ).to(DEVICE)

            with torch.no_grad():
                outputs = self.bert(**inputs)
            return outputs.last_hidden_state.squeeze(0).cpu().numpy()

        except Exception as e:
            print(f"Text processing failed: {str(e)}")
            return None
        finally:
            if os.path.exists(audio_temp):
                os.remove(audio_temp)

# Multimodal Feature Pipeline
Integrates audio, visual, and text processing to create aligned, normalized multimodal features from video inputs.

In [11]:
class MultimodalPipeline:
    """Processes video to extract aligned and normalized audio, visual, and text features."""
    def __init__(self):
        self.audio_processor = AudioProcessor()
        self.vision_processor = VisionProcessor()
        self.text_processor = TextProcessor()

    def process_video(self, video_path: str) -> Optional[Dict[str, torch.Tensor]]:
        """Extract and align features from audio, vision, and text modalities."""
        if not os.path.exists(video_path):
            print(f"Video file {video_path} not found!")
            return None

        try:
            audio = self.audio_processor.extract_features(video_path)
            vision = self.vision_processor.extract_features(video_path)
            text = self.text_processor.extract_features(video_path)

            if audio is None or vision is None or text is None:
                print("Feature extraction failed for one or more modalities")
                return None

            audio = align_features(audio)
            vision = align_features(vision)
            text = align_features(text)

            features = {
                "audio": torch.tensor(audio).unsqueeze(0).float(),
                "vision": torch.tensor(vision).unsqueeze(0).float(),
                "text": torch.tensor(text).unsqueeze(0).float()
            }

            features = normalize_features(features)
            return features

        except Exception as e:
            print(f"Pipeline failed: {str(e)}")
            return None

# Main Execution Block
Runs the entire multimodal feature extraction pipeline on the input video. Note that it will work only if internet access is properly configured(for downloading pretrained models).

In [15]:
if __name__ == "__main__":
    pipeline = MultimodalPipeline()
    features = pipeline.process_video(VIDEO_PATH)

    if features is not None:
        print(f"Audio features shape: {features['audio'].shape}")  # (1, 50, 5)
        print(f"Vision features shape: {features['vision'].shape}")  # (1, 50, 20)
        print(f"Text features shape: {features['text'].shape}")  # (1, 50, 768)
    else:
        print("Pipeline failed to process video")

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 67.3MB/s]


preprocessor_config.json:   0%|          | 0.00/185k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/283k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/836k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.48M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/494k [00:00<?, ?B/s]

normalizer.json:   0%|          | 0.00/52.7k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/34.6k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/2.19k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.97k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/967M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/3.87k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Audio features shape: torch.Size([1, 50, 5])
Vision features shape: torch.Size([1, 50, 20])
Text features shape: torch.Size([1, 50, 768])
