In [1]:
import torch
import numpy as np
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image, write_jpeg
import torchvision.transforms as transforms
from torchvision.transforms.functional import crop
from torcheval import metrics

import scipy.io as scio

In [2]:
emotions = np.array(['Affection', 'Anger', 'Annoyance', 'Anticipation', 'Aversion', 'Confidence', 'Disapproval', 'Disconnection',
                     'Disquietment', 'Doubt/Confusion', 'Embarrassment', 'Engagement', 'Esteem', 'Excitement', 'Fatigue', 'Fear',
                     'Happiness', 'Pain', 'Peace', 'Pleasure', 'Sadness', 'Sensitivity', 'Suffering', 'Surprise', 'Sympathy', 'Yearning'])

emotion_encode = {e: i for i, e in enumerate(emotions)}
emotion_decode = {i: e for e, i in emotion_encode.items()}

In [3]:
class EmoticDataset(Dataset):
    def __init__(self, data_type, subject_size, context_size, anns_dir, img_dir):
        anns = scio.loadmat(anns_dir)[data_type]
        self.anns = np.fromiter(filter(lambda x: x["folder"].item() != "framesdb/images", iter(anns[0])), dtype=anns.dtype)
        self.img_dir = img_dir
        self.subject_transform = transforms.Resize(subject_size)
        self.context_transform = transforms.Resize(context_size)

    def __len__(self):
        return self.anns.size

    def __getitem__(self, idx):
        ann = self.anns[idx]

        img_loc = "../data/cvpr_emotic/" + ann["folder"].item() + '/' + ann["filename"].item()
        context_img = read_image(img_loc)
        
        bbox =  ann["person"]["body_bbox"][0][0][0].astype(int)
        subject_img = crop(context_img, bbox[1], bbox[0], bbox[3] - bbox[1], bbox[2] - bbox[0])

        label = np.zeros(len(emotions), dtype=np.float32)
        ems = [i.item() for i in ann["person"]["annotations_categories"][0][0][0][0][0][0]]
        for e in ems:
            label[emotion_encode[e]] = 1.

        subject_img = self.subject_transform(subject_img.float())
        context_img = self.context_transform(context_img.float())

        if subject_img.shape[0] == 1:
            subject_img = subject_img.repeat(3, 1, 1)
            
        if context_img.shape[0] == 1:
            context_img = context_img.repeat(3, 1, 1)

        return subject_img, context_img, label

In [4]:
def net_branch():
    return nn.Sequential(
        nn.Conv2d(3, 96, (11, 1), stride=(4, 1)),
        nn.ReLU(),
        nn.BatchNorm2d(96),
        nn.Conv2d(96, 96, (1, 11), stride=(1, 4)),
        nn.ReLU(),
        nn.BatchNorm2d(96),
        nn.MaxPool2d(3, stride=2),
        
        nn.Conv2d(96, 256, (1, 5), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(256),
        nn.Conv2d(256, 256, (5, 1), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(256),
        nn.MaxPool2d(3, stride=2),
        
        nn.Conv2d(256, 384, (1, 3), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(384),
        nn.Conv2d(384, 384, (3, 1), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(384),
        
        nn.Conv2d(384, 384, (1, 3), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(384),
        nn.Conv2d(384, 384, (3, 1), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(384),
        
        nn.Conv2d(384, 256, (1, 3), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(256),
        nn.Conv2d(256, 256, (3, 1), padding="same"),
        nn.ReLU(),
        nn.BatchNorm2d(256),
        nn.MaxPool2d(3, stride=2)
    )

In [5]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()

        self.subject = net_branch()
        self.context = net_branch()
        
        self.fusion = nn.Sequential(
            nn.Linear(12800, 256),
            nn.ReLU(),
            nn.Linear(256, 1024),
            nn.ReLU(),
            nn.Linear(1024, len(emotions)),
            nn.Sigmoid()
        )

    def forward(self, s, c):
        s = self.subject(s)
        s = torch.flatten(s, start_dim=1)
        
        c = self.context(c)
        c = torch.flatten(c, start_dim=1)

        x = torch.cat((s, c), dim=1)
        x = self.fusion(x)
        return x

In [6]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)

In [15]:
subject_size = (224, 224)
context_size = (224, 224)

train_batch_size = 52
val_batch_size = 52
test_batch_size = 52
num_epochs = 500
learning_rate = .01

In [10]:
train_data = EmoticDataset("train", subject_size, context_size, "../data/Annotations/Annotations.mat", "../data/cvpr_emotic/")
train_dataloader = DataLoader(train_data, batch_size=train_batch_size)

val_data = EmoticDataset("val", subject_size, context_size, "../data/Annotations/Annotations.mat", "../data/cvpr_emotic/")
val_dataloader = DataLoader(val_data, batch_size=val_batch_size)

test_data = EmoticDataset("test", subject_size, context_size, "../data/Annotations/Annotations.mat", "../data/cvpr_emotic/")
test_dataloader = DataLoader(test_data, batch_size=test_batch_size)

train_size = len(train_dataloader.dataset)
val_size = len(val_dataloader.dataset)
test_size = len(test_dataloader.dataset)

In [14]:
loss_fn = nn.BCELoss()
metric = metrics.MultilabelAUPRC(num_labels=len(emotions), device=device)

In [15]:
emotic_net = Net().to(device)

In [None]:
optimizer = torch.optim.SGD(emotic_net.parameters(), lr=learning_rate)

In [17]:
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    emotic_net.train()
    for batch, (S, C, y) in enumerate(train_dataloader):
        S, C, y = S.to(device), C.to(device), y.to(device)
        pred = emotic_net(S, C)
        loss = loss_fn(pred, y)
    
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
        loss, current = loss.item(), min((batch + 1) * train_batch_size, train_size)
        print(f"loss: {loss:>7f}  [{current:>5d}/{train_size:>5d}]", end="\r")
        
    print()

    torch.save(emotic_net.state_dict(), "../models/basic.pth")
    
    metric.reset()
    emotic_net.eval()
    for batch, (S, C, y) in enumerate(val_dataloader):
        with torch.no_grad():
            S, C, y = S.to(device), C.to(device), y.to(device)
            pred = emotic_net(S, C)
            metric.update(pred, y)
    
            current = min((batch + 1) * val_batch_size, val_size)
            print(f"calculating metric for:  [{current:>4d}/{val_size:>4d}]", end="\r")
    
    print()
    print(f"metric: {metric.compute() * 100} %\n")

Epoch 1/500
loss: 0.179709  [12915/12915]
calculating metric for:  [1705/1705]
metric: 11.307699203491211 %

Epoch 2/500
loss: 0.175972  [12915/12915]
calculating metric for:  [1705/1705]
metric: 12.197599411010742 %

Epoch 3/500
loss: 0.171243  [12915/12915]
calculating metric for:  [1705/1705]
metric: 12.667115211486816 %

Epoch 4/500
loss: 0.166673  [12915/12915]
calculating metric for:  [1705/1705]
metric: 12.972111701965332 %

Epoch 5/500
loss: 0.161501  [12915/12915]
calculating metric for:  [1705/1705]
metric: 13.146671295166016 %

Epoch 6/500
loss: 0.155500  [12915/12915]
calculating metric for:  [1705/1705]
metric: 13.296778678894043 %

Epoch 7/500
loss: 0.149107  [12915/12915]
calculating metric for:  [1705/1705]
metric: 13.439745903015137 %

Epoch 8/500
loss: 0.142312  [12915/12915]
calculating metric for:  [1705/1705]
metric: 13.447572708129883 %

Epoch 9/500
loss: 0.134930  [12915/12915]
calculating metric for:  [1705/1705]
metric: 13.510565757751465 %

Epoch 10/500
loss: 

KeyboardInterrupt: 

In [11]:
load_emotic_net = Net().to(device)
load_emotic_net.load_state_dict(torch.load("../models/basic.pth", weights_only=True))

<All keys matched successfully>

In [12]:
def compute_metrics(emotic_net, metric, dataloader, batch_size, size):
    metric.reset()
    emotic_net.eval()
    for batch, (S, C, y) in enumerate(dataloader):
        with torch.no_grad():
            S, C, y = S.to(device), C.to(device), y.to(device)
            pred = emotic_net(S, C)
            metric.update(pred, y)
    
            current = min((batch + 1) * batch_size, size)
            print(f"calculating metric for:  [{current:>4d}/{size:>4d}]", end="\r")
    
    print()
    print(f"metric: {metric.compute() * 100} %\n")

In [16]:
compute_metrics(load_emotic_net, metric, test_dataloader, test_batch_size, test_size)

calculating metric for:  [3682/3682]
metric: 13.611191749572754 %



In [91]:
idx = 1325
data = val_data[idx]
data = (data[0].unsqueeze(0), data[1].unsqueeze(0), torch.tensor(data[2]).unsqueeze(0))

In [92]:
out = emotic_net(data[0].to(device), data[1].to(device))

In [93]:
loss_fn(out, data[2].to(device))

tensor(0.0112, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)

In [94]:
emotions[out.cpu()[0] > 0.5]

array(['Anticipation', 'Engagement'], dtype='<U15')

In [95]:
emotions[data[2][0] > 0.5]

array(['Anticipation', 'Engagement'], dtype='<U15')