In [1]:
import sys
sys.path.append('..')
from layers.dynamic_rnn import DynamicLSTM
from layers.attention_vis import Attention
# from layers.attention import Attention
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision.models as models
import numpy as np
from einops import rearrange
from models.visual_net import resnet18
import argparse
import os

from PIL import Image
import cv2



In [9]:
class AVQA_Fusion_Net(nn.Module):
    def __init__(self, args):
        super(AVQA_Fusion_Net, self).__init__()
        self.device = 'cpu'
        self.qst_vocab_size = 93
        self.word_embed_size = 512
        self.embed_dim_audio = 128
        self.embed_dim_video = 512  # or 2048
        self.hidden_dim = 256
        self.num_classes = 42  # size of answer vocab
        self.stage1_hops = 2
        self.stage2_hops = 2
        self.num_heads = 4
        self.lstm_num_layers = 1
        self.que_max_len = 10
        # get the feature from [-2] layer of resnet18
        # self.img_extractor = nn.Sequential(*list(resnet18(pretrained=True, modal="vision").children())[:-1])
        # img_extractor = models.video.r2plus1d_18(pretrained=True)
        # self.img_extractor = nn.Sequential(*list(img_extractor.children())[:-1])
        # for p in self.img_extractor.parameters():
        #     p.requires_grad = False
        self.word2vec = nn.Embedding(self.qst_vocab_size, self.word_embed_size)
        self.bi_lstm_question = DynamicLSTM(self.word_embed_size,self.hidden_dim,num_layers=self.lstm_num_layers,batch_first=True,bidirectional=True,)
        self.bi_lstm_audio = DynamicLSTM(self.embed_dim_audio,self.hidden_dim,num_layers=self.lstm_num_layers,batch_first=True,bidirectional=True,)
        self.bi_lstm_video = DynamicLSTM(self.embed_dim_video,self.hidden_dim,num_layers=self.lstm_num_layers,batch_first=True,bidirectional=True,)
        
        self.tanh = nn.Tanh()
        self.EAVF_fusion = nn.Linear(self.hidden_dim * 6, self.hidden_dim * 2)
        self.MF_fusion = nn.Linear(self.hidden_dim * 2, self.hidden_dim * 2)
        self.LAF_fusion = nn.Linear(self.hidden_dim * 2, self.hidden_dim * 2)
        self.fc_ans = nn.Linear(self.hidden_dim * 2 * self.que_max_len, self.num_classes)

    # (self, audio, visual_posi, visual_nega, question)
    def forward(self, audio_posi, video_posi, video_nega, question):
        '''
        question      [B, C]
        audio         [B, T, C]
        video_posi    [B, T, C, H, W]
        video_nega    [B, T, C, H, W]
        '''
        B, T, C = video_posi.size()
        # question_memory_len = torch.sum(question != 0, dim=-1).to(self.device)
        question_memory_len = torch.tensor([self.que_max_len for i in range(B)]).to(
            self.device
        )
        # print(question_memory_len)
        audio_memory_len = torch.tensor([T for i in range(B)]).to(self.device)
        video_memory_len = torch.tensor([T for i in range(B)]).to(self.device)
        # nonzeros_question = torch.tensor(question_memory_len).to(self.device)

        question = self.word2vec(question)  # [B, maxseqlen, C] [B, 14, 512]

        # question_memory [B, 14, 512], audio_memory [B, T, 512], video_*_memory [B, T, 512]
        question_memory, (_, _) = self.bi_lstm_question(question, question_memory_len)
        audio_memory, (_, _) = self.bi_lstm_audio(audio_posi, audio_memory_len)
        video_posi_memory, (_, _) = self.bi_lstm_video(video_posi, video_memory_len)
        video_nega_memory, (_, _) = self.bi_lstm_video(video_nega, video_memory_len)
        # print('question_memory: ', question_memory.shape)
        
        # EAVF
        av_feat = torch.cat((audio_memory, video_posi_memory),dim=-1,)
        qav_feat = torch.cat((question_memory, av_feat),dim=-1,)
        EAVF_feat = self.tanh(qav_feat)
        EAVF_feat = self.EAVF_fusion(EAVF_feat)
        EAVF_feat = self.tanh(EAVF_feat)
        
        # MF
        qav_feat = audio_memory * video_posi_memory * question_memory
        MF_feat = self.tanh(qav_feat)
        MF_feat = self.MF_fusion(MF_feat)
        MF_feat = self.tanh(MF_feat)
        
        # LAF
        qav_feat = audio_memory * video_posi_memory * question_memory
        LAF_feat = self.tanh(qav_feat)
        LAF_feat = self.LAF_fusion(LAF_feat)
        LAF_feat = self.tanh(LAF_feat)
        
        averaged_feature = (EAVF_feat + MF_feat + LAF_feat)/3
        # print('averaged_feature: ', averaged_feature.shape)
        
        combined_feature = rearrange(averaged_feature, 'b t c -> b (t c)')
        out = self.fc_ans(combined_feature)  # [batch_size, ans_vocab_size]

        return out

In [11]:
if __name__ == "__main__":
    # parser = argparse.ArgumentParser()
    # args = parser.parse_args()
    # args.device = 'cpu'
    args = ''
    model = AVQA_Fusion_Net(args)
    model.eval()
    audio = torch.randn(2, 10, 128)
    video_posi = torch.randn(2, 10, 512)
    video_nega = torch.randn(2, 10, 512)
    question = np.array([np.random.randint(0, 93, 14), np.random.randint(0, 93, 14)])
    question = torch.from_numpy(question).long()
    out_qa = model(audio, video_posi, video_nega, question)
    print('\nout_qa feature dimension ----- ', out_qa.size())
    # print('loc_et_posi_audio feature dimension ----- ', loc_et_posi_audio.size())
    # print('loc_et_posi_video feature dimension ----- ', loc_et_posi_video.size())
    # print('loc_et_nega_video feature dimension ----- ', loc_et_nega_video.size())
    # print('glo_et_posi_audio feature dimension ----- ', glo_et_posi_audio.size())
    # print('loc_et_nega_video feature dimension ----- ', glo_et_posi_video.size())
    # print('glo_et_posi_video feature dimension ----- ', glo_et_nega_video.size())
    # print('loc_att_map feature dimension ----- ', loc_att_map.size())
    print('-- model constructing successfully --')
   

averaged_feature:  torch.Size([2, 10, 512])

out_qa feature dimension -----  torch.Size([2, 42])
-- model constructing successfully --


# trihard loss

In [21]:
import torch.nn as nn
import torch

class TripletLoss(nn.Module):
    """Triplet loss with hard positive/negative mining.
    Reference:
    Hermans et al. In Defense of the Triplet Loss for Person Re-Identification. arXiv:1703.07737.
    Code imported from https://github.com/Cysu/open-reid/blob/master/reid/loss/triplet.py.
    Args:
        margin (float): margin for triplet.
    """
    def __init__(self, margin=0.3):
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.ranking_loss = nn.MarginRankingLoss(margin=margin)

    def forward(self, inputs, targets):
        """
        Args:
            inputs: feature matrix with shape (batch_size, feat_dim)
            targets: ground truth labels with shape (num_classes)
        """
        n = inputs.size(0)
        # Compute pairwise distance, replace by the official when merged
        dist = torch.pow(inputs, 2).sum(dim=1, keepdim=True).expand(n, n)
        print('\ndist ----- 0', dist.shape)
        print(dist)
        dist = dist + dist.t()
        print('\ndist ----- 1', dist.shape)
        print(dist)
        # dist.addmm_(1, -2, inputs, inputs.t())
        dist = torch.addmm(1, dist, -2, inputs, inputs.t())
        dist = dist.clamp(min=1e-12).sqrt()  # for numerical stability
        # For each anchor, find the hardest positive and negative
        mask = targets.expand(n, n).eq(targets.expand(n, n).t())
        dist_ap, dist_an = [], []
        for i in range(n):
            dist_ap.append(dist[i][mask[i]].max().unsqueeze(0))
            dist_an.append(dist[i][mask[i] == 0].min().unsqueeze(0))
        dist_ap = torch.cat(dist_ap)
        dist_an = torch.cat(dist_an)
        # Compute ranking hinge loss
        y = torch.ones_like(dist_an)
        loss = self.ranking_loss(dist_an, dist_ap, y)
        return loss

In [22]:
if __name__ == '__main__':
    target = [1,1,1,1,2,2,2,2,3,3,3,3,4,4,4,4,5,5,5,5,6,6,6,6,7,7,7,7,8,8,8,8]
    target = torch.tensor(target)
    features = torch.randn(32, 512)
    a = TripletLoss()
    loss = a.forward(features, target)
    print(loss)


dist ----- 0 torch.Size([32, 32])
tensor([[519.9790, 519.9790, 519.9790,  ..., 519.9790, 519.9790, 519.9790],
        [529.4755, 529.4755, 529.4755,  ..., 529.4755, 529.4755, 529.4755],
        [503.3029, 503.3029, 503.3029,  ..., 503.3029, 503.3029, 503.3029],
        ...,
        [514.5906, 514.5906, 514.5906,  ..., 514.5906, 514.5906, 514.5906],
        [560.9006, 560.9006, 560.9006,  ..., 560.9006, 560.9006, 560.9006],
        [477.0782, 477.0782, 477.0782,  ..., 477.0782, 477.0782, 477.0782]])

dist ----- 1 torch.Size([32, 32])
tensor([[1039.9580, 1049.4546, 1023.2820,  ..., 1034.5696, 1080.8796,
          997.0573],
        [1049.4546, 1058.9510, 1032.7784,  ..., 1044.0662, 1090.3762,
         1006.5538],
        [1023.2820, 1032.7784, 1006.6059,  ..., 1017.8936, 1064.2036,
          980.3812],
        ...,
        [1034.5696, 1044.0662, 1017.8936,  ..., 1029.1812, 1075.4912,
          991.6688],
        [1080.8796, 1090.3762, 1064.2036,  ..., 1075.4912, 1121.8013,
         1037

# net test

In [10]:
import sys
sys.path.append('..')
import torch
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

from models.visual_net_vis import resnet18


class AVQA_AVatt_Grounding(nn.Module):

    def __init__(self):
        super(AVQA_AVatt_Grounding, self).__init__()

        # for features
        self.fc_a1 =  nn.Linear(128, 512)
        self.fc_a2=nn.Linear(512,512)

        # visual
        self.visual_net = resnet18(pretrained=True)

        # combine
        self.fc1 = nn.Linear(1024, 512)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(512, 256)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(256, 128)
        self.relu3 = nn.ReLU()
        self.fc4 = nn.Linear(128, 2)
        self.relu4 = nn.ReLU()
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        self.fc_gl=nn.Linear(1024,512)
        self.tanh = nn.Tanh()


    def forward(self, video_id, audio, visual):

        ## audio features
        audio_feat = F.relu(self.fc_a1(audio))
        audio_feat=self.fc_a2(audio_feat)                      # [16, 20, 512]
        (B, T, C) = audio_feat.size()
        audio_feat = audio_feat.view(B*T, C)                # [320, 512]

        ## visual, input: [16, 20, 3, 224, 224]
        (B, T, C, H, W) = visual.size()
        visual = visual.view(B * T, C, H, W)                # [320, 3, 224, 224]

        v_feat_out_res18 = self.visual_net(visual)                    # [320, 512, 14, 14]
        print('v_feat_out_res18 ----- ', v_feat_out_res18.size())
        v_feat=self.avgpool(v_feat_out_res18)
        visual_feat_before_grounding=v_feat.squeeze()     # 320 512
        
        (B, C, H, W) = v_feat_out_res18.size()
        v_feat = v_feat_out_res18.view(B, C, H * W)
        v_feat = v_feat.permute(0, 2, 1)  # B, HxW, C
        visual = nn.functional.normalize(v_feat, dim=2)
             
        ## audio-visual grounding
        audio_feat_aa = audio_feat.unsqueeze(-1)            # [320, 512, 1]
        audio_feat_aa = nn.functional.normalize(audio_feat_aa, dim=1)
        visual_feat = visual
        print('visual_feat ----- ', visual_feat.size())
        print('audio_feat_aa ----- ', audio_feat_aa.size())
        x2_va = torch.matmul(visual_feat, audio_feat_aa).squeeze()

        x2_p = F.softmax(x2_va, dim=-1).unsqueeze(-2)       # [320, 1, 196]
        visual_feat_grd = torch.matmul(x2_p, visual_feat)
        visual_feat_grd = visual_feat_grd.squeeze()         # [320, 512]   

        visual_gl=torch.cat((visual_feat_before_grounding,visual_feat_grd),dim=-1)
        visual_feat_grd=self.tanh(visual_gl)
        visual_feat_grd=self.fc_gl(visual_feat_grd)

        # combine a and v
        feat = torch.cat((audio_feat, visual_feat_grd), dim=-1)     # [320, 1024]

        feat = F.relu(self.fc1(feat))   # (1024, 512)
        feat = F.relu(self.fc2(feat))   # (512, 256)
        feat = F.relu(self.fc3(feat))   # (256, 128)
        feat = self.fc4(feat)   # (128, 2)

        return  x2_p, feat

In [11]:
if __name__ == "__main__":
    # parser = argparse.ArgumentParser()
    # args = parser.parse_args()
    # args.device = 'cpu'
    args = ''
    model = AVQA_AVatt_Grounding()
    model.eval()
    audio = torch.randn(32, 10, 128)
    video_posi = torch.randn(32, 10, 3, 224, 224)
    question = np.array([np.random.randint(0, 93, 14), np.random.randint(0, 93, 14)])
    att_map, feat = model(1, audio, video_posi)
    print('att_map ----- ', att_map.size())
    print('feat ----- ', feat.size())
    print('-- model constructing successfully --')

---------- >>> load pretrained res-18 <<< ----------

v_feat_out_res18 -----  torch.Size([320, 512, 14, 14])
visual_feat -----  torch.Size([320, 196, 512])
audio_feat_aa -----  torch.Size([320, 512, 1])
att_map -----  torch.Size([320, 1, 196])
feat -----  torch.Size([320, 2])
-- model constructing successfully --
