In [None]:
import sys
sys.path.append('..')

In [None]:
import numpy as np
from rtmha.elevenband import elevenband_taps_min
import plotly.graph_objects as go

In [None]:
# Band 0 taps from 11-band multirate filter
taps = elevenband_taps_min[0]

## Plotting the Predicted Response

In [None]:
# this is the band 0 filter, so the rate is downsampled to 1/16
sample_rate = 32000 
down_rate=sample_rate / 16 
nyq_rate=down_rate / 2 

In [None]:
# Use scipy freqz to compute the expected performance
from scipy.signal import freqz
w, h = freqz(taps, worN=8000)

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(x=(w/np.pi)*nyq_rate,
                y=np.abs(h),
                opacity=.5,
                mode='lines'))

fig.update_layout(title='Frequency Response',
                xaxis_title='Frequency (Hz)',
                yaxis_title='Gain',
                template='plotly_dark')
fig.show()

In [None]:
def generate_sine_waves(freq_list, duration=1, sample_rate=32000):
    """Generates a signal with multiple sine waves

    Args:
        freq_list: List of frequencies
        duration: signal length in seconds (default 1)
        sample_rate: sample rate in Hz (default 32000)

    Returns:
        (t, y): t is time. y is value in range [-1,1]
    """
    x = np.linspace(0, duration, int(sample_rate * duration), endpoint=False)
    y = np.zeros(len(x))
    for freq in freq_list:
        frequencies = x * freq
        y += np.sin((2 * np.pi) * frequencies)
    y /= len(freq_list)   # normalize
    return x, y

In [None]:
def plot_fft(title, output):

    ftrans = np.fft.fft(output)/len(output)
    outlen = len(output)
    values = np.arange(int(outlen/2))
    period = 16*outlen/32000
    frequencies = values/period

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=frequencies, 
                  y=np.abs(ftrans)))

    fig.update_layout(title=title,
                    xaxis_title='Frequency',
                    yaxis_title='Amplitude',
                    template='plotly_dark')
    fig.show()

In [None]:
x, y = generate_sine_waves(np.linspace(50,500,10), sample_rate=down_rate)

In [None]:
from rtmha.filter import FirFilter

In [None]:
f = FirFilter(taps, len(y))
res = f.filter(y)
res.shape

In [None]:
plot_fft('FFT', y)
plot_fft('FFT', res)

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(x=x,
                y=y,
                opacity=.5,
                mode='lines',
                name='input',))

fig.add_trace(go.Scatter(x=x,
                y=res,
                opacity=.5,
                mode='lines',
                name='output'))

fig.update_layout(title='Filtered Output',
                xaxis_title='Time(ms)',
                yaxis_title='Amplitude',
                template='plotly_dark')
fig.show()

In [None]:
x, y = generate_sine_waves(np.linspace(50,500,10), duration=1000, sample_rate=down_rate)
f = FirFilter(taps, len(y))

In [None]:
%%timeit
res = f.filter(y)

In [None]:
# check if float32 is faster
y = y.astype(np.float32)
y.dtype

In [None]:
%%timeit
res = f.filter(y)

In [None]:
# now try a short array
y = np.ones(32)

In [None]:
%%timeit
res = f.filter(y)

In [None]:
# check if float32 is faster
y = y.astype(np.float32)
y.dtype

In [None]:
%%timeit
res = f.filter(y)

## Testing Splitting the Data into Small Packets

In [None]:
x, y = generate_sine_waves(np.linspace(50,500,10), sample_rate=down_rate)
f = FirFilter(taps, len(y))
res = f.filter(y)

In [None]:
f = FirFilter(taps, 32)
res2 = np.empty((0),dtype=np.float32)

for start in range(0,len(y), 10):
    res2 = np.append(res2, f.filter(y[start:start+10]))

In [None]:
np.all(res == res2)