In [12]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Wave (ocean) sound --> slow 'wave curve' (amplitude envelope) extractor.

- Loads audio (mono), optional high-pass to remove DC/rumble
- Extracts Hilbert envelope (amplitude)
- Smooths and downsamples the envelope to a low rate (default 50 Hz)
- Produces:
    * Broad envelope (nice overall loudness)
    * Swell-scale envelopes with cutoffs at 0.3 Hz and 0.1 Hz
    * Mid-scale "splash" band on envelope (1-5 Hz)
- Exports a figure and a CSV with time + curves

Usage:
    python wave_envelope.py input.wav --out-prefix my_wave --swell-1 0.3 --swell-2 0.1

Dependencies:
    pip install numpy scipy soundfile matplotlib
    (optional) librosa for resampling; otherwise SciPy polyphase is used
"""

import argparse
import os
import numpy as np
import soundfile as sf
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfiltfilt, hilbert, resample_poly
from math import gcd
from scipy.signal import resample_poly

try:
    import librosa
    HAVE_LIBROSA = True
except Exception:
    HAVE_LIBROSA = False


# ----------------------- DSP helpers -----------------------

def to_mono(x: np.ndarray) -> np.ndarray:
    return x.mean(axis=1) if x.ndim == 2 else x


def _resample_1d(x: np.ndarray, fs_from: int, fs_to: int) -> np.ndarray:
    if fs_from == fs_to: 
        return x
    g = gcd(int(fs_from), int(fs_to))
    up, down = int(fs_to)//g, int(fs_from)//g
    return resample_poly(x, up, down).astype(np.float32)

def resample_audio(x: np.ndarray, sr: int, target_sr: int) -> tuple[np.ndarray, int]:
    if sr == target_sr:
        return x, sr
    if HAVE_LIBROSA:
        y = librosa.resample(y=x, orig_sr=sr, target_sr=target_sr, res_type="kaiser_best")
        return y.astype(np.float32), target_sr
    # Fallback: polyphase (good quality)
    # Compute rational approximation
    from math import gcd
    g = gcd(sr, target_sr)
    up, down = target_sr // g, sr // g
    y = resample_poly(x, up, down)
    return y.astype(np.float32), target_sr

def butter_sos(cut, fs, btype, order=4):
    nyq = fs * 0.5
    Wn = np.array(cut, ndmin=1) / nyq
    return butter(order, Wn, btype=btype, output="sos")

def highpass(x, fs, cut=20.0, order=2):
    sos = butter_sos(cut, fs, 'highpass', order=order)
    return sosfiltfilt(sos, x)

def lowpass(x, fs, cut=5.0, order=4):
    sos = butter_sos(cut, fs, 'lowpass', order=order)
    return sosfiltfilt(sos, x)

def bandpass(x, fs, lo, hi, order=4):
    sos = butter_sos([lo, hi], fs, 'bandpass', order=order)
    return sosfiltfilt(sos, x)

def envelope_hilbert(x):
    analytic = hilbert(x)
    return np.abs(analytic)

def compress_log(x, eps=1e-8):
    y = np.log10(eps + x)
    # Normalize to [0,1] for nice plotting/CSV comparability
    y = (y - y.min()) / (y.max() - y.min() + 1e-12)
    return y

def downsample_signal(x, fs, target_fs=50):
    """Polyphase resample envelope to a low rate for accurate sub-Hz filtering."""
    if fs == target_fs:
        return x, fs
    from math import gcd
    g = gcd(int(fs), int(target_fs))
    up, down = int(target_fs) // g, int(fs) // g
    y = resample_poly(x, up, down)
    return y.astype(np.float32), target_fs

# ----------------------- Main routine -----------------------

def process(
    wav_path: str,
    out_prefix: str,
    target_sr_audio: int = 22050,
    env_ds_fs: int = 50,                 # keep this low for stable sub-Hz filters
    hp_cut: float = 20.0,
    lpf_broad: float = 10.0,
    swell_cut_1: float = 0.3,
    swell_cut_2: float = 0.1,
    mid_band: tuple[float, float] = (1.0, 5.0),
    min_sec: float | None = None,
    max_sec: float | None = None,
    output_fs: int | None = None         # NEW: resample curves to this rate for CSV
):
    x, sr = sf.read(wav_path, always_2d=False)
    x = to_mono(x).astype(np.float64)     # higher precision helps very-low cutoffs

    if min_sec is not None or max_sec is not None:
        start = int((min_sec or 0) * sr)
        end   = int((max_sec or len(x)/sr) * sr)
        x = x[start:end]

    if hp_cut is not None and hp_cut > 0:
        x = highpass(x, fs=sr, cut=hp_cut, order=2)

    x, sr = resample_audio(x, sr, target_sr_audio)

    # Envelope at audio rate
    env = envelope_hilbert(x)
    env_broad = lowpass(env, fs=sr, cut=lpf_broad, order=4)

    # Downsample envelope to low rate for robust sub-Hz filtering
    env_ds, fs_ds = downsample_signal(env_broad, fs=sr, target_fs=env_ds_fs)

    # Swell + mid bands at low rate (stable)
    env_swell_1 = lowpass(env_ds, fs=fs_ds, cut=swell_cut_1, order=4)
    env_swell_2 = lowpass(env_ds, fs=fs_ds, cut=swell_cut_2, order=4)
    lo, hi = mid_band
    env_mid = bandpass(env_ds, fs=fs_ds, lo=lo, hi=hi, order=4)

    # Defensive clamp before log (avoids NaNs from tiny negatives)
    def safe_log_norm(x):
        x = np.maximum(x, 0.0)
        y = np.log10(1e-8 + x)
        y = (y - y.min()) / (y.max() - y.min() + 1e-12)
        return y.astype(np.float32)

    env_broad_n = safe_log_norm(env_ds)
    swell_1_n   = safe_log_norm(env_swell_1)
    swell_2_n   = safe_log_norm(env_swell_2)
    env_mid_n   = safe_log_norm(np.abs(env_mid))

    # If requested, resample curves to EEG rate for export/merge
    out_fs = output_fs or fs_ds
    if out_fs != fs_ds:
        env_broad_n = _resample_1d(env_broad_n, fs_ds, out_fs)
        swell_1_n   = _resample_1d(swell_1_n,   fs_ds, out_fs)
        swell_2_n   = _resample_1d(swell_2_n,   fs_ds, out_fs)
        env_mid_n   = _resample_1d(env_mid_n,   fs_ds, out_fs)
        fs_save = out_fs
    else:
        fs_save = fs_ds

    # Timebase at the save rate
    t = np.arange(len(env_broad_n), dtype=np.float64) / float(fs_save)

    # Save CSV at fs_save (e.g., 256 Hz)
    csv_path = f"{out_prefix}_curves.csv"
    data = np.column_stack([t, env_broad_n, swell_1_n, swell_2_n, env_mid_n])
    header = "time_s,env_broad,env_swell_0p3hz,env_swell_0p1hz,env_mid_1_5hz"
    np.savetxt(csv_path, data, delimiter=",", header=header, comments="")
    print(f"[OK] Saved curves CSV → {csv_path} at {fs_save} Hz")

    # Plot (downsample for speed if huge)
    fig = plt.figure(figsize=(12,6)); ax = plt.gca()
    ax.plot(t, env_broad_n, label=f"Broad env (~{lpf_broad} Hz LP)", linewidth=1.1, alpha=0.3)
    ax.plot(t, swell_1_n,   label=f"Swell LP @ {swell_cut_1} Hz", linewidth=2.0)
    ax.plot(t, swell_2_n,   label=f"Swell LP @ {swell_cut_2} Hz", linewidth=2.0)
    ax.plot(t, env_mid_n,   label=f"Mid band {mid_band[0]}–{mid_band[1]} Hz (env)", linewidth=1.1, alpha=0.3)
    ax.set_xlabel("Time (s)"); ax.set_ylabel("Normalized amplitude (log)"); ax.grid(True, alpha=0.25)
    ax.legend(loc="upper right", ncols=2); fig.tight_layout()
    fig_path = f"{out_prefix}_curves.png"; plt.savefig(fig_path, dpi=160); plt.close(fig)
    print(f"[OK] Saved plot → {fig_path}")

    return {"csv": csv_path, "png": fig_path, "fs_env": fs_save, "length_s": float(t[-1]) if len(t) else 0.0}

In [14]:
# In a notebook: replace argparse with direct variables
wav_path = "../data/processed/sub-02/audio/TASCAM_1104S12_cut.wav"   # path to your audio file
out_prefix = "sea_60-120"     # prefix for CSV/plot outputs

# Parameters
target_sr_audio = 22050   # resample audio before envelope
env_ds_fs = 50            # envelope downsample rate (Hz)
hp_cut = 20.0             # high-pass cutoff on audio (Hz); set None to disable
lpf_broad = 10.0          # first low-pass on envelope before DS (Hz)
swell_cut_1 = 0.2         # swell envelope cutoff 1 (Hz)
swell_cut_2 = 0.1         # swell envelope cutoff 2 (Hz)
mid_band = (1.0, 5.0)     # mid-band (Hz) on envelope

# Run processing
results = process(
    wav_path=wav_path,
    out_prefix=out_prefix,
    min_sec=None, max_sec=None,
    target_sr_audio=target_sr_audio,
    env_ds_fs=env_ds_fs,
    hp_cut=hp_cut,
    lpf_broad=lpf_broad,
    swell_cut_1=swell_cut_1,
    swell_cut_2=swell_cut_2,
    mid_band=mid_band,
    output_fs=256
)

results


[OK] Saved curves CSV → sea_60-120_curves.csv at 256 Hz
[OK] Saved plot → sea_60-120_curves.png


{'csv': 'sea_60-120_curves.csv',
 'png': 'sea_60-120_curves.png',
 'fs_env': 256,
 'length_s': 2306.4765625}