In [118]:
from torch.utils.data import Dataset, DataLoader
import torch
from torchvision import transforms

import numpy as np
from pathlib import Path
import pandas as pd

class DCASE(Dataset):
    def __init__(self, root_dir: str, clip_duration: int):
        self._root_dir = Path(root_dir)
        self._labels = pd.read_csv((self._root_dir / 'labels.csv'), names=['file', 'label'])
        self.label_dict()
        self._clip_duration = clip_duration
        self._total_duration = 30 #DCASE audio length is 30s
        self._num_clips = self._total_duration // clip_duration 
        self._data_len = len(self._labels)

    def __getitem__(self, index):
        #reading spectrograms
        filename, label = self._labels.iloc[index]
        filepath = self._root_dir / 'audio'/ filename
        spec = torch.from_numpy(np.load(filepath))

        #splitting spec
        spec = self.__trim__(spec)
        return spec, label

    def __trim__(self, spec: torch.Tensor) -> torch.Tensor:
        """
        Trims spectrogram into multiple clips of length specified in self._num_clips
        :param spec: tensor containing spectrogram of full audio signal of shape [1, 60, 1501]
        :return: tensor containing stacked spectrograms of shape [num_clips, 60, clip_length] ([10, 60, 150] with 3s clips)
        """
        time_steps = spec.size(-1)
        self._num_clips = self._total_duration // self._clip_duration
        time_interval = int(time_steps // self._num_clips)
        all_clips = []
        for clip_idx in range(self._num_clips):
            start = clip_idx * time_interval
            end = start + time_interval
            spec_clip = spec[:, start:end]
            #spec_clip = torch.squeeze(spec_clip)
            all_clips.append(spec_clip)

        specs = torch.stack(all_clips)
        return specs
    
    def label_dict(self):
        self._labels['label_cat'] = self._labels.label.astype('category').cat.codes.astype('int')
        self._labels['clip_no'] = self._labels.file.apply(lambda file: (file[0], int(file[1:4])))
        self._labels['start_frame'] = self._labels.file.apply(lambda file: file.split('_')[1])
        self._labels['stop_frame'] = self._labels.file.apply(lambda file: file[:-4].split('_')[2])

    def get_num_clips(self) -> int:
        """
        Gets number of clips the raw audio has been split into
        :return: self._num_clips of type int
        """
        return self._num_clips
    
    def __len__(self):
        return self._data_len

    
class NF_TRAIN_DCASE(Dataset):
    
    def __init__(self, root_dir: str, clip_duration: int, clip_filter):
        self._root_dir = Path(root_dir)
        self._labels = pd.read_csv((self._root_dir / 'labels.csv'), names=['file', 'label'])
        self.label_dict()
        self._labels = self._labels[self._labels['file'].isin(clip_filter)]
        self._clip_duration = clip_duration
        self._total_duration = 30 #DCASE audio length is 30s
        self._num_clips = self._total_duration // clip_duration 
        self._data_len = len(self._labels)
        
    def __getitem__(self, index):
        #reading spectrograms
        filename, label = self._labels.iloc[index]
        filepath = self._root_dir / 'audio'/ filename
        spec = torch.from_numpy(np.load(filepath))

        #splitting spec
        spec = self.__trim__(spec)
        return spec, label

    def __trim__(self, spec: torch.Tensor) -> torch.Tensor:
        """
        Trims spectrogram into multiple clips of length specified in self._num_clips
        :param spec: tensor containing spectrogram of full audio signal of shape [1, 60, 1501]
        :return: tensor containing stacked spectrograms of shape [num_clips, 60, clip_length] ([10, 60, 150] with 3s clips)
        """
        time_steps = spec.size(-1)
        self._num_clips = self._total_duration // self._clip_duration
        time_interval = int(time_steps // self._num_clips)
        all_clips = []
        for clip_idx in range(self._num_clips):
            start = clip_idx * time_interval
            end = start + time_interval
            spec_clip = spec[:, start:end]
            #spec_clip = torch.squeeze(spec_clip)
            all_clips.append(spec_clip)

        specs = torch.stack(all_clips)
        return specs
    
    def label_dict(self):
        self._labels['label_cat'] = self._labels.label.astype('category').cat.codes.astype('int')
        self._labels['clip_no'] = self._labels.file.apply(lambda file: (file[0], int(file[1:4])))
        self._labels['start_frame'] = self._labels.file.apply(lambda file: file.split('_')[1])
        self._labels['stop_frame'] = self._labels.file.apply(lambda file: file[:-4].split('_')[2])

    def get_num_clips(self) -> int:
        """
        Gets number of clips the raw audio has been split into
        :return: self._num_clips of type int
        """
        return self._num_clips
    
    def __len__(self):
        return self._data_len


In [74]:
root_dir = Path('./ADL_DCASE_DATA/development/')

dataset = DCASE(root_dir, clip_duration=3)

In [11]:
loader = iter(DataLoader(dataset, shuffle=False, batch_size=1))

In [12]:
data, label = loader.next()

In [14]:
data.shape

torch.Size([1, 10, 60, 150])

In [15]:
label

tensor([12])

In [75]:
dataset._labels[dataset._labels['clip_no'] == ('a',1)]


Unnamed: 0,file,label,label_cat,clip_no,start_frame,stop_frame
0,a001_0_30.npy,residential_area,12,"(a, 1)",0,30
1,a001_120_150.npy,residential_area,12,"(a, 1)",120,150
2,a001_150_180.npy,residential_area,12,"(a, 1)",150,180
3,a001_30_60.npy,residential_area,12,"(a, 1)",30,60
4,a001_60_90.npy,residential_area,12,"(a, 1)",60,90
5,a001_90_120.npy,residential_area,12,"(a, 1)",90,120


In [119]:
import numpy as np
train = []
test = []

for label, clips in labels.items():
    total = list(range(len(clips)))
    np.random.shuffle(total)
    train_idx, test_idx = total[3:], total[:3]
    
    
    for i in train_idx:
        train.append(clips[i])
    for j in test_idx:
        test.append(clips[j])
#     print(len(clips

train_clips = []
test_clips = []

for tr in train:
    xd = dataset._labels[dataset._labels['clip_no'] == tr].file
    train_clips.extend(xd)
    
for te in test:
    pff = dataset._labels[dataset._labels['clip_no'] == te].file
    test_clips.extend(pff)
    
# dataset._labels[dataset._labels['file'].isin(train_clips)]

alie = NF_DCASE(root_dir, 3, train_clips)
fam = NF_DCASE(root_dir, 3, test_clips)

In [120]:
len(alie) + len(fam)

1170

In [None]:
NF_TRAIN_DCASE(root_dir, 3, test_clips)