In [1]:
from glob import glob
import os
import random
from collections import namedtuple
import json
import av
import numpy as np
import librosa
import cv2
import pickle
from IPython.display import Video
import IPython.display as ipd

from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import HTML

def video2numpy(path):
    container = av.open(path)
    fps = round(float(container.streams.video[0].average_rate))
    frames = np.stack([frame.to_image() for frame in container.decode(video=0)])
    container.close()
    return frames, fps


class DataSample:
    def __init__(self, DATASET_DIR='..', database=None, dataset='train', uttid=None):
        self.DATASET_DIR = DATASET_DIR
        if database is None:
            self.database = random.choice(['MELD', 'IEMOCAP', 'AFEW', 'CAER'])
        else:
            self.database = database
        self.dataset = dataset
        self.uttid = uttid            

        if uttid is None:
            # choose one random utternace id
            _, __, ___, ____, _____, filename = random.choice(
                glob(f'{self.DATASET_DIR}/{self.database}/face-videos/{self.dataset}/*/*.mp4')).split('/')
            self.uttid = '_'.join(filename.split('_')[:-1]).split('.')[0]
        else:
            self.uttid = uttid

        self.label_path = os.path.join(self.DATASET_DIR, self.database, 'labels.json')

        with open(self.label_path, 'r') as stream:
            self.labels = json.load(stream)

        self.label = self.labels[self.dataset][self.uttid]
            
    def get_paths(self):
        self.vid_path = os.path.join(self.DATASET_DIR, self.database, 'raw-videos', self.dataset, self.uttid +'.avi')
        if not os.path.isfile(self.vid_path):
            self.vid_path = self.vid_path.replace('.avi', '.mp4')
        if not os.path.isfile(self.vid_path):
            self.vid_path = None
            
        self.audio_path = os.path.join(self.DATASET_DIR, self.database, 'raw-audios', self.dataset, self.uttid + '.wav')
        if not os.path.isfile(self.audio_path):
            self.audio_path = self.audio_path.replace('.wav', '.mp3')
        if not os.path.isfile(self.audio_path):
            self.audio_path = None

        self.text_path = os.path.join(self.DATASET_DIR, self.database, 'raw-texts', self.dataset, self.uttid + '.json')
        if not os.path.isfile(self.text_path):
            self.text_path = None
            
        self.face_path = os.path.join(self.DATASET_DIR, self.database, 'faces', self.dataset, self.uttid + '.pkl')
        if not os.path.isfile(self.face_path):
            self.face_path = None

        self.face_videos_path = glob(os.path.join(self.DATASET_DIR, self.database, 
                                                  'face-videos', self.dataset, self.uttid,'*.mp4'))
                
    def load_data(self, SAMPLING_RATE=22050):
        self.sr = SAMPLING_RATE
        if self.vid_path is not None:
            self.vid, self.fps = video2numpy(self.vid_path)
        else:
            self.vid = None
        
        if self.audio_path is not None:
            self.audio = librosa.core.load(self.audio_path, sr=self.sr)[0]
        else:
            self.audio = None
            
        if self.text_path is not None:
            with open(self.text_path, 'r') as stream:
                self.text = json.load(stream)['Utterance']
        else:
            self.text = None
            
        if self.face_path is not None:
            with open(self.face_path, 'rb') as stream:
                self.faces = pickle.load(stream)

            assert len(self.faces) == len(self.vid)
            
        if len(self.face_videos_path) !=0 :
            self.face_videos = [video2numpy(path)[0] for path in self.face_videos_path]
            
            for fv in self.face_videos:
                assert len(fv) == len(self.vid)

            self.faces_concat = np.zeros((len(self.vid), fv.shape[1], fv.shape[2]*len(self.face_videos), 3),
                                        dtype=np.uint8)
            for i, fv in enumerate(self.face_videos):
                self.faces_concat[:,:,fv.shape[2]*i:fv.shape[2]*(i+1),:] = fv
            
    def draw_box_landmark(self, box_color=(0, 0, 255), box_size=2, landmark_color=(255, 0, 0), landmark_size=2):
        for idx, (frame, faces) in enumerate(self.faces.items()):
            for face in faces:
                box = [int(num) for num in face['bbox']]
                landmark = face['landmark'].astype(np.int)
                cv2.rectangle(self.vid[idx], (box[0], box[1]), (box[2], box[3]), box_color, box_size)
                for l in landmark:
                    cv2.circle(self.vid[idx], (l[0], l[1]), landmark_size, landmark_color)

In [2]:
%matplotlib inline

ds = DataSample(database='CAER')
ds.get_paths()
print(ds.vid_path)
print(ds.audio_path)
print(ds.text_path)
print(ds.face_path)
print(ds.face_videos_path)
print()
print(f"labeled emotion: {ds.label}")

ds.load_data()

print()
print(ds.text)

ipd.Audio(ds.audio_path) # load a local WAV file

../CAER/raw-videos/train/train-disgust-0076.avi
../CAER/raw-audios/train/train-disgust-0076.mp3
None
../CAER/faces/train/train-disgust-0076.pkl
['../CAER/face-videos/train/train-disgust-0076/train-disgust-0076_001.mp4', '../CAER/face-videos/train/train-disgust-0076/train-disgust-0076_000.mp4', '../CAER/face-videos/train/train-disgust-0076/train-disgust-0076_002.mp4']

labeled emotion: disgust





None


In [3]:
def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

# np array with shape (frames, height, width, channels)
ds.draw_box_landmark()
video = ds.vid
# video = ds.face_videos[1]

fig = plt.figure(figsize=(10,10))
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image


anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=1/ ds.fps * 1000)
HTML(anim.to_html5_video())

In [4]:
def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

# np array with shape (frames, height, width, channels)
ds.draw_box_landmark()
video = ds.faces_concat

fig = plt.figure(figsize=(10,10))
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image


anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=1/ ds.fps * 1000)
HTML(anim.to_html5_video())