In [None]:
import data
import torch
from models import imagebind_model
from models.imagebind_model import ModalityType

text_list=["A dog.", "A car", "A bird"]
image_paths=[".assets/dog_image.jpg", ".assets/car_image.jpg", ".assets/bird_image.jpg"]
audio_paths=[".assets/dog_audio.wav", ".assets/car_audio.wav", ".assets/bird_audio.wav"]

device = "cuda:0" if torch.cuda.is_available() else "cpu"

# Instantiate model
model = imagebind_model.imagebind_huge(pretrained=True)
model.eval()
model.to(device)

# Load data
inputs = {
    ModalityType.TEXT: data.load_and_transform_text(text_list, device),
    ModalityType.VISION: data.load_and_transform_vision_data(image_paths, device),
    ModalityType.AUDIO: data.load_and_transform_audio_data(audio_paths, device),
}

with torch.no_grad():
    embeddings = model(inputs)

print(
    "Vision x Text: ",
    torch.softmax(embeddings[ModalityType.VISION] @ embeddings[ModalityType.TEXT].T, dim=-1),
)
print(
    "Audio x Text: ",
    torch.softmax(embeddings[ModalityType.AUDIO] @ embeddings[ModalityType.TEXT].T, dim=-1),
)
print(
    "Vision x Audio: ",
    torch.softmax(embeddings[ModalityType.VISION] @ embeddings[ModalityType.AUDIO].T, dim=-1),
)

# Expected output:
#
# Vision x Text:
# tensor([[9.9761e-01, 2.3694e-03, 1.8612e-05],
#         [3.3836e-05, 9.9994e-01, 2.4118e-05],
#         [4.7997e-05, 1.3496e-02, 9.8646e-01]])
#
# Audio x Text:
# tensor([[1., 0., 0.],
#         [0., 1., 0.],
#         [0., 0., 1.]])
#
# Vision x Audio:
# tensor([[0.8070, 0.1088, 0.0842],
#         [0.1036, 0.7884, 0.1079],
#         [0.0018, 0.0022, 0.9960]])


In [None]:
torch.softmax(embeddings[ModalityType.VISION] @ embeddings[ModalityType.AUDIO].T, dim=-1)

In [None]:
from models.multimodal_preprocessors import PatchEmbedGeneric,PadIm2Video
import torch.nn as nn
import numpy as np
from typing import Optional
class PatchEmbedGeneric(nn.Module):
    """
    PatchEmbed from Hydra
    """

    def __init__(self, proj_stem, norm_layer: Optional[nn.Module] = None):
        super().__init__()

        if len(proj_stem) > 1:
            self.proj = nn.Sequential(*proj_stem)
        else:
            # Special case to be able to load pre-trained models that were
            # trained with a standard stem
            self.proj = proj_stem[0]
        self.norm_layer = norm_layer

    def get_patch_layout(self, img_size):
        with torch.no_grad():
            dummy_img = torch.zeros(
                [
                    1,
                ]
                + img_size
            )
            print(dummy_img.shape)
            dummy_out = self.proj(dummy_img)
        print(dummy_out.shape)
        embed_dim = dummy_out.shape[1]
        patches_layout = tuple(dummy_out.shape[2:])
        num_patches = np.prod(patches_layout)
        return patches_layout, num_patches, embed_dim

    def forward(self, x):
        print(x.shape)
        x = self.proj(x)
        print(x.shape)
        # B C (T) H W -> B (T)HW C
        x = x.flatten(2).transpose(1, 2)
        print(x.shape)
        if self.norm_layer is not None:
            x = self.norm_layer(x)
        return x
    
kernel_size=(2, 14, 14)
vision_embed_dim=1024
proj_stem=[
                PadIm2Video(pad_type="repeat", ntimes=2),
                nn.Conv3d(
                    in_channels=3,
                    kernel_size=kernel_size,
                    out_channels=vision_embed_dim,
                    stride=kernel_size,
                    bias=False,
                )
]
PatchEmbedGeneric(proj_stem,None).get_patch_layout([3, 2,224, 224])

In [None]:
from models.helpers import VerboseNNModule
from typing import Tuple, Optional, Callable
from models.helpers import (EinOpsRearrange, LearnableLogitScaling, Normalize,
                            SelectElement, SelectEOSAndProject)
from models.multimodal_preprocessors import (AudioPreprocessor,
                                             IMUPreprocessor, PadIm2Video,
                                             PatchEmbedGeneric,
                                             RGBDTPreprocessor,
                                             SpatioTemporalPosEmbeddingHelper,
                                             TextPreprocessor,
                                             ThermalPreprocessor)
from models.transformer import MultiheadAttention, SimpleTransformer

import logging
import os
from functools import partial
from types import SimpleNamespace
from typing import Dict

import torch
import torch.nn as nn

class RGBDTPreprocessor(VerboseNNModule):
    def __init__(
        self,
        rgbt_stem: PatchEmbedGeneric,
        depth_stem: Optional[PatchEmbedGeneric],
        img_size: Tuple = (3, 224, 224),
        num_cls_tokens: int = 1,
        pos_embed_fn: Optional[Callable] = None,
        use_type_embed: bool = False,
        init_param_style: str = "openclip",
    ) -> None:
        super().__init__()
        stem = rgbt_stem if rgbt_stem is not None else depth_stem
        (
            self.patches_layout,
            self.num_patches,
            self.embed_dim,
        ) = stem.get_patch_layout(img_size)
        self.rgbt_stem = rgbt_stem
        self.depth_stem = depth_stem
        self.use_pos_embed = pos_embed_fn is not None
        self.use_type_embed = use_type_embed
        self.num_cls_tokens = num_cls_tokens

        if self.use_pos_embed:
            self.pos_embedding_helper = pos_embed_fn(
                patches_layout=self.patches_layout,
                num_cls_tokens=num_cls_tokens,
                num_patches=self.num_patches,
                embed_dim=self.embed_dim,
            )
        if self.num_cls_tokens > 0:
            self.cls_token = nn.Parameter(
                torch.zeros(1, self.num_cls_tokens, self.embed_dim)
            )
        if self.use_type_embed:
            self.type_embed = nn.Parameter(torch.zeros(1, 1, self.embed_dim))

        self.init_parameters(init_param_style)

    @torch.no_grad()
    def init_parameters(self, init_param_style):
        if init_param_style == "openclip":
            # OpenCLIP style initialization
            scale = self.embed_dim**-0.5
            if self.use_pos_embed:
                nn.init.normal_(self.pos_embedding_helper.pos_embed)
                self.pos_embedding_helper.pos_embed *= scale

            if self.num_cls_tokens > 0:
                nn.init.normal_(self.cls_token)
                self.cls_token *= scale
        elif init_param_style == "vit":
            self.cls_token.data.fill_(0)
        else:
            raise ValueError(f"Unknown init {init_param_style}")

        if self.use_type_embed:
            nn.init.normal_(self.type_embed)

    def tokenize_input_and_cls_pos(self, input, stem, mask):
        # tokens is of shape B x L x D
        tokens = stem(input)
        assert tokens.ndim == 3
        assert tokens.shape[2] == self.embed_dim
        B = tokens.shape[0]
        if self.num_cls_tokens > 0:
            class_tokens = self.cls_token.expand(
                B, -1, -1
            )  # stole class_tokens impl from Phil Wang, thanks
            tokens = torch.cat((class_tokens, tokens), dim=1)
        if self.use_pos_embed:
            pos_embed = self.pos_embedding_helper.get_pos_embedding(input, tokens)
            tokens = tokens + pos_embed
        if self.use_type_embed:
            tokens = tokens + self.type_embed.expand(B, -1, -1)
        return tokens

    def forward(self, vision=None, depth=None, patch_mask=None):
        if patch_mask is not None:
            raise NotImplementedError()

        if vision is not None:
            vision_tokens = self.tokenize_input_and_cls_pos(
                vision, self.rgbt_stem, patch_mask
            )

        if depth is not None:
            depth_tokens = self.tokenize_input_and_cls_pos(
                depth, self.depth_stem, patch_mask
            )

        # aggregate tokens
        if vision is not None and depth is not None:
            final_tokens = vision_tokens + depth_tokens
        else:
            final_tokens = vision_tokens if vision is not None else depth_tokens
        return_dict = {
            "trunk": {
                "tokens": final_tokens,
            },
            "head": {},
        }
        return return_dict

rgbt_stem = PatchEmbedGeneric(
            proj_stem=[
                PadIm2Video(pad_type="repeat", ntimes=2),
                nn.Conv3d(
                    in_channels=3,
                    kernel_size=kernel_size,
                    out_channels=vision_embed_dim,
                    stride=kernel_size,
                    bias=False,
                ),
            ]
        )

rgbt_preprocessor = RGBDTPreprocessor(
            img_size=[3, 2, 224, 224],
            num_cls_tokens=1,
            pos_embed_fn=partial(SpatioTemporalPosEmbeddingHelper, learnable=True),
            rgbt_stem=rgbt_stem,
            depth_stem=None,
        )

rgbt_preprocessor(dummy_img[0])['trunk']['tokens'].shape

In [None]:
from models.imagebind_model import ImageBindModel
imageBind=ImageBindModel()

In [None]:
import torch
dummy_img = torch.zeros(
                [
                    1,
                ]
                + [3,2, 224, 224]
            )
imageBind.layer_shapes(dummy_img,modality_type="vision")

In [None]:
from models.multimodal_preprocessors import PatchEmbedGeneric,PadIm2Video
from torchvision import transforms
from datasets.VideoDataset import resize_pad,VideoDataset
import torch
import pandas as pd
dummy_img = torch.zeros(
                [3, 500, 500]
            )
# PadIm2Video(pad_type="repeat", ntimes=2)(dummy_img).shape
videodataset=VideoDataset()
data=videodataset[0]

In [None]:
# transpose to B, T, C, H, W
data=data[0].permute(0,2,1,3,4)
# visualize first frame
import matplotlib.pyplot as plt
plt.imshow(data[0,0].permute(1,2,0))
plt.show()

In [None]:
import torch
path='.checkpoints/imagebind_huge.pth'
state_dict = torch.load(path)

In [None]:
state_dict['modality_heads.thermal.2.weight'].shape

In [None]:
[keys for keys in state_dict.keys() if 'thermal' in keys]

In [None]:
[key for key in state_dict.keys() if key.startswith("modality_preprocessors.thermal")]

In [None]:
from models.events import EventModel
e=EventModel()
e

In [None]:
e.event_preprocessor.cls_token

In [None]:
e.event_preprocessor.state_dict()['cls_token']

In [None]:
e.load_weights(path='.checkpoints/imagebind_huge.pth')

In [None]:
path='/tsukimi/datasets/Chiba/finetune_train/HL-HC-Official-2023_12_04_14_10_37-100.pkl'
import pickle as pkl
with open(path, 'rb') as f:
    data = pkl.load(f)
from datasets.EventDataset import events_to_image as e2i

In [None]:
events=data['events'][0]
events['polarity'][events['polarity']==0]=-1
events_positive=events[events['polarity']==1]
events_negative=events[events['polarity']==-1]
event_frame_positive=e2i(events_positive['x'],events_positive['y'],events_positive['polarity'])
event_frame_negative=e2i(events_negative['x'],events_negative['y'],events_negative['polarity'])
#find the indexes where eventframe not equals to 0
import matplotlib.pyplot as plt
plt.imshow(event_frame_negative, cmap='gray')
plt.show()
plt.imshow(event_frame_positive, cmap='gray')
plt.show()

In [None]:
# concat image
import numpy as np
event_frame=np.stack([event_frame_positive,event_frame_negative,event_frame_positive+event_frame_negative])
plt.imshow(np.transpose(event_frame,(1, 2, 0)))

In [None]:
from datasets.EventDataset import events_to_image_torch as e2it
import torch
events=data['events'][0]
events['polarity'][events['polarity']==0]=-1
events_positive=events[events['polarity']==1]
events_negative=events[events['polarity']==-1]

# deep compare e2it(events_negative['x'],events_negative['y'],events_negative['polarity']) and event_frame_negative
events_positive_frame=e2it(events_positive['x'],events_positive['y'],events_positive['polarity'])
events_negative_frame=e2it(events_negative['x'],events_negative['y'],events_negative['polarity'])
event_frame=torch.stack([events_positive_frame,events_negative_frame,events_positive_frame+events_negative_frame],dim=0)
event_frame.shape

In [None]:
dummy_path={'train':['/tsukimi/datasets/Chiba/finetune_train/HL-HC-Official-2023_12_04_14_10_37-100.pkl']}
import pickle
#save dummy_path
with open('dummy_path.pkl', 'wb') as f:
    pickle.dump(dummy_path, f)

In [None]:
from datasets.EventDataset import EventDataset
import numpy as np
import torch

eventdataset=EventDataset(mode='train',data_dir='/tsukimi/datasets/Chiba/finetune_train/',path='dummy_path.pkl')

In [None]:
eventdataset[0]['image_units'].shape

In [None]:
csv_path='/tsukimi/datasets/Chiba/baseline/datalist_3'
import cv2
import pandas as pd
import torch
sum = torch.tensor([0.0, 0.0, 0.0])
sum_of_squares = torch.tensor([0.0, 0.0, 0.0])
num_pixels = 0


for name in ['train','val','test']:
    data=pd.read_csv(csv_path+f'/{name}.csv')
    videos=list(data.values[:,0])
    for video in videos:
        # Open video file
        cap = cv2.VideoCapture(video)
        
        while True:
            success, frame = cap.read()
            if not success:
                break
            
            # Convert frame to RGB
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Convert to tensor and normalize to [0, 1]
            frame = torch.tensor(frame).permute(2, 0, 1).float() / 255.0
            
            # Calculate sum and sum of squares
            sum += frame.sum([1, 2])
            sum_of_squares += (frame ** 2).sum([1, 2])
            num_pixels += frame.size(1) * frame.size(2)
        
        cap.release()

# Calculate mean and std
mean = sum / num_pixels
std = (sum_of_squares / num_pixels - mean ** 2) ** 0.5
print('Mean:', mean)
print('Std:', std)

In [None]:
from datasets.VideoDataset import VideoDataset
from models import imagebind_model
import torch
imagebind=imagebind_model.imagebind_huge(pretrained=True)
imagebind.eval()

In [None]:
dummy_input = torch.zeros(
                [
                    3,2
                ]
                + [3, 2,224, 224]
            )
with torch.no_grad():
    embeddings = imagebind({"vision":dummy_input})
embeddings['vision'].shape

In [1]:
from train_baseline import VideoTrain
from datasets.VideoDataset import VideoDataModule

train=VideoTrain()
# load batch
dataset=VideoDataModule(csv_path='/tsukimi/datasets/Chiba/baseline/datalist_3')
loader=dataset.train_dataloader()
batch,labels=next(iter(loader))
embed=train(batch)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
embed

tensor([[ 0.0780,  0.0870, -0.0244]], grad_fn=<AddmmBackward0>)

In [1]:
from datasets.VideoDataset import VideoDataModule
module=VideoDataModule(csv_path='/tsukimi/datasets/Chiba/baseline/datalist_3')
module.setup(stage='fit')
loader=module.train_dataloader()
print(len(loader))
batch=next(iter(loader))
from train_baseline import VideoTrain
train=VideoTrain()
embed=train(batch[0])

  from .autonotebook import tqdm as notebook_tqdm


9856


In [12]:
from eventutils import AccMetric,ConfusionMatrixMetric, multi_label_accuracy, custom_multi_label_pred, ground_truth_decoder
from pytorch_loss import FocalLossV3
labels=batch[1]
gt=ground_truth_decoder(labels)
c=FocalLossV3()
c(embed.to('cuda'),gt.to('cuda'))

tensor(0.0755, device='cuda:0', grad_fn=<MeanBackward0>)

In [14]:
next(iter(loader))[0].shape

torch.Size([1, 42, 3, 2, 224, 224])

: 