In [1]:
CACHE_DIR = "../cache"

In [2]:
from transformers import VideoMAEConfig, VideoMAEModel, VideoMAEFeatureExtractor, AutoImageProcessor, VideoMAEForPreTraining, VideoMAEForVideoClassification
import numpy as np
import torch

2023-11-21 09:33:18.207413: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-11-21 09:33:18.381475: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
processor = AutoImageProcessor.from_pretrained("MCG-NJU/videomae-base", cache_dir=CACHE_DIR)
videomae = VideoMAEModel.from_pretrained("MCG-NJU/videomae-base", cache_dir=CACHE_DIR)

Could not find image processor class in the image processor config or the model config. Loading based on pattern matching with the model's feature extractor configuration.


In [17]:
num_frames = 32
video = list(np.random.randint(0, 256, (num_frames, 3, 1024, 512)))

model_input = processor([video, video], return_tensors="pt", padding=True)
pixel_values = model_input.pixel_values
print(pixel_values.shape)

torch.Size([2, 32, 3, 224, 224])


In [None]:
outputs = videomae(pixel_values)
print(outputs.keys())
print(outputs.last_hidden_state.shape)

In [12]:
sequence_output = outputs[0]
batch_size, seq_len, hidden_size = sequence_output.shape
print(sequence_output.shape)

torch.Size([2, 1568, 768])


In [6]:
video_sequence_output = sequence_output.reshape(batch_size, num_frames, -1, hidden_size)
print(video_sequence_output.shape)

torch.Size([2, 16, 98, 768])


In [7]:
# torch.Size([2, 16, 98, 768]) to torch.Size([2, 16, 768]) by compute mean
video_mean_output = torch.mean(video_sequence_output, dim=2)
print(video_mean_output.shape)

torch.Size([2, 16, 768])


In [8]:
pretrain_video_mae = VideoMAEForPreTraining.from_pretrained("MCG-NJU/videomae-base", cache_dir=CACHE_DIR)
pretrain_video_mae

VideoMAEForPreTraining(
  (videomae): VideoMAEModel(
    (embeddings): VideoMAEEmbeddings(
      (patch_embeddings): VideoMAEPatchEmbeddings(
        (projection): Conv3d(3, 768, kernel_size=(2, 16, 16), stride=(2, 16, 16))
      )
    )
    (encoder): VideoMAEEncoder(
      (layer): ModuleList(
        (0-11): 12 x VideoMAELayer(
          (attention): VideoMAEAttention(
            (attention): VideoMAESelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=False)
              (key): Linear(in_features=768, out_features=768, bias=False)
              (value): Linear(in_features=768, out_features=768, bias=False)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): VideoMAESelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): VideoMAEIntermediate(
            (dense): Linear(in_f

## Transformer encoder

In [13]:
import torch
import torch.nn as nn
from torch.nn import Transformer, TransformerEncoderLayer, TransformerEncoder
import math

emb_size = 128
nhead = 8
num_encoder_layers = 6
num_decoder_layers = 6
dim_feedforward = emb_size * 2
dropout = 0.1

encoder_layer = TransformerEncoderLayer(
    emb_size, nhead, dim_feedforward, dropout, batch_first=False
)
transformer_encoder = TransformerEncoder(encoder_layer, num_encoder_layers)
predictor = nn.Sequential(
    nn.Linear(emb_size, 1),
    nn.Sigmoid()
)

In [21]:
# if batch_first=True, input shape should be (batch_size, seq_len, 2)
# else, input shape should be (seq_len, batch_size, emb_size)
seq_len = 19
batch_size = 1
inputs = torch.randn(seq_len, batch_size, emb_size)
outputs = transformer_encoder(inputs)
print(f'outputs.shape: {outputs.shape}')

merged_output = outputs.mean(0)
print(f'merged_output.shape: {merged_output.shape}')

predictions = predictor(merged_output)
print(f'predictions.shape: {predictions.shape}')
predictions.squeeze(-1)

outputs.shape: torch.Size([19, 1, 128])
merged_output.shape: torch.Size([1, 128])
predictions.shape: torch.Size([1, 1])


tensor([0.3660], grad_fn=<SqueezeBackward1>)

In [22]:

import torch
import torch.nn as nn
from torch import Tensor
import torch.nn.functional as F
import math


class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout: float, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(-torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer("pos_embedding", pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(
            token_embedding + self.pos_embedding[: token_embedding.size(0), :]
        )


class Encoder(nn.Module):
    def __init__(
        self, emb_size, nhead, num_encoder_layers, dim_feedforward, dropout=0.1
    ):
        super().__init__()
        encoder_layer = nn.TransformerEncoderLayer(
            emb_size, nhead, dim_feedforward, dropout
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        return self.encoder(src, src_mask, src_key_padding_mask)

position_encoder = PositionalEncoding(emb_size, dropout)
encoder = Encoder(emb_size, nhead, num_encoder_layers, dim_feedforward, dropout)

src = torch.randn(seq_len, batch_size, emb_size)
src = position_encoder(src)
print(f'src.shape: {src.shape}')

hidden_states = encoder(src)
print(f'hidden_states.shape: {hidden_states.shape}')

src.shape: torch.Size([19, 1, 128])
hidden_states.shape: torch.Size([19, 1, 128])


In [23]:
classifier = nn.Linear(
    emb_size, 1
)

outputs = classifier(hidden_states)
print(f'outputs.shape: {outputs.shape}')

outputs = outputs.squeeze(-1)
print(f'outputs.shape: {outputs.shape}')

outputs.shape: torch.Size([19, 1, 1])
outputs.shape: torch.Size([19, 1])
