In [1]:
# initial data loading cell

# !wget http://emodb.bilderbar.info/download/download.zip
# !mkdir -p data/berlin && unzip download.zip -d data/berlin

# # !aria2c -x 16 https://zenodo.org/record/1188976/files/Audio_Speech_Actors_01-24.zip
# !wget https://zenodo.org/record/1188976/files/Audio_Speech_Actors_01-24.zip
# !mkdir -p data/ravdess && unzip Audio_Speech_Actors_01-24.zip -d data/ravdess
# !rm download.zip Audio_Speech_Actors_01-24.zip
%load_ext lab_black

In [2]:
import glob
import os
import re

import librosa
import numpy as np

RAVDESS = "../data/ravdess/"
BERLIN = "../data/berlin/wav/"
RAVDESS_CLEAN = "clean_data/ravdess_clean.npy"
BERLIN_CLEAN = "clean_data/berlin_clean.npy"
# !rm clean_data/ravdess_clean.npy
# !rm clean_data/berlin_clean.npy
DEV_MODE = True
DEV_LIMIT = 15
BATCH_SIZE = 1

# AUDIO = 0
# SAMPLING_RATE = 44100//2
# EMOTION = 2
# ACTOR = 3
# GENDER = 4
MFCC = 20

# Possible Emotions
#  - Neutral
#  - Calm
#  - Happy
#  - Sad
#  - Angry
#  - Fearful
#  - Disgust
#  - Surprised
#  - Boredom

# Emotion = {'01': neutral, '02': calm, '03': happy, '04': sad, '05': angry, '06': fearful, '07': disgust, '08': surprised}

# Possible genders
#  - male
#  - female

# 34 possible actors


class Dataset:
    def __init__(self):
        self.berlin_data = None
        self.ravdess_data = None
        self.feature_names = ["audio", "sampling_rate", "emotion", "actor", "gender"]

    def load(self):
        try:
            self.ravdess_data = np.load(RAVDESS_CLEAN, allow_pickle=True)
            self.berlin_data = np.load(BERLIN_CLEAN, allow_pickle=True)
        except:  # no file found, so do the normal loading stuff
            print("Manually rebuilding the dataset!")
            self.ravdess_data = self.fetch(RAVDESS, RAVDESS_CLEAN)
            self.berlin_data = self.fetch(BERLIN, BERLIN_CLEAN)

    def fetch(self, path, save_path):
        #         n = Dataset._get_num_files(path)
        output = []  # np.zeros(n, dtype=object)
        for i, filename in enumerate(glob.iglob(path + "**", recursive=True)):
            if DEV_MODE and i > DEV_LIMIT:
                continue
            if ".wav" in filename:
                element = {
                    "audio": None,
                    "sampling_rate": None,
                    "emotion": None,
                    "actor": None,
                    "gender": None,
                    "mfcc": None,
                }

                audio, sampling_rate = librosa.load(filename)
                mfcc = librosa.feature.mfcc(y=audio, sr=sampling_rate, n_mfcc=MFCC)
                emotion = Dataset._get_emotion(filename, path)
                actor = Dataset._get_actor(filename, path)
                gender = Dataset._get_gender(filename, path)

                element["audio"] = audio
                element["sampling_rate"] = sampling_rate
                element["emotion"] = emotion
                element["actor"] = actor
                element["gender"] = gender
                element["mfcc"] = mfcc
                output.append(element)
        #                 output[i] = element

        output = np.array(output, dtype=object)
        np.save(save_path, output)
        return output

    @staticmethod
    def _get_num_files(path):
        total = 0
        for root, dirs, files in os.walk(path):
            total += len(files)
        return total

    @staticmethod
    def _get_emotion(filename, dataset_type):
        if dataset_type == RAVDESS:
            return re.findall("[0-9][0-9]", filename)[2]
        elif dataset_type == BERLIN:
            emotion = re.search("[A-Z]", filename)

    @staticmethod
    def _get_actor(filename, dataset_type):
        if dataset_type == RAVDESS:
            return re.findall("[0-9][0-9]", filename)[6]
        elif dataset_type == BERLIN:
            actor = re.search("[a-z][0-9][0-9]", filename)

    @staticmethod
    def _get_gender(filename, dataset_type):
        if dataset_type == RAVDESS:
            actor = re.findall("[0-9][0-9]", filename)[6]
            if int(actor) % 2 == 0:
                return "female"
            else:
                return "male"
        elif dataset_type == BERLIN:
            actor = re.findall("[0-9][0-9]", filename)[0]
            return Dataset._get_berlin_gender_from_actor(actor)

    @staticmethod
    def _get_berlin_gender_from_actor(actor):
        switch = {
            "03": "male",
            "08": "female",
            "09": "female",
            "10": "male",
            "11": "male",
            "12": "male",
            "13": "female",
            "14": "female",
            "15": "male",
            "16": "female",
        }
        return switch.get(actor, "Invalid actor!")


dataset = Dataset()
dataset.load()

In [3]:
import torch
from torch import nn
from torchvision import models, transforms
from typing import Dict, Tuple, List
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def pad(to_pad, r=MFCC, c=230):
    z = np.zeros((r, c))
    z[: to_pad.shape[0], : to_pad.shape[1]] = to_pad
    return z


# X = (lambda x: pad(x['mfcc']))(dataset.ravdess_data)
def prep_Xy(data):
    #     padded = pad_sequence([torch.from_numpy(x['mfcc']) for x in dataset.ravdess_data]) #https://pytorch.org/docs/stable/nn.html#pad-sequence
    X = torch.tensor([pad(x["mfcc"]) for x in data.ravdess_data], dtype=torch.float32)
    y = torch.tensor(
        [int(x["emotion"]) for x in data.ravdess_data], dtype=torch.float32
    )
    return X, y


class PaddedDataset(Dataset):
    def __init__(self, X: torch.tensor, y: torch.tensor):
        super(PaddedDataset).__init__()
        self.X = X
        self.y = y

    def __getitem__(self, index):
        return (
            self.X[index].unsqueeze(0).to(device),
            self.y[index].unsqueeze(0).to(device),
        )

    def __len__(self):
        return self.X.shape[0]


paddedDataset = PaddedDataset(*prep_Xy(dataset))

train_size = int(0.7 * len(paddedDataset))
valid_size = int(0.15 * len(paddedDataset))
test_size = len(paddedDataset) - (train_size + valid_size)
train_dataset, valid_dataset, test_dataset = torch.utils.data.random_split(
    paddedDataset, [train_size, valid_size, test_size]
)

trainloader = DataLoader(train_dataset, batch_size=BATCH_SIZE)  # , shuffle=True)
validloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE)  # , shuffle=True)
testloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)  # , shuffle=True)

In [10]:
import torch.nn as nn
import torch.nn.functional as F

class Network(nn.Module):
    def __init__(self):
        super().__init__()

        # define layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5)
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=12, kernel_size=5)

        self.fc1 = nn.Linear(in_features=12*2*54, out_features=120)
        self.fc2 = nn.Linear(in_features=120, out_features=60)
        self.out = nn.Linear(in_features=60, out_features=8)

    #comes in as 20x230. 20 = MFCCs, 230 is padded length that doesnt truncate any data
    def forward(self, t):
        # conv 1
        t = self.conv1(t) #output 6x16x226
        print(t.shape)

        t = F.relu(t)
        t = F.max_pool2d(t, kernel_size=2, stride=2) #output 6x8x113
        print(t.shape)

        # conv 2
        t = self.conv2(t) #output 12x4x109
        print(t.shape)

        t = F.relu(t)
        t = F.max_pool2d(t, kernel_size=2, stride=2) #output 12x2x54

        print(t.shape)
        
        # fc1
        t = t.reshape(-1, 12*2*54) #flatten to 1x1296
        print(t.shape)
        t = self.fc1(t) #output 1x120
        print(t.shape)
        t = F.relu(t)

        # fc2
        t = self.fc2(t) #output 1x60
        print(t.shape)
        t = F.relu(t)

        t = self.out(t) #output 1x8 (8 classes)
        # don't need softmax here since we'll use cross-entropy as activation.

        return t


def train(model, train_loader, optimizer, epochs=10, log_interval=100, device=device):
    model.to(device)
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        print(data.dtype, target.dtype)
        optimizer.zero_grad()
        output = model(data)
        loss = F.binary_cross_entropy(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epochs, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, test_loader, device=device):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.binary_cross_entropy(output, target).item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    
net = Network()
print(net)
optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
train(net, trainloader, optimizer)

test(net, testloader)


Network(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(6, 12, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=1296, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=60, bias=True)
  (out): Linear(in_features=60, out_features=8, bias=True)
)
torch.float32 torch.float32
torch.Size([1, 6, 16, 226])
torch.Size([1, 6, 8, 113])
torch.Size([1, 12, 4, 109])
torch.Size([1, 12, 2, 54])
torch.Size([1, 1296])
torch.Size([1, 120])
torch.Size([1, 60])




ValueError: Target and input must have the same number of elements. target nelement (1) != input nelement (8)