# Beat-Aligned Chunking Demo
Verify audio chunking at beat boundaries before sending to Gemini.

In [None]:
import json
import sqlite3
from pathlib import Path

import IPython.display as ipd
import librosa
import numpy as np
import soundfile as sf
import matplotlib.pyplot as plt

In [None]:
DB_PATH = Path.home() / "Library/Application Support/com.luma.luma/luma.db"
conn = sqlite3.connect(DB_PATH)

# List tracks with beat data
cursor = conn.execute("""
    SELECT t.id, t.title, t.artist, tb.bpm
    FROM tracks t
    JOIN track_beats tb ON t.id = tb.track_id
    WHERE t.file_path IS NOT NULL
    ORDER BY t.title
""")
for row in cursor.fetchall():
    print(f"[{row[0]:3d}] {row[1]} - {row[2]} ({row[3]:.0f} BPM)")

In [None]:
# Pick a track
TRACK_ID = 20  # <-- Change this

cursor = conn.execute("""
    SELECT t.title, t.artist, t.file_path, tb.bpm, tb.beats_json, tb.downbeats_json
    FROM tracks t
    JOIN track_beats tb ON t.id = tb.track_id
    WHERE t.id = ?
""", (TRACK_ID,))
row = cursor.fetchone()
title, artist, file_path, bpm, beats_json, downbeats_json = row
beats = np.array(json.loads(beats_json))
downbeats = np.array(json.loads(downbeats_json))

print(f"Track: {title} - {artist}")
print(f"BPM: {bpm}")
print(f"Beats: {len(beats)}, Downbeats: {len(downbeats)}")
print(f"File: {file_path}")

In [None]:
# Load audio
y, sr = librosa.load(file_path, sr=None, mono=True)
duration = len(y) / sr
print(f"Duration: {duration:.1f}s, Sample rate: {sr}")

In [None]:
def get_subdivision_times(beats: np.ndarray, subdivision: float) -> np.ndarray:
    """Get chunk boundary times based on subdivision factor."""
    if subdivision >= 1:
        if subdivision == 1:
            return beats
        result = []
        for i in range(len(beats) - 1):
            t0, t1 = beats[i], beats[i + 1]
            for j in range(int(subdivision)):
                result.append(t0 + (t1 - t0) * j / subdivision)
        result.append(beats[-1])
        return np.array(result)
    else:
        step = int(1 / subdivision)
        return beats[::step]

In [None]:
# Test different subdivisions
SUBDIVISION = 1  # 1=beat, 2=half-beat, 0.5=2-beats, 0.25=bar

chunk_times = get_subdivision_times(beats, SUBDIVISION)
print(f"Subdivision: {SUBDIVISION}")
print(f"Chunk boundaries: {len(chunk_times)}")
print(f"First 10 chunk times: {chunk_times[:10]}")

In [None]:
# Visualize chunk boundaries on waveform (first 30 seconds)
VIEW_START = 0  # seconds
VIEW_END = 30   # seconds
BEATS_PER_BAR = 4

def beat_to_bar_beat(beat_index: int, beats_per_bar: int = 4) -> str:
    bar = (beat_index // beats_per_bar) + 1
    beat = (beat_index % beats_per_bar) + 1
    return f"{bar}.{beat}"

start_sample = int(VIEW_START * sr)
end_sample = int(VIEW_END * sr)
times = np.linspace(VIEW_START, VIEW_END, end_sample - start_sample)

plt.figure(figsize=(16, 4))
plt.plot(times, y[start_sample:end_sample], alpha=0.7, linewidth=0.5)

# Mark chunk boundaries
visible_chunks = chunk_times[(chunk_times >= VIEW_START) & (chunk_times <= VIEW_END)]
for i, t in enumerate(visible_chunks):
    plt.axvline(x=t, color='red', alpha=0.5, linewidth=1)
    if i < 20:  # Label first 20
        chunk_idx = np.searchsorted(chunk_times, t)
        label = beat_to_bar_beat(chunk_idx, BEATS_PER_BAR)
        plt.text(t, y[start_sample:end_sample].max() * 0.9, label, 
                 fontsize=8, ha='center', color='red')

# Mark downbeats
visible_downbeats = downbeats[(downbeats >= VIEW_START) & (downbeats <= VIEW_END)]
for t in visible_downbeats:
    plt.axvline(x=t, color='blue', alpha=0.7, linewidth=2, linestyle='--')

plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.title(f'{title} - Chunk boundaries (red, bar.beat) and downbeats (blue dashed)')
plt.tight_layout()
plt.show()

In [None]:
# Listen to a specific chunk (using bar.beat notation)
BAR = 18   # <-- Change this
BEAT = 1   # <-- Change this (1-4)
BEATS_PER_BAR = 4

# Convert bar.beat to chunk index
chunk_idx = (BAR - 1) * BEATS_PER_BAR + (BEAT - 1)

if chunk_idx < len(chunk_times) - 1:
    start_sec = chunk_times[chunk_idx]
    end_sec = chunk_times[chunk_idx + 1]
    
    start_sample = int(start_sec * sr)
    end_sample = int(end_sec * sr)
    chunk_audio = y[start_sample:end_sample]
    
    print(f"[{BAR}.{BEAT}]: {start_sec:.3f}s - {end_sec:.3f}s ({(end_sec - start_sec) * 1000:.0f}ms)")
    ipd.display(ipd.Audio(chunk_audio, rate=sr))
else:
    print(f"Chunk {BAR}.{BEAT} out of range")

In [None]:
# Listen to a range of bars
START_BAR = 1    # <-- Change this
END_BAR = 4      # <-- Change this (inclusive)
BEATS_PER_BAR = 4

start_idx = (START_BAR - 1) * BEATS_PER_BAR
end_idx = END_BAR * BEATS_PER_BAR

start_sec = chunk_times[start_idx]
end_sec = chunk_times[min(end_idx, len(chunk_times) - 1)]

start_sample = int(start_sec * sr)
end_sample = int(end_sec * sr)

print(f"Bars {START_BAR}-{END_BAR} ({START_BAR}.1 to {END_BAR}.4): {start_sec:.2f}s - {end_sec:.2f}s")
ipd.display(ipd.Audio(y[start_sample:end_sample], rate=sr))

In [None]:
# Preview what will be sent to Gemini
MAX_CHUNKS = 50
BEATS_PER_BAR = 4  # from track_beats table

def beat_to_bar_beat(beat_index: int, beats_per_bar: int = 4) -> str:
    """Convert 0-indexed beat to bar.beat notation."""
    bar = (beat_index // beats_per_bar) + 1
    beat = (beat_index % beats_per_bar) + 1
    return f"{bar}.{beat}"

print("Content structure that will be sent to Gemini:")
print("="*60)
print(f"\"I'm going to play you a song split into {min(len(chunk_times)-1, MAX_CHUNKS)} consecutive chunks.")
print(f"Each chunk is labeled with bar.beat notation (e.g., 18.3 means bar 18, beat 3).\"")
print()
for i in range(min(len(chunk_times) - 1, MAX_CHUNKS)):
    start_sec = chunk_times[i]
    end_sec = chunk_times[i + 1]
    duration_ms = (end_sec - start_sec) * 1000
    label = beat_to_bar_beat(i, BEATS_PER_BAR)
    print(f"[{label}] <audio {duration_ms:.0f}ms>")
    if i >= 9:
        remaining = min(len(chunk_times) - 1, MAX_CHUNKS) - i - 1
        if remaining > 0:
            print(f"... ({remaining} more chunks)")
        break
print()
print("\"List ALL the drops. Format: - X.Y: description\"")

In [None]:
# Estimate total payload size
import io

MAX_CHUNKS = 200
total_bytes = 0

for i in range(min(len(chunk_times) - 1, MAX_CHUNKS)):
    start_sec = chunk_times[i]
    end_sec = chunk_times[i + 1]
    start_sample = int(start_sec * sr)
    end_sample = int(end_sec * sr)
    chunk_audio = y[start_sample:end_sample]
    
    buf = io.BytesIO()
    sf.write(buf, chunk_audio, sr, format='WAV')
    total_bytes += len(buf.getvalue())

print(f"Chunks: {min(len(chunk_times) - 1, MAX_CHUNKS)}")
print(f"Total WAV size: {total_bytes / 1024 / 1024:.2f} MB")
print(f"Under 20MB limit: {total_bytes < 20 * 1024 * 1024}")

In [None]:
conn.close()