WARNING - Notebook takes a very long time to run! Will override the existing saved compressed data. 

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import wiener_sampling as ws

In [None]:
# functions

def Snn_trial(w, N0=1e-2, A=5, b=1.8):
    return N0 + A / (np.abs(w)**b) 

a_b_trial = (0, 0.9)

def Sxx_trial(w, peak_amp=0.9, f_c=10.0, f_halfwidth=1.0):
    wc = 2*np.pi*f_c
    gamma = 2*np.pi*f_halfwidth
    peak = peak_amp * (gamma**2) / (((np.abs(w) - wc)**2) + gamma**2)
    return peak

In [3]:
def Sxx(w):
    return Sxx_trial(w)
def Snn(w):
    return Snn_trial(w)
def Sxy(w):
    return Sxx(w)
def Syy(w): 
    return Sxx(w) + Snn(w)

# preconditioning values
alpha, beta = a_b_trial

def F_factor(w):
    wc = np.asarray(w, dtype=np.complex128)
    # return np.exp(-np.pi*1j * alpha/2) * (1j + wc)**(alpha - beta) * (wc**beta)
    return np.exp(1j * alpha/2) * (wc**beta) * (1j + wc)**(alpha - beta)

In [4]:
# Generate data to construct filter
seed = 12345
rng = np.random.default_rng(seed)
fs = 512
Nt = 16384*50
w_master = 2*np.pi*np.fft.fftfreq(Nt, d=1.0/fs)  
w = np.sort(w_master) 
Gamma = 0.1

In [5]:
x, n, y = ws.generate_xy(Sxx, Snn, fs, Nt, margin_sec=3.0, rng=rng)
Syy_est = ws.welch_on_grid(x, y, fs, Gamma, w_master, Nt)
# generate data to test filter
x, n, y = ws.generate_xy(Sxx, Snn, fs, Nt, margin_sec=3.0, rng=np.random.default_rng(123))

  return N0 + A / (np.abs(w)**b)


In [6]:
# Use preconditioning
F_FACTOR = True
# Use eigenabsis of 100, 10.000 points for FFT integral. 
K, N = 100, 10000
tan_freq = 5

# Compute filter H_est
if F_FACTOR:
    H_est  = F_factor(w)*ws.eigen_solve(K, N, 
                                        lambda w: Sxy(w)*np.conj(F_factor(w)), 
                                        lambda w: Syy_est(w)*np.abs(F_factor(w))**2, w, tan_freq)
    H_no_sampling = F_factor(w)*ws.eigen_solve(K, N, 
                                        lambda w: Sxy(w)*np.conj(F_factor(w)), 
                                        lambda w: Syy(w)*np.abs(F_factor(w))**2, w, tan_freq)
    H_no_preconditioning = ws.eigen_solve(K, N, Sxy, Syy, w, tan_freq)
    
else:
    H_est  = ws.eigen_solve(K, N, Sxy, Syy_est, w)

# non causal solution (fully analytic)
H_non_causal = Sxx(w)/(Sxx(w)+Snn(w))

# compute x_hat, x_hat_non_causal
Y = np.fft.fft(y, n=w.size)
x_hat      = np.fft.ifft(np.fft.ifftshift(H_est) *Y, n=w_master.size).real
x_hat_non_causal = np.fft.ifft(np.fft.ifftshift(H_non_causal)*Y, n=w_master.size).real
t = np.arange(Nt)/fs

  return N0 + A / (np.abs(w)**b)


In [7]:
# generate sample and test data for fully causal real-time filter (tested on same data as used to generate)
x_sample, n_sample, y_sample = ws.generate_xy(Sxx, Snn, fs, Nt, margin_sec=3.0, rng=np.random.default_rng(123))
Syy_est = ws.welch_on_grid(x_sample, y_sample, fs, Gamma, w_master, Nt)
x_test, n_test, y_test = ws.generate_xy(Sxx, Snn, fs, Nt, margin_sec=3.0, rng=np.random.default_rng(123))

  return N0 + A / (np.abs(w)**b)


In [None]:
# Generation of real-time filter. Calculates new Syy and new filter h(w) every second of data using all previous data. Applies it to the interval of one second. Saves to a file. 
seconds = Nt // fs

nfft = w.size  

xhat_blocks = []

Syy_snaps = []  
H_snaps   = []  

for sec in range(seconds):
    # if sec % 20 == 0: 
    #     print(sec)
    a = sec * fs
    b = a + fs

    end_est = b
    Syy_est = ws.welch_on_grid(x_sample[:end_est], y_sample[:end_est],
                               fs, Gamma, w_master, end_est)

    H_est = F_factor(w) * ws.eigen_solve(
        K, N,
        lambda ww: Sxy(ww) * np.conj(F_factor(ww)),
        lambda ww: Syy_est(ww) * np.abs(F_factor(ww))**2,
        w, tan_freq
    )

    pad = fs  # 1 second of extra data on each side to avoid circular artifacts
    a_ctx = max(0, a - pad)
    b_ctx = min(len(y_test), b + pad)

    y_ctx = y_test[a_ctx:b_ctx]

    Y_ctx = np.fft.fft(y_ctx, n=nfft)
    Xhat_ctx_full = np.fft.ifft(np.fft.ifftshift(H_est) * Y_ctx, n=nfft).real

    start_in_ctx = a - a_ctx
    xhat_blocks.append(Xhat_ctx_full[start_in_ctx:start_in_ctx + fs])

    # remember snapshots
    if sec % 50 == 0:
        Syy_snaps.append(Syy_est(w)) 
        H_snaps.append(H_est.copy())

# full output of real-time filter
x_hat_stream = np.concatenate(xhat_blocks)
t = np.arange(x_hat_stream.size) / fs

Syy_arr = np.vstack(Syy_snaps)   
H_arr   = np.vstack(H_snaps)

# zoom in on 150s (in order to not save the whole of Syy, H arrays)
a = 150
b = a + 1
m = (t > a) & (t < b)
H_causal = H_arr[a//50]
Syy_causal = Syy_arr[a//50]



In [None]:
# save everything in one compressed file
np.savez_compressed(
    "filter_snaps.npz",
    Syy=Syy_causal,
    H=H_causal,
    w=w,
    t=t,
    xhat=x_hat_stream,
    x=x, 
    x_hat=x_hat,
    y=y,
    H_est=H_est,
    H_no_sampling = H_no_sampling,
    H_no_preconditioning = H_no_preconditioning,
    H_non_causal = H_non_causal
    )

In [None]:
# supplementary graphs data generation. Very long runtime, extrenely high memory utilisation. Saves to a file. 

def h_grid(w, Sxy, Syy, step, n_extra=10):
    o = np.argmin(np.abs(w))
    dw = np.abs(w[1] - w[0])
    step_dw = step * dw
    # build core slices (your original logic)
    w_left_core = w[o - step//2::-step][::-1] 
    w_right_core = w[o + step//2::step]    

    # extend beyond the original array
    left_extra  = w_left_core[0] - step_dw * np.arange(n_extra, 0, -1)
    right_extra = w_right_core[-1] + step_dw * np.arange(1, n_extra + 1)

    w_left  = np.concatenate([left_extra,  w_left_core])
    w_right = np.concatenate([w_right_core, right_extra])
    w_sub = np.concatenate([w_left, w_right])
    h_grid_solve = ws.grid_solve(w_sub, Sxy, Syy)

    start = left_extra.size
    end   = start + w_left_core.size + w_right_core.size
    w_sub = w_sub[start:end]
    h_grid_solve = h_grid_solve[start:end]
    return w_sub, h_grid_solve

w_sub, h_grid_solve = h_grid(w, Sxy, Syy, 20, 2000)
w_sub_conditioned, h_grid_conditioned = h_grid(w, lambda w: Sxy(w)*np.conj(F_factor(w)), lambda w: Syy(w)*np.abs(F_factor(w))**2, 20, 2000)
h_grid_conditioned = h_grid_conditioned * F_factor(w_sub_conditioned)



  H = dw/np.pi * np.where(diff == 0.0, 0.0, 1.0/diff)


In [9]:
np.savez_compressed(
    "filter_supplementary.npz",
    w_sub = w_sub,
    h_grid_solve = h_grid_solve,
    w_sub_conditioned = w_sub_conditioned,
    h_grid_conditioned = h_grid_conditioned
    )