In [44]:
import toml
from collections import defaultdict
config = toml.load('config.toml')
config = defaultdict(lambda : None, config)

# Package Management

In [4]:
%%capture
!pip install pytorch_lightning  # Install pytorch convenience wrapper
!pip install attributedict
!pip install plotly

In [5]:
import json
from torch.utils.data import Dataset
import copy
import glob
import json
import os
import random
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import torch
import pickle
import plotly.express as px
import plotly.io as pio
import plotly.graph_objects as go
pio.templates.default = "plotly_dark"
from torch.utils.data import Dataset
import pandas as pd
import copy
import multiprocessing as mp
from argparse import ArgumentParser
from collections import OrderedDict
import pytorch_lightning as pl
import pytorch_lightning.metrics.functional as F
import torch
import torch.nn as nn
import torch.nn.functional as F
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from sklearn import metrics
from sklearn.manifold import TSNE
from torch.utils.data import DataLoader
from pytorch_lightning.metrics.functional import *
from tqdm import tqdm
import seaborn as sns
import numpy as np
from attributedict.collections import AttributeDict
import plotly.express as px

# Data Processing
## Download Dataset

In [52]:
if not os.path.exists(config['data']['chroma_folder']):
    raise Exception('Error Downloading Dataset!')
    if not os.path.exists('chroma.zip'):
        !gsutil -m cp gs://ragaresearch/chroma.zip ./
    !unzip chroma.zip
    !mv n_ftt4096__hop_length2048 hindustani-chroma
    !rm chroma.zip

## Data Processing Classes

In [38]:
class FullChromaDataset(Dataset):
    """
    Dataset object that returns full length songs as chromagams, accompanied with a "RAGA ID" label which is 
    computed dynamically based on the ragas in the given set in alphabetical order.
    """

    def _assign_raga_ids(self):
        """
        Helper function. Creates a raga to raga id mapping using the list of ragas in alphabetical order.
        """

        mbids = [os.path.basename(file_name).split('.')[0] for file_name in self.files]
        raga_ids = {self.metadata[mbid]['raags'][0]['common_name'] for mbid in mbids}
        raga_ids = sorted(raga_ids)
        self.raga_ids = {k: v for v, k in enumerate(raga_ids)}

    def _get_raga_id(self, file):
        """
        Helper function. Gets the raga id associated with a specific chromagram file.
        """

        if not hasattr(self, 'raga_ids') or self.raga_ids is None:
            self._assign_raga_ids()
        mbid = os.path.basename(file).split('.')[0]
        return self.raga_ids[self.metadata[mbid]['raags'][0]['common_name']]

    def __init__(self, json_path, data_folder, include_mbids=None):
        """
        Creates a new dataset object.

        :param json_path: path to json file with all raga metadata
        :param data_folder: folder with all of the chromagrams inside
        :param include_mbids: list of song ids to include
        """
        self.files = glob.glob(os.path.join(data_folder, '**/*.pkl'))
        self.files += glob.glob(os.path.join(data_folder, '*.pkl'))
        self.metadata = json.load(open(json_path, 'r'))

        # Remove files not on the "include" list (can easily create a subset of the main dataset)
        if include_mbids is not None:
            for self.file in copy.deepcopy(self.files):
                file_name = os.path.basename(self.file).split('.pkl')[0]
                if file_name not in include_mbids:
                    self.files.remove(self.file)
        else:
            for self.file in copy.deepcopy(self.files):
                mbid = os.path.basename(self.file).split('.')[0]
                if len(self.metadata[mbid]['raags']) < 1:
                    self.files.remove(self.file)


        self.X = []
        self.y = []
        for file in tqdm(self.files, desc="Loading Chromagram Files"):
            self.X.append(torch.FloatTensor(pickle.load(open(file, 'rb'))))
            self.y.append(self._get_raga_id(file))

    @classmethod
    def init_x_y(cls, X, y, raga_ids):
        """
        Helper method. Bypasses default constructor to allow for construction with just X and y objects directly.
        """
        obj = cls.__new__(cls)
        obj.X = X
        obj.y = y
        obj.raga_ids = raga_ids
        return obj

    def __getitem__(self, item):
        return self.X[item], self.y[item]

    def __len__(self):
        return len(self.y)

    def train_test_split(self, test_size=None, train_size=None, random_state=1):
        """
        Creates two new datasets from the original dataset object by splitting the datasets in a stratified fashion.

        :param test_size: size of test set (as a percentage)
        :param train_size: size of the train set (as a percentage)
        :param random_state: random seed used to shuffle the data before splitting
        """

        X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, test_size=test_size, train_size=train_size,
                                                            stratify=self.y, random_state=random_state)
        return FullChromaDataset.init_x_y(X_train, y_train, self.raga_ids), FullChromaDataset.init_x_y(X_test, y_test, self.raga_ids)
    
    

    def greedy_split(self, train_size=None, test_size=None):
        if test_size is None:
            test_size = 1 - train_size
        else:
            train_size = 1 - test_size

        # Split Samples by Raga
        samples_by_raga = [[] for i in range(len(self.raga_ids))]
        for X, y in self:
            samples_by_raga[y].append(X)
        X_train, y_train = [], []
        X_test, y_test = [], []
        for raga, samples in enumerate(samples_by_raga):
            train_len, test_len = 0, 0
            for sample in sorted(samples, reverse=True, key=lambda sample: len(sample[0])):
                if train_len <= test_len:
                    X_train.append(sample)
                    y_train.append(raga)
                    train_len += len(sample[0]) * (test_size/train_size)
                else:
                    X_test.append(sample)
                    y_test.append(raga)
                    test_len += len(sample[0])

        return FullChromaDataset.init_x_y(X_train, y_train, self.raga_ids), FullChromaDataset.init_x_y(X_test, y_test, self.raga_ids)
        


if config['data']['unit_test']:
    fcd = FullChromaDataset(json_path=config['data']['metadata'],
                            data_folder=config['data']['chroma_folder'],
                            include_mbids=json.load(open(config['data']['limit_songs'])))
    fcd_train, fcd_val = fcd.greedy_split(train_size=0.75)

Loading Chromagram Files: 100%|██████████| 300/300 [00:00<00:00, 1748.33it/s]


In [47]:
class ChromaChunkDataset(Dataset):
    def __init__(self, full_chroma_dataset: FullChromaDataset, chunk_size, augmentation=None):
        """
        Class for chunkifying an existing full chroma dataset

        :param full_chroma_dataset: FullChromaDataset object
        :param chunk_size: size of the chunks to make from the original set
        :param augmentation: function that the chunks are passed through before calling get_item (user defined)
        """

        self.X = []
        self.y = []
        self.augmentation = augmentation
        self.raga_ids = full_chroma_dataset.raga_ids

        for chroma, raga_id in full_chroma_dataset:
            unfolded = chroma.split(chunk_size, dim=1)
            for i in range(len(unfolded)):
                chroma = unfolded[i]
                if unfolded[i].shape[1] != chunk_size:
                    padding = torch.zeros(unfolded[i].shape[0], chunk_size - unfolded[i].shape[1])
                    chroma = torch.cat((unfolded[i], padding), 1)
                self.X.append(chroma.unsqueeze(0))
            self.y += len(unfolded) * [raga_id]

        self.X = torch.cat(self.X, dim=0)

    def __getitem__(self, item):
        if self.augmentation is None:
            return self.X[item], self.y[item]
        else:
            return self.augmentation(self.X[item]), self.y[item]

    def __len__(self):
        return len(self.y)

if config['data']['unit_test']:
    fcd_train_chunks =  ChromaChunkDataset(fcd_train, chunk_size=config['data']['chunk_size'])
    assert(len(fcd_train_chunks) > len(fcd_train))

## Useful Helper Functions

In [48]:
def get_raga_list(dataset):
    """
    Accesses a FullChromaDataset or ChunkChromaDataset object and extracts list of ragas in order of raga id
    """
    return sorted(dataset.raga_ids)

if config['data']['unit_test']:
    assert 'Bhairav' in get_raga_list(fcd)

In [49]:
def transpose_chromagram(x, shift=None):
    if shift is None:
        shift = random.randint(0, 11)
    if shift == 0:
        return x
    else:
        return torch.cat([x[-shift:, :], x[:-shift, :]], 0)

if config['data']['unit_test']:
    pass #TODO

### Balanced Dataloader
There can be class imbalance from the creation of chunks, so it's valuable to have a data loader which samples based on the prevalance of the classes. This is code adapted from https://github.com/ufoym/imbalanced-dataset-sampler for taking care of the balanced sampling :)

In [50]:
def dataset_get_label_callback(dataset, idx):
    return dataset[idx][1]


class ImbalancedDatasetSampler(torch.utils.data.sampler.Sampler):
    """Samples elements randomly from a given list of indices for imbalanced dataset
    Arguments:
        indices (list, optional): a list of indices
        num_samples (int, optional): number of samples to draw
        callback_get_label func: a callback-like function which takes two arguments - dataset and index
    """

    def __init__(self, dataset, indices=None, num_samples=None, callback_get_label=dataset_get_label_callback):

        # if indices is not provided,
        # all elements in the dataset will be considered
        self.indices = list(range(len(dataset))) \
            if indices is None else indices

        # define custom callback
        self.callback_get_label = callback_get_label

        # if num_samples is not provided,
        # draw `len(indices)` samples in each iteration
        self.num_samples = len(self.indices) \
            if num_samples is None else num_samples

        # distribution of classes in the dataset
        label_to_count = {}
        for idx in self.indices:
            label = self._get_label(dataset, idx)
            if label in label_to_count:
                label_to_count[label] += 1
            else:
                label_to_count[label] = 1

        # weight for each sample
        weights = [1.0 / label_to_count[self._get_label(dataset, idx)]
                   for idx in self.indices]
        self.weights = torch.DoubleTensor(weights)

    def _get_label(self, dataset, idx):
        if isinstance(dataset, torchvision.datasets.MNIST):
            return dataset.train_labels[idx].item()
        elif isinstance(dataset, torchvision.datasets.ImageFolder):
            return dataset.imgs[idx][1]
        elif isinstance(dataset, torch.utils.data.Subset):
            return dataset.dataset.imgs[idx][1]
        elif self.callback_get_label:
            return self.callback_get_label(dataset, idx)
        else:
            raise NotImplementedError

    def __iter__(self):
        return (self.indices[i] for i in torch.multinomial(
            self.weights, self.num_samples, replacement=True))

    def __len__(self):
        return self.num_samples