# Online Hardware Validation - ADS1299

Live acquisition and analysis using the acquisition and analysis modules.

**Architecture:**
- `acquisition.HardwareClient` - ESP32/ADS1299 communication
- `acquisition.capture_live_stream()` - WebSocket frame streaming
- `analysis.parse_framestream_bin()` - Binary parsing
- `analysis.preprocess` - Signal processing

## Setup

In [None]:
import asyncio
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal

# Apply nest_asyncio for Jupyter compatibility
import nest_asyncio
nest_asyncio.apply()

# Import from our modules
from acquisition import HardwareClient, capture_live_stream
from analysis import parse_framestream_bin, counts_to_uv, preprocess_channel_uv, FS_HZ, VREF_V, GAIN
from analysis.timing_integrity import check_timing_integrity

plt.style.use("default")

# Configuration
ESP32_IP = "node.local"
CAPTURE_DURATION_S = 5.0
FS = 16000  # Hz

# Channel definitions
EEG_CHANNELS = [1, 2, 3]
OAE_CHANNELS = [4, 5]
N_CHANNELS = 8

print(f"ESP32 IP: {ESP32_IP}")
print(f"Capture duration: {CAPTURE_DURATION_S}s")
print(f"Sample rate: {FS} Hz")

## Utility Functions

In [None]:
def psd_welch(x_uv, fs, nperseg=512):
    """Compute Welch PSD."""
    return signal.welch(x_uv, fs, nperseg=nperseg)

def fft_dbfs(x_uv, fs, vref=VREF_V, gain=GAIN):
    """FFT magnitude in dBFS."""
    from scipy.fft import rfft, rfftfreq
    x_uv = np.asarray(x_uv, dtype=float)
    n = len(x_uv)
    volts = x_uv * 1e-6
    spec = rfft(volts)
    amp = (2.0 / n) * np.abs(spec)
    amp[0] /= 2.0
    fullscale = vref / gain
    dbfs = 20 * np.log10(amp / fullscale + 1e-20)
    freqs = rfftfreq(n, 1/fs)
    return freqs, np.minimum(dbfs, 0)

def saturation_report(counts: np.ndarray, adc_bits: int = 24) -> None:
    """Check for ADC saturation/railing."""
    lo = -2**(adc_bits-1)
    hi = 2**(adc_bits-1) - 1
    print(f"\n{adc_bits}-bit ADC rails: [{lo}, {hi}]")
    for ch in range(counts.shape[0]):
        x = counts[ch]
        sat_lo = int(np.sum(x == lo))
        sat_hi = int(np.sum(x == hi))
        print(f"Ch{ch+1:<2}  min={int(x.min()):>9}  max={int(x.max()):>9}  sat_lo={sat_lo:>6}  sat_hi={sat_hi:>6}")

In [None]:
def plot_validation_summary(uv, channels, fs_hz, title="Hardware Validation"):
    """
    Plot validation summary: time series, histogram, PSD, FFT.
    """
    n_ch, n_samples = uv.shape
    t = np.arange(n_samples) / fs_hz
    
    fig, axes = plt.subplots(2, 2, figsize=(16, 10))
    fig.suptitle(f"{title} | {n_samples} samples @ {fs_hz} Hz", fontsize=14)
    ax_ts, ax_hist, ax_psd, ax_fft = axes.flatten()
    
    # Time series
    for i in range(n_ch):
        ax_ts.plot(t, uv[i], label=f"Ch{channels[i]}")
    ax_ts.set_title("Time Series")
    ax_ts.set_xlabel("Time (s)")
    ax_ts.set_ylabel("uV")
    ax_ts.legend()
    ax_ts.grid(alpha=0.3)
    
    # Histogram
    for i in range(n_ch):
        ax_hist.hist(uv[i], bins=100, alpha=0.5, label=f"Ch{channels[i]}")
    ax_hist.set_title("Amplitude Distribution")
    ax_hist.set_xlabel("uV")
    ax_hist.set_ylabel("Count")
    ax_hist.legend()
    ax_hist.grid(alpha=0.3)
    
    # PSD
    for i in range(n_ch):
        f, psd = psd_welch(uv[i], fs_hz)
        ax_psd.semilogy(f, psd, label=f"Ch{channels[i]}")
    ax_psd.set_title("PSD (Welch)")
    ax_psd.set_xlabel("Frequency (Hz)")
    ax_psd.set_ylabel("PSD (uV^2/Hz)")
    ax_psd.set_xlim(0, min(200, fs_hz/2))
    ax_psd.legend()
    ax_psd.grid(alpha=0.3)
    
    # FFT dBFS
    for i in range(n_ch):
        f, dbfs = fft_dbfs(uv[i], fs_hz)
        ax_fft.plot(f, dbfs, label=f"Ch{channels[i]}")
    ax_fft.set_title("FFT (dBFS)")
    ax_fft.set_xlabel("Frequency (Hz)")
    ax_fft.set_ylabel("dBFS")
    ax_fft.set_xlim(0, min(200, fs_hz/2))
    ax_fft.set_ylim(-120, 5)
    ax_fft.axhline(0, color='black', linestyle='--')
    ax_fft.legend()
    ax_fft.grid(alpha=0.3)
    
    plt.tight_layout()
    plt.show()

## Live Acquisition

Single function using `acquisition.capture_live_stream()` and `analysis.parse_framestream_bin()`.

In [None]:
def acquire_and_parse(
    duration_s: float = CAPTURE_DURATION_S,
    channels: list = None,
    apply_notch: bool = False
):
    """
    Acquire live data and parse using modules.
    
    Returns:
        counts: (n_channels, n_samples) raw ADC counts
        uv: (len(channels), n_samples) preprocessed uV
        info: capture info dict with timing headers
    """
    if channels is None:
        channels = list(range(1, N_CHANNELS + 1))
    
    # Create hardware client
    client = HardwareClient(ESP32_IP)
    
    # Detect sample rate from registers
    sample_rate = client.detect_sample_rate()
    if sample_rate is None:
        print(f"Could not detect sample rate, using default: {FS}")
        sample_rate = FS
    print(f"Sample rate: {sample_rate} Hz")
    
    # Capture live stream
    sample_bytes, raw_frames_bytes, info = asyncio.run(
        capture_live_stream(client, duration_s, sample_rate)
    )
    
    # Parse using analysis module
    counts, meta = parse_framestream_bin(raw_frames_bytes)
    print(f"Parsed counts shape: {counts.shape}")
    
    # Check timing integrity
    if 'hdr' in info:
        print("\n=== Timing Integrity ===")
        n_frames = len(info['hdr']['t1_first_drdy_us'])
        print(f"Frames received: {n_frames}")
        print(f"Samples parsed: {counts.shape[1]}")
    
    # Saturation check
    print("\n=== Saturation Check ===")
    saturation_report(counts)
    
    # Preprocess selected channels
    ch_indices = [ch - 1 for ch in channels]
    counts_selected = counts[ch_indices, :]
    
    uv = np.vstack([
        preprocess_channel_uv(counts_selected[i], sample_rate, apply_notch=apply_notch)
        for i in range(len(channels))
    ])
    
    return counts_selected, uv, channels, sample_rate, info

## Validation Tests

### A2: Internal ADC Noise (Inputs Shorted)

In [None]:
def run_internal_noise(channels=None, apply_notch=False):
    """Internal ADC noise test with inputs shorted."""
    if channels is None:
        channels = EEG_CHANNELS
    
    print("=" * 60)
    print("A2: Internal ADC Noise (Inputs Shorted)")
    print("=" * 60)
    
    counts, uv, used_channels, fs_hz, info = acquire_and_parse(
        duration_s=CAPTURE_DURATION_S,
        channels=channels,
        apply_notch=apply_notch
    )
    
    if counts.shape[1] == 0:
        print("No samples received - device not ready")
        return
    
    plot_validation_summary(uv, used_channels, fs_hz, title="A2: Internal ADC Noise")
    print("Internal noise test complete.")

# Run:
run_internal_noise()

### A1: Environmental Noise (Floating Inputs)

In [None]:
def run_floating_noise(channels=None, apply_notch=False):
    """Environmental noise test with floating inputs."""
    if channels is None:
        channels = EEG_CHANNELS
    
    print("=" * 60)
    print("A1: Environmental Noise (Floating Inputs)")
    print("=" * 60)
    
    counts, uv, used_channels, fs_hz, info = acquire_and_parse(
        duration_s=CAPTURE_DURATION_S,
        channels=channels,
        apply_notch=apply_notch
    )
    
    if counts.shape[1] == 0:
        print("No samples received - device not ready")
        return
    
    plot_validation_summary(uv, used_channels, fs_hz, title="A1: Floating Inputs")
    print("Floating noise test complete.")

# Run:
run_floating_noise()

### B1: Injected Signal Test

In [None]:
def run_injected_signal(channels=None, apply_notch=False):
    """Injected signal test."""
    if channels is None:
        channels = EEG_CHANNELS
    
    print("=" * 60)
    print("B1: Injected Signal Test")
    print("=" * 60)
    
    counts, uv, used_channels, fs_hz, info = acquire_and_parse(
        duration_s=CAPTURE_DURATION_S,
        channels=channels,
        apply_notch=apply_notch
    )
    
    if counts.shape[1] == 0:
        print("No samples received - device not ready")
        return
    
    plot_validation_summary(uv, used_channels, fs_hz, title="B1: Injected Signal")
    print("Injected signal test complete.")

# Run:
# run_injected_signal()

### Functional EEG: Eyes Open vs Eyes Closed

In [None]:
def run_functional_eeg(channels=None, apply_notch=True):
    """Functional EEG test: Eyes Open vs Eyes Closed."""
    if channels is None:
        channels = EEG_CHANNELS
    
    print("=" * 60)
    print("Functional EEG: Eyes Open vs Eyes Closed")
    print("=" * 60)
    
    # Eyes Open
    print("\n--- Eyes OPEN ---")
    input("Press Enter when ready for Eyes Open recording...")
    counts_eo, uv_eo, ch_eo, fs_eo, info_eo = acquire_and_parse(
        duration_s=CAPTURE_DURATION_S,
        channels=channels,
        apply_notch=apply_notch
    )
    
    # Eyes Closed
    print("\n--- Eyes CLOSED ---")
    input("Press Enter when ready for Eyes Closed recording...")
    counts_ec, uv_ec, ch_ec, fs_ec, info_ec = acquire_and_parse(
        duration_s=CAPTURE_DURATION_S,
        channels=channels,
        apply_notch=apply_notch
    )
    
    if uv_eo.shape[1] == 0 or uv_ec.shape[1] == 0:
        print("No samples received - device not ready")
        return
    
    # Compute PSDs for first channel
    f_eo, psd_eo = psd_welch(uv_eo[0], fs_eo)
    f_ec, psd_ec = psd_welch(uv_ec[0], fs_ec)
    
    # Plot comparison
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # PSD comparison
    axes[0].semilogy(f_eo, psd_eo, label='Eyes Open')
    axes[0].semilogy(f_ec, psd_ec, label='Eyes Closed')
    axes[0].set_xlim(0, 50)
    axes[0].set_xlabel('Frequency (Hz)')
    axes[0].set_ylabel('PSD (uV^2/Hz)')
    axes[0].set_title(f'PSD Comparison - Ch{channels[0]}')
    axes[0].legend()
    axes[0].grid(alpha=0.3)
    
    # Alpha band power comparison
    alpha_mask_eo = (f_eo >= 8) & (f_eo <= 13)
    alpha_mask_ec = (f_ec >= 8) & (f_ec <= 13)
    alpha_power_eo = np.trapz(psd_eo[alpha_mask_eo], f_eo[alpha_mask_eo])
    alpha_power_ec = np.trapz(psd_ec[alpha_mask_ec], f_ec[alpha_mask_ec])
    
    axes[1].bar(['Eyes Open', 'Eyes Closed'], [alpha_power_eo, alpha_power_ec])
    axes[1].set_ylabel('Alpha Power (8-13 Hz) uV^2')
    axes[1].set_title('Alpha Band Power Comparison')
    axes[1].grid(axis='y', alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print(f"\nAlpha power (8-13 Hz):")
    print(f"  Eyes Open:   {alpha_power_eo:.2f} uV^2")
    print(f"  Eyes Closed: {alpha_power_ec:.2f} uV^2")
    print(f"  Ratio (EC/EO): {alpha_power_ec/alpha_power_eo:.2f}x")
    print("\nFunctional EEG test complete.")

# Run:
# run_functional_eeg()