In [1]:
import meb
from meb import utils
from meb import datasets
from meb import core
from meb import models

from functools import partial
from typing import List, Tuple

import numpy as np
import pandas as pd
from numba import jit, njit
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import transforms
import timm
from tqdm import tqdm



pd.set_option("display.max_columns", 50)
%load_ext autoreload
%autoreload 2

In [15]:
c = datasets.CrossDataset(cropped=True, color=True, resize=320)
df = c.data_frame
data = c.data



In [30]:
c_flow = datasets.CrossDataset(optical_flow=True, resize=320)
data_flow = c_flow.data

In [13]:
import dlib
import cv2
import py_evm

In [14]:
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("../../data/shape_predictor_68_face_landmarks.dat")

In [32]:
def mner_preprocess(onset, apex, i):
    onset = (onset * 255.0).astype("uint8")
    apex = (apex * 255.0).astype("uint8")

    onset_g = cv2.cvtColor(onset, cv2.COLOR_RGB2GRAY)
    apex_g = cv2.cvtColor(apex, cv2.COLOR_RGB2GRAY)

    pic_size = onset.shape
    hsv = np.zeros(pic_size)
    hsv[:, :, 1] = cv2.cvtColor(apex, cv2.COLOR_RGB2HSV)[:, :, 1]

    flow = data_flow[i].transpose(1, 2, 0)[..., :2]

    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    hsv[:, :, 0] = ang * (180 / np.pi / 2)
    hsv[:, :, 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    hsv = np.asarray(hsv, dtype=np.float32)
    # This line is added to avoid using float[0-255]
    hsv /= 255.0
    rgb_flow = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
    return rgb_flow

In [33]:
flows = np.zeros((df.shape[0], 3, 320, 320))
for i, video in enumerate(tqdm(data, total=df.shape[0])):
    mm_video = py_evm.magnify(video)
    if df.loc[i, "apexf"] < mm_video.shape[0]:
        flow = mner_preprocess(mm_video[0], mm_video[df.loc[i, "apexf"]], i)
    else:
        flow = mner_preprocess(mm_video[0], mm_video[-1], i)
    flows[i] = flow.transpose(2, 0, 1)

100%|███████████████████████████████████████| 2031/2031 [50:11<00:00,  1.48s/it]


In [34]:
def weight_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        nn.init.xavier_normal_(m.weight.data)
        nn.init.constant_(m.bias.data, 0.0)

In [35]:
class NMER(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.meta = {'mean': [0.485, 0.456, 0.406],
                     'std': [0.229, 0.224, 0.225],
                     'imageSize': [224, 224]}
        self.backbone = timm.models.resnet18(pretrained=True)
        self.backbone = nn.Sequential(*(list(self.backbone.children())[:-2]))
        self.features_8 = nn.AvgPool2d(kernel_size=[7, 7], stride=[1, 1], padding=0, ceil_mode=False, count_include_pad=False)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        
        self.fc3_top = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Dropout(0.1)
        )
        self.fc3_bot = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Dropout(0.1)
        )
        self.fc = nn.Linear(64 * 2, num_classes)
        self.fc_top = nn.Linear(64, num_classes)
        self.fc_bot = nn.Linear(64, num_classes)
        
    def forward(self, x):
        features_7 = self.backbone(x)
        
        top = features_7[:, :, :3]
        bottom = features_7[:, :, 3:]
        
        top = self.avgpool(top)
        bottom = self.avgpool(bottom)
        
        top = top.view(top.size(0), -1)
        bottom = bottom.view(bottom.size(0), -1)
        # FC layers
        top = self.fc3_top(top)
        bottom = self.fc3_bot(bottom)
        features = torch.cat((top, bottom), 1)
        #Classification
        output = self.fc(features)
        output_top = self.fc_top(top)
        output_bottom = self.fc_bot(bottom)
        return output, output_top, output_bottom
        
        

In [36]:
from PIL import Image

def img_to_pil(img: np.ndarray):
    img = np.array(img)
    img = (img.transpose(1, 2, 0) * 255).astype("uint8")
    return Image.fromarray(img)

transform = transforms.Compose([
        img_to_pil,
        transforms.Resize((256, 256)),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
        transforms.RandomCrop((224, 224), pad_if_needed=True),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5],
                             std=[0.5, 0.5, 0.5])
    ])

In [37]:
class Config(core.Config):
    device = torch.device("cuda:0")
    scheduler = None
    evaluation_fn = [
        partial(utils.MultiLabelF1Score, average="macro"),
        partial(utils.MultiLabelF1Score, average="binary"),
    ]
    epochs = 1
    model = partial(NMER, num_classes=len(core.Config.action_units))
    optimizer = partial(optim.Adam, lr=0.0001, weight_decay=0.000001)
    train_transform = {"spatial": transform, "temporal": None}

In [39]:
NMERValidation(Config).validate_n_times(df, flows)

100%|████████████████████████████████████████████| 5/5 [21:08<00:00, 253.80s/it]

MultiLabelF1Score
AUS: ['AU1', 'AU2', 'AU4', 'AU5', 'AU6', 'AU7', 'AU9', 'AU10', 'AU12', 'AU14', 'AU15', 'AU17', 'Average']
48.4 & 45.5 & 53.5 & 43.0 & 41.4 & 44.7 & 42.5 & 35.9 & 44.9 & 44.7 & 37.7 & 43.1 & 43.8

Datasets:  ['casme', 'casme2', 'samm', 'fourd', 'mmew', 'casme3a', 'Average']
38.9 & 41.0 & 40.9 & 41.5 & 42.1 & 38.2 & 40.4
MultiLabelF1Score
AUS: ['AU1', 'AU2', 'AU4', 'AU5', 'AU6', 'AU7', 'AU9', 'AU10', 'AU12', 'AU14', 'AU15', 'AU17', 'Average']
19.1 & 19.2 & 43.3 & 9.3 & 6.3 & 9.1 & 12.5 & 4.8 & 18.1 & 22.6 & 3.3 & 6.4 & 14.5

Datasets:  ['casme', 'casme2', 'samm', 'fourd', 'mmew', 'casme3a', 'Average']
11.3 & 11.5 & 9.2 & 10.2 & 12.3 & 11.9 & 11.1





In [38]:
class NMERValidation(core.CrossDatasetValidation):
    def __init__(self, config: Config, verbose: bool = True):
        super().__init__(config)
        
    def train_one_epoch(self, epoch: int, dataloader: torch.utils.data.DataLoader):
        if epoch % 10 == 0 and epoch != 0:
            if self.optimizer.param_groups[1]['lr'] > 0.00001:
                self.optimizer.param_groups[1]['lr'] = self.optimizer.param_groups[1]['lr'] * 0.5
        num_updates = epoch * len(dataloader)
        for i, (X, y) in enumerate(dataloader):
            X, y = X.to(self.cf.device), y.to(self.cf.device)
            self.optimizer.zero_grad()
            if self.mixup_fn:
                X, y = self.mixup_fn(X.float(), y.float())
            out, out_top, out_bot = self.model(X.float())
            loss = (
                self.criterion(out, y)
                + self.criterion(out_top, y)
                + self.criterion(out_bot, y)
            )
            loss.backward()
            nn.utils.clip_grad_norm_(
                self.model.parameters(), 1, norm_type=2
            )
            self.optimizer.step()
            num_updates += 1
            if self.scheduler:
                self.scheduler.step_update(num_updates=num_updates)
            if self.cf.print_loss_interval:
                if i % self.cf.print_loss_interval == 0:
                    print(
                        f"{datetime.now()} - INFO - Epoch "
                        f"[{epoch + 1}/{self.cf.epochs}][{i + 1}/{len(dataloader)}] "
                        f"lr: {self.optimizer.param_groups[0]['lr']:>6f}, loss: {loss.item():>7f}"
                    )
                    
    def evaluate_model(
        self, dataloader: torch.utils.data.DataLoader, test: bool = False
    ) -> List[float] | Tuple[List[float], torch.tensor]:
        """
        Evaluates the model given a dataloader and an evaluation function. Returns
        the evaluation result and if boolean test is set to true also the
        predictions.
        """
        self.model.eval()
        outputs_list = []
        labels_list = []
        with torch.no_grad():
            for batch in dataloader:
                data_batch = batch[0].to(self.cf.device)
                labels_batch = batch[1]
                outputs, _, _ = self.model(data_batch.float())
                outputs_list.append(outputs.detach().cpu())
                labels_list.append(labels_batch)
        self.model.train()
        predictions = torch.cat(outputs_list)
        labels = torch.cat(labels_list)
        results = self.evaluation_fn(labels, predictions)
        if test:
            return results, predictions
        return results
    
    def setup_training(self) -> None:
        """
        Sets up the training modules, including model, criterion, optimizer, scheduler and mixup.
        """
        self.model = self.cf.model()
        self.model.apply(weight_init)
        self.criterion = self.cf.criterion()
        self.model.to(self.cf.device)
        self.optimizer = self.cf.optimizer(
            [{"params": (
                list(Config.model().fc.parameters())
                + list(Config.model().fc_top.parameters())
                + list(Config.model().fc_bot.parameters())
                + list(Config.model().fc3_top.parameters())
                + list(Config.model().fc3_bot.parameters())
            
            )},
             {"params": self.model.backbone.parameters(), "lr": 0.00001}
            ])
        self.scheduler = (
            self.cf.scheduler(self.optimizer) if self.cf.scheduler else None
        )
        self.mixup_fn = self.cf.mixup_fn() if self.cf.mixup_fn else None