# This is a notebook to create my sample dataset

In [33]:
import scipy
import librosa
import os
import numpy as np
import glob2
import joblib

In [43]:
from spatial_two_mics.labels_inference.tf_label_estimator import TFMaskEstimator
from spatial_two_mics.utils.audio_mixture_constructor import AudioMixtureConstructor

ground_estimator = TFMaskEstimator('duet_kmeans')
duet_estimator = TFMaskEstimator('ground_truth')

audio_mixer = AudioMixtureConstructor()

def transform_sample(sample_path, tf_sample_path, target_sr=16000):
    """Takes a sample that is not ready for use in the model, 
        and transforms it to a sample that the model can use.
        @param sample_path - the path to the sample, a String
        @param tf_sample_path - the path for saving the proccessed sample
        @returns: a path to the transformed sample"""
    signal, _ = librosa.core.load(sample_path, sr=target_sr, dtype=np.float32)
    if len(signal.shape) > 1:
        signal = signal[:,0]    
    scipy.io.wavfile.write(tf_sample_path, target_sr, signal)
    return tf_sample_path

# a sound to mix with all of the other sounds
mixer_wav = os.path.join('samples', 'mixer.wav')
orig_wav = os.path.join('samples', '4kHz_sin_wave.wav')
transform_sample(orig_wav, mixer_wav)

'samples\\mixer.wav'

In [46]:
def get_info(file_path):
    return ({'positions':
                      {'amplitudes': np.array([0.73382382, 0.26617618]),
                       'd_thetas': np.array([1.06829948]),
                       'distances': {'m1m1': 0.0,
                                     'm1m2': 0.03,
                                     'm1s1': 3.015,
                                     'm1s2': 3.0072529608785676,
                                     'm2m1': 0.03,
                                     'm2m2': 0.0,
                                     'm2s1': 2.985,
                                     'm2s2': 2.9928046426867034,
                                     's1m1': 3.015,
                                     's1m2': 2.985,
                                     's1s1': 0.0,
                                     's1s2': 3.054656422155759,
                                     's2m1': 3.0072529608785676,
                                     's2m2': 2.9928046426867034,
                                     's2s1': 3.054656422155759,
                                     's2s2': 0.0},
                       'taus': np.array([1.39941691, 0.67397403]),
                       'thetas': np.array([0., 1.06829948]),
                       'xy_positons': np.array([[3., 0.],
                                             [1.44484569, 2.62914833]])},
         'sources_ids': [{'gender': 'f',
                         'sentence_id': 'sa1',
                         'speaker_id': 'flbw0',
                         'wav_path': file_path},
                        {'gender': 'm',
                         'sentence_id': 'sa2',
                         'speaker_id': 'mbns0',
                         'wav_path': mixer_wav}
    ]})

def get_mixture_info(file_path):
    file_path = transform_sample(file_path, file_path.split('.')[0] + 'X.wav')
    ex = get_info(file_path)
    return audio_mixer.construct_mixture(mixture_info=ex)

def get_masks(file_path):
    T = get_mixture_info(file_path)
    ground_mask = ground_estimator.infer_mixture_labels(T)
    duet_mask = duet_estimator.infer_mixture_labels(T)
    return ground_mask, duet_mask

In [47]:
def transform_sample_tf(sample_path, tf_path, target_sr=16000):
    """Takes a sample that is not ready for use in the model, 
        and transforms it to a sample that the model can use.
        @param sample_path - the path to the sample, a String
        @param tf_sample_path - the directory to save the information of the sample
        @returns: a path to the transformed sample"""
    signal, _ = librosa.core.load(sample_path, sr=target_sr, dtype=np.float32)
    # take only the first channel
    if len(signal.shape) > 1:
        signal = signal[:,0]
    # calculate the real part of the stft and the complex part of the stft
    tf = librosa.core.stft(signal)
    real = np.real(tf)
    imag = np.imag(tf)
    # write it to the file, seperate it to real and imag parts
    real_p = os.path.join(tf_path, 'real_tfs')
    imag_p = os.path.join(tf_path, 'imag_tfs')
    joblib.dump(real, real_p, compress=0)
    joblib.dump(imag, imag_p, compress=0)
    # another thing for this to work:
    # save the raw wavs in the name 'wavs'
    wavs_p = os.path.join(tf_path, 'wavs')
    joblib.dump(signal, wavs_p, compress=0)
    # save the 'soft_labeled_mask' and 'ground_truth_mask'
    ground_masks, duet_mask = get_masks(sample_path)
    soft_p = os.path.join(tf_path, 'soft_labeled_mask')
    ground_p = os.path.join(tf_path, 'ground_truth_mask')
    joblib.dump(ground_masks, ground_p, compress=0)
    joblib.dump(duet_mask, soft_p, compress=0)
    return tf_path


In [48]:
unprocessed_path = "samples"
target_path = os.path.join('tf_samples', 'my_dataset', 'train')

In [49]:
def sep():
    sep = '/'
    windows = True
    if windows:
        sep = '\\'
    return sep

def extract_name(path):
    # a file with the path - './.../folder/<name>.wav', returs <name>
    file_name = path.split(sep())[-1]
    return file_name.split('.')[0]

# test:
extract_name(".\\some folder\\example.wav")

'example'

In [50]:
sample_list = glob2.glob(unprocessed_path + f'{sep()}*')
sample_list

['samples\\4kHz_sin_wave.wav',
 'samples\\bass1.wav',
 'samples\\bass2.wav',
 'samples\\drums1.wav',
 'samples\\drums2.wav',
 'samples\\mixer.wav',
 'samples\\vocals1.wav']

In [51]:
for sample_path in sample_list:
    name = extract_name(sample_path)
    tf_path = os.path.join(target_path, name)
    if not os.path.isdir(tf_path):
        os.mkdir(tf_path)
    transform_sample_tf(sample_path, tf_path)

### Also for *val* and for *test* 

In [52]:
target_path = os.path.join('tf_samples', 'my_dataset', 'val')
sample_list = glob2.glob(unprocessed_path + f'{sep()}*')
for sample_path in sample_list:
    name = extract_name(sample_path)
    tf_path = os.path.join(target_path, name)
    if not os.path.isdir(tf_path):
        os.mkdir(tf_path)
    transform_sample_tf(sample_path, tf_path)

In [53]:
target_path = os.path.join('tf_samples', 'my_dataset', 'test')
sample_list = glob2.glob(unprocessed_path + f'{sep()}*')
for sample_path in sample_list:
    name = extract_name(sample_path)
    tf_path = os.path.join(target_path, name)
    if not os.path.isdir(tf_path):
        os.mkdir(tf_path)
    transform_sample_tf(sample_path, tf_path)