In [1]:
import os
import sys
import array
import random
import wave
import shutil
import glob

import numpy as np
import matplotlib.pyplot as plt
import librosa
from pystoi import stoi

repos_dir = r'/home/takkan/repos'
#repos_dir = r'/home/akikun/repos'
sys.path.append(repos_dir)

imgan_dir = os.path.join(repos_dir, 'Intelligibility-MetricGAN')
sys.path.append(imgan_dir)

from sak import display as dp
from sak import signal_processing as sp
from sak import nele

In [2]:
# Signal-to-Noise
snr = -10.0

# open iMetricGAN sample files.
# wav_dir = os.path.join(imgan_dir, 'JR_database')
# clean_file = os.path.join(wav_dir, 'Train/Clean/Train_001.wav')
# noise_file = os.path.join(wav_dir, 'Train/Noise/Train_001.wav')
wav_dir = os.path.join(imgan_dir, 'database')
clean_file = os.path.join(wav_dir, 'Train', 'Clean', 'Train_1.wav')
noise_file = os.path.join(wav_dir, 'Train', 'Noise', 'Train_1.wav')
clean_wav = wave.open(clean_file, "r")
noise_wav = wave.open(noise_file, "r")

## by adding noise to a clean audio, make a new wav file with a given SNR.  

In [None]:
'''
Reference: https://github.com/Sato-Kunihiko/audio-SNR
'''
def cal_amp(wf):
    buffer = wf.readframes(wf.getnframes())
    amptitude = (np.frombuffer(buffer, dtype="int16")).astype(np.float64)
    return amptitude    

def cal_rms(amp):
    return np.sqrt(np.mean(np.square(amp), axis=-1))

def cal_adjusted_rms(clean_rms, snr):
    a = float(snr) / 20
    noise_rms = clean_rms / (10**a) 
    return noise_rms


In [None]:
# Calculation of amp
clean_amp = cal_amp(clean_wav)
noise_amp = cal_amp(noise_wav)
print(clean_amp, noise_amp)

In [None]:
# Calculation of RMS
start = random.randint(0, len(noise_amp)-len(clean_amp))
clean_rms = cal_rms(clean_amp)
split_noise_amp = noise_amp[start: start + len(clean_amp)]
noise_rms = cal_rms(split_noise_amp)

print(clean_rms, noise_rms)

In [None]:
# Synthesize waveforms of any size
adjusted_noise_rms = cal_adjusted_rms(clean_rms, snr)
        
adjusted_noise_amp = split_noise_amp * (adjusted_noise_rms / noise_rms) 
mixed_amp = (clean_amp + adjusted_noise_amp)

# Normalized so as not to crack the sound
if (mixed_amp.max(axis=0) > 32767): 
    mixed_amp = mixed_amp * (32767/mixed_amp.max(axis=0))
    clean_amp = clean_amp * (32767/mixed_amp.max(axis=0))
    adjusted_noise_amp = adjusted_noise_amp * (32767/mixed_amp.max(axis=0))


In [None]:
# Save the waveform as a wav file
noisy_wave = wave.Wave_write('output_noisy_file.wav')
noisy_wave.setparams(clean_wav.getparams())
noisy_wave.writeframes(array.array('h', mixed_amp.astype(np.int16)).tostring() )
noisy_wave.close()


In [None]:
dp.disp_wav('output_noisy_file.wav')

## Check sak/nele/add_noise function.

In [3]:
nele.add_noise(clean_file, noise_file, 'output_noisy_file_2.wav', -10.0)

  """Entry point for launching an IPython kernel.


In [4]:
dp.disp_wav('output_noisy_file_2.wav')

## implement the same function using librosa and sak.

In [None]:
def add_noise2(wav_clean_path, wav_noise_path, wav_mixed_path, snr=0, sampling_frequency=44100):
    # load signal from wav files.
    signal_clean = sp.load_wav(wav_clean_path)
    signal_noise = sp.load_wav(wav_noise_path)

    # calculate average rms. 
    rms_clean = np.mean(sp.calc_rms(signal_clean))
    rms_noise = np.mean(sp.calc_rms(signal_noise))

    # energy_noise = energy_signal / exp(SNR / 20)
    a = float(snr) / 20
    rms_noise_desired = rms_clean / (10**a)

    # adjust rms of noise and add to signal_clean.
    signal_mixed = signal_clean + signal_noise * rms_noise_desired / rms_noise

    # output the signal.
    librosa.output.write_wav(wav_mixed_path, signal_mixed, sampling_frequency)

In [6]:
wav_clean_path = clean_file
wav_noise_path = noise_file
wav_mixed_path = 'mixed.wav'
sampling_frequency = 44100

nele.add_noise2(
    wav_clean_path, 
    wav_noise_path, 
    wav_mixed_path, 
    snr=-10, 
    sampling_frequency=sampling_frequency)

In [7]:
dp.disp_wav(clean_file)
dp.disp_wav(noise_file)
dp.disp_wav(wav_mixed_path)

## Taku's work.
Recursive processing of add noise.

In [None]:
# Signal-to-Noise
snrs = [-10, -5, 0, 5, 10]

# open iMetricGAN sample files.
wav_dir = os.path.join(imgan_dir, 'test_wav')
clean_dir = os.path.join(wav_dir, 'wavs_ssdrc')
noise_dir = os.path.join(wav_dir, 'noise')
file_names = os.listdir(clean_dir)


In [None]:
for snr in snrs:
    for file_name in file_names:
        clean_file = os.path.join(clean_dir, file_name)
        noise_file = os.path.join(noise_dir, file_name)
        output_name = os.path.splitext(os.path.basename(clean_file))[0] + '_' + str(snr) + '.wav'
        output_file = os.path.join(wav_dir, 'output', output_name)
        #print(clean_file, noise_file, output_file)
        nele.add_noise(clean_file, noise_file, output_file, snr)


## Aki's work.
Same task as above with add_noise2.

In [4]:
#samples_dir = r'/home/akikun/experiments/jr/samples/ssdrc'
samples_dir = r'/home/takkan/experiments/jr/samples'

nele_dir = r'/home/common/db/audio_corpora/nele'
wav_clean_dir    = os.path.join(nele_dir, 'imgan', 'train', 'clean')
wav_enhanced_dir = os.path.join(nele_dir, 'imgan', 'train', 'enhanced')
wav_train_noise_jr_dir = os.path.join(nele_dir, 'train_noise', 'JR')
# noise to be used.
wav_noise_path = os.path.join(wav_train_noise_jr_dir, 'wavs_normalized', 'train_noise1.wav')


In [5]:
wav_clean_paths = glob.glob(os.path.join(wav_clean_dir, '*.wav'))
wav_clean_paths.sort()
for wav_clean_path in wav_clean_paths[0:3]:
    for snr in [-15, -13, -10, -8, -5, 0]:
        # original. 
        print(wav_clean_path)
        #dp.disp_wav_file(wav_clean_path)
        shutil.copy(
            wav_clean_path, 
            os.path.join(samples_dir, os.path.basename(wav_clean_path).replace('.wav', '_tts.wav')))
        shutil.copy(
            wav_clean_path.replace('clean', 'enhanced'), 
            os.path.join(samples_dir, os.path.basename(wav_clean_path).replace('.wav', '_ssdrc.wav')))
         
        
        # tts
        wav_mixed_path = os.path.join(
            samples_dir, 
            os.path.basename(wav_clean_path).replace('.wav', '') + '_tts_train_noise1_snr' + str(snr) + '.wav')
        print('processing ... {}'.format(os.path.basename(wav_mixed_path)))
        nele.add_noise2(
            wav_clean_path, 
            wav_noise_path,
            wav_mixed_path, 
            wav_noise_out_path=None, 
            snr=snr, 
            sampling_frequency=44100)

        # ssdrc
        wav_mixed_path = os.path.join(
            samples_dir, 
            os.path.basename(wav_clean_path).replace('.wav', '') + '_ssdrc_train_noise1_snr' + str(snr) + '.wav')
        print('processing ... {}'.format(os.path.basename(wav_mixed_path)))
        nele.add_noise2(
            wav_clean_path.replace('clean', 'enhanced'), 
            wav_noise_path,
            wav_mixed_path, 
            wav_noise_out_path=None, 
            snr=snr, 
            sampling_frequency=44100)


/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
processing ... hikari-001_001_tts_train_noise1_snr-15.wav
processing ... hikari-001_001_ssdrc_train_noise1_snr-15.wav
/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
processing ... hikari-001_001_tts_train_noise1_snr-13.wav
processing ... hikari-001_001_ssdrc_train_noise1_snr-13.wav
/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
processing ... hikari-001_001_tts_train_noise1_snr-10.wav
processing ... hikari-001_001_ssdrc_train_noise1_snr-10.wav
/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
processing ... hikari-001_001_tts_train_noise1_snr-8.wav
processing ... hikari-001_001_ssdrc_train_noise1_snr-8.wav
/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
processing ... hikari-001_001_tts_train_noise1_snr-5.wav
processing ... hikari-001_001_ssdrc_train_noise1_snr-5.wav
/home/common/db/audio_corpora/nele/imgan/train/clean/h

In [6]:
for wav_path in glob.glob(os.path.join(samples_dir, 'hikari-001_001*.wav')):
    print(os.path.basename(wav_path))
    dp.disp_wav_file(wav_path)


hikari-001_001_ssdrc_train_noise1_snr-8.wav


hikari-001_001_tts_train_noise1_snr-10.wav


hikari-001_001_tts_train_noise1_snr0.wav


hikari-001_001_ssdrc_train_noise1_snr0.wav


hikari-001_001_tts_train_noise1_snr-5.wav


hikari-001_001_ssdrc_train_noise1_snr-13.wav


hikari-001_001_tts_train_noise1_snr-8.wav


hikari-001_001_ssdrc_train_noise1_snr-5.wav


hikari-001_001_ssdrc_train_noise1_snr-15.wav


hikari-001_001_ssdrc_train_noise1_snr-10.wav


hikari-001_001_tts_train_noise1_snr-15.wav


hikari-001_001_tts_train_noise1_snr-13.wav


hikari-001_001_ssdrc.wav


hikari-001_001_tts.wav


## Test of calculation stoi.

In [7]:
for wav_clean_path in wav_clean_paths[0:3]:
    for snr in [-15, -13, -10, -8, -5, 0]:
        # original. 
        print(wav_clean_path)
        clean, fs = librosa.load(wav_clean_path, sr=44100)
        
        # tts
        wav_mixed_tts_path = os.path.join(
            samples_dir, 
            os.path.basename(wav_clean_path).replace('.wav', '') + '_tts_train_noise1_snr' + str(snr) + '.wav')
        print(wav_mixed_tts_path)
        tts_mixed, fs = librosa.load(wav_mixed_tts_path, sr=44100)
        
        tts_mixed_intelligibility = stoi(clean, tts_mixed, fs, extended=False)
        print(tts_mixed_intelligibility)
        
        # ssdrc
        wav_mixed_ssdrc_path = os.path.join(
            samples_dir, 
            os.path.basename(wav_clean_path).replace('.wav', '') + '_ssdrc_train_noise1_snr' + str(snr) + '.wav')
        print(wav_mixed_ssdrc_path)
        ssdrc_mixed, fs = librosa.load(wav_mixed_ssdrc_path, sr=44100)
        
        ssdrc_mixed_intelligibility = stoi(clean, ssdrc_mixed, fs, extended=False)
        print(ssdrc_mixed_intelligibility)
        print('-----')


/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
/home/takkan/experiments/jr/samples/hikari-001_001_tts_train_noise1_snr-15.wav
0.39972756780761803
/home/takkan/experiments/jr/samples/hikari-001_001_ssdrc_train_noise1_snr-15.wav
0.4141632076683262
-----
/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
/home/takkan/experiments/jr/samples/hikari-001_001_tts_train_noise1_snr-13.wav
0.44177596560086
/home/takkan/experiments/jr/samples/hikari-001_001_ssdrc_train_noise1_snr-13.wav
0.4891136695560832
-----
/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
/home/takkan/experiments/jr/samples/hikari-001_001_tts_train_noise1_snr-10.wav
0.514504570534607
/home/takkan/experiments/jr/samples/hikari-001_001_ssdrc_train_noise1_snr-10.wav
0.5168214870968179
-----
/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_001.wav
/home/takkan/experiments/jr/samples/hikari-001_001_tts_train_noise1_snr-8.wav
0.5600696802827715
/home

## Check sak/nele/calc_stoi function.

In [9]:
test_clean_path = '/home/common/db/audio_corpora/nele/imgan/train/clean/hikari-001_003.wav'
test_mixed_path = '/home/takkan/experiments/jr/samples/hikari-001_003_tts_train_noise1_snr-8.wav'
test_intelligibility = nele.calc_stoi(test_clean_path, test_mixed_path, 44100)
print(test_intelligibility)


0.5344962776890629
