In [5]:
import pandas as pd
import numpy as np

import torchaudio
import torch

from torch.fft import rfft as fft
from torch.fft import irfft as ifft

from tqdm.auto import tqdm
import soundfile as sf

import IPython

import collections
import os

from filters.frequency_domain import mono_FDAF

In [6]:
path = './data/midi_noise_cut'

In [7]:
lines = np.loadtxt(os.path.join(path, "dir_list.txt"), comments="#", unpack=False, dtype=str)

In [8]:
dirs = collections.defaultdict(list)
for line in lines:
    l = []
    for (dirpath, dirnames, filenames) in os.walk(os.path.join(path, line)):
        l.append(filenames)
    dirs[line] = l

In [9]:
dirs[list(dirs.keys())[15]][0]

['fdaf.wav',
 'meta.json',
 'nlms.wav',
 'raw_mic_0.wav',
 'raw_mic_1.wav',
 'raw_mic_2.wav',
 'raw_mic_3.wav',
 'raw_spk_0.wav',
 'raw_spk_1.wav',
 'vqe_0.wav']

In [10]:
def get_names(ind, path):
    input_name = dirs[list(dirs.keys())[ind]][0][-7]
    response_name = dirs[list(dirs.keys())[ind]][0][-2]
    dir_name = list(dirs.keys())[ind]
    return os.path.join(os.path.join(path, dir_name), input_name),\
os.path.join(os.path.join(path, dir_name), response_name), os.path.join(os.path.join(path, dir_name))

In [None]:
for i in tqdm(range(len(dirs))):
    input_seq, response_seq, save_path = get_names(i, path)
    x, x_sr = torchaudio.load(input_seq)
    d, d_sr = torchaudio.load(response_seq)
    x, d = x.squeeze(), d.squeeze()

    y, e = single_FDAF(x, d, 256, 0.1)
    audio_path = os.path.join(save_path,'fdaf.wav')
    sf.write(audio_path, y, x_sr, subtype='PCM_16')
    #IPython.display.Audio(audio_path, rate=x_sr)

In [76]:
def mono_FDKalman(x, d, M, ff=0.95, delta=1e-2, gamma=1e-6):
    n_blocks = x.shape[-1]//M
    hann_window = torch.hann_window(M)
    w_fft = torch.zeros(M+1)
    x_prev = torch.zeros(M)
    
    Q = delta
    R = torch.full([M+1],delta)
    P = torch.full([M+1],gamma)

    y = torch.zeros(n_blocks*M)
    
    for k in tqdm(range(n_blocks)):
        x_cur = torch.cat((x_prev, x[k * M:(k + 1) * M]))
        
        d_t = d[k * M:(k + 1) * M]
        x_prev = x[k * M:(k + 1) * M]
        
        x_fft = fft(x_cur)
        
        y_t = ifft(w_fft * x_fft)[M:]
        
        e_t = d_t-y_t
        
        e_fft = fft(torch.cat((torch.zeros(M), e_t * hann_window)))
        
        
        
        
        R = ff * R + (1.0 - ff) * (torch.abs(e_fft) ** 2)
        P_t = P + Q*torch.abs(w_fft)
        K = P_t*x_fft.conj()/((x_fft*P_t*x_fft.conj())+R)
       
        P = (1.0 - K*x_fft) * P_t
        
        
        w_fft = w_fft + (K * e_fft)
        w = ifft(w_fft)
        w[M:] = 0
        w_fft = fft(w)
        
        y[k * M:(k + 1) * M] = y_t
        
    return y

In [77]:
input_seq, response_seq, save_path = get_names(0, path)
x, x_sr = torchaudio.load(input_seq)
d, d_sr = torchaudio.load(response_seq)
x, d = x.squeeze(), d.squeeze()
    
y = mono_FDKalman(x, d, 256)


  0%|          | 0/802 [00:00<?, ?it/s]

In [79]:
audio_path = os.path.join(save_path,'nlms.wav')
sf.write(audio_path, x, x_sr, subtype='PCM_16')
IPython.display.Audio(audio_path, rate=x_sr)