In [46]:
# #! /usr/bin/env python
# # -*- coding: utf-8 -*-

# from os import path
# from acoular import __file__ as bpath, MicGeom, WNoiseGenerator, PointSource,\
#  Mixer, WriteH5, TimeSamples, PowerSpectra, RectGrid, SteeringVector,\
#  BeamformerBase, BeamformerFunctional, BeamformerMusic, L_p, SineGenerator
# from pylab import figure, plot, axis, imshow, colorbar, show
# from scipy.io import wavfile

# # set up the parameters
# sfreq = 51200
# duration = 1
# nsamples = duration*sfreq
# micgeofile = 'array_9.xml'
# h5savefile = 'mix_sine_dual.h5'

# # generate test data, in real life this would come from an array measurement
# mg = MicGeom( from_file=micgeofile )

# n1 = SineGenerator(sample_freq=sfreq, numsamples=nsamples, freq=2000)
# n2 = SineGenerator(sample_freq=sfreq, numsamples=nsamples, freq=4000)

# n3 = WNoiseGenerator( sample_freq=sfreq, numsamples=nsamples, seed=3, rms=1 )
# n4 = WNoiseGenerator( sample_freq=sfreq, numsamples=nsamples, seed=6, rms=0.7 )
# n5 = WNoiseGenerator( sample_freq=sfreq, numsamples=nsamples, seed=7, rms=1 )


# p1 = PointSource( signal=n1, mics=mg,  loc=(4,2,0.5) ) # noise mod at right
# p2 = PointSource( signal=n2, mics=mg,  loc=(0,2,0.5) ) # signal mod at left

# pa = Mixer( source=p2, sources=[p1] )
# wh5 = WriteH5( source=pa, name=h5savefile )
# wh5.save()

Load params

In [47]:
from Beamformer_MVDR import beamformer_MVDR
import utils
from acoular import MicGeom
import numpy as np
from scipy.io import wavfile
import os

# Parameters
test_name = "SINR_simu"
MIX_FILENAME = 'mix_mod.h5'
SIG_FILENAME = 'sig_mod.h5'
IAN_FILENAME = 'noise_mod.h5'

OUTPUT_FOLDER = f'test_{test_name}'
if not os.path.exists(OUTPUT_FOLDER):
    os.makedirs(OUTPUT_FOLDER)
FILENAME = 'ideal.wav'
FILENAME1 = 'recon.wav'
FILENAME2 = 'right_mic.wav'
FILENAME3 = 'left_mic.wav'
FILENAME4 = 'smi.wav'
dir_path = '/home/xian/Documents/mvdr_beamformer'
OUTPUT_PATH = os.path.join(dir_path, OUTPUT_FOLDER)
OUTPUT_FILENAME = os.path.join(OUTPUT_PATH, FILENAME)
OUTPUT_FILENAME1 = os.path.join(OUTPUT_PATH, FILENAME1)
OUTPUT_FILENAME2 = os.path.join(OUTPUT_PATH, FILENAME2)
OUTPUT_FILENAME3 = os.path.join(OUTPUT_PATH, FILENAME3)
OUTPUT_FILENAME4 = os.path.join(OUTPUT_PATH, FILENAME4)

mg = MicGeom(from_file='array_9.xml')
number_of_mic = mg.mpos.shape[1]
MIC_POS = []
for i in np.arange(number_of_mic):
    MIC_POS.append(mg.mpos[:,int(i)])

LOOK_POS = [0,2,0.5]
NOISE_POS = [4,2,0.5]
NOISE_CH = 1
SIG_CH = 7
SAMPLING_RATE = 51200
FFT_LENGTH = 8192
FFT_SHIFT = 2048
SOUND_SPEED = 343

mvdr_beamformer = beamformer_MVDR(MIC_POS, sampling_rate=SAMPLING_RATE, fft_length=FFT_LENGTH, fft_shift=FFT_SHIFT, sound_speed=SOUND_SPEED)

f_sig = 4000
f_in = 2000
ind_f_sig = (np.abs(mvdr_beamformer.frequency_grid - f_sig)).argmin()
ind_f_in = (np.abs(mvdr_beamformer.frequency_grid - f_in)).argmin()

Ideal SINR

In [48]:
multi_signal = utils.get_data_from_h5(MIX_FILENAME, skip_last_channel=False)
noise_signal = utils.get_data_from_h5(IAN_FILENAME, skip_last_channel=False)
pure_signal = utils.get_data_from_h5(SIG_FILENAME, skip_last_channel=False)

wavfile.write(OUTPUT_FILENAME3, SAMPLING_RATE, multi_signal[:,SIG_CH].astype(np.float32))
wavfile.write(OUTPUT_FILENAME2, SAMPLING_RATE, multi_signal[:,NOISE_CH].astype(np.float32))

steering_vector = mvdr_beamformer.get_steering_vector_near_field(LOOK_POS)

noise_correlation_matrix = mvdr_beamformer.get_spatial_correlation_matrix(noise_signal) # noise covariance matrix
signal_correlation_matrix = mvdr_beamformer.get_spatial_correlation_matrix(pure_signal) # noise covariance matrix

beamformer = mvdr_beamformer.get_mvdr_beamformer(steering_vector, noise_correlation_matrix)

complex_spectrum = utils.get_spectrogram(multi_signal, FFT_LENGTH, FFT_SHIFT, FFT_LENGTH)

enhanced_spectrum = mvdr_beamformer.apply_beamformer(beamformer, complex_spectrum)

enhanced_audio = utils.spec2wav(enhanced_spectrum, SAMPLING_RATE, FFT_LENGTH, FFT_LENGTH, FFT_SHIFT)

enhanced_audio_ideal = enhanced_audio / np.max(np.abs(enhanced_audio)) * 0.7

wavfile.write(OUTPUT_FILENAME, SAMPLING_RATE, enhanced_audio_ideal.astype(np.float32))

# SINR = []
# for f_ind in np.arange(beamformer.shape[1]):
#     noise_correlation_matrix_cut = np.asmatrix(noise_correlation_matrix[:,:,f_ind])
#     signal_correlation_matrix_cut = np.asmatrix(signal_correlation_matrix[:,:,f_ind])
#     beamformer_cut = np.asmatrix(beamformer[:,f_ind])

#     product_IAN_part1 = np.matmul(np.conjugate(beamformer_cut), noise_correlation_matrix_cut)
#     product_IAN_total = np.absolute(np.matmul(product_IAN_part1, beamformer_cut.T))

#     product_SIG_part1 = np.matmul(np.conjugate(beamformer_cut), signal_correlation_matrix_cut)
#     product_SIG_total = np.absolute(np.matmul(product_SIG_part1, beamformer_cut.T))

#     if product_IAN_total == 0:
#         SINR.append(np.Inf)
#     else:
#         SINR.append(product_SIG_total/product_IAN_total)

# print(np.log10(np.mean(SINR)))

# signal_mean = enhanced_spectrum.mean(0)
# pow_sig = np.absolute(signal_mean[ind_f_sig])
# pow_in = np.absolute(signal_mean[ind_f_in])
# SINR = pow_sig/pow_in
# print(np.log10(SINR))

0.6048727


Recon SINR

In [49]:
### noise spectrogram multiplied with steering vector pointing to its position
multi_signal = utils.get_data_from_h5(MIX_FILENAME, skip_last_channel=False)
noise_signal = mvdr_beamformer.get_augmented_noise(NOISE_POS, multi_signal, NOISE_CH)

noise_correlation_matrix = mvdr_beamformer.get_spatial_correlation_matrix(noise_signal) # noise covariance matrix

steering_vector = mvdr_beamformer.get_steering_vector_near_field(LOOK_POS)

beamformer = mvdr_beamformer.get_mvdr_beamformer(steering_vector, noise_correlation_matrix)

complex_spectrum = utils.get_spectrogram(multi_signal, FFT_LENGTH, FFT_SHIFT, FFT_LENGTH)

enhanced_spectrum = mvdr_beamformer.apply_beamformer(beamformer, complex_spectrum)

enhanced_audio = utils.spec2wav(enhanced_spectrum, SAMPLING_RATE, FFT_LENGTH, FFT_LENGTH, FFT_SHIFT)

enhanced_audio_recon = enhanced_audio / np.max(np.abs(enhanced_audio)) * 0.7

wavfile.write(OUTPUT_FILENAME1, SAMPLING_RATE, enhanced_audio_recon.astype(np.float32))

# # get mean from the 21 frames, then calc w*R_i+n*w for the denominator
# signal_mean = enhanced_spectrum.mean(0)
# SINR_recon = []
# for f_ind in np.arange(beamformer.shape[1]):
#     noise_correlation_matrix_cut = np.asmatrix(noise_correlation_matrix[:,:,f_ind])
#     beamformer_cut = np.asmatrix(beamformer[:,f_ind])

#     product_IAN_part1 = np.matmul(np.conjugate(beamformer_cut), noise_correlation_matrix_cut)
#     product_IAN_total = np.absolute(np.matmul(product_IAN_part1, beamformer_cut.T))

#     product_SIG_total = np.absolute(signal_mean[f_ind])

#     # print(f'IAN:{product_IAN_total}')
#     # print(f'SIG:{product_SIG_total}')

#     if product_IAN_total == 0:
#         SINR_recon.append(np.Inf)
#     else:
#         SINR_recon.append(product_SIG_total/product_IAN_total)
#         # print(f'SINR:{product_SIG_total/product_IAN_total}')

# print(np.log10(np.mean(SINR_recon)))

# signal_mean = enhanced_spectrum.mean(0)
# pow_sig = np.absolute(signal_mean[ind_f_sig])
# pow_in = np.absolute(signal_mean[ind_f_in])
# SINR = pow_sig/pow_in
# print(np.log10(SINR))

0.8573271


SMI

In [50]:
multi_signal = utils.get_data_from_h5(MIX_FILENAME, skip_last_channel=False)

spectral_correlation_matrix = mvdr_beamformer.get_spatial_correlation_matrix(multi_signal)

steering_vector = mvdr_beamformer.get_steering_vector_near_field(LOOK_POS)

beamformer = mvdr_beamformer.get_mvdr_beamformer(steering_vector, spectral_correlation_matrix)

complex_spectrum = utils.get_spectrogram(multi_signal, FFT_LENGTH, FFT_SHIFT, FFT_LENGTH)

enhanced_spectrum = mvdr_beamformer.apply_beamformer(beamformer, complex_spectrum)

enhanced_audio = utils.spec2wav(enhanced_spectrum, SAMPLING_RATE, FFT_LENGTH, FFT_LENGTH, FFT_SHIFT)

enhanced_audio_smi = enhanced_audio / np.max(np.abs(enhanced_audio)) * 0.7

wavfile.write(OUTPUT_FILENAME4, SAMPLING_RATE, enhanced_audio_smi.astype(np.float32))

# signal_mean = enhanced_spectrum.mean(0)
# pow_sig = np.absolute(signal_mean[ind_f_sig])
# pow_in = np.absolute(signal_mean[ind_f_in])
# SINR = pow_sig/pow_in
# print(np.log10(SINR))

0.5582358


In [51]:
# wav_temp = 'modulated_signal.wav'
# _, signal = wavfile.read(wav_temp)
# signal = signal / np.max(np.abs(signal)) * 0.7

SI-SDR

In [52]:
# def si_sdr(estimate, reference, epsilon=1e-8):
#     estimate = estimate - estimate.mean()
#     reference = reference - reference.mean()
#     reference_pow = np.power(reference, 2)
#     reference_pow = reference_pow.mean()
#     mix_pow = (estimate * reference).mean()
#     scale = mix_pow / (reference_pow + epsilon)

#     reference = scale * reference
#     # estimate = scale * estimate
#     error = estimate - reference

#     reference_pow = np.power(reference, 2)
#     error_pow = np.power(error, 2)

#     reference_pow = reference_pow.mean()
#     error_pow = error_pow.mean()

#     sisdr = 10 * np.log10(reference_pow) - 10 * np.log10(error_pow)
#     return sisdr.item()

# def si_sdr2(reference, estimation):
#     """
#     Scale-Invariant Signal-to-Distortion Ratio (SI-SDR)
#     Args:
#         reference: numpy.ndarray, [..., T]
#         estimation: numpy.ndarray, [..., T]
#     Returns:
#         SI-SDR
#     [1] SDR– Half- Baked or Well Done?
#     http://www.merl.com/publications/docs/TR2019-013.pdf
#     >>> np.random.seed(0)
#     >>> reference = np.random.randn(100)
#     >>> si_sdr(reference, reference)
#     inf
#     >>> si_sdr(reference, reference * 2)
#     inf
#     >>> si_sdr(reference, np.flip(reference))
#     -25.127672346460717
#     >>> si_sdr(reference, reference + np.flip(reference))
#     0.481070445785553
#     >>> si_sdr(reference, reference + 0.5)
#     6.3704606032577304
#     >>> si_sdr(reference, reference * 2 + 1)
#     6.3704606032577304
#     >>> si_sdr([1., 0], [0., 0])  # never predict only zeros
#     nan
#     >>> si_sdr([reference, reference], [reference * 2 + 1, reference * 1 + 0.5])
#     array([6.3704606, 6.3704606])
#     """
#     estimation, reference = np.broadcast_arrays(estimation, reference)

#     # assert reference.dtype == np.float64, reference.dtype
#     # assert estimation.dtype == np.float64, estimation.dtype

#     reference_energy = np.sum(reference ** 2, axis=-1, keepdims=True)

#     # This is $\alpha$ after Equation (3) in [1].
#     optimal_scaling = np.sum(reference * estimation, axis=-1, keepdims=True) \
#         / reference_energy

#     # This is $e_{\text{target}}$ in Equation (4) in [1].
#     projection = optimal_scaling * reference

#     # This is $e_{\text{res}}$ in Equation (4) in [1].
#     noise = estimation - projection

#     ratio = np.sum(projection ** 2, axis=-1) / np.sum(noise ** 2, axis=-1)
#     return 10 * np.log10(ratio)

# print("SI-SDR: ", si_sdr(enhanced_audio_recon, signal[:enhanced_audio_recon.size]))
# print("SI-SDR2: ", si_sdr2(signal[:enhanced_audio_recon.size], enhanced_audio_ideal))