In [1]:
from frames import Frames
from audio import Audio
from label_map import label_map

import torch
# import torchvision.transforms as transforms
from torch.utils.data import Dataset

import pandas as pd
from omegaconf import OmegaConf
from pathlib import Path


class OriginalMultimodalDataset(Dataset):
    """
    加载原始的多模态数据。
    """
    def __init__(self, is_need_audio=True, path_config_path_str='../configs/path.yaml'):
        # 导入配置。
        self.path_config = OmegaConf.load(path_config_path_str)

        # 加载视频、字幕、音频的路径。
        self.base_dir = Path(self.path_config['datasets']['base_dir'])
        self.base_video_dir = Path(self.path_config['datasets']['base_video_dir'])
        self.base_subtitle_dir = Path(self.path_config['datasets']['base_subtitle_dir'])
        self.base_audio_dir = Path(self.path_config['datasets']['base_audio_dir'])

        # 导入主控制文件。
        self.is_need_audio = is_need_audio
        self.all_data = pd.read_json(self.path_config['datasets']['base_all_data'], dtype={'video_id': str})

    def __len__(self):
        return len(self.all_data)

    def __getitem__(self, idx):
        frames_data = self.get_frames_data(self.all_data.loc[idx, 'video_id'])
        result = {
            'title': self.all_data.loc[idx, 'title'],
            'emotion_name': self.all_data.loc[idx, 'emotion'],
            'emotion': label_map[self.all_data.loc[idx, 'emotion']],
            'scenes': frames_data['images'],  # 这是一个list。
            'subtitles': frames_data['subtitles'],  # 这是一个list。
        }
        if self.is_need_audio:
            audio_data = self.get_audio_data_dict(self.all_data.loc[idx, 'video_id'])
            result = result | audio_data
        return result

    def get_frames_data(self, video_id):
        frames = Frames(video_id)
        video_info = frames.get_video_info()
        frames_image = frames.get_frame_image_by_time()
        frames_subtitle = frames.get_frame_subtitle_by_time()
        return {
            'images': frames_image,
            'subtitles': frames_subtitle,
        }

    def get_audio_data_dict(self, video_id):
        audio = Audio(video_id)
        waveform, sample_rate = audio.load_audio()
        return {
            'audio_waveform': waveform,
            'audio_sample_rate': sample_rate
        }



In [8]:
# from .original_dataset import OriginalMultimodalDataset

from embedding import TextEncoder, Captioner, ImageEncoder, FaceExtractor, AudioEncoder

import torch
import torchvision.transforms as transforms

from pathlib import Path
from omegaconf import OmegaConf


class ProcessedMultimodalDataset(OriginalMultimodalDataset):
    def __init__(self, is_need_caption=True, is_need_audio=True, path_config_path_str='../configs/path.yaml'):
        super().__init__(is_need_audio, path_config_path_str)
        self.is_need_caption = is_need_caption
        # self.image_transform = image_transform

        # 定义好的一系列的处理和编码器。
        self.text_encoder = TextEncoder()
        self.captioner = Captioner()
        self.image_encoder = ImageEncoder()
        self.face_extractor = FaceExtractor()
        self.audio_encoder = AudioEncoder()

    def __getitem__(self, idx):
        data = super().__getitem__(idx)
        result = {
            'emotion': torch.tensor(data['emotion'], dtype=torch.long),
            # 'title': data['title'],
            'title_embedding': self.text_encoder.encode(data['title']),
            'scene_embedding_list': self.get_scene_embedding_list(data['scenes']),
            'face_embedding_list': self.get_face_embedding_list(data['scenes']),
            'text_embedding_list': self.get_text_embedding_list(data['scenes'], data['subtitles']),
            # 'audio': data['audio'],
        }
        if self.is_need_audio:
            audio_embedding = self.get_audio_embedding((data['audio_waveform'], data['audio_sample_rate']))
            result = result | audio_embedding
        return result

    def get_scene_embedding_list(self, scenes):
        return [self.get_scene_embedding(scene) for scene in scenes]

    def get_face_embedding_list(self, scenes):
        return [self.get_face_embedding(scene) for scene in scenes]

    def get_text_embedding_list(self, scenes, subtitles):
        text_embeddings_list = []
        for i in range(len(subtitles)):
            scene = scenes[i]
            subtitle = subtitles[i]
            text_embeddings_list.append(self.get_text_embedding(scene, subtitle))

        return text_embeddings_list

    def transform_image(self):
        """将图片进行处理转换。主要是resize。"""
        pass

    def get_title_embedding(self, title):
        """获取标题的embedding。"""
        return self.text_encoder.encode(title)

    def get_text_embedding(self, scene, subtitle, is_need_caption=True):
        """获得text部分的embedding。会输入conditioned_text_encoder。"""
        caption = self.captioner.generate(scene)
        result = ''
        if self.is_need_caption:
            # 这里对于text的部分选择的方法是拼接。
            result = subtitle + '\n' + caption
        else:
            result = subtitle
        text_embedding = self.text_encoder.encode(result)
        return text_embedding

    def generate_caption(self, np_array_image):
        """输入ndarray图片，输出text的caption"""
        return self.captioner.generate(np_array_image)

    # def from_image_get_caption_list(self, np_array_image_list):
    #     """根据本dataset的设计，输入scene list，得到对应的caption list。"""
    #     return [self.generate_caption(np_array_image) for np_array_image in np_array_image_list]

    # def get_image_embedding(self):
    #     """获取image部分的embedding。会输入conditioned_image_encoder。"""

    def get_faces_and_ratios_list(self, np_array_image):
        """
        输入np_array的图片，返回一个元组的list，分别是(face_pil_image,face_area_ration)。
        这里默认输入的是scene。
        """
        faces_with_ratios_list = self.face_extractor.extract_face(np_array_image)
        return faces_with_ratios_list

    def get_face_embedding(self, scene, is_need_norm=False):
        """聚合多张脸的语义信息。返回结果是(num_faces,face_embedding)"""
        faces_with_ratios_list = self.get_faces_and_ratios_list(scene)
        num_faces = len(faces_with_ratios_list)

        total_ratio = sum(face_area_ratio for _, face_area_ratio in faces_with_ratios_list)
        weighted_embeddings = []
        for face_image, face_area_ratio in faces_with_ratios_list:
            # 先将脸部的图片进行编码。
            face_embedding = self.image_encoder.encode(face_image)
            if is_need_norm:
                # 这里如果需要进行归一化，就调整原本脸的占比的数值。
                face_area_ratio = face_area_ratio / total_ratio
            weighted_embedding = face_embedding * face_area_ratio
            weighted_embeddings.append(weighted_embedding)

        face_embedding = torch.sum(torch.stack(weighted_embeddings), dim=0)

        return num_faces, face_embedding

    def get_scene_embedding(self, scene):
        return self.image_encoder.encode(scene)

    def get_audio_embedding(self, audio):
        """获取audio部分的embedding。直接输入最终的decision模块。需要判断是否"""
        return {'audio_embedding': self.audio_encoder.encode(audio)}

    def build_image_transform(self):
        """默认的自建图片transform pipeline。"""



In [9]:
dataset = ProcessedMultimodalDataset()

Some weights of Wav2Vec2Model were not initialized from the model checkpoint at D:\dcmt\model\hf\facebook\wav2vec2-base-960h and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [11]:
data1 = dataset[0]

In [12]:
data1

{'emotion': tensor(7),
 'title_embedding': tensor([[[ 0.1563,  0.2086,  0.0874,  ..., -0.1670,  0.1266,  0.0060],
          [-0.0169, -0.0160,  0.0104,  ...,  0.2914,  0.0567,  0.0134],
          [ 0.0462,  0.1899,  0.0844,  ...,  0.0784, -0.0644,  0.1923],
          ...,
          [ 0.1621,  0.2183, -0.0066,  ..., -0.3488,  0.0164,  0.0850],
          [ 0.1621,  0.2183, -0.0066,  ..., -0.3488,  0.0164,  0.0850],
          [ 0.1621,  0.2183, -0.0066,  ..., -0.3488,  0.0164,  0.0850]]]),
 'scene_embedding_list': [tensor([[ 2.7576e-01,  2.3710e-01, -5.4151e-01,  7.5937e-01, -5.9688e-01,
           -2.3822e-01, -3.2324e-02, -1.0319e-01,  2.7173e-01,  3.5827e-01,
           -6.7644e-03, -1.2480e-01, -5.8395e-02,  2.6306e-02, -4.2684e-01,
            7.7804e-01, -3.8899e-01, -3.7860e-01,  2.2451e-01,  4.7585e-01,
            4.1474e-01, -4.3052e-01, -3.2304e-01,  2.7460e-01,  5.4479e-01,
            5.2682e-01,  4.3586e-02, -5.0043e-02, -3.9665e-01, -8.0432e-01,
            1.8042e-02,  1.2

In [13]:
data1['title_embedding'].shape

torch.Size([1, 128, 768])

In [17]:
len(data1['scene_embedding_list']), len(data1['face_embedding_list']), len(data1['text_embedding_list'])

(8, 8, 8)

In [16]:
data1['scene_embedding_list'][0].shape

torch.Size([1, 768])

In [19]:
data1['face_embedding_list'][0][1].shape

torch.Size([1, 768])

In [20]:
data1['text_embedding_list'][0].shape

torch.Size([1, 128, 768])

In [23]:
data1['audio_embedding'].shape

torch.Size([1, 394, 768])