# fftboost: Ensemble-of-Experts Boosting (EEB) — Visual Demo

This Colab-ready notebook demonstrates the full EEB pipeline:
- Deterministic training with Huber loss + early stopping
- Ensemble of experts (fft_bin + sk_band)
- Explainable descriptors (bin and band ranges)
- Visualizations: signal, spectrogram, selected bins/bands, stage contributions
- Deterministic artifact save/load with stable SHA256


In [1]:
# ruff: noqa: I001
# If running on Colab or clean environment, uncomment to install from GitHub
!pip install git+https://github.com/pinballsurgeon/fftboost.git --quiet
import importlib
import matplotlib.pyplot as plt
import numpy as np

from fftboost import FFTBoost, BoosterConfig

io_mod = None
for module_name in ('fftboost.io', 'src.fftboost.io'):
    try:
        io_mod = importlib.import_module(module_name)
        break
    except ModuleNotFoundError:
        continue
if io_mod is None:
    raise ImportError('Unable to import load_model')
load_model = io_mod.load_model


ImportError: cannot import name 'BoosterConfig' from 'fftboost' (c:\Users\pinba\code\fftboost\.venv\Lib\site-packages\fftboost\__init__.py)

## 1) Generate a synthetic signal (AM 50 Hz tone + noise)

In [None]:
rng = np.random.default_rng(123)
fs = 1000.0
duration_s = 10.0
n = int(fs * duration_s)
t = np.arange(n) / fs
env = 0.7 + 0.3 * np.sin(2 * np.pi * 0.25 * t)
x = env * np.sin(2 * np.pi * 50.0 * t) + 0.4 * rng.standard_normal(n)

win_s, hop_s = 0.5, 0.25
win_len, hop = int(win_s * fs), int(hop_s * fs)
y_true_signal = env * np.sin(2 * np.pi * 50.0 * t)
n_win = (n - win_len) // hop + 1
windows = np.lib.stride_tricks.as_strided(
    y_true_signal,
    shape=(n_win, win_len),
    strides=(y_true_signal.strides[0] * hop, y_true_signal.strides[0]),
)
y = np.sqrt(np.mean(windows ** 2, axis=1)).astype(np.float64)
x = x.astype(np.float64)

fig, ax = plt.subplots(1, 1, figsize=(10, 3))
ax.plot(t[:5000], x[:5000], lw=0.8)
ax.set_title('Signal (first 5s)')
ax.set_xlabel('seconds')
ax.set_ylabel('amplitude')
plt.show()


## 2) Configure fftboost (EEB) and activate sk_band

In [None]:
# Log-spaced band edges from 10 Hz to Nyquist (fs/2)
nyq = fs/2.0
edges = np.logspace(np.log10(10.0), np.log10(nyq), 11)
bands = [(float(edges[i]), float(edges[i+1])) for i in range(len(edges)-1)]
config = BoosterConfig(
    n_stages=32, nu=0.5, ridge_alpha=1e-3, early_stopping_rounds=8,
    loss='huber', huber_delta=1.0, k_fft=6, min_sep_bins=3,
    default_band_edges_hz=bands, sk_n_select=2, sk_kurtosis_boost=0.2
)
model = FFTBoost(config)


## 3) Train with contiguous holdout early stopping

In [None]:
yhat = model.predict(x, fs=fs, window_s=win_s, hop_s=hop_s)
fig, ax = plt.subplots(1, 1, figsize=(10, 3))
ax.plot(y, label='target', lw=1.5)
ax.plot(yhat, label='prediction', lw=1.25)
ax.legend()
ax.set_title('Target vs Prediction')
plt.show()
print('best_iteration:', model._booster.best_iteration_)


## 4) Visualize selected FFT bins and band proposals

In [None]:
artifact = model._booster.artifact
freqs = artifact.freqs
bins = []
bands_sel = []
for stage in artifact.stages:
    for desc in stage.descriptors:
        if desc.get('type') == 'fft_bin':
            bins.append(int(desc['bin']))
        elif desc.get('type') == 'sk_band':
            bands_sel.append(tuple(desc['band_hz']))

fig, ax = plt.subplots(1, 1, figsize=(10, 3))
ax.hist([freqs[b] for b in bins], bins=20, color='tab:blue')
ax.set_title('Selected FFT bin frequencies')
ax.set_xlabel('Hz')
plt.show()

if bands_sel:
    lows = [band[0] for band in bands_sel]
    highs = [band[1] for band in bands_sel]
    fig, ax = plt.subplots(1, 1, figsize=(10, 3))
    ax.scatter(lows, highs, s=20, c='tab:orange')
    ax.set_title('Selected band ranges (lo vs hi)')
    ax.set_xlabel('lo (Hz)')
    ax.set_ylabel('hi (Hz)')
    plt.show()


## 5) Artifact determinism (save + load)

In [None]:
# ruff: noqa: I001
import os
import tempfile

from fftboost.booster import Booster as BoosterLoaded

tmp = tempfile.mkdtemp()
info = model.save(os.path.join(tmp, 'model'))
artifact2 = load_model(os.path.join(tmp, 'model'))
cfg_dict = artifact2.config or {}
cfg_params = cfg_dict.get('cfg', cfg_dict)
model2 = FFTBoost(BoosterConfig(**cfg_params))
booster2 = BoosterLoaded(model2.config)
booster2.stages = artifact2.stages
booster2.freqs = artifact2.freqs
booster2.best_iteration_ = len(artifact2.stages) - 1
model2._booster = booster2
model2.is_fitted = True
yhat2 = model2.predict(x, fs=fs, window_s=win_s, hop_s=hop_s)
identical = np.allclose(yhat, yhat2) and yhat.shape == yhat2.shape
print('SHA256:', info['sha256'])
print('bit-identical predictions:', identical)


## 6) Spectrogram (reference)

In [None]:
# Compute spectrogram using the same window/hop
psd_full = np.abs(np.fft.rfft(windows, axis=1))[:, 1:]
freqs_spec = np.fft.rfftfreq(win_len, d=1.0 / fs)[1:]
times = (np.arange(n_win) * hop + win_len / 2) / fs
fig, ax = plt.subplots(1, 1, figsize=(10, 4))
im = ax.imshow(20*np.log10(psd_full.T + 1e-12),
                extent=[times[0], times[-1], freqs_spec[0], freqs_spec[-1]],
                origin='lower', aspect='auto', cmap='magma')
ax.set_title('Spectrogram (dB)')
ax.set_xlabel('seconds'); ax.set_ylabel('Hz')
plt.colorbar(im, ax=ax, label='dB')
plt.show()


## 7) Stage contributions

In [None]:
# Recompute PSD on the same grid and break down per-stage contributions
def nearest_index(fgrid: np.ndarray, f: float) -> int:
    i = int(np.searchsorted(fgrid, f, side='left'))
    if i <= 0: 
        return 0
    if i >= fgrid.size: 
        return int(fgrid.size - 1)
    return int(i - 1 if abs(f - fgrid[i - 1]) <= abs(fgrid[i] - f) else i)

win_len = int(win_s * fs); hop = int(hop_s * fs)
n = x.shape[0]; n_win = (n - win_len) // hop + 1
shape = (n_win, win_len); strides = (x.strides[0] * hop, x.strides[0])
windows_sig = np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)
psd = np.abs(np.fft.rfft(windows_sig, axis=1))[:, 1:]
freqs = np.fft.rfftfreq(win_len, d=1.0 / fs)[1:]

steps = []
for rec in artifact.stages:
    cols = []
    for d in rec.descriptors:
        if d.get('type') == 'fft_bin':
            f = float(d.get('freq_hz'))
            idx = nearest_index(freqs, f)
            cols.append(psd[:, idx])
        elif d.get('type') == 'sk_band':
            lo, hi = map(float, d['band_hz'])
            mask = (freqs >= lo) & (freqs < hi)
            cols.append(psd[:, mask].sum(axis=1) if mask.any() else np.zeros(psd.shape[0]))
    H = np.column_stack(cols) if cols else np.zeros((psd.shape[0], 0))
    if H.shape[1] == 0:
        step = np.zeros(psd.shape[0])
    else:
        Z = (H - rec.mu) / (rec.sigma + 1e-12)
        step = rec.nu * rec.gamma * (Z @ rec.weights)
    steps.append(step)

magnitudes = [float(np.linalg.norm(s)) for s in steps]
fig, ax = plt.subplots(1, 1, figsize=(10, 3))
ax.bar(np.arange(len(magnitudes)), magnitudes, color='tab:green')
ax.set_title('Stage contribution magnitudes (L2 per stage)')
ax.set_xlabel('stage'); ax.set_ylabel('L2 magnitude')
plt.show()
