In [1]:
from vq_clip import VQCLIPModel
from transformers import CLIPProcessor
from PIL import Image
import torch

model = VQCLIPModel.from_pretrained_clip(clip_path="openai/clip-vit-large-patch14", vision_vq_adapter_path="adams-story/vq-ViT-L-14-k64-d32-ema", )

processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")

model.to('cuda')
model.eval()

  from .autonotebook import tqdm as notebook_tqdm
`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["id2label"]` will be overriden.


Could not do vq-clip lazy init


`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["id2label"]` will be overriden.


VQCLIPModel(
  (clip_model): CLIPModel(
    (text_model): CLIPTextTransformer(
      (embeddings): CLIPTextEmbeddings(
        (token_embedding): Embedding(49408, 768)
        (position_embedding): Embedding(77, 768)
      )
      (encoder): CLIPEncoder(
        (layers): ModuleList(
          (0-11): 12 x CLIPEncoderLayer(
            (self_attn): CLIPAttention(
              (k_proj): Linear(in_features=768, out_features=768, bias=True)
              (v_proj): Linear(in_features=768, out_features=768, bias=True)
              (q_proj): Linear(in_features=768, out_features=768, bias=True)
              (out_proj): Linear(in_features=768, out_features=768, bias=True)
            )
            (layer_norm1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
            (mlp): CLIPMLP(
              (activation_fn): QuickGELUActivation()
              (fc1): Linear(in_features=768, out_features=3072, bias=True)
              (fc2): Linear(in_features=3072, out_features=768, bias=True

In [2]:
def _is_tensor_video_clip(clip):
    if not torch.is_tensor(clip):
        raise TypeError("clip should be Tensor. Got %s" % type(clip))

    if not clip.ndimension() == 4:
        raise ValueError("clip should be 4D. Got %dD" % clip.dim())

    return True


def center_crop_arr(pil_image, image_size):
    """
    Center cropping implementation from ADM.
    https://github.com/openai/guided-diffusion/blob/8fb3ad9197f16bbc40620447b2742e13458d2831/guided_diffusion/image_datasets.py#L126
    """
    while min(*pil_image.size) >= 2 * image_size:
        pil_image = pil_image.resize(
            tuple(x // 2 for x in pil_image.size), resample=Image.BOX
        )

    scale = image_size / min(*pil_image.size)
    pil_image = pil_image.resize(
        tuple(round(x * scale) for x in pil_image.size), resample=Image.BICUBIC
    )

    arr = np.array(pil_image)
    crop_y = (arr.shape[0] - image_size) // 2
    crop_x = (arr.shape[1] - image_size) // 2
    return Image.fromarray(arr[crop_y: crop_y + image_size, crop_x: crop_x + image_size])


def crop(clip, i, j, h, w):
    """
    Args:
        clip (torch.tensor): Video clip to be cropped. Size is (T, C, H, W)
    """
    if len(clip.size()) != 4:
        raise ValueError("clip should be a 4D tensor")
    return clip[..., i: i + h, j: j + w]


def resize(clip, target_size, interpolation_mode):
    if len(target_size) != 2:
        raise ValueError(f"target size should be tuple (height, width), instead got {target_size}")
    return torch.nn.functional.interpolate(clip, size=target_size, mode=interpolation_mode, align_corners=True, antialias=True)


def resize_scale(clip, target_size, interpolation_mode):
    if len(target_size) != 2:
        raise ValueError(f"target size should be tuple (height, width), instead got {target_size}")
    H, W = clip.size(-2), clip.size(-1)
    scale_ = target_size[0] / min(H, W)
    return torch.nn.functional.interpolate(clip, scale_factor=scale_, mode=interpolation_mode, align_corners=True, antialias=True)


def resized_crop(clip, i, j, h, w, size, interpolation_mode="bilinear"):
    """
    Do spatial cropping and resizing to the video clip
    Args:
        clip (torch.tensor): Video clip to be cropped. Size is (T, C, H, W)
        i (int): i in (i,j) i.e coordinates of the upper left corner.
        j (int): j in (i,j) i.e coordinates of the upper left corner.
        h (int): Height of the cropped region.
        w (int): Width of the cropped region.
        size (tuple(int, int)): height and width of resized clip
    Returns:
        clip (torch.tensor): Resized and cropped clip. Size is (T, C, H, W)
    """
    if not _is_tensor_video_clip(clip):
        raise ValueError("clip should be a 4D torch.tensor")
    clip = crop(clip, i, j, h, w)
    clip = resize(clip, size, interpolation_mode)
    return clip


def center_crop(clip, crop_size):
    if not _is_tensor_video_clip(clip):
        raise ValueError("clip should be a 4D torch.tensor")
    h, w = clip.size(-2), clip.size(-1)
    th, tw = crop_size
    if h < th or w < tw:
        raise ValueError("height and width must be no smaller than crop_size")

    i = int(round((h - th) / 2.0))
    j = int(round((w - tw) / 2.0))
    return crop(clip, i, j, th, tw)


def center_crop_using_short_edge(clip):
    if not _is_tensor_video_clip(clip):
        raise ValueError("clip should be a 4D torch.tensor")
    h, w = clip.size(-2), clip.size(-1)
    if h < w:
        th, tw = h, h
        i = 0
        j = int(round((w - tw) / 2.0))
    else:
        th, tw = w, w
        i = int(round((h - th) / 2.0))
        j = 0
    return crop(clip, i, j, th, tw)

class CenterCropResizeVideo:
    '''
    First use the short side for cropping length,
    center crop video, then resize to the specified size
    '''

    def __init__(
            self,
            size,
            interpolation_mode="bilinear",
    ):
        if isinstance(size, tuple):
            if len(size) != 2:
                raise ValueError(f"size should be tuple (height, width), instead got {size}")
            self.size = size
        else:
            self.size = (size, size)

        self.interpolation_mode = interpolation_mode

    def __call__(self, clip):
        """
        Args:
            clip (torch.tensor): Video clip to be cropped. Size is (T, C, H, W)
        Returns:
            torch.tensor: scale resized / center cropped video clip.
                size is (T, C, crop_size, crop_size)
        """
        clip_center_crop = center_crop_using_short_edge(clip)
        clip_center_crop_resize = resize(clip_center_crop, target_size=self.size,
                                         interpolation_mode=self.interpolation_mode)
        return clip_center_crop_resize

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(size={self.size}, interpolation_mode={self.interpolation_mode}"

In [18]:
from decord import VideoReader, cpu
from decord import bridge

bridge = bridge.set_bridge("torch")

video_path = "/cvdata1/jihwan/minecraft/train/1_2/000002.mp4"

vr = VideoReader(video_path, ctx=cpu(0))
video = vr.get_batch(range(len(vr)))
video = video.permute(0, 3, 1, 2)
video.shape

torch.Size([301, 3, 128, 128])

In [19]:
center_crop_resize = CenterCropResizeVideo(size=128)
resized_video = center_crop_resize(video)
resized_video.shape

torch.Size([301, 3, 128, 128])

In [20]:

embeddings = []
with torch.no_grad():
    for i in range(1, len(resized_video)):
        image = resized_video[i]
        image = Image.fromarray(image.permute(1, 2, 0).numpy())
        inputs = processor(images=image, return_tensors="pt")
        inputs = inputs.to(model.device)
        embedding = model.get_image_features(**inputs)
        
        embeddings.append(embedding)


In [21]:
cosine_sim = torch.nn.CosineSimilarity(dim=1)

print("consecutive frames")
for i in range(1, 18):
    print(cosine_sim(embeddings[0], embeddings[i]))

consecutive frames
tensor([0.8687], device='cuda:0')
tensor([0.8255], device='cuda:0')
tensor([0.8702], device='cuda:0')
tensor([0.8546], device='cuda:0')
tensor([0.8461], device='cuda:0')
tensor([0.8447], device='cuda:0')
tensor([0.8723], device='cuda:0')
tensor([0.9206], device='cuda:0')
tensor([0.8982], device='cuda:0')
tensor([0.8687], device='cuda:0')
tensor([0.8400], device='cuda:0')
tensor([0.8530], device='cuda:0')
tensor([0.8482], device='cuda:0')
tensor([0.7813], device='cuda:0')
tensor([0.7504], device='cuda:0')
tensor([0.7977], device='cuda:0')
tensor([0.7753], device='cuda:0')


In [22]:
print("consecutive chunks")
for i in range(17, len(embeddings), 17):
    print(cosine_sim(embeddings[0], embeddings[i]))

consecutive chunks
tensor([0.7753], device='cuda:0')
tensor([0.7953], device='cuda:0')
tensor([0.7641], device='cuda:0')
tensor([0.7816], device='cuda:0')
tensor([0.8147], device='cuda:0')
tensor([0.7802], device='cuda:0')
tensor([0.7830], device='cuda:0')
tensor([0.7938], device='cuda:0')
tensor([0.7787], device='cuda:0')
tensor([0.7576], device='cuda:0')
tensor([0.8617], device='cuda:0')
tensor([0.9019], device='cuda:0')
tensor([0.7302], device='cuda:0')
tensor([0.7915], device='cuda:0')
tensor([0.7933], device='cuda:0')
tensor([0.8073], device='cuda:0')
tensor([0.7488], device='cuda:0')


## Comparison btw the first frames of different videos

In [25]:
center_crop_resize = CenterCropResizeVideo(size=128)
video_path_1 = "/cvdata1/jihwan/minecraft/train/1_2/000000.mp4"
video_path_2 = "/cvdata1/jihwan/minecraft/train/1_2/000002.mp4"

vr1 = VideoReader(video_path_1, ctx=cpu(0))
video1 = vr1.get_batch(range(len(vr1)))
video1 = video1.permute(0, 3, 1, 2)
resized_video1 = center_crop_resize(video1)


vr2 = VideoReader(video_path_2, ctx=cpu(0))
video2 = vr2.get_batch(range(len(vr2)))
video2 = video2.permute(0, 3, 1, 2)
resized_video2 = center_crop_resize(video2)


image1 = resized_video1[i]
image1 = Image.fromarray(image1.permute(1, 2, 0).numpy())
inputs1 = processor(images=image1, return_tensors="pt")
inputs1 = inputs1.to(model.device)
embedding1 = model.get_image_features(**inputs1)

image2 = resized_video2[i]
image2 = Image.fromarray(image2.permute(1, 2, 0).numpy())
inputs2 = processor(images=image2, return_tensors="pt")
inputs2 = inputs2.to(model.device)
embedding2 = model.get_image_features(**inputs2)

cosine_sim(embedding1, embedding2)

tensor([0.8118], device='cuda:0', grad_fn=<SumBackward1>)