In [1]:
import scipy
import soundfile as sf
import numpy as np
import matplotlib.pyplot as plt
import os

def active_rms(clean, noise, fs=16000, energy_thresh=-50):

    '''Returns the clean and noise RMS of the noise calculated only in the active portions'''
    window_size = 100 # in ms
    window_samples = int(fs*window_size/1000)
    sample_start = 0
    noise_active_segs = []
    clean_active_segs = []

    while sample_start < len(noise):
        sample_end = min(sample_start + window_samples, len(noise))
        noise_win = noise[sample_start:sample_end]
        clean_win = clean[sample_start:sample_end]
        noise_seg_rms = (noise_win**2).mean()**0.5

        # Considering frames with energy
        if noise_seg_rms > 10 ** (energy_thresh / 20) * (max(abs(noise))+np.finfo(float).eps): # if noise_seg_rms > energy_thresh: 에서 고쳤음
            noise_active_segs = np.append(noise_active_segs, noise_win)
            clean_active_segs = np.append(clean_active_segs, clean_win)
        sample_start += window_samples

    if len(noise_active_segs)!=0:
        noise_rms = (noise_active_segs**2).mean()**0.5
    else:
        noise_rms = np.finfo(float).eps
        
    if len(clean_active_segs)!=0:
        clean_rms = (clean_active_segs**2).mean()**0.5
    else:
        clean_rms = np.finfo(float).eps

    return clean_rms, noise_rms


def active_snr(clean, noise):
    clean_rms, noise_rms = active_rms(clean, noise)
    snr = 20 * np.log10(clean_rms/noise_rms)
    return snr


EPS = np.finfo(float).eps

def normalize_segmental_rms(audio, rms, target_level=-25):
    '''Normalize the signal to the target level
    based on segmental RMS'''
    scalar = 10 ** (target_level / 20) / (rms+EPS)
    audio = audio * scalar
    return audio

def is_clipped(audio, clipping_threshold=0.99):
    return any(abs(audio) > clipping_threshold)

def snr_mixer(clean, noisy, SNR=-5):
    clean = clean/(max(abs(clean))+EPS)
    noise = noise/(max(abs(noise))+EPS)

    # 1차 normalize 후 noise가 active한 구간만의 rms value 구하기 (snr 기준 mixing을 위함)   
    rmsclean, rmsnoise = active_rms(clean=clean, noise=noise)

    # active rms를 기반으로 2차 normalize. Target level dB 기준으로. (default -25dB)
    clean = normalize_segmental_rms(clean, rms=rmsclean, target_level=-25)
    noise = normalize_segmental_rms(noise, rms=rmsnoise, target_level=-25)

    # 현재 clean, noise는 -25dB를 기준으로 normalize 되어있음

    # Set the noise level for a given SNR
    
    noisescalar = 1 / (10**(SNR/20)) ## revised


    noisenewlevel = noise * noisescalar


    # Mix noise and clean speech
    noisyspeech = clean + noisenewlevel
    # Randomly select RMS value between -15 dBFS and -35 dBFS and normalize noisyspeech with that value
    # There is a chance of clipping that might happen with very less probability, which is not a major issue. 
    noisy_rms_level = np.random.randint(-35, -15)
    rmsnoisy = (noisyspeech**2).mean()**0.5

    # noisy speech를 target_lower ~ target_upper 사이 크기로 변환 후, clean과 noise를 같은 비율로 변환
    scalarnoisy = 10 ** (noisy_rms_level / 20) / (rmsnoisy+EPS)

    noisyspeech = noisyspeech * scalarnoisy
    clean = clean * scalarnoisy
    noisenewlevel = noisenewlevel * scalarnoisy

    # Final check to see if there are any amplitudes exceeding +/- 1. If so, normalize all the signals accordingly
    if is_clipped(noisyspeech):
        noisyspeech_maxamplevel = max(abs(noisyspeech))/(0.99-EPS)
        noisyspeech = noisyspeech/noisyspeech_maxamplevel
        clean = clean/noisyspeech_maxamplevel
        noisenewlevel = noisenewlevel/noisyspeech_maxamplevel
        noisy_rms_level = int(20*np.log10(scalarnoisy/noisyspeech_maxamplevel*(rmsnoisy+EPS))) # clip이 일어난 경우 target level을 고쳐서 기입함


    return clean, noisenewlevel, noisyspeech, noisy_rms_level



In [None]:
clean_src = './VBD/train/clean'
noise_src = './VBD/train/noise'

clean_dst = './VBD_SNR-5/train/clean'
noise_dst = './VBD_SNR-5/train/noise'
noisy_dst = './VBD_SNR-5/train/noisy'

for r, d, f in os.walk(clean_src):
    for file in f:

        clean, _ = sf.read(os.path.join(clean_src, file))
        noise, _ = sf.read(os.path.join(noise_src, file))
        
        snr = active_snr(clean, noise)
        noise_snrm5 = noise * np.power(10.0, (snr+5)/20)
        noisy_snrm5 = clean + noise_snrm5

        if is_clipped(noisy_snrm5):
            noisyspeech_maxamplevel = max(abs(noisy_snrm5))/(0.99-EPS)
            noisy_snrm5 = noisy_snrm5/noisyspeech_maxamplevel
            clean = clean/noisyspeech_maxamplevel
            noise_snrm5 = noise_snrm5/noisyspeech_maxamplevel
        
        sf.write(os.path.join(clean_dst, file), clean, 16000)
        sf.write(os.path.join(noisy_dst, file), noisy_snrm5, 16000)
        sf.write(os.path.join(noise_dst, file), noise_snrm5, 16000)

In [None]:
clean_src = './VBD_SNR-5/valid/clean'
noise_src = './VBD_SNR-5/valid/noise'

clean_dst = './VBD_SNR-5/valid2/clean'
noise_dst = './VBD_SNR-5/valid2/noise'
noisy_dst = './VBD_SNR-5/valid2/noisy'

for r, d, f in os.walk(clean_src):
    for file in f:

        clean, _ = sf.read(os.path.join(clean_src, file))
        noise, _ = sf.read(os.path.join(noise_src, file))
        
        snr = active_snr(clean, noise)
        noise_snrm5 = noise * np.power(10.0, (snr+5)/20)
        noisy_snrm5 = clean + noise_snrm5

        if is_clipped(noisy_snrm5):
            noisyspeech_maxamplevel = max(abs(noisy_snrm5))/(0.99-EPS)
            noisy_snrm5 = noisy_snrm5/noisyspeech_maxamplevel
            clean = clean/noisyspeech_maxamplevel
            noise_snrm5 = noise_snrm5/noisyspeech_maxamplevel
        
        sf.write(os.path.join(clean_dst, file), clean, 16000)
        sf.write(os.path.join(noisy_dst, file), noisy_snrm5, 16000)
        sf.write(os.path.join(noise_dst, file), noise_snrm5, 16000)