In [None]:
import numpy as np

from simul.vis.signals import get_vis_df, vis_signals, vis_signals2d, get_fft_df


In [None]:
# Freq - 1Hz, sample rate - 1ms
sample_rate_full = 1000  #  Samples per s
sin_freq = 1  # Hz
sin_omega = (2*np.pi)*sin_freq
sin_ts = np.arange(0, 20, 1/sample_rate_full)

signal_vals_full = np.exp(1j * sin_omega * sin_ts)

df = get_vis_df(
    sin_ts,
    np.array([signal_vals_full]),
    np.array([signal_vals_full]),
#     *interp_signals,
#     n=20000,
    freqs=[0],
)
vis_signals2d(df)

In [None]:
def prune_normal(signal, rate=0.05, std=0.1):
#     ratio: 0.1
    res = np.full(signal.shape, np.NaN, dtype=complex)
    grid_ids = np.arange(0, signal.shape[0], 1/rate)
    grid_deltas = np.random.normal(0, std, grid_ids.shape)
    grid_ids += grid_deltas
    grid_ids = grid_ids[(grid_ids < signal.shape[0]-1)&((grid_ids > 0))]
#     print(np.rint(grid_ids).astype(int))
    res[np.rint(grid_ids).astype(int)] = signal[np.rint(grid_ids).astype(int)]
    return res

def prune_uniform(signal, interval, spread, sample_rate=1000):
    res = np.full(signal.shape, np.NaN, dtype=complex)
    
    grid_ids = np.arange(0, signal.shape[0], interval*sample_rate)

    grid_deltas = np.random.uniform(-sample_rate*spread/2, sample_rate*spread/2,
                                    grid_ids.shape)
    grid_ids += grid_deltas
    grid_ids = grid_ids[(grid_ids < signal.shape[0]-1)&((grid_ids > 0))]

    res[np.rint(grid_ids).astype(int)] = signal[np.rint(grid_ids).astype(int)]
    return res


In [None]:

# signal_vals_pruned = prune_normal(signal_vals_full, rate = 0.1, std=2)
signal_vals_pruned = prune_uniform(signal_vals_full, interval = 0.1, spread=0.1)
signal_vals_zero_filled = np.nan_to_num(signal_vals_pruned, copy=True, nan=0)

df = get_vis_df(
    sin_ts,
    np.array([signal_vals_full]),
    np.array([signal_vals_pruned]),
#     np.array([signal_vals_zero_filled]),
#     (np.array([signal_vals_zero_filled]), "zero_filled", ),
#     *interp_signals,
#     n=20000,
    freqs=[0],
)
vis_signals2d(df)

In [None]:
m = np.nanmean(signal_vals_pruned)
a = np.array(signal_vals_pruned)
# np.nan_to_num(np.array(signal_vals_pruned, dtype=np.complex128), nan=m)
a[np.isnan(a)] = m
a

In [None]:
from scipy import signal
import itertools

def interp_prev(vals_pruned):
    last_val = 0j
    for t_i in range(vals_pruned.shape[0]):
        if np.isnan(vals_pruned[t_i]):
            vals_pruned[t_i] = last_val
        else:
            last_val = vals_pruned[t_i]
    return vals_pruned


def iir_lowpass_interp(vals_pruned, order = 5, ftype="butter", rp=5, rs=40, doublepass=True, sr_max=True):
#     if(len(vals_pruned.shape) == 1):
#         signal_vals = np.array([np.copy(vals_pruned)])
#     else:
    signal_vals = np.copy(vals_pruned)

    t_rate = 2
    if(sr_max):
        sample_rate = 1 / max([len(list(g)) for k, g in itertools.groupby(signal_vals,
                                                        lambda x: np.isnan(x)) if k])
    else:
        sample_rate = np.count_nonzero(~np.isnan(signal_vals))/signal_vals.shape[0]
    cutoff = sample_rate/2    
#     vals_zero_filled = np.nan_to_num(signal_vals, copy=True, nan=0)
    vals_filled = interp_prev(signal_vals)
    
    if(ftype == "zero"):
        vals_interp = vals_zero_filled
    elif(ftype == "prev"):
        vals_interp = interp_prev(signal_vals)
    else:
        if(ftype=="butter"):
            b, a = signal.butter(order, cutoff)
        elif(ftype == "ellip"):
            b, a = signal.ellip(order, rp, rs, cutoff)
        elif (ftype == "cheby1"):
            b, a = signal.cheby1(order, rp, cutoff)
        elif (ftype == "cheby2"):
            b, a = signal.cheby2(order, rs, cutoff)
        vals_interp = signal.filtfilt(
            b, a, vals_filled) if doublepass else signal.lfilter(b, a, vals_filled)
    norm_coeff = np.nanmean(np.abs(signal_vals))/np.mean(np.abs(vals_interp))
#     print(f"{ftype}: f_niq={cutoff*1000:2.3}, k={norm_coeff:4.4}")
    return np.array([vals_interp*norm_coeff])


def windowed_interp(vals_pruned, w_size=1500, w_prev=500, w_next=50, **kwargs):
    if(len(vals_pruned.shape) == 1):
        signal_vals = np.array([vals_pruned])
    else:
        signal_vals = vals_pruned
    print(signal_vals.shape)
    res = np.zeros(signal_vals.shape, dtype=np.complex128)
    for freq_i in range(signal_vals.shape[0]):
        w_i = 0
        for w_i in range(w_prev, signal_vals.shape[1] - w_next - w_size, w_size):
#             print(f"{w_i - w_prev}:{w_i + w_size + w_next}; {w_i}:{w_i + w_size}")
#             res[freq_i,w_i:w_i + w_size] = signal_vals[freq_i, w_i:w_i + w_size]
            res[freq_i,w_i:w_i + w_size] = iir_lowpass_interp(signal_vals[
                                            freq_i, w_i-w_prev:w_i + w_size+w_next],
                                        **kwargs)[0,w_prev:-w_next]
        w_i += w_size
#         print(f"{w_i - w_prev}; {w_i}:")
#         res[freq_i, w_i:] = signal_vals[freq_i, w_i:]
        res[freq_i, w_i:] = iir_lowpass_interp(signal_vals
                                [freq_i, w_i-w_prev:], **kwargs)[0, w_prev:]
    return res

# test_vals = np.arange(20) + 1
# windowed_interp(signal_vals_pruned, ftype="prev")
# print(test_vals)
# interp(test_vals, w_size = 4, w_prev=2, w_next=1)
vis_signals2d(
    get_vis_df(
        np.arange(signal_vals_full.shape[0]),
        np.array([signal_vals_full]),
        np.array([signal_vals_pruned]),
        (windowed_interp(signal_vals_pruned, w_prev=1000, w_size=5000, ftype="prev"), "prev_w"),
        (windowed_interp(signal_vals_pruned, w_prev=1000, w_size=5000, ftype="butter"), "butter_w"),
        #         n=20000,
        freqs=[0],
    ))

In [None]:
vis_signals2d(
    get_vis_df(
        np.arange(signal_vals_full.shape[0]),
        np.array([signal_vals_full]),
        np.array([signal_vals_pruned]),
        (windowed_interp(signal_vals_pruned, w_prev=1000, w_size=5000, ftype="prev"), "prev_w"),
        (windowed_interp(signal_vals_pruned, w_prev=500, w_size=1322, w_next=500, ftype="butter"), "butter_w"),
#         (windowed_interp(signal_vals_pruned, w_prev=100, w_size=1000, ftype="butter"), "butter_2"),
        #         n=20000,
        freqs=[0],
    ))

In [None]:
import plotly.express as px
import matplotlib.pyplot as plt

def show_filter_reponse(b, a, cutoff):
    w, h = signal.freqs(b, a)
    px.line(x=w, y=np.abs(h), log_x=True).add_vline(x=cutoff).show()

cutoff = 10
# b, a = signal.iirfilter(10, cutoff, btype='low', analog=True, ftype="butter")
# b, a = signal.iirfilter(10, cutoff, btype='low', analog=True, ftype="ellip", rp=5, rs=40)
# show_filter_reponse(*signal.iirfilter(5, cutoff, btype='low', analog=True, ftype="butter"), cutoff)
show_filter_reponse(*signal.butter(5, cutoff, analog=True), cutoff)
show_filter_reponse(*signal.ellip(5, 5, 40, cutoff, analog=True), cutoff)
show_filter_reponse(*signal.cheby1(5, 5, cutoff, analog=True), cutoff)
show_filter_reponse(*signal.cheby2(5, 40, cutoff, analog=True), cutoff)

In [None]:

# show_filter_reponse(np.sinc(np.arange(1, 10, 1)), np.array([1]+[0]*9), cutoff)
# D = 5;
# f0 = 1e3;
# fm = 40e3;
# t = 0:1:2*fm*D-1;
# x = cos(2*pi*t*f0/fm);
# h = sinc(-D*pi:1/10:D*pi);
t = np.arange(0, 5*np.pi, 0.1)
show_filter_reponse(np.sinc(t), np.array([1]), cutoff)


In [None]:
import plotly.io as pio
import plotly.graph_objects as go

def plot_signal(x, t):
    return go.Figure([go.Scatter(y=x, x=t, mode="lines+markers")
                      ]).update_layout(
        xaxis_title="Time, s",
        yaxis_title="Signal",
    )


def fft(vals, ts):
    sample_rate = ts.shape[-1]/(ts[-1] - ts[0])
    n = int(vals.shape[-1] * 13.74)
    return np.abs(np.fft.fft(vals, n=n)), np.fft.fftfreq(n)*sample_rate

def scatter_freqs(vals, ts, f_max=10, name=""):
    x, f = fft(vals, ts)
    f_idxs = np.abs(f) < f_max
    return go.Scatter(y=x[f_idxs], x=f[f_idxs], mode="lines", name=name)

def plot_freqs(vals, ts, f_max=10):
    x, f = fft(vals, ts)
    f_idxs = np.abs(f) < f_max
    return go.Figure(go.Scatter(y=x[f_idxs], x=f[f_idxs], mode="lines")).update_layout(
        xaxis_title="Frequency, Hz",
        yaxis_title="Strength",
    )



In [None]:
def run_test(signal_vals, interval, spread, order=5, cut_edges=False, sr_max=True):
#     signals_vals_pruned = prune_normal(signal_vals, rate, std)
    np.random.seed(1)
    if(sr_max):
        f_niq = 0.5/(interval+spread)
    else:
        f_niq = 0.5/(interval)
        
    if(cut_edges):
        edges = slice(int(signal_vals.shape[0]*0.1), int(signal_vals.shape[0]*0.9))
    else:
        edges = slice(0, -1)
    signal_vals_pruned = prune_uniform(signal_vals, interval, spread)
    default_params = {"order":order, "rp":0.1, "rs":40, "sr_max":sr_max}
    interp_signals = [
        (iir_lowpass_interp(signal_vals_pruned, ftype="butter", **default_params), "Butter", ),
        (windowed_interp(signal_vals_pruned, ftype="butter", **default_params), "windowed", ),
#         (iir_lowpass_interp(signal_vals_pruned, ftype="ellip", **default_params), "Ellip", ),
#         (iir_lowpass_interp(signal_vals_pruned, ftype="cheby1", **default_params,), "cheby1", ),
#         (iir_lowpass_interp(signal_vals_pruned, ftype="cheby2", **default_params), "cheby2", ),
#         (iir_lowpass_interp(signal_vals_pruned, ftype="prev", **default_params,), "prev", ),
#         (iir_lowpass_interp(signal_vals_pruned, ftype="zero", **default_params,), "zero", ),
                     ]

    freq_plot = plot_freqs(signal_vals, sin_ts, f_niq*4);
    for interp, name in interp_signals:
        freq_plot.add_trace(scatter_freqs(interp[0][edges],sin_ts, f_niq*4, name=name))
        
    freq_plot.add_vline(x=f_niq, annotation_text=f"f_niq:{f_niq:.2}", line_dash="dot", line_color="red")
    freq_plot.add_vline(x=1/(interval), annotation_text=f"mirror:{1/(interval)}", line_dash="dash", line_color="red")

    freq_plot.show()

    return vis_signals2d(
        get_vis_df(
            sin_ts,
            np.array([signal_vals]),
            np.array([signal_vals_pruned]),
            *interp_signals,
    #         n=20000,
            freqs=[0],
    ))


run_test(signal_vals_full, 0.1, 0.02, 5)


In [None]:
run_test(signal_vals_full, 0.05, 0.4, 5)
# run_test(signal_vals_full, 0.05, 0.4, 5, sr_max=False)


In [None]:
run_test(signal_vals_full, 0.3, 0.2, 5, sr_max=True)
run_test(signal_vals_full, 0.3, 0.2, 5, sr_max=False)


In [None]:
run_test(signal_vals_full, 1.1, 0.05, 5, True)
run_test(signal_vals_full, 1.1, 0.05, 5, False)


In [None]:
run_test(signal_vals_full, 0.33, 0.0, 5, True)


In [None]:
from distance_determination import estimate_dist, simulate_signals
from run_experiment import experiments
from simul.vis.signals import get_vis_df, vis_signals, vis_signals2d, get_fft_df

# import plotly.io as pio
# pio.renderers.default = "notebook_connected"
# exp_start = 1000
# exp_size = 10000
# exp_start = 0
# exp_size = -1

exp_name = "default_full"
# exp_name = "full_no_walls"
params = experiments[exp_name]
params.vel = 3.6
# params.vel = 1
dist, signals_data = simulate_signals(params)
# signals_data = signals_data_full[:, exp_start:exp_start+exp_size]
exp_name = "default_random"
# exp_name = "no_walls_random"
params = experiments[exp_name]
params.vel = 3.6
# params.vel = 1
dist, signals_data_pruned = simulate_signals(params)
# signals_data_pruned = signals_data_pruned_full[:, exp_start:exp_start+exp_size]

# exp_name = "default"
# # exp_name = "no_walls"
# params = experiments[exp_name]
# dist, signals_data_pruned_reg_full = simulate_signals(params)
# signals_data_pruned_reg = signals_data_pruned_reg_full[:, exp_start:exp_start+exp_size]
# params.tss[10000]

In [None]:
df = get_vis_df(
    params.tss,
    signals_data,
    signals_data_pruned,
#     (signals_data_pruned_reg,"reg"),
#     *interp_signals,
#     n=20000,
    freqs=[0],
)
vis_signals2d(df)
plot_freqs(signals_data_full[0,:], params.tss, 30).show()
# plot_freqs(signals_data_full[:,0], np.array(range(0, 40)), 30).show()


In [None]:

# interp = iir_lowpass_interp_fill_means(signals_data_pruned[0,:], order=5)

interp_signals = [
        (iir_lowpass_interp(signals_data_pruned[0,:], ftype="butter", sr_max=False), "butter", ),
        (iir_lowpass_interp(signals_data_pruned[0,:], ftype="ellip", sr_max=False, rp=0.1, rs=100), "ellip", ),
        (iir_lowpass_interp(signals_data_pruned[0,:], ftype="cheby1", sr_max=False, rp=0.1, rs=100), "cheby1", ),
        (iir_lowpass_interp(signals_data_pruned[0,:], ftype="cheby2", sr_max=False, rs=40), "cheby2", ),
        (iir_lowpass_interp(signals_data_pruned[0,:], ftype="prev"), "prev", ),

#         (iir_lowpass_interp(signals_data_pruned[0,:], ftype="zero"), "zero", ),

                     ]
edges = slice(0,-1)

freq_plot = plot_freqs(signals_data[0,:], params.tss, 40);
for interp, name in interp_signals:
    freq_plot.add_trace(scatter_freqs(interp[0][edges], params.tss, 40, name=name))
    
freq_plot.add_vline(x=12.6, annotation_text=f"12.1", line_dash="dot", line_color="red")
freq_plot.add_vline(x=12.6*2, annotation_text=f"12.1", line_dash="dot", line_color="red")

freq_plot.show()

vis_signals2d(
    get_vis_df(
        params.tss,
        signals_data,
        signals_data_pruned,
        *interp_signals,
#         n=20000,
        freqs=[0],
))

In [None]:
freq_plot = plot_freqs(signal_vals, sin_ts, f_niq*4);
for interp, name in interp_signals:
    freq_plot.add_trace(scatter_freqs(interp[0][edges],sin_ts, f_niq*4, name=name))

freq_plot.add_vline(x=f_niq, annotation_text=f"f_niq:{f_niq:.2}", line_dash="dot", line_color="red")
freq_plot.add_vline(x=1/(interval), annotation_text=f"mirror:{1/(interval)}", line_dash="dash", line_color="red")

freq_plot.show()

In [None]:
# def
def test_dist_prediction(signal_data, params):
    estimated_dists = estimate_dist(signal_data, params)
    # real_dists = dist[exp_start:exp_start+exp_size:8, 0]
    real_dists = np.array([*range(exp_start,signal_data.shape[1],8)])

    dist_ideal = np.arange(0, 20, 0.02)
    px.imshow(np.log(estimated_dists.T + 1), aspect="auto", y=dist_ideal, x=real_dists).show()


test_dist_prediction(signals_data, params)


In [None]:
test_dist_prediction(
    windowed_interp(signals_data_pruned, w_size=3000, w_prev=1000, w_next=100,
                    ftype="
                    ", sr_max=False, doublepass=False, rp=0.1, rs=100, order=5), 
    params)
test_dist_prediction(
    windowed_interp(signals_data_pruned, w_size=3000, w_prev=1000, w_next=100,
                    ftype="cheby1", sr_max=False, doublepass=True, rp=0.1, rs=100, order=5), 
    params)

In [None]:
t = 20000
# print(real_dists[t])
freq_plot = plot_freqs(signals_data[:, t], np.array(range(40)), 40);
freq_plot.add_trace(scatter_freqs(interp_signals_f[:,t], np.array(range(40)), 40, name="butter"))

freq_plot.show()

In [None]:
x = 