In [7]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from transformers import BertModel, BertConfig
import cv2
import os
import subprocess
from PIL import Image

def extract_keyframes(input_video, output_dir):
    # Create the output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # ffmpeg command to extract keyframes
    cmd = [
        'ffmpeg',
        '-i', input_video,
        '-vf', "select='eq(pict_type\,I)'",
        '-vsync', 'vfr',
        os.path.join(output_dir, 'frame_%03d.jpg')
    ]
    
    # Run ffmpeg command
    try:
        subprocess.run(cmd, check=True)
        print(f"Keyframes extracted successfully to {output_dir}.")
    except subprocess.CalledProcessError as e:
        print(f"Error extracting keyframes: {e}")

video_path = 'medical3.mp4'
output_dir = '/Users/krisanusarkar/Documents/ML/videototext/frames'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)
extract_keyframes(video_path, output_dir)



ffmpeg version 7.0.1 Copyright (c) 2000-2024 the FFmpeg developers
  built with Apple clang version 15.0.0 (clang-1500.3.9.4)
  configuration: --prefix=/usr/local/Cellar/ffmpeg/7.0.1 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags='-Wl,-ld_classic' --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libharfbuzz --enable-libjxl --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libspeex --ena

Keyframes extracted successfully to /Users/krisanusarkar/Documents/ML/videototext/frames.


[out#0/image2 @ 0x7f871e804180] video:583KiB audio:0KiB subtitle:0KiB other streams:0KiB global headers:0KiB muxing overhead: unknown
frame=   53 fps= 48 q=6.6 Lsize=N/A time=00:03:41.16 bitrate=N/A speed= 200x    


In [35]:
import torchvision.transforms as transforms
import torchvision.models as models
resnet = models.resnet50(pretrained=True)
resnet.fc = nn.Identity()  # Remove the final classification layer
resnet.eval()



ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [50]:

# Function to extract features using a pre-trained ResNet model
def extract_features(frame):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    input_tensor = transform(frame).unsqueeze(0)
    with torch.no_grad():
        features = resnet(input_tensor)
    return features.squeeze(0)

# Define Transformer-based video embedding model
class VideoEmbeddingModel(nn.Module):
    def __init__(self, config):
        super(VideoEmbeddingModel, self).__init__()
        self.bert = BertModel(config)

    def forward(self, frame_features):
        # Assuming frame_features shape is (seq_len, batch_size, input_size)
        inputs_embeds = frame_features.permute(1, 0, 2) 
        outputs = self.bert(inputs_embeds)
        pooled_output = outputs.pooler_output  # Take pooled output as video embedding
        return pooled_output



In [52]:
# Example usage:
if __name__ == "__main__":
    # Parameters



    # Step 2: Initialize the Transformer-based model
    config = BertConfig(
        hidden_size=2048,  # Increase hidden size as needed
        num_hidden_layers=12,
        num_attention_heads=16,
        intermediate_size=8192,
        hidden_dropout_prob=0.1,
        attention_probs_dropout_prob=0.1,
    )

    model = VideoEmbeddingModel(config)

    # Step 3: Process each frame and aggregate into a video embedding
    frame_features_list = []
    frames_dir = sorted(os.listdir(output_dir))
    for frame_file in frames_dir:
        frame_path = os.path.join(output_dir, frame_file)
        frame = cv2.imread(frame_path)
        image = Image.fromarray(frame)
        frame_features = extract_features(image)
        frame_features_list.append(frame_features)
    frame_features_tensor = torch.stack(frame_features_list)  # Concatenate along the sequence length
    frame_features_tensor = frame_features_tensor.unsqueeze(0)     # Add batch dimension

    # Step 4: Obtain the video embedding
    with torch.no_grad():
        video_embedding = model.forward(frame_features_tensor)

    print("Video embedding shape:", video_embedding.shape)

ValueError: too many values to unpack (expected 2)

In [49]:
frame_features_tensor.unsqueeze(0).shape

torch.Size([1, 53, 2048])

In [62]:
import torch
import torchvision.transforms as transforms
from torchvision.models.video import r3d_18

# Load pre-trained model
model = r3d_18(pretrained=True)
model.eval()

# Define a transform to preprocess video frames
transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
])

# Example usage: preprocess and pass video frames to the model
# Assuming 'video_frames' is a list of PIL images representing frames from a video
preprocessed_frames = []
for frame_file in frames_dir:
    frame_path = os.path.join(output_dir, frame_file)
    frame = cv2.imread(frame_path)
    image = Image.fromarray(frame)
    preprocessed_frames.append(transform(image))
video_tensor = torch.stack(preprocessed_frames).unsqueeze(2)
print(video_tensor.shape ) # Add batch dimension
video_tensor = video_tensor.permute(2, 1, 0 , 3 , 4 ) 
print(video_tensor.shape )
# Extract features
with torch.no_grad():
    features = model(video_tensor)


torch.Size([53, 3, 1, 112, 112])
torch.Size([1, 3, 53, 112, 112])


In [63]:
features.shape

torch.Size([1, 400])

In [66]:
import torch
import torchvision.transforms as transforms
import timm

# Load pre-trained Vision Transformer (e.g., TimeSformer)
model = timm.create_model('vit_base_patch16_224', pretrained=True)
model.eval()

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize frames to match Vision Transformer input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize as per Vision Transformer requirements
])





In [73]:
# Example usage: preprocess and pass video frames to the model
preprocessed_frames = []
for frame_file in frames_dir:
    frame_path = os.path.join(output_dir, frame_file)
    frame = cv2.imread(frame_path)
    image = Image.fromarray(frame)
    preprocessed_frames.append(transform(image))
video_tensor = torch.stack(preprocessed_frames , dim = 0)
print(video_tensor.shape ) # Add batch dimension
video_tensor = video_tensor.unsqueeze(2)
video_tensor = video_tensor.permute(2, 1, 0 , 3 , 4 ) 
print(video_tensor.shape )  # Add batch dimension

# Extract features
with torch.no_grad():
    features = model(video_tensor[])

# 'features' now contains the extracted temporal features from the video
print(features.shape)  # Should print something like torch.Size([1, 768])

torch.Size([53, 3, 224, 224])
torch.Size([1, 3, 53, 224, 224])


ValueError: too many values to unpack (expected 4)

In [77]:

import av
import numpy as np
def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (`av.container.input.InputContainer`): PyAV container.
        indices (`List[int]`): List of frame indices to decode.
    Returns:
        result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

In [75]:
from transformers import VivitImageProcessor, VivitModel


image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
model = VivitModel.from_pretrained("google/vivit-b-16x2-kinetics400")

# prepare video for the model
inputs = image_processor(video_path, return_tensors="pt")

preprocessor_config.json:   0%|          | 0.00/401 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/18.6k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/356M [00:00<?, ?B/s]

Some weights of VivitModel were not initialized from the model checkpoint at google/vivit-b-16x2-kinetics400 and are newly initialized: ['vivit.pooler.dense.bias', 'vivit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


ValueError: Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, torch.Tensor, tf.Tensor or jax.ndarray.

In [78]:
import av
import numpy as np

from transformers import VivitImageProcessor, VivitModel
from huggingface_hub import hf_hub_download

np.random.seed(0)


def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (`av.container.input.InputContainer`): PyAV container.
        indices (`List[int]`): List of frame indices to decode.
    Returns:
        result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])


def sample_frame_indices(clip_len, frame_sample_rate, seg_len):
    '''
    Sample a given number of frame indices from the video.
    Args:
        clip_len (`int`): Total number of frames to sample.
        frame_sample_rate (`int`): Sample every n-th frame.
        seg_len (`int`): Maximum allowed index of sample's last frame.
    Returns:
        indices (`List[int]`): List of sampled frame indices
    '''
    converted_len = int(clip_len * frame_sample_rate)
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices


# video clip consists of 300 frames (10 seconds at 30 FPS)
file_path = video_path
container = av.open(file_path)

# sample 32 frames
indices = sample_frame_indices(clip_len=32, frame_sample_rate=1, seg_len=container.streams.video[0].frames)
video = read_video_pyav(container=container, indices=indices)

image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
model = VivitModel.from_pretrained("google/vivit-b-16x2-kinetics400")

# prepare video for the model
inputs = image_processor(list(video), return_tensors="pt")

# forward pass
outputs = model(**inputs)
last_hidden_states = outputs.last_hidden_state
list(last_hidden_states.shape)

Some weights of VivitModel were not initialized from the model checkpoint at google/vivit-b-16x2-kinetics400 and are newly initialized: ['vivit.pooler.dense.bias', 'vivit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  return torch.tensor(value)


[1, 3137, 768]

In [79]:
last_hidden_states

tensor([[[-0.2769,  0.4040, -0.0855,  ..., -0.9227,  0.3589,  1.2062],
         [-0.5515,  0.9327, -0.2063,  ...,  0.7791,  1.2606,  0.4091],
         [-1.1194,  0.7339,  0.0346,  ...,  0.4897,  1.0453,  0.4511],
         ...,
         [-0.6217,  0.5936,  0.2449,  ..., -0.9312, -0.1055,  0.9701],
         [-0.8812,  0.8009,  0.0260,  ..., -0.4803,  0.5180,  0.5747],
         [-0.9221,  0.1696, -0.0112,  ..., -0.5728, -0.5307,  0.5443]]],
       grad_fn=<NativeLayerNormBackward0>)

In [80]:
indices

array([2732, 2733, 2734, 2735, 2736, 2737, 2738, 2739, 2740, 2741, 2742,
       2743, 2744, 2745, 2746, 2747, 2748, 2749, 2750, 2751, 2752, 2753,
       2754, 2755, 2756, 2757, 2758, 2759, 2760, 2761, 2762, 2763])

In [83]:
np.random.randint(7 , 90)

71

In [84]:
import torch
import clip
from PIL import Image

# Load the CLIP model and the preprocessing method
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

# Load and preprocess the image
image_path = "/Users/krisanusarkar/Documents/ML/videototext/frames/frame_045.jpg"
image = Image.open(image_path)
image_input = preprocess(image).unsqueeze(0).to(device)  # Add batch dimension and move to device

# Encode the image
with torch.no_grad():
    image_features = model.encode_image(image_input)

# Normalize the features
image_features = image_features / image_features.norm(dim=-1, keepdim=True)

# 'image_features' now contains the encoded features for the image
print(image_features.shape)  # Should print something like torch.Size([1, 512])


100%|███████████████████████████████████████| 338M/338M [00:58<00:00, 6.09MiB/s]


torch.Size([1, 512])


In [85]:
model.eval

<bound method Module.eval of CLIP(
  (visual): VisionTransformer(
    (conv1): Conv2d(3, 768, kernel_size=(32, 32), stride=(32, 32), bias=False)
    (ln_pre): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (transformer): Transformer(
      (resblocks): Sequential(
        (0): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          )
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): Sequential(
            (c_fc): Linear(in_features=768, out_features=3072, bias=True)
            (gelu): QuickGELU()
            (c_proj): Linear(in_features=3072, out_features=768, bias=True)
          )
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
        (1): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_featur

In [None]:
image_features

In [87]:
import torch
import clip
import cv2
from PIL import Image
from transformers import CLIPProcessor, CLIPModel

# Load the CLIP model and the preprocessing method
device = "cuda" if torch.cuda.is_available() else "cpu"
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch16").to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch16")

def extract_frames(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    success, frame = cap.read()
    while success:
        # Convert the frame from BGR to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # Convert to PIL Image
        frame_pil = Image.fromarray(frame_rgb)
        frames.append(frame_pil)
        success, frame = cap.read()
    cap.release()
    return frames

video_path = "medical3.mp4"
video_frames = extract_frames(video_path)

# Preprocess each frame
inputs = processor(images=video_frames, return_tensors="pt", padding=True)
pixel_values = inputs["pixel_values"].to(device)

# Encode the video frames
with torch.no_grad():
    video_features = model.get_image_features(pixel_values)

# Normalize the features
video_features = video_features / video_features.norm(dim=-1, keepdim=True)

# 'video_features' now contains the encoded features for the video clip
print(video_features.shape)  # Expected output shape: (num_frames, 512)


config.json:   0%|          | 0.00/4.10k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/599M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/905 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/961k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.22M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

KeyboardInterrupt: 