In [14]:
import csv
import re
import dataloader
import librosa

In [15]:
class EdansaDataloader:
    def __init__(self, path: str):
        def try_int(x):
            try:
                return int(x)
            except:
                return x

        sort_key = lambda x: [try_int(y) for y in re.split(r'(\d+)', x['Clip Path'])]

        with open(f'{path}/labels.csv') as f:
            r = csv.reader(f)
            header = next(r)
            data = []
            for row in r:
                assert len(row) == len(header)
                data.append({ header[i]: try_int(row[i]) for i in range(len(header)) })

        is_event = lambda x: x['Sil'] == 0
        events = [{ 'is_event': True, **x } for x in data if is_event(x)]
        not_events = [{ 'is_event': False, **x } for x in data if not is_event(x)]

        events.sort(key = sort_key) # sort to order by deployment then chronologically
        not_events.sort(key = sort_key) # sort to order by deployment then chronologically

        n = min(len(events), len(not_events))
        events = events[:n]
        not_events = not_events[:n]
        assert len(events) == len(not_events)

        self.__path = path
        self.__data = events + not_events

        self.__data.sort(key = sort_key) # sort to order by deployment then chronologically

    def __iter__(self):
        target_samples = dataloader.SAMPLE_DURATION_SECS * dataloader.UNIFORM_SAMPLE_RATE
        for entry in self.__data:
            file = f'{self.__path}/data/{entry["Clip Path"]}'
            try:
                orig_samples, orig_sr = librosa.load(file, sr = None)
                new_samples = librosa.resample(orig_samples, orig_sr = orig_sr, target_sr = dataloader.UNIFORM_SAMPLE_RATE)
            except Exception as e:
                print(f'failed to read file \'{file}\':\n{e}\nskipping...\n')
                continue

            clip_count = new_samples.shape[0] // target_samples
            front_split = random.randrange(new_samples.shape[0] % target_samples) if new_samples.shape[0] % target_samples != 0 else 0
            trimmed = new_samples[front_split : front_split + clip_count * target_samples]
            assert trimmed.shape[0] // target_samples == clip_count and trimmed.shape[0] % target_samples == 0

            yield trimmed, [trimmed[i * target_samples : (i + 1) * target_samples] for i in range(clip_count)], entry

In [17]:
dataset = EdansaDataloader('/home/devin/Downloads/EDANSA-2019')
dataset = iter(dataset)
trimmed, clips, entry = next(dataset)
print(trimmed.shape, clips[0].shape, len(clips), entry)

(80000,) (8000,) 10 {'is_event': True, 'batch': 4, 'set': 'test', 'region': 'anwr', 'Site ID': 31, 'Date': '05/04/2019', 'Start Time': '05:37:50.000000', 'End Time': '05:38:00.000000', 'Length': '00:00:10.000000', 'Clip Path': 'anwr/31/S4A10297_20190504_043000_67m_50s__68m_0s.wav', 'Anth': 1, 'Bio': 0, 'Geo': 0, 'Sil': 0, 'Auto': 0, 'Airc': 1, 'Mach': 0, 'Flare': 0, 'Bird': 0, 'Mam': 0, 'Bug': 0, 'Wind': 0, 'Rain': 0, 'Water': 0, 'Truck': 0, 'Car': 0, 'Prop': 0, 'Helo': 0, 'Jet': 0, 'Corv': 0, 'SongB': 0, 'DGS': 0, 'Grous': 0, 'Crane': 0, 'Loon': 0, 'SeaB': 0, 'Owl': 0, 'Hum': 0, 'Rapt': 0, 'Woop': 0, 'ShorB': 0, 'Woof': 0, 'Bear': 0, 'Mous': 0, 'Deer': 0, 'Weas': 0, 'Meow': 0, 'Hare': 0, 'Shrew': 0, 'Mosq': 0, 'Fly': 0}
