# Beats widget

Input from Novelty + Tempogram (or set `NOVELTY_FOR_BEATS` and `TEMPOGRAM_FOR_BEATS` below). By default runs novelty and tempogram with default params. Preceding params = re-run **novelty_widget.ipynb** / **tempogram_widget.ipynb** or set overrides. This widget's params = live or Recompute.

In [1]:
import sys
from pathlib import Path

import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import HTML, Javascript, display

widget_dir = Path.cwd()
for candidate in [Path.cwd(), Path.cwd() / "widget", Path.cwd() / "scratch" / "widget", Path.cwd() / "notebooks" / "scratch" / "widget"]:
    if (candidate / "temporal_widget_helpers.py").exists():
        widget_dir = candidate
        break
sys.path.insert(0, str(widget_dir))
project_root = widget_dir
for _ in range(6):
    if (project_root / "src" / "dijon").exists():
        break
    project_root = project_root.parent
sys.path.insert(0, str(project_root))

from temporal_widget_helpers import (
    FS_NOV,
    NOVELTY_DEFAULTS,
    build_audio_with_waveform_html,
    build_beat_sonification_audio,
    eager_compute_beats,
    eager_compute_tempogram,
    get_head_in_time,
    get_novelty_for_method,
    load_audio_and_markers,
    novelty_params_key,
    run_beat_tracking,
    run_beats_and_bars,
    zoom_level_to_half_sec,
)

%matplotlib inline

In [2]:
TRACK = "YTB-005"
MARKER = ""

x, sr, seg_start, seg_end, marker_label = load_audio_and_markers(TRACK, MARKER or None)
duration_s = len(x) / sr
print(f"Track: {TRACK} | Marker: {marker_label} | Duration: {duration_s:.2f}s @ {sr} Hz")

Track: YTB-005 | Marker: FULL | Duration: 175.48s @ 22050 Hz


In [3]:
novelty_cache = {}
tempogram_cache = {}
print("Caches initialized. Default upstream novelty + tempogram + eager beats run in next cell.")

Caches initialized. Default upstream novelty + tempogram + eager beats run in next cell.


**Overrides (optional)** Set `NOVELTY_FOR_BEATS` (dict with `nov_100`) and/or `TEMPOGRAM_FOR_BEATS` (dict with `tempo_bpm`, etc.) to use custom inputs. Leave `None` to use default novelty + tempogram below.

In [4]:
def _is_empty(v):
    return v is None or v == "" or (isinstance(v, dict) and len(v) == 0)

NOVELTY_FOR_BEATS = None
TEMPOGRAM_FOR_BEATS = None

In [5]:
# Default upstream novelty + tempogram are derived once here (unless overrides), then beats are eagerly computed once.
nm_default = "spectral"
d_nov = NOVELTY_DEFAULTS[nm_default]
N, H, gamma, M = d_nov["N"], d_nov["H"], d_nov.get("gamma") or 100.0, d_nov.get("M", 0)
nov, nov_100 = get_novelty_for_method(novelty_cache, x, sr, nm_default, N, H, gamma, M)
nov_key = novelty_params_key(nm_default, N, H, gamma, M)
_nov_state = {"nov_100": nov_100, "nm": nm_default, "novelty_key": nov_key}

_nov_for_temp = _nov_state if _is_empty(NOVELTY_FOR_BEATS) else NOVELTY_FOR_BEATS
nm_src = _nov_for_temp.get("nm", "spectral")
d_src = NOVELTY_DEFAULTS[nm_src]
nov_key_src = _nov_for_temp.get("novelty_key") or novelty_params_key(
    nm_src,
    d_src["N"],
    d_src["H"],
    d_src.get("gamma") or 100.0,
    d_src.get("M", 0),
)
_default_temp_state, tempogram_cache, temp_init_time, _ = eager_compute_tempogram(
    _nov_for_temp["nov_100"],
    nov_key_src,
    tempogram_method="fourier",
    window_sec=5.0,
    hop_sec=0.1,
)
_temp_state = _default_temp_state if _is_empty(TEMPOGRAM_FOR_BEATS) else TEMPOGRAM_FOR_BEATS

head_in = get_head_in_time(TRACK)
_nov_for_beats = _nov_state if _is_empty(NOVELTY_FOR_BEATS) else NOVELTY_FOR_BEATS
_beat_state, beat_init_time, _ = eager_compute_beats(
    _nov_for_beats["nov_100"],
    _temp_state["tempo_bpm"],
    head_in,
    x,
    sr,
    factor=1.0,
    beats_per_bar_override=None,
)
print(
    f"Eager init | Tempogram: {temp_init_time:.2f}s ({_temp_state['tempo_bpm']:.1f} BPM) | "
    f"Beats: {beat_init_time:.2f}s ({len(_beat_state['B'])} beats)"
)

Eager init | Tempogram: 0.96s (244.0 BPM) | Beats: 3.03s (702 beats)


In [6]:
_beat_compact = dict(layout=widgets.Layout(margin="0 2px 2px 2px"))

beat_w_factor = widgets.FloatSlider(value=1.0, min=0.5, max=2.0, step=0.1, description="", **_beat_compact)
beat_w_tempo_override = widgets.FloatText(value=0, description="", placeholder="0=auto", **_beat_compact)
beat_w_beats_per_bar = widgets.Dropdown(options=[("Auto", 0), ("2", 2), ("3", 3), ("4", 4)], value=0, description="", **_beat_compact)
beat_w_sonify = widgets.Checkbox(value=False, description="Sonify beats", **_beat_compact)
beat_w_zoom_enabled = widgets.Checkbox(value=False, description="Zoom on playhead", **_beat_compact)
beat_w_zoom_level = widgets.IntSlider(value=1, min=1, max=10, step=1, description="", **_beat_compact)

def _beat_label(txt):
    return widgets.HTML(f'<div style="font-size:9px;margin-top:2px;margin-bottom:0;line-height:1.2">{txt}</div>')

BEAT_FACTORY = {"factor": 1.0, "tempo_override": 0, "beats_per_bar": 0, "sonify_toggle": False, "zoom_enabled": False, "zoom_level": 1}
beat_defaults = {
    "factor": widgets.FloatText(value=BEAT_FACTORY["factor"], **_beat_compact),
    "tempo_override": widgets.FloatText(value=BEAT_FACTORY["tempo_override"], **_beat_compact),
    "beats_per_bar": widgets.Dropdown(options=[("Auto", 0), ("2", 2), ("3", 3), ("4", 4)], value=BEAT_FACTORY["beats_per_bar"], **_beat_compact),
    "sonify_toggle": widgets.Checkbox(value=BEAT_FACTORY["sonify_toggle"], **_beat_compact),
    "zoom_enabled": widgets.Checkbox(value=BEAT_FACTORY["zoom_enabled"], **_beat_compact),
    "zoom_level": widgets.IntSlider(value=BEAT_FACTORY["zoom_level"], min=1, max=10, **_beat_compact),
}

# Keep eager state from cell 6 if present; otherwise widget starts empty until Recompute
_beat_state = _beat_state if isinstance(_beat_state, dict) and "B" in _beat_state else {}

def beat_w_run(fast_only=False):
    _nov_src = _nov_state if _is_empty(NOVELTY_FOR_BEATS) else NOVELTY_FOR_BEATS
    _temp_src = _temp_state if _is_empty(TEMPOGRAM_FOR_BEATS) else TEMPOGRAM_FOR_BEATS
    if not _nov_src or not _temp_src:
        with beat_w_status_out:
            beat_w_status_out.clear_output()
            print("Run default-novelty+tempogram cell above or set overrides.")
        return
    import time
    t0 = time.perf_counter()
    nov_100 = _nov_src["nov_100"]
    tempo_bpm = float(beat_w_tempo_override.value) if beat_w_tempo_override.value and beat_w_tempo_override.value > 0 else _temp_src["tempo_bpm"]
    B, D, P = run_beat_tracking(nov_100, tempo_bpm, factor=beat_w_factor.value)
    beat_times = B / FS_NOV
    head_in = get_head_in_time(TRACK)
    bpb = beat_w_beats_per_bar.value if beat_w_beats_per_bar.value != 0 else None
    labels, beats_per_bar, _, _ = run_beats_and_bars(beat_times, head_in, x, sr, beats_per_bar_override=bpb)
    t_compute = time.perf_counter() - t0
    _beat_state.update({"B": B, "beat_times": beat_times, "labels": labels, "beats_per_bar": beats_per_bar, "nov_100": nov_100})
    beat_w_redraw(0)
    with beat_w_status_out:
        beat_w_status_out.clear_output()
        print(f"Beats: {t_compute:.3f}s | {len(B)} beats | {beats_per_bar} b/bar")

def beat_w_do_recompute(_):
    beat_w_run(fast_only=False)

def beat_w_redraw(center_time=None, update_audio=True):
    if not _beat_state:
        return
    half = zoom_level_to_half_sec(beat_w_zoom_level.value, duration_s) if beat_w_zoom_enabled.value else duration_s
    xlo = max(0, (center_time or 0) - half) if beat_w_zoom_enabled.value and center_time is not None else 0
    xhi = min(duration_s, (center_time or 0) + half) if beat_w_zoom_enabled.value and center_time is not None else duration_s
    if not beat_w_zoom_enabled.value:
        xlo, xhi = 0, duration_s
    B, beat_times, nov_100 = _beat_state["B"], _beat_state["beat_times"], _beat_state["nov_100"]
    t_nov = np.arange(len(nov_100)) / FS_NOV
    with beat_w_plot_out:
        beat_w_plot_out.clear_output(wait=True)
        fig, axes = plt.subplots(2, 1, figsize=(12, 4), sharex=True)
        axes[0].plot(t_nov, nov_100, color="#333", linewidth=0.7)
        if len(B) > 0:
            axes[0].stem(beat_times, nov_100[B], linefmt="r-", markerfmt="ro", basefmt=" ")
        axes[0].set_ylabel("Novelty")
        axes[0].set_title("Beats")
        axes[0].set_xlim(xlo, xhi)
        axes[1].plot(beat_times, np.arange(len(beat_times)), "k.", markersize=2)
        axes[1].set_ylabel("Beat index")
        axes[1].set_xlabel("Time (s)")
        axes[1].set_title("Bars")
        axes[1].set_xlim(xlo, xhi)
        plt.tight_layout()
        plt.show()
    if update_audio:
        with beat_w_audio_out:
            beat_w_audio_out.clear_output(wait=True)
            x_play = build_beat_sonification_audio(x, sr, beat_times, _beat_state["labels"], _beat_state["beats_per_bar"], get_head_in_time(TRACK)) if beat_w_sonify.value else x
            h = build_audio_with_waveform_html(x, x_play, sr, max_duration_sec=max(60, duration_s))
            display(HTML('<div style="font-size:10px">Waveform: original. Audio: ' + ('sonified' if beat_w_sonify.value else 'original') + '</div>'))
            display(HTML('<div id="beat-w-audio-container">' + h + '</div>'))
            display(Javascript("(function(){ var c = document.getElementById('beat-w-audio-container'); var a = c ? c.querySelector('audio') : null; if (!a) return; var last = -1; a.addEventListener('timeupdate', function(){ var t = a.currentTime; if (Math.abs(t - last) < 0.25) return; last = t; if (typeof Jupyter !== 'undefined' && Jupyter.notebook && Jupyter.notebook.kernel) Jupyter.notebook.kernel.execute('beat_w_zoom_from_playhead(' + t + ')'); }); })();"))
def beat_w_zoom_from_playhead(t):
    if beat_w_zoom_enabled.value and t is not None:
        beat_w_redraw(float(t), update_audio=False)

beat_w_zoom_enabled.observe(lambda _: beat_w_redraw(0) if _beat_state else None, names="value")
beat_w_zoom_level.observe(lambda _: beat_w_redraw(0) if _beat_state else None, names="value")
beat_w_factor.observe(lambda _: beat_w_run(fast_only=True) if _beat_state else None, names="value")
beat_w_tempo_override.observe(lambda _: beat_w_run(fast_only=True) if _beat_state else None, names="value")
beat_w_beats_per_bar.observe(lambda _: beat_w_run(fast_only=True) if _beat_state else None, names="value")
beat_w_sonify.observe(lambda _: beat_w_redraw(0, update_audio=True) if _beat_state else None, names="value")

_beat_param_widgets = {"factor": beat_w_factor, "tempo_override": beat_w_tempo_override, "beats_per_bar": beat_w_beats_per_bar, "sonify_toggle": beat_w_sonify, "zoom_enabled": beat_w_zoom_enabled, "zoom_level": beat_w_zoom_level}
def beat_w_reset(_):
    for k, w in beat_defaults.items():
        if k in ("zoom_enabled", "zoom_level"): continue
        try:
            v = w.value
            if v is None: continue
            if k in ("factor", "tempo_override"): v = float(v)
            elif k in ("beats_per_bar", "zoom_level"): v = int(v)
            _beat_param_widgets[k].value = v
        except (TypeError, ValueError): pass
    beat_w_run()
def beat_w_set_default(_):
    for k in beat_defaults:
        beat_defaults[k].value = _beat_param_widgets[k].value
def beat_w_factory(_):
    for k, w in beat_defaults.items(): w.value = BEAT_FACTORY[k]
    beat_w_reset(None)

beat_w_recompute_btn = widgets.Button(description="Recompute", button_style="primary")
beat_w_recompute_btn.on_click(beat_w_do_recompute)
beat_w_status_out = widgets.Output()
beat_w_plot_out = widgets.Output()
beat_w_audio_out = widgets.Output()

beat_w_controls = widgets.VBox([
    widgets.HTML("<b>Beats / Bars</b>"), _beat_label("Factor"), beat_w_factor, _beat_label("Tempo override"), beat_w_tempo_override,
    _beat_label("Beats per bar"), beat_w_beats_per_bar, beat_w_sonify,
    widgets.HTML("<b>Zoom</b>"), beat_w_zoom_enabled, _beat_label("Zoom (1=out, 10=in)"), beat_w_zoom_level,
    beat_w_recompute_btn,
    widgets.HTML("<b>Defaults</b>"), _beat_label("Factor"), beat_defaults["factor"], _beat_label("Tempo ov"), beat_defaults["tempo_override"],
    _beat_label("B/bar"), beat_defaults["beats_per_bar"], beat_defaults["sonify_toggle"], _beat_label("Zoom on"), beat_defaults["zoom_enabled"], _beat_label("Zoom level"), beat_defaults["zoom_level"],
    widgets.HBox([widgets.Button(description="RESET"), widgets.Button(description="SET TO DEFAULT")]), widgets.Button(description="Reset to factory", button_style="warning"),
], layout=widgets.Layout(width="280px"))
beat_w_controls.children[-2].children[0].on_click(beat_w_reset)
beat_w_controls.children[-2].children[1].on_click(beat_w_set_default)
beat_w_controls.children[-1].on_click(beat_w_factory)

beat_w_outputs = widgets.VBox([beat_w_status_out, beat_w_plot_out, widgets.HTML("<b>Audio</b>"), beat_w_audio_out], layout=widgets.Layout(flex="1"))
beat_widget_box = widgets.HBox([beat_w_controls, beat_w_outputs], layout=widgets.Layout(width="100%"))
with beat_w_status_out:
    beat_w_status_out.clear_output()
    if _beat_state:
        print(
            f"Eager beats: {beat_init_time:.3f}s | {len(_beat_state['B'])} beats | "
            f"{_beat_state['beats_per_bar']} b/bar"
        )
    else:
        print("Run cell above for default novelty+tempogram+beats, or use Recompute.")
display(beat_widget_box)
beat_w_redraw(0)

HBox(children=(VBox(children=(HTML(value='<b>Beats / Bars</b>'), HTML(value='<div style="font-size:9px;margin-â€¦