# 添加依赖

In [4]:
# 添加依赖，主要是总依赖，包含EfficientFaceTemporal的初始化
import time
import math
import re
import sys
import os
import argparse

import numpy as np
from numpy.lib.function_base import _quantile_unchecked
import cv2
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.backends import cudnn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from PIL import Image
# matplotlib.use('TkAgg')

from models import multimodalcnn
import utils

from models.modulator import Modulator
from models.efficientface import LocalFeatureExtractor, InvertedResidual
from models.transformer_timm import AttentionBlock, Attention
import torchvision.models as models

def conv1d_block(in_channels, out_channels, kernel_size=3, stride=1, padding='same'):
    return nn.Sequential(nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size,stride=stride, padding=padding),nn.BatchNorm1d(out_channels),
                                   nn.ReLU(inplace=True)) 

class EfficientFaceTemporal(nn.Module):

    def __init__(self, stages_repeats, stages_out_channels, num_classes=7, im_per_sample=25):
        super(EfficientFaceTemporal, self).__init__()

        if len(stages_repeats) != 3:
            raise ValueError('expected stages_repeats as list of 3 positive ints')
        if len(stages_out_channels) != 5:
            raise ValueError('expected stages_out_channels as list of 5 positive ints')
        self._stage_out_channels = stages_out_channels

        input_channels = 3
        output_channels = self._stage_out_channels[0]
        self.conv1 = nn.Sequential(nn.Conv2d(input_channels, output_channels, 3, 2, 1, bias=False),
                                   nn.BatchNorm2d(output_channels),
                                   nn.ReLU(inplace=True),)
        input_channels = output_channels

        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        stage_names = ['stage{}'.format(i) for i in [2, 3, 4]]
        for name, repeats, output_channels in zip(stage_names, stages_repeats, self._stage_out_channels[1:]):
            seq = [InvertedResidual(input_channels, output_channels, 2)]
            for i in range(repeats - 1):
                seq.append(InvertedResidual(output_channels, output_channels, 1))
            setattr(self, name, nn.Sequential(*seq))
            input_channels = output_channels

        self.local = LocalFeatureExtractor(29, 116, 1)
        self.modulator = Modulator(116)

        output_channels = self._stage_out_channels[-1]

        self.conv5 = nn.Sequential(nn.Conv2d(input_channels, output_channels, 1, 1, 0, bias=False),
                                   nn.BatchNorm2d(output_channels),
                                   nn.ReLU(inplace=True),)
        self.conv1d_0 = conv1d_block(output_channels, 64)
        self.conv1d_1 = conv1d_block(64, 64)
        self.conv1d_2 = conv1d_block(64, 128)
        self.conv1d_3 = conv1d_block(128, 128)

        # self.resnet50 = models.resnet50(pretrained=False)
        self.resnet50 = models.resnet50(pretrained=True)
        self.adaptive_avgpool = nn.AdaptiveAvgPool2d((7, 7))
        # self.resnet_conv1 =    resnet50.conv1
        # self.resnet_bn1 =      resnet50.bn1
        # self.resnet_relu =     resnet50.relu
        # self.resnet_maxpool =  resnet50.maxpool
        # self.resnet_layer1 =   resnet50.layer1
        # self.resnet_layer2 =   resnet50.layer2
        # self.resnet_layer3 =   resnet50.layer3
        # self.resnet_layer4 =   resnet50.layer4
        # self.resnet_avgpool =  resnet50.avgpool

        self.classifier_1 = nn.Sequential(
                nn.Linear(128, num_classes),
            )
        self.im_per_sample = im_per_sample
        
    def forward_features(self, x):  # torch.Size([1200, 3, 224, 224])
        x = self.conv1(x)   # torch.Size([1200, 29, 112, 112])
        x = self.maxpool(x) # torch.Size([1200, 29, 56, 56])
        x = self.modulator(self.stage2(x)) + self.local(x)  # torch.Size([1200, 116, 28, 28])
        x = self.stage3(x)  # torch.Size([1200, 232, 14, 14])
        x = self.stage4(x)  # torch.Size([1200, 464, 7, 7])
        x = self.conv5(x)   # torch.Size([1200, 1024, 7, 7])
        # 对每个通道上的所有元素求平均值。这样就得到了一个一维向量作为输出
        x = x.mean([2, 3]) #global average pooling， torch.Size([1200, 1024])
        return x
        
    def forward_features_resnet(self, x):  # torch.Size([1200, 3, 224, 224])
        x = self.resnet50.conv1(x)
        x = self.resnet50.bn1(x)
        x = self.resnet50.relu(x)
        x = self.resnet50.maxpool(x)
        x = self.resnet50.layer1(x)
        x = self.resnet50.layer2(x)
        x = self.resnet50.layer3(x)

        x = self.adaptive_avgpool(x)
        # x = torch.randn(1200, 1024, 7, 7)
        # 对每个通道上的所有元素求平均值。这样就得到了一个一维向量作为输出
        x = x.mean([2, 3]) #global average pooling， torch.Size([1200, 1024])
        return x

    def forward_stage1(self, x):
        #Getting samples per batch
        assert x.shape[0] % self.im_per_sample == 0, "Batch size is not a multiple of sequence length."
        n_samples = x.shape[0] // self.im_per_sample
        x = x.view(n_samples, self.im_per_sample, x.shape[1])
        x = x.permute(0,2,1)
        x = self.conv1d_0(x)
        x = self.conv1d_1(x)
        return x
        
        
    def forward_stage2(self, x):
        x = self.conv1d_2(x)
        x = self.conv1d_3(x)
        return x
    
    def forward_classifier(self, x):
        x = x.mean([-1]) #pooling accross temporal dimension
        x1 = self.classifier_1(x)
        return x1
    
    def forward(self, x):
        x = self.forward_features(x)
        x = self.forward_stage1(x)
        x = self.forward_stage2(x)
        x = self.forward_classifier(x)
        return x

# 处理图片序列

In [5]:
# 处理图片序列，（可以简单认为是处理视频），处理完应该主要得到的是numpy_video
# -*- coding: utf-8 -*-
import os
import numpy as np          
import cv2
from tqdm import tqdm
import torch
from facenet_pytorch import MTCNN
device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'


mtcnn = MTCNN(image_size=(720, 1280), device=device)

#mtcnn.to(device)
save_frames = 15
input_fps = 30

save_length = 3.6 #seconds
save_avi = False # True

failed_videos = []
# root = '/lustre/scratch/chumache/RAVDESS_or/'
root = '/home/ubuntu/work_space/datasets/RAVDESS_autido_speech/Actor_03/'

# 这段代码定义了一个lambda函数select_distributed，
# 它的作用是将视频的帧数分成若干段，然后在每一段中均匀地选择一些帧。
# 具体来说，它接受两个参数m和n，其中m表示要分成的段数，n表示视频的总帧数。
# 它返回一个长度为m的列表，列表中的每个元素表示在对应的段中选择的帧的索引。
# 这个函数在后面的代码中被用来选择视频中的一些帧进行人脸检测和裁剪。
select_distributed = lambda m, n: [i*n//m + n//(2*m) for i in range(m)]
n_processed = 0
filename = root + "01-01-03-02-01-01-03.mp4"
           
if filename.endswith('.mp4'):
                
    cap = cv2.VideoCapture(filename)
    #calculate length in frames
    framen = 0
    while True:
        i,q = cap.read()
        if not i:
            break
        framen += 1
    cap = cv2.VideoCapture(filename)

    # 这几行代码中的变量save_length表示要保存的视频长度(秒)，
    # input_fps表示视频的帧率，save_frames表示要保存的帧数，mtcnn是MTCNN模型的实例。
    # 如果视频的帧数小于要保存的帧数，代码会跳过一些帧以确保保存的帧数正确。
    # 如果视频处理失败，代码会将其添加到failed_videos列表中。
    
    # 这段代码实际上相当于if save_length > framen / input_fps:
    # 也就是说视频总时长小于需要保存的时长，这时候为什么要跳过帧呢？
    # 按理说不该是小于的时候才去掉两边吗，我感觉这里应该是写反了
    # 考虑save_length*input_fps = 3 * framen，这时候下面完全不会裁剪视频
    if save_length*input_fps > framen:                    
        skip_begin = int((framen - (save_length*input_fps)) // 2)
        for i in range(skip_begin):
            # 跳过一些帧，读取但是不处理就是跳过了
            _, im = cap.read() 
            
    framen = int(save_length*input_fps)    
    frames_to_select = select_distributed(save_frames,framen)
    save_fps = save_frames // (framen // input_fps) 
    if save_avi:
        out = cv2.VideoWriter(filename[:-4]+'_facecroppad.avi',cv2.VideoWriter_fourcc('M','J','P','G'), save_fps, (224,224))

    numpy_video = []
    success = 0
    frame_ctr = 0
    
    while True: 
        ret, im = cap.read()
        if not ret:
            break
        if frame_ctr not in frames_to_select:
            frame_ctr += 1
            continue
        else:
            frames_to_select.remove(frame_ctr)
            frame_ctr += 1

        try:
            gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
        except:
            failed_videos.append(i)
            break

        temp = im[:,:,-1]
        im_rgb = im.copy()
        im_rgb[:,:,-1] = im_rgb[:,:,0]
        im_rgb[:,:,0] = temp
        im_rgb = torch.tensor(im_rgb)
        im_rgb = im_rgb.to(device)

        bbox = mtcnn.detect(im_rgb)
        if bbox[0] is not None:
            bbox = bbox[0][0]
            bbox = [round(x) for x in bbox]
            x1, y1, x2, y2 = bbox
        im = im[y1:y2, x1:x2, :]
        im = cv2.resize(im, (224,224))
        if save_avi:
            out.write(im)
        numpy_video.append(im)
    # 如果可以添加的帧不足，则用空白帧替换
    if len(frames_to_select) > 0:
        for i in range(len(frames_to_select)):
            if save_avi:
                out.write(np.zeros((224,224,3), dtype = np.uint8))
            numpy_video.append(np.zeros((224,224,3), dtype=np.uint8))
    if save_avi:
        out.release() 
    np.save(filename[:-4]+'_facecroppad.npy', np.array(numpy_video))
    if len(numpy_video) != 15:
        print('Error', filename)    


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


# 处理音频序列

In [6]:
# 处理音频序列，这里的主要输出应该是y和sr
# -*- coding: utf-8 -*-

import librosa
import os
import soundfile as sf
import numpy as np

#audiofile = 'E://OpenDR_datasets//RAVDESS//Actor_19//03-01-07-02-01-02-19.wav'
##this file preprocess audio files to ensure they are of the same length. if length is less than 3.6 seconds, it is padded with zeros in the end. otherwise, it is equally cropped from 
##both sides

# root = '/lustre/scratch/chumache/RAVDESS_or/'
root = '/home/ubuntu/work_space/datasets/RAVDESS_autido_speech'
target_time = 3.6 #sec
audiofile = filename
        
# if not audiofile.endswith('.wav') or 'croppad' in audiofile:
#     continue

audios = librosa.core.load(audiofile, sr=22050)

y = audios[0]
sr = audios[1]
target_length = int(sr * target_time)
if len(y) < target_length:
    y = np.array(list(y) + [0 for i in range(target_length - len(y))])
else:
    remain = len(y) - target_length
    y = y[remain//2:-(remain - remain//2)]

# sf.write(audiofile[:-4]+'_croppad.wav', y, sr)



# 获取模型输出结果

## 加载模型

In [7]:
# 加载模型
num_classes = 8
seq_length = 15
pretrain_state_path = '/home/ubuntu/work_space/multimodal-emotion-recognition-experiment/best_results/1678717134.4151022lr_0.00017341600515462103seed_42optimizer_AdamWweight_decay_0.001_best_complete/RAVDESS_multimodalcnn_15_best0.pth'
pretrain_path = '/home/ubuntu/work_space/EfficientFace-master/checkpoint/Pretrained_EfficientFace.tar'
model = multimodalcnn.MultiModalCNN(fusion = 'iaLSTM', pretr_ef = pretrain_path)
pretrained_state = torch.load(pretrain_state_path)
pretrained_state_dict = pretrained_state['state_dict']
# 这里要将一些字符串替换掉才能得到合适的字典
pretrained_state_dict = {key.replace("module.", ""): value for key, value in pretrained_state_dict.items()}
model.load_state_dict(pretrained_state_dict)

print(type(model))
# print(model)

Initializing efficientnet
<class 'models.multimodalcnn.MultiModalCNN'>


## 处理音视频数据

In [8]:
# 处理音视频数据
import transforms
video_transform = transforms.Compose([
                transforms.ToTensor(255)])
# video_transform = transforms.Compose([
#     transforms.ToTensor()]) # opt.video_norm_value
# test_data = get_test_set(opt, spatial_transform=video_transform)
def video_loader(video_dir_path):
    video = np.load(video_dir_path)    
    video_data = []
    for i in range(np.shape(video)[0]):
        video_data.append(Image.fromarray(video[i,:,:,:]))    
    return video_data

import functools
def get_default_video_loader():
    return functools.partial(video_loader)

self_loader = get_default_video_loader()
video_path = '/home/ubuntu/work_space/datasets/RAVDESS_autido_speech/Actor_20/02-01-03-01-02-02-20_facecroppad.npy'
clip = self_loader(video_path)
self_spatial_transform = video_transform
self_spatial_transform.randomize_parameters()
# 这一句就让之前的视频加载操作无效了
clip = numpy_video
clip = [self_spatial_transform(img) for img in clip]
clip = torch.stack(clip, 0)
print(type(clip))
print(clip.shape)

import librosa
# def load_audio(audiofile, sr):
#     audios = librosa.core.load(audiofile, sr)
#     y = audios[0]
#     return y, sr
# audio_path = '/home/ubuntu/work_space/datasets/RAVDESS_autido_speech/Actor_20/03-01-03-01-02-02-20_croppad.wav'
# y, sr = load_audio(audio_path, sr=22050)

def get_mfccs(y, sr):
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=10)
    return mfcc
audio_features = get_mfccs(y, sr)

audio_features = torch.from_numpy(audio_features)
print(type(audio_features))
print(audio_features.shape)

# 将 audio_features 张量的维度从 (batch_size, num_channels, sequence_length) 
# 扩展为 (1, batch_size, num_channels, sequence_length)。
audio_features = audio_features.unsqueeze(0)
print(type(audio_features))
print(audio_features.shape)

audio_features_tmp = torch.zeros((64, 10, 3))
print(type(audio_features_tmp))
print(audio_features_tmp.shape)

# audio_features and clip is next input

<class 'torch.Tensor'>
torch.Size([15, 3, 224, 224])
<class 'torch.Tensor'>
torch.Size([10, 156])
<class 'torch.Tensor'>
torch.Size([1, 10, 156])
<class 'torch.Tensor'>
torch.Size([64, 10, 3])


In [37]:
# 测试代码，正式demo中实际上不需要这个
# import torch
# audio_features = torch.rand((1, 10, 13))
# print(audio_features)

tensor([[[0.4506, 0.7349, 0.7496, 0.2442, 0.2085, 0.6537, 0.4141, 0.8010,
          0.6745, 0.9483, 0.3453, 0.5775],
         [0.5649, 0.2819, 0.6138, 0.2782, 0.0668, 0.2279, 0.3788, 0.2516,
          0.7972, 0.1908, 0.0291, 0.6928],
         [0.2877, 0.8548, 0.7394, 0.1457, 0.2828, 0.0222, 0.9719, 0.8331,
          0.3450, 0.0873, 0.4397, 0.0760],
         [0.0965, 0.4549, 0.6765, 0.8307, 0.5361, 0.6237, 0.4570, 0.3490,
          0.5878, 0.2956, 0.1151, 0.7451],
         [0.2248, 0.9463, 0.5156, 0.1240, 0.5064, 0.6489, 0.7283, 0.2686,
          0.7721, 0.3729, 0.1039, 0.8908],
         [0.1624, 0.6752, 0.8542, 0.3142, 0.2070, 0.1302, 0.3621, 0.5767,
          0.5807, 0.3117, 0.1114, 0.8456],
         [0.2697, 0.1161, 0.3301, 0.5103, 0.1458, 0.4043, 0.5708, 0.8177,
          0.8311, 0.9241, 0.7423, 0.7344],
         [0.8604, 0.5618, 0.4336, 0.5197, 0.7332, 0.6024, 0.2580, 0.6312,
          0.3810, 0.4059, 0.7945, 0.3454],
         [0.6144, 0.6788, 0.2994, 0.3755, 0.8658, 0.1374, 0.4623

## 模型输出

In [38]:
# 模型输出
testResource = ['/home/ubuntu/work_space/datasets/RAVDESS_autido_speech/Actor_20/02-01-03-01-02-02-20_facecroppad.npy',
                '/home/ubuntu/work_space/datasets/RAVDESS_autido_speech/Actor_20/03-01-03-01-02-02-20_croppad.wav']
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

# 注意这里得赋值，不然张量不会转移到gpu上去
audio_features = audio_features.to(device)
clip = clip.to(device)
model.to(device)
model.cuda(1)   # 这里的0或者1代表你想使用哪块gpu

# Test the Model
model.eval()  # Change model to 'eval' mode (BN uses moving mean/var).

# result = model(audio_features_tmp, clip)
result = model(audio_features, clip)
# 下面是用来查看模型所在位置的测试代码（查看是在cpu上还是gpu上）
# for param in model.parameters():
#     print(param.device)
# print(audio_features.device)
# print(clip.device)

print(result.shape)
print(result)
print(sum(sum(result)))

RuntimeError: Given input size: (128x1x1). Calculated output size: (128x1x0). Output size is too small

# 处理原视频，提取人脸位置

In [8]:
# 提取识别到的表情
expression_list = ["中性", "平静", "快乐", "悲伤", "愤怒", "恐惧", "厌恶", "惊讶"]
max_index = torch.argmax(result[0])
expression = expression_list[max_index]
print(expression)

快乐


In [9]:
import time
import math
import re
import sys
import os
import argparse

import numpy as np
from numpy.lib.function_base import _quantile_unchecked
import cv2
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.backends import cudnn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
# from face_detection import RetinaFace
# import retinaface as RetinaFace
import matplotlib
from matplotlib import pyplot as plt
from PIL import Image
# matplotlib.use('TkAgg')

import utils

def parse_args():
    """Parse input arguments."""
    parser = argparse.ArgumentParser(
        description='Head pose estimation using the 6DRepNet.')
    parser.add_argument('--gpu',
                        dest='gpu_id', help='GPU device id to use [0]',
                        default=1, type=int)
    # 尝试使用video作为应用源
    parser.add_argument("--video", type=str, default='/home/ubuntu/work_space/datasets/RAVDESS_autido_speech/Actor_03/01-01-03-02-01-01-03.mp4',
                        help="Path of video to process i.e. /path/to/vid.mp4")
    parser.add_argument('--cam',
                        dest='cam_id', help='Camera device id to use [0]',
                        default=None, type=int) # 此处default在原文件中是0
    parser.add_argument('--snapshot',
                        dest='snapshot', help='Name of model snapshot.',
                        default='', type=str)
    parser.add_argument('--save_viz',
                        dest='save_viz', help='Save images with pose cube.',
                        default=False, type=bool)

    args = parser.parse_args()
    return args


transformations = transforms.Compose([transforms.Resize(224),
                                      transforms.CenterCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

# if __name__ == '__main__':
# args = parse_args()
cudnn.enabled = True
# gpu = args.gpu_id
gpu = 1
# cam = args.cam_id if args.cam_id is not None else args.video
cam = '/home/ubuntu/work_space/datasets/RAVDESS_autido_speech/Actor_03/01-01-03-02-01-01-03.mp4'
if(cam is None):
    print('Camera or video not specified as argument, selecting default camera node (0) as input...')
    cam = 0
# snapshot_path = args.snapshot

print('Loading data.')

# detector = RetinaFace(gpu_id=gpu)

cap = cv2.VideoCapture(cam)

# 以下代码用于保存处理后的视频文件
# 获取视频的FPS、宽度和高度
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# 定义视频编码器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')

# 创建输出视频的VideoWriter对象
out = cv2.VideoWriter('../datasets/test/output_video_multimodal_expression.mp4', fourcc, fps, (width, height))

# Check if the webcam is opened correctly
if not cap.isOpened():
    raise IOError("Cannot open webcam")

frame_count = 0
time_consume = 0

while True:
    ret, frame = cap.read()
    frame_count += 1
    if frame is None:
        break

    # faces = detector(frame)
    bbox = mtcnn.detect(frame)

    # for box, landmarks, score in faces:
    if bbox[0] is not None:
        bbox = bbox[0][0]
        bbox = [round(x) for x in bbox]
        x1, y1, x2, y2 = bbox

        # Print the location of each face in this image
        # if score < .95:
        #     continue
        # x_min = int(box[0])
        # y_min = int(box[1])
        # x_max = int(box[2])
        # y_max = int(box[3])
        x_min = x1
        y_min = y1
        x_max = x2
        y_max = y2
        bbox_width = abs(x_max - x_min)
        bbox_height = abs(y_max - y_min)

        x_min = max(0, x_min-int(0.2*bbox_height))
        y_min = max(0, y_min-int(0.2*bbox_width))
        x_max = x_max+int(0.2*bbox_height)
        y_max = y_max+int(0.2*bbox_width)

        # c = cv2.waitKey(1)
        # if c == 27:
        #     break

        start = time.time()
        Expression_pred = expression
        end = time.time()
        time_consume += (end - start)*1000.
        # print('Head pose estimation: %2f ms' % ((end - start)*1000.))
        # print("Processed frame per second: %2f fps" % (1000. / (time_consume / frame_count)))
        cv2.rectangle(frame, (int(x1),int(y1)), (int(x2),int(y2)), (0,255,0), 2)
        #utils.draw_axis(frame, y_pred_deg, p_pred_deg, r_pred_deg, left+int(.5*(right-left)), top, size=100)
        # 定义要添加的文字
        expression = 'happy'
        text = expression

        # 定义文字的位置和字体
        font = cv2.FONT_HERSHEY_SIMPLEX
        position = (x1, y1 - 20)
        font_scale = 2
        color = (0, 255, 0)
        thickness = 2

        # 在图像上添加文字
        cv2.putText(frame, text, position, font, font_scale, color, thickness)

    out.write(frame)
    # cv2.imshow("Demo", frame)
    # cv2.waitKey(5)
out.release()

Loading data.


# 保存视频

In [None]:
# 上面的代码已经保存了视频了
# 我能否做出一个实时效果的展示视频呢？