In [1]:
import os
import glob
import torch
import re
import pandas as pd
import numpy as np
import torch.nn as nn
import torchvision.transforms as transforms
import cv2
from PIL import Image


DATA_DIR = r"D:\Documents\datasets\AIST4010\muse"
SPEC_DIR = os.path.join(DATA_DIR, "spectrograms_jpg")
songs_data_fp = os.path.join(DATA_DIR, "extracted_data.csv")


def load_imgs(fp=SPEC_DIR, transform=None):
    fp = glob.glob(os.path.join(fp, '*'))
    rematch_pattern = r"^.*\\([^\.]*).jpg"
    fp.sort(key=lambda fp: re.match(rematch_pattern, fp).group(1))
    imgs = [None] * len(fp)
    img_ids = [None] * len(fp)
    transform = transforms.ToTensor()
    for idx, img_fp in enumerate(fp):
        img_id = re.match(rematch_pattern, img_fp).group(1)
        with Image.open(img_fp) as f:
            imgs[idx] = np.asarray(f.convert("RGB"))
        img_ids[idx] = img_id
    return np.array(imgs), np.array(img_ids)

In [2]:
songs_data = pd.read_csv(songs_data_fp)
songs_data.set_index("spotify_id", inplace=True)

data, ids = load_imgs()
labels = songs_data.loc[ids, ["valence_tags", "arousal_tags", "dominance_tags"]].values

In [3]:
from torch.utils.data import DataLoader, Dataset


class ImageDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        super(ImageDataset, self).__init__()
        self.data = data
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.to_list()
        sample = self.data[idx]
        if self.transform:
            sample = self.transform(self.data[idx])
        return sample, self.labels[idx]

    def get_data(self, idx=None):
        if idx:
            return self.data[idx]
        return self.data

    
def spectrum_transform():
    trans = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((224, 224)),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    return trans


spectrum_ds = ImageDataset(data, labels, transform=spectrum_transform())
spectrum_loader = DataLoader(spectrum_ds, batch_size=32, shuffle=True)

In [5]:
class PlainCNN(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(PlainCNN, self).__init__()
        conv_stack = [
            nn.Conv2d(in_dim, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3),
            nn.ReLU(),
            nn.Conv2d(128, 128, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout2d(0.5),
            nn.Flatten(),
            nn.Linear(73728, 512),
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, out_dim)
        ]
        self.conv_stack = nn.Sequential(*conv_stack)
    
    def forward(self, x):
        return self.conv_stack(x)

In [9]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = PlainCNN(3, 3).half().to(device)


# training settings
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.01)


EPOCHS = 100
for epoch in range(EPOCHS):
    running_val_loss, running_aro_loss, running_dom_loss = 0.0, 0.0, 0.0
    for inputs, labels in spectrum_loader:
        optimizer.zero_grad()
        
        inputs, labels = inputs.half().to(device), labels.half().to(device)
        outputs = model(inputs)
        val_loss = criterion(outputs[:, 0], labels[:, 0]) / 3
        aro_loss = criterion(outputs[:, 1], labels[:, 1]) / 3
        dom_loss = criterion(outputs[:, 2], labels[:, 2]) / 3
        running_val_loss += val_loss * len(inputs)
        running_aro_loss += aro_loss * len(inputs)
        running_dom_loss += dom_loss * len(inputs)
        
        val_loss.backward(retain_graph=True)
        aro_loss.backward(retain_graph=True)
        dom_loss.backward()
        optimizer.step()
        
    epoch_val_loss = running_val_loss / len(spectrum_loader.dataset)
    epoch_aro_loss = running_aro_loss / len(spectrum_loader.dataset)
    epoch_dom_loss = running_dom_loss / len(spectrum_loader.dataset)
    print(f"train loss - {epoch_val_loss:.5f}\t{epoch_aro_loss:.5f}\t{epoch_dom_loss:.5f}")

train loss - 2.03711	1.18359	1.55664
train loss - 0.87451	0.49316	0.52881
train loss - 0.87109	0.48828	0.52344
train loss - 0.84424	0.47119	0.50146
train loss - 0.84326	0.47314	0.49878


KeyboardInterrupt: 

In [7]:
for x, y in spectrum_loader:
    x, y = x.half().to(device), y.half().to(device)
    outputs = model(x)
    print(outputs)
    print(y)
    break

tensor([[4.6133, 3.5527, 4.4336],
        [6.1016, 4.1133, 5.6094],
        [8.2500, 5.4727, 7.2656],
        [4.6680, 4.3242, 4.5234],
        [4.4258, 3.9531, 4.3789],
        [5.2656, 3.4160, 5.0664],
        [6.9141, 4.1992, 6.1992],
        [3.1992, 3.2480, 3.3398],
        [3.6309, 3.2793, 3.7227],
        [6.2109, 4.6016, 5.7734],
        [4.0625, 3.2188, 4.0625],
        [5.6445, 4.1719, 5.2266],
        [4.8516, 3.0137, 4.5703],
        [6.4883, 4.6133, 5.9805],
        [6.6484, 4.7500, 6.0508],
        [6.2266, 5.3203, 5.9023],
        [6.1680, 3.9980, 5.6758],
        [4.1680, 3.9082, 4.0859],
        [3.8516, 3.5762, 3.8691],
        [5.8750, 4.6523, 5.4922],
        [6.1914, 3.9746, 5.7461],
        [4.7734, 4.2578, 4.7031],
        [5.2773, 4.6289, 5.0664],
        [5.5430, 4.4141, 5.1836],
        [5.2266, 3.6758, 5.0664],
        [6.2539, 4.6211, 5.8477],
        [5.7500, 5.0625, 5.4336],
        [6.6328, 5.2383, 6.1250],
        [7.1367, 4.7031, 6.4336],
        [4.949