# Creation of Pseudo Multi-channel Data

In [1]:
import librosa
import numpy as np
import numpy as np
from scipy.signal import butter, lfilter
from glob import glob
from tqdm import tqdm
import soundfile as sf
import json
import os

In [2]:
from audio_utils import SpecViewer

In [3]:
specviewer = SpecViewer()

# Download Zebra Finch Data

In [6]:
from huggingface_hub import snapshot_download
snapshot_download('nccratliri/vad-zebra-finch', local_dir = "orig_data/zebra-finch", repo_type="dataset" )
snapshot_download('nccratliri/wing-flap-noise-audio-examples', local_dir = "noise_audios", repo_type="dataset" )

# Create pseudo accelerometer audio

In [5]:
class PseudoAccAudioSynthesizer:
    def __init__(self, noise_audio_file_pattern ):
        self.noise_audio_sr = 48000
        self.noise_audio = np.concatenate( [ librosa.load(fname, sr = self.noise_audio_sr)[0] for fname in glob(noise_audio_file_pattern)],
                              axis = 0 
                            )

    def butter_lowpass(self, cutoff, fs, order=5):
        nyq = 0.5 * fs
        normal_cutoff = cutoff / nyq
        b, a = butter(order, normal_cutoff, btype='low', analog=False)
        return b, a

    def butter_lowpass_filter(self, data, cutoff, fs, order=5):
        b, a = self.butter_lowpass(cutoff, fs, order=order)
        y = lfilter(b, a, data)
        return y

    def apply_low_pass_filter(self,  audio, sr, f_low, f_high, dur_min, dur_max ):
        curr_pos = 0
        processed_audio_list = []
        while curr_pos < audio.shape[0]:
            curr_dur = int(np.random.uniform(low=int(dur_min * sr), high=int(dur_max * sr) ))
            audio_clip = audio[curr_pos:curr_pos + curr_dur]
            curr_pos += curr_dur
            
            cutoff = np.random.uniform(low=f_low, high=f_high)
            processed_audio_clip = self.butter_lowpass_filter(audio_clip, cutoff, sr)
            processed_audio_list.append( processed_audio_clip )
        
        processed_audio = np.concatenate( processed_audio_list, axis = 0)[:len(audio)]
        processed_audio = np.concatenate( [processed_audio, np.zeros( len(audio) - len(processed_audio) ) ], axis = 0 )
        return processed_audio

    def add_pseudo_wing_flap_noise(self,  audio, sr, dur_min, dur_max, noise_ratio, noise_audio, noise_audio_sr ):
        curr_pos = 0
        processed_audio_list = []

        if sr != noise_audio_sr:
            noise_audio = librosa.resample( noise_audio, orig_sr = noise_audio_sr, target_sr = sr )
        while curr_pos < audio.shape[0]:
            curr_dur = int(np.random.uniform(low=int(dur_min * sr), high=int(dur_max * sr) ))
            audio_clip = audio[curr_pos:curr_pos + curr_dur]
            curr_pos += curr_dur

            if np.random.rand() < noise_ratio:
                random_offset_in_noise = np.random.choice( len(noise_audio) - len(audio_clip) )
                processed_audio_clip = audio_clip + noise_audio[random_offset_in_noise:random_offset_in_noise + len(audio_clip) ]
            else:
                processed_audio_clip = audio_clip
            processed_audio_list.append( processed_audio_clip )
        
        processed_audio = np.concatenate( processed_audio_list, axis = 0)[:len(audio)]
        processed_audio = np.concatenate( [processed_audio, np.zeros( len(audio) - len(processed_audio) ) ], axis = 0 )
        return processed_audio

    def add_white_noise(self,  audio ):
        # Add white noise
        noise_amp = 0.005 * np.amax(audio)
        noise = noise_amp * np.random.normal(size=audio.shape)
        audio_with_noise = audio + noise
        return audio_with_noise

    def synthesize( self, audio, sr ):
        processed_audio = self.apply_low_pass_filter( audio, sr, f_low = 400, f_high = 2000, dur_min = 0.1, dur_max = 1.0 )
        processed_audio = self.add_pseudo_wing_flap_noise( processed_audio, sr, dur_min = 0.1, dur_max = 1.0, noise_ratio = 0.1, 
                             noise_audio = self.noise_audio, noise_audio_sr = self.noise_audio_sr )
        processed_audio = self.add_white_noise(processed_audio)
        assert len(processed_audio) == len(audio)
        return processed_audio

In [6]:
audio_synthesizer = PseudoAccAudioSynthesizer( "noise_audios/*.wav" )

In [7]:
orig_audio_fname_list = glob( "orig_data/zebra-finch/train/*.wav" )

In [16]:
for fname in tqdm(orig_audio_fname_list):
    audio, sr = librosa.load( fname, sr = None )
    acc_audio = audio_synthesizer.synthesize( audio, sr )
    acc_save_name = fname[:-4] + "_acc.wav"
    sf.write( acc_save_name, acc_audio, samplerate = sr )

100%|████████████████████████████████████████████████████████████████████████████████████████| 2606/2606 [13:11<00:00,  3.29it/s]


In [22]:
specviewer.visualize( acc_audio, sr )

interactive(children=(FloatSlider(value=0.0, description='offset', max=0.0, step=0.25), Output()), _dom_classe…

<function ipywidgets.widgets.interaction._InteractFactory.__call__.<locals>.<lambda>(*args, **kwargs)>

# Mix audio to create multi-channel dataset

In [7]:
basename_list = [ item[:-5] for item in glob(  "orig_data/zebra-finch/train/*.json" )]

In [8]:
save_folder = "mc_data/zebra-finch/train/"
os.makedirs( save_folder, exist_ok=True )

In [9]:
for count, basename in enumerate(tqdm(basename_list)):
    label = json.load( open(basename + ".json") )
    label["sr"] = 16000
    target_clue_audio, _ = librosa.load( basename + "_acc.wav", sr = label["sr"] )
    
    num_non_targets = np.random.choice( [1,2,3] )
    non_target_basename_list = []
    for _ in range( num_non_targets ):
        while True:
            non_target_idx = np.random.choice( len(basename_list) )
            if non_target_idx != count:
                break
        non_target_basename_list.append( basename_list[non_target_idx] )
    non_target_audio = None
    for fname in non_target_basename_list:
        audio, _ = librosa.load( fname + "_acc.wav" , sr = label["sr"] )
        audio = audio[:len(target_clue_audio)]
        audio = np.concatenate( [audio, np.zeros( len(target_clue_audio) - len(audio) )], axis = 0 ).astype(np.float32)
        if non_target_audio is None:
            non_target_audio = audio
        else:
            non_target_audio = non_target_audio + audio
    
    mix_recording_audio, _ = librosa.load( basename + ".wav", sr = label["sr"] )
    for fname in non_target_basename_list:        
        audio, _ = librosa.load( fname + ".wav" , sr = label["sr"] )
        audio = audio[:len(mix_recording_audio)]
        audio = np.concatenate( [audio, np.zeros( len(mix_recording_audio) - len(audio) )], axis = 0 ).astype(np.float32)
        mix_recording_audio += audio

    save_basename = os.path.basename( basename )
    json.dump( label,  open( save_folder + "/" +  save_basename + "_1.json", "w" ) )
    sf.write( save_folder + "/" +  save_basename + "_1.wav", target_clue_audio, samplerate = label["sr"] )
    sf.write( save_folder + "/" +  save_basename + "_2.wav", non_target_audio, samplerate = label["sr"] )
    sf.write( save_folder + "/" +  save_basename + "_3.wav", mix_recording_audio, samplerate = label["sr"] )


100%|████████████████████████████████████████████████████████████████████████████████████████| 2606/2606 [00:41<00:00, 63.27it/s]


In [17]:
label_fname_list = glob(  "mc_data/zebra-finch/train/*.json" )
for fname in label_fname_list:
    label = json.load( open(fname) )
    label["species"] = "animal"
    label["cluster"] = [ "vocal" ] * len( label["cluster"] )
    json.dump( label, open(fname, "w") )

In [97]:
specviewer.visualize(target_clue_audio, label["sr"], label = label )

interactive(children=(FloatSlider(value=0.0, description='offset', max=0.0, step=0.25), Output()), _dom_classe…

<function ipywidgets.widgets.interaction._InteractFactory.__call__.<locals>.<lambda>(*args, **kwargs)>

In [98]:
specviewer.visualize(non_target_audio, label["sr"], label = label )

interactive(children=(FloatSlider(value=0.0, description='offset', max=0.0, step=0.25), Output()), _dom_classe…

<function ipywidgets.widgets.interaction._InteractFactory.__call__.<locals>.<lambda>(*args, **kwargs)>

In [99]:
specviewer.visualize(mix_recording_audio, label["sr"], label = label )

interactive(children=(FloatSlider(value=0.0, description='offset', max=0.0, step=0.25), Output()), _dom_classe…

<function ipywidgets.widgets.interaction._InteractFactory.__call__.<locals>.<lambda>(*args, **kwargs)>