In [None]:
import numpy as np
from IPython.display import Audio
import scipy.signal as sig
import soundfile as sf
import matplotlib.pyplot as plt
import mat73
import os
from masp import shoebox_room_sim as srs
from os.path import join as pjoin
import copy
import spaudiopy as spa

In [None]:
# import my modules (helpers.py where I stored all the functions):
import helpers as hlp
import importlib
importlib.reload(hlp);

In [None]:
speech_path = 'sounds'
arte_path = '/Users/joanna.luberadzka/Data/ARTE'
output_path = 'generated-sounds/'
maxlim = 2
ambi_order = 10
fs=48000
target_snr = -5

In [None]:
def mono2biSH(mono_sig, sh_rir):
    # Apply audio to SH IR
    # sh_rir: (M, maxSH, 2, 1), which comes from (M, maxSH, nRec, nSrc)
    # mono_sig: (1,D)
    left = sig.fftconvolve(np.tile(mono_sig[0], (121,1)).T, sh_rir[:,:,0,0], 'full', 0)   
    right = sig.fftconvolve(np.tile(mono_sig[0], (121,1)).T, sh_rir[:,:,1,0], 'full', 0)
    return np.array([left, right])

def mono2sh(mono_sig, sh_rir):
    # Apply audio to SH IR
    # sh_rir: (M, maxSH, 1, 1), which comes from (M, maxSH, nRec, nSrc)
    # mono_sig: (1,D)
    head = sig.fftconvolve(np.tile(mono_sig[0], (121,1)).T, sh_rir[:,:,0,0], 'full', 0)   
    return head

def biSH2bin(sh_sig, decoder):
    left = sig.fftconvolve(sh_sig[0], decoder[:,:,0], 'full', 0).sum(1)
    right = sig.fftconvolve(sh_sig[1], decoder[:,:,1], 'full', 0).sum(1)
    return np.array([left,right])

def sh2bin(sh_sig, decoder):
    left = sig.fftconvolve(sh_sig, decoder[:,:,0], 'full', 0).sum(1)
    right = sig.fftconvolve(sh_sig, decoder[:,:,1], 'full', 0).sum(1)
    return np.array([left,right])


# DESIGN A SCENE

In [None]:
# # --------------- DESIGN PARTY SCENE ----------------
# scene_tag="party"
# # --- Room: ----
# room = np.array([15., 10., 3.5]) 
# rt60 = np.array([.4])
# # --- Receivers: ----
# head_orient = np.array([0, 0])
# head_pos= np.array([room[0]/2, room[1]/2, 1.6]) # Listener coordinates
# ears_pos=hlp.head_2_ku_ears(head_pos,head_orient)
# mics = np.array([ears_pos[0], ears_pos[1], list(head_pos)]) #two positions for the binaural, one for the SH
# # --- Sources: ----
# TARGET1_ANGLE=20
# TARGET2_ANGLE=-20
# src1_pos= hlp.place_on_circle(head_pos,1,TARGET1_ANGLE)
# src2_pos= hlp.place_on_circle(head_pos,1,TARGET2_ANGLE)
# srcs=np.array(src1_pos+src2_pos)
# # --- Audio: ----
# src1_sig_mono, fs_src1 = sf.read(pjoin(speech_path, 'dial2p1.wav'))
# src2_sig_mono, fs_src2 = sf.read(pjoin(speech_path, 'dial2p2.wav'))
# noise_sig_sh, fs_noise = sf.read(pjoin(arte_path,'09_Dinner_party_MOA_31ch/09_Dinner_party_MOA_31ch.wav'))
# chunk_len = 25 #seconds

# --------------- DESIGN RESTAURANT SCENE ----------------
scene_tag="restaurant"
# --- Room: ----
room = np.array([28., 17., 4.2]) 
rt60 = np.array([1.1]) * 0.5
# --- Receivers: ----
head_orient = np.array([0, 0])
head_pos= np.array([room[0]/2, room[1]/2, 1.3]) # Listener coordinates
ears_pos=hlp.head_2_ku_ears(head_pos,head_orient)
mics = np.array([ears_pos[0], ears_pos[1], list(head_pos)]) #two positions for the binaural, one for the SH
# --- Sources: ----
TARGET1_ANGLE=20
TARGET2_ANGLE=-20
src1_pos= hlp.place_on_circle(head_pos,1,TARGET1_ANGLE)
src2_pos= hlp.place_on_circle(head_pos,1,TARGET2_ANGLE)
srcs=np.array(src1_pos+src2_pos)
# --- Audio: ----
src1_sig_mono, fs_src1 = sf.read(pjoin(speech_path, 'dial3p1.wav'))
src2_sig_mono, fs_src2 = sf.read(pjoin(speech_path, 'dial3p2.wav'))
noise_sig_sh, fs_noise = sf.read(pjoin(arte_path,'08_Cafe_2_MOA_31ch/08_Cafe_2_MOA_31ch.wav'))
chunk_len = 25 #seconds


# # --------------- DESIGN MEETING SCENE ----------------
# scene_tag="meeting"
# # --- Room: ----
# room = np.array([5., 2., 2.5]) 
# rt60 = np.array([0.2]) * 0.6
# # --- Receivers: ----
# head_orient = np.array([0, 0]) 
# head_pos= np.array([room[0]/2, room[1]/2, 1.3]) # Listener coordinates
# ears_pos=hlp.head_2_ku_ears(head_pos,head_orient)
# mics = np.array([ears_pos[0], ears_pos[1], list(head_pos)]) #two positions for the binaural, one for the SH
# # --- Sources: ----
# TARGET1_ANGLE=30
# TARGET2_ANGLE=-30
# src1_pos= hlp.place_on_circle(head_pos,1,TARGET1_ANGLE)
# src2_pos= hlp.place_on_circle(head_pos,1,TARGET2_ANGLE)
# srcs=np.array(src1_pos+src2_pos)
# # --- Audio: ----
# src1_sig_mono, fs_src1 = sf.read(pjoin(speech_path, 'dial1p1.wav'))
# src2_sig_mono, fs_src2 = sf.read(pjoin(speech_path, 'dial1p2.wav'))
# noise_sig_sh, fs_noise = sf.read(pjoin(arte_path,'02_Office_MOA_31ch/02_Office_MOA_31ch.wav'))
# chunk_len = 25 #seconds


# -------- Room parameter adjustments - same for all the scenes: ----------
# Compute absorption coefficients for desired rt60 and room dimensions
abs_walls,rt60_true = srs.find_abs_coeffs_from_rt(room, rt60)
# Small correction for sound absorption coefficients:
if sum(rt60_true-rt60>0.05*rt60_true)>0 :
    abs_walls,rt60_true = srs.find_abs_coeffs_from_rt(room, rt60_true + abs(rt60-rt60_true))
# Generally, we simulate up to RT60:
limits = np.minimum(rt60, maxlim)

In [None]:
print(f"{room.shape=} -> Room dimensions in cartesian coordinates. Dimension = (3) [x, y, z].")
print(f"{abs_walls.shape=} -> Wall absorption coefficients per band. Dimension = (nBands, 6).")
print(f"{limits.shape=} -> Maximum echogram computation time per band.  Dimension = (nBands).")
print(f"{srcs.shape=} -> Source position in cartesian coordinates. Dimension = (nSrc, 3) [[x, y, z]].")
print(f"{mics.shape=} -> Receiver position in cartesian coordinates. Dimension = (nRec, 3) [[x, y, z]].")

In [None]:
# --------------- COMPUTE ECHOGRAMS ----------------
# How many reflections, at what time and from which coordinates:
abs_echograms = srs.compute_echograms_sh(room, srcs, mics, abs_walls, limits, ambi_order)

# --------------- RENDER ECHOGRAMS ----------------
# based on echograms, RIRs in spherical harmonics are generated for each defined receiver
# here there are 3 receivers: head center, left ear, right ear
band_centerfreqs=np.array([1000])
rirs_sh = srs.render_rirs_sh(abs_echograms, band_centerfreqs, fs)

In [None]:
# check array dimensions:
print(f"{abs_echograms.shape=} -> Rendered echograms. Dimension = (nSrc, nRec, nBands)")
print(f"{rirs_sh.shape=} -> Rendered RIR in SH. Dimension = (M, maxSH, nRec, nSrc)")

In [None]:
hlp.plot_scene_raw(room,mics,srcs,perspective="xy")

In [None]:
# -------------- SIGNALS IN SPHERICAL HARMONICS -----------------
# Signal in mono is convolved with RIR in SH generated previously 

# make sure both loaded mono files have the same level 
src1_sig_mono=hlp.set_level(src1_sig_mono,-30)
src2_sig_mono=hlp.set_level(src2_sig_mono,-30)

# resample all audio signals:
src1_sig_mono = sig.resample_poly(src1_sig_mono, fs, fs_src1)
src2_sig_mono = sig.resample_poly(src2_sig_mono, fs, fs_src2)
noise_sig_sh = sig.resample_poly(noise_sig_sh, fs, fs_noise)
# crop all audio signals:
src1_sig_mono = src1_sig_mono[:chunk_len*fs] 
src2_sig_mono = src2_sig_mono[:chunk_len*fs]
L_after_convol=(chunk_len*fs)+rirs_sh.shape[0]-1
noise_sig_sh = noise_sig_sh[:L_after_convol] # to make them all equal in SH domain


# ---> Note! Functions mono2sh and mono2biSH expect a 2-dim array with a signal and a
# 4-dim array with RIRs, therefore I use the python slicing - to pick the RIRs for 
# the third receiver (here corresponding with the center of the head), I use 
# rirs_sh[:,:,2:3,:] - this picks the correct elements without changing the size of
# the array. I also change shape of the signal from (D,) to (1,D).

# change shape from (D,) to (1,D) bc this is expected by :
src1_sig_mono=np.array(src1_sig_mono,ndmin=2)
src2_sig_mono=np.array(src2_sig_mono,ndmin=2)

# from mono to spherical harmonics (sh for the head center)
src1_sig_sh = mono2sh(src1_sig_mono, rirs_sh[:, :, 2:3, 0:1]) # rirs_sh -> (M, maxSH, nRec, nSrc)
src2_sig_sh = mono2sh(src2_sig_mono, rirs_sh[:, :, 2:3, 1:2]) # rirs_sh -> (M, maxSH, nRec, nSrc)

# from mono to binaural spherical harmonics (sh for each ear)
src1_sig_sh_bi = mono2biSH(src1_sig_mono, rirs_sh[:, :, 0:2, 0:1])
src2_sig_sh_bi = mono2biSH(src2_sig_mono, rirs_sh[:, :, 0:2, 1:2])

# source signals are generated in 10th order ambisonics (=121 channels) 
# noise files have only 31 channels, so to sum them we pad noise signal 
# with zeros to 121 channels 
zeropads=np.zeros([noise_sig_sh.shape[0],121-31])
noise_sig_sh =np.concatenate((noise_sig_sh,zeropads),axis=1)


# create a binaural version of the noise for the purpose of decoding later
noise_sig_sh_bi=np.tile(noise_sig_sh,(2,1,1))


In [None]:
# check array dimensions:
print(f"{src1_sig_sh.shape=}")
print(f"{src2_sig_sh.shape=}")
print(f"{src1_sig_sh_bi.shape=}")
print(f"{src2_sig_sh_bi.shape=}")
print(f"{noise_sig_sh.shape=}")
print(f"{noise_sig_sh_bi.shape=}")

In [None]:
# --------------- LOAD AVAILABLE DECODERS ----------------

# load bimagls decoders created in matlab
# bimagls takes RIRs in SH of 2 receivers 
decoder_ku_bimag = mat73.loadmat(pjoin('decoders_ord10', 'Ku100_ALFE_Window_sinEQ_bimag.mat'))['hnm']
decoder_ric_bimag = mat73.loadmat(pjoin('decoders_ord10', 'RIC_Front_Omni_ALFE_Window_SinEQ_bimag.mat'))['hnm']

# create a magls decoder with spaudiopy using a sofa file
# magls takes RIRs in SH of 1 receivers 
hrirs = spa.io.load_sofa_hrirs('sofas/RIC_Front_Omni_48000Hz.sofa')
left, rigth, new_fs = spa.process.resample_hrirs(hrirs.left, hrirs.right, hrirs.fs, fs, 8)
hrirs.fs = new_fs
hrirs.update_hrirs(left, rigth)
decoder_oldb_mag = spa.decoder.magls_bin(hrirs, 10)
decoder_oldb_mag=decoder_oldb_mag.T

print(f"{decoder_ku_bimag.shape=}")
print(f"{decoder_ric_bimag.shape=}")
print(f"{decoder_oldb_mag.shape=}")


In [None]:
# --------------- SET SNR AND DECODE FROM SH TO BINAURAL ----------------

# Choose 1 from the loaded decoders:
decoder=decoder_ku_bimag

# Important -> depending on the decoder there is a different input signal and decoding function!!! 
dec_type="ku100bimagls"


if "magls" in dec_type:
    # add speech sources in the SH domain:
    added_sources_sh=src1_sig_sh + src2_sig_sh
    # decode to binaural
    added_sources_bin=sh2bin(added_sources_sh, decoder)
    # compute the initial snr between added sources and noise
    ini_snr = 10 * np.log10(hlp.power(added_sources_bin) / hlp.power(sh2bin(noise_sig_sh, decoder)))
    # scale noise to achieve a desired snr: 
    noise_gain_db = ini_snr - target_snr
    noise_sig_sh_scaled = noise_sig_sh * np.power(10, noise_gain_db/20)
    # check if snr is correct
    snr_check = 10 * np.log10(hlp.power(added_sources_bin) / hlp.power(sh2bin(noise_sig_sh_scaled, decoder)))
    print(f"{snr_check=}")
    # add all signals in the SH domain:
    mixture_sh=src1_sig_sh + src2_sig_sh + noise_sig_sh_scaled
    # decode to binaural
    mixture_bin=sh2bin(mixture_sh, decoder)
    # normalize SH mixture so that the binaural mixture is in [-1, 1] and it doesn't clip
    norm_fact = np.max(np.abs(mixture_bin))
    mixture_sh_norm = mixture_sh/norm_fact
    mixture_bin_norm=sh2bin(mixture_sh_norm, decoder)
elif "bimagls" in dec_type:
    # add speech sources in the SH domain:
    added_sources_sh=src1_sig_sh_bi + src2_sig_sh_bi
    # decode to binaural
    added_sources_bin=biSH2bin(added_sources_sh, decoder)
    # compute the initial snr between added sources and noise
    ini_snr = 10 * np.log10(hlp.power(added_sources_bin) / hlp.power(biSH2bin(noise_sig_sh_bi, decoder)))
    # scale noise to achieve a desired snr: 
    noise_gain_db = ini_snr - target_snr
    noise_sig_sh_bi_scaled = noise_sig_sh_bi * np.power(10, noise_gain_db/20)
    # check if snr is correct
    snr_check = 10 * np.log10(hlp.power(added_sources_bin) / hlp.power(biSH2bin(noise_sig_sh_bi_scaled, decoder)))
    print(f"{snr_check=}")
    # add all signals in the SH domain:
    mixture_sh_bi=src1_sig_sh_bi + src2_sig_sh_bi + noise_sig_sh_bi_scaled
    # decode to binaural
    mixture_bin=biSH2bin(mixture_sh_bi, decoder)
    # normalize SH mixture so that the binaural mixture is in [-1, 1] and it doesn't clip
    norm_fact = np.max(np.abs(mixture_bin))
    mixture_sh_bi_norm = mixture_sh_bi/norm_fact
    mixture_bin_norm=biSH2bin(mixture_sh_bi_norm, decoder)



In [None]:
Audio(added_sources_bin, rate=fs)

In [None]:
Audio(mixture_bin, rate=fs)

In [None]:
sf.write(pjoin(output_path,f"dialog_{scene_tag}_bin_snr{int(target_snr)}_mixture_dec_{dec_type}_{fs}hz.wav"), mixture_bin.T, fs, subtype='FLOAT')
sf.write(pjoin(output_path,f"dialog_{scene_tag}_bin_snr{int(target_snr)}_speech_dec_{dec_type}_{fs}hz.wav"), added_sources_bin.T, fs, subtype='FLOAT')
