<a href="https://colab.research.google.com/github/linzhe001/tutorial_notebooks/blob/main/Video_ViT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip -q install av

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m39.5/39.5 MB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os

# Set a custom Hugging Face home directory
os.environ['HF_HOME'] = '/content'

Video Classification (deafult 32 frames)

In [None]:
import av
import numpy as np
import torch

from transformers import VivitImageProcessor, VivitForVideoClassification
from huggingface_hub import hf_hub_download

np.random.seed(0)


def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (`av.container.input.InputContainer`): PyAV container.
        indices (`List[int]`): List of frame indices to decode.
    Returns:
        result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])


def sample_frame_indices(clip_len, frame_sample_rate, seg_len):
    '''
    Sample a given number of frame indices from the video.
    Args:
        clip_len (`int`): Total number of frames to sample.
        frame_sample_rate (`int`): Sample every n-th frame.
        seg_len (`int`): Maximum allowed index of sample's last frame.
    Returns:
        indices (`List[int]`): List of sampled frame indices
    '''
    converted_len = int(clip_len * frame_sample_rate)
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices


# video clip consists of 300 frames (10 seconds at 30 FPS)
file_path = hf_hub_download(
    repo_id="nielsr/video-demo", filename="eating_spaghetti.mp4", repo_type="dataset"
)
container = av.open(file_path)

# sample 32 frames
indices = sample_frame_indices(clip_len=32, frame_sample_rate=4, seg_len=container.streams.video[0].frames)
video = read_video_pyav(container=container, indices=indices)

image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
model = VivitForVideoClassification.from_pretrained("google/vivit-b-16x2-kinetics400")

inputs = image_processor(list(video), return_tensors="pt")
print('input video:', inputs['pixel_values'].shape)
with torch.no_grad():
    outputs = model(**inputs)
    logits = outputs.logits

# model predicts one of the 400 Kinetics-400 classes
predicted_label = logits.argmax(-1).item()
print(model.config.id2label[predicted_label])
print('Predicted Label:', predicted_label)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


eating_spaghetti.mp4:   0%|          | 0.00/1.01M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/401 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/18.6k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/356M [00:00<?, ?B/s]

input video: torch.Size([1, 32, 3, 224, 224])
LABEL_116
Predicted Label: 116


Video Classification (customized 5 frames)

In [None]:
from transformers import VivitImageProcessor, VivitForVideoClassification, VivitConfig
from safetensors.torch import load_file
from transformers import AutoConfig

from collections import OrderedDict


def partial_loading(model, state_dict):
    new_state_dict = OrderedDict()

    for k, v in state_dict.items():
        if k in model.state_dict() and model.state_dict()[k].shape == v.shape:
            new_state_dict[k] = v

    model.load_state_dict(new_state_dict, strict=False)
    return model

# sample 5 frames
num_frames = 5
indices = sample_frame_indices(clip_len=num_frames, frame_sample_rate=4, seg_len=container.streams.video[0].frames)
video = read_video_pyav(container=container, indices=indices)

image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
# model = VivitForVideoClassification.from_pretrained("google/vivit-b-16x2-kinetics400")


config = AutoConfig.from_pretrained("google/vivit-b-16x2-kinetics400")
config.num_frames = num_frames
config.video_size = [num_frames, 224, 224]

model = VivitForVideoClassification(config)
model_weight = '/content/hub/models--google--vivit-b-16x2-kinetics400/snapshots/8a7171a57f79b9aaa58bc8d977c002a0ea0f0d42/pytorch_model.bin'
state_dict = torch.load(model_weight, weights_only=True)
model = partial_loading(model, state_dict)



inputs = image_processor(list(video), return_tensors="pt")
print('input video:', inputs['pixel_values'].shape)
with torch.no_grad():
    outputs = model(**inputs)
    logits = outputs.logits

# model predicts one of the 400 Kinetics-400 classes
predicted_label = logits.argmax(-1).item()
# print(model.config.id2label[predicted_label])
print('Predicted Label:', predicted_label)

input video: torch.Size([1, 5, 3, 224, 224])
Predicted Label: 7


Video Classification (customized 2 frames)

In [None]:
from transformers import VivitImageProcessor, VivitForVideoClassification, VivitConfig
from safetensors.torch import load_file
from transformers import AutoConfig

from collections import OrderedDict

import av
import numpy as np
import torch

from transformers import VivitImageProcessor, VivitForVideoClassification
from huggingface_hub import hf_hub_download

np.random.seed(0)

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (`av.container.input.InputContainer`): PyAV container.
        indices (`List[int]`): List of frame indices to decode.
    Returns:
        result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_frame_indices(clip_len, frame_sample_rate, seg_len):
    '''
    Sample a given number of frame indices from the video.
    Args:
        clip_len (`int`): Total number of frames to sample.
        frame_sample_rate (`int`): Sample every n-th frame.
        seg_len (`int`): Maximum allowed index of sample's last frame.
    Returns:
        indices (`List[int]`): List of sampled frame indices
    '''
    converted_len = int(clip_len * frame_sample_rate)
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices


def partial_loading(model, state_dict):
    new_state_dict = OrderedDict()

    for k, v in state_dict.items():
        if k in model.state_dict() and model.state_dict()[k].shape == v.shape:
            new_state_dict[k] = v

    model.load_state_dict(new_state_dict, strict=False)
    return model

# video clip consists of 300 frames (10 seconds at 30 FPS)
file_path = hf_hub_download(
    repo_id="nielsr/video-demo", filename="eating_spaghetti.mp4", repo_type="dataset"
)
container = av.open(file_path)

# sample 5 frames
num_frames = 2
indices = sample_frame_indices(clip_len=num_frames, frame_sample_rate=4, seg_len=container.streams.video[0].frames)
video = read_video_pyav(container=container, indices=indices)

image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
# model = VivitForVideoClassification.from_pretrained("google/vivit-b-16x2-kinetics400")


config = AutoConfig.from_pretrained("google/vivit-b-16x2-kinetics400")
config.num_frames = num_frames
config.video_size = [num_frames, 224, 224]

model = VivitForVideoClassification(config)
model_weight = '/content/hub/models--google--vivit-b-16x2-kinetics400/snapshots/8a7171a57f79b9aaa58bc8d977c002a0ea0f0d42/pytorch_model.bin'
state_dict = torch.load(model_weight, weights_only=True)
model = partial_loading(model, state_dict)



inputs = image_processor(list(video), return_tensors="pt")
print('input video:', inputs['pixel_values'].shape)
with torch.no_grad():
    outputs = model(**inputs)
    logits = outputs.logits

# model predicts one of the 400 Kinetics-400 classes
predicted_label = logits.argmax(-1).item()
# print(model.config.id2label[predicted_label])
print('Predicted Label:', predicted_label)

input video: torch.Size([1, 2, 3, 224, 224])
pixel_values: torch.Size([1, 2, 3, 224, 224])
self.embeddings: VivitEmbeddings(
  (patch_embeddings): VivitTubeletEmbeddings(
    (projection): Conv3d(3, 768, kernel_size=(2, 16, 16), stride=(2, 16, 16))
  )
  (dropout): Dropout(p=0.0, inplace=False)
)
embedding_output: torch.Size([1, 197, 768])
Predicted Label: 379


In [None]:
import torch.nn as nn
emb = nn.Conv3d(3, 768, kernel_size=(2, 16, 16), stride=(2, 16, 16))
pixel_values = inputs['pixel_values'].permute(0, 2, 1, 3, 4)
out = emb(pixel_values)
out = out.flatten(2).transpose(1, 2)
out.shape


torch.Size([1, 196, 768])

In [None]:
model.vivit.embeddings

VivitEmbeddings(
  (patch_embeddings): VivitTubeletEmbeddings(
    (projection): Conv3d(3, 768, kernel_size=(2, 16, 16), stride=(2, 16, 16))
  )
  (dropout): Dropout(p=0.0, inplace=False)
)

In [None]:
inputs.keys()

dict_keys(['pixel_values'])