In [6]:
from enum import Enum

import random


class DatasetType(Enum):
    TRAIN = "train"
    DEV = "dev"
    TEST = "test"

def shuffle_in_groups(a, b):
    assert len(a) == len(b)
    zipped = list(zip(a, b))
    random.shuffle(zipped)
    return zip(*zipped)

class Singleton(type):
    _instances = {}
    def __call__(self, *args, **kwargs):
        if self not in self._instances:
            self._instances[self] = super().__call__(*args, **kwargs)
        return self._instances[self]

In [7]:
import hashlib
import os
import random
import re
from pathlib import Path

import librosa
import numpy as np
from tqdm import tqdm
from torch.utils.data import Dataset, IterableDataset, get_worker_info

LABEL_SILENCE = "__silence__"
LABEL_UNKNOWN = "__unknown__"

class GSCDatasetPreprocessor(metaclass=Singleton):
    def __init__(self, config):
        super().__init__()

        # organize audio files by class
        unknown_class_name = "_UNKNOWN_"

        audio_files_by_class = {}
        audio_counts_by_class = {}

        for class_path in Path(config["data_dir"]).iterdir():

            if not class_path.is_dir():
                continue

            class_name = class_path.name

            if class_name not in config["target_class"] and class_name != "_background_noise_":
                class_name = unknown_class_name

            if class_name not in audio_files_by_class:
                audio_files_by_class[class_name] = []
                audio_counts_by_class[class_name] = 0

            count = 0
            for file_path in class_path.iterdir():

                if ".wav" != file_path.suffix:
                    continue

                count += 1
                audio_files_by_class[class_name].append(file_path.as_posix())

            audio_counts_by_class[class_name] += count

        noise_files = audio_files_by_class.pop("_background_noise_")

        # split the dataset into trian/dev/test
        self.bucket_size = 2**27 - 1
        self.dev_pct = config["dev_pct"]
        self.test_pct = config["test_pct"]

        self.audio_files_by_dataset = {
            DatasetType.TRAIN: [],
            DatasetType.DEV: [],
            DatasetType.TEST: []
        }
        self.labels_by_dataset = {
            DatasetType.TRAIN: [],
            DatasetType.DEV: [],
            DatasetType.TEST: []
        }
        self.label_mapping = {}

        # target class
        for class_name in config["target_class"]:
            audio_list = audio_files_by_class[class_name]

            label = config["target_class"].index(class_name)
            self.label_mapping[label] = class_name

            for audio_file in audio_list:
                bucket = self.get_bucket_from_file_name(audio_file, config["group_speakers_by_id"])
                self.distribute_to_dataset(bucket, audio_file, label)

        # unknown class
        if config["unknown_class"]:
            unknown_label = len(config["target_class"])
            for dataset in DatasetType:
                unknown_size = int(len(self.labels_by_dataset[dataset]) / len(self.label_mapping.keys()))
                self.audio_files_by_dataset[dataset] += random.sample(audio_files_by_class[unknown_class_name], unknown_size)
                self.labels_by_dataset[dataset] += ([unknown_label] * unknown_size)
            self.label_mapping[unknown_label] = LABEL_UNKNOWN

        # silence class
        if config["silence_class"]:
            silence_label = len(config["target_class"]) + 1
            for dataset in DatasetType:
                silence_size = int(len(self.labels_by_dataset[dataset]) / len(self.label_mapping.keys()))
                self.audio_files_by_dataset[dataset] += ([LABEL_SILENCE] * silence_size)
                self.labels_by_dataset[dataset] += ([silence_label] * silence_size)
            self.label_mapping[silence_label] = LABEL_SILENCE

        # noise samples
        self.noise_samples_by_dataset = {
            DatasetType.TRAIN: [],
            DatasetType.DEV: [],
            DatasetType.TEST: []
        }

        sample_rate = config["sample_rate"]
        for file_name in noise_files:
            full_noise = librosa.core.load(file_name, sr=sample_rate)[0]
            for i in range(0, len(full_noise)-sample_rate, sample_rate):
                noise_sample = full_noise[i:i + sample_rate] * random.random()

                bucket = random.random()
                if bucket < self.test_pct:
                    self.noise_samples_by_dataset[DatasetType.TEST].append(noise_sample)
                elif bucket < self.dev_pct + self.test_pct:
                    self.noise_samples_by_dataset[DatasetType.DEV].append(noise_sample)
                else:
                    self.noise_samples_by_dataset[DatasetType.TRAIN].append(noise_sample)


    def get_bucket_from_file_name(self, audio_file, group_speakers_by_id):
        if group_speakers_by_id:
            hashname_search = re.search(r"(\w+)_nohash_.*$", audio_file, re.IGNORECASE)
            if hashname_search:
                hashname = hashname_search.group(1)

            sha = int(hashlib.sha1(hashname.encode()).hexdigest(), 16)
            bucket = (sha % (self.bucket_size + 1)) / self.bucket_size
        else:
            bucket = random.random()

        return bucket

    def distribute_to_dataset(self, bucket, audio_file, label):
        if bucket < self.test_pct:
            self.audio_files_by_dataset[DatasetType.TEST].append(audio_file)
            self.labels_by_dataset[DatasetType.TEST].append(label)
        elif bucket < self.dev_pct + self.test_pct:
            self.audio_files_by_dataset[DatasetType.DEV].append(audio_file)
            self.labels_by_dataset[DatasetType.DEV].append(label)
        else:
            self.audio_files_by_dataset[DatasetType.TRAIN].append(audio_file)
            self.labels_by_dataset[DatasetType.TRAIN].append(label)


In [12]:
from itertools import islice

In [41]:
def window_iter(seq, window_size, step_size):
    it = iter(seq)
    result = tuple(islice(it, window_size, step_size))
    if len(result) == window_size:
        print('r', result)
        yield result    
    for elem in it:
        result = result[1:] + (elem,)
        yield result

In [42]:
def moving_averages(values, size, step):
    for selection in window_iter(values, size, step):
        print(selection)
        yield sum(selection) / size

In [43]:
y = ['1', '2', '3', '4','5','6','7','8','9','10']
y2 = ['21', '22', '23', '24','25','26','27','28','29','210']
for p1,p2 in zip(moving_averages(map(int, y), 5, 1), moving_averages(map(int, y2), 5, 1)):
    print(p1,p2)

(6,)
(26,)
1.2 5.2
(7,)
(27,)
1.4 5.4
(8,)
(28,)
1.6 5.6
(9,)
(29,)
1.8 5.8
(10,)
(210,)
2.0 42.0
