# XYZ location

In [None]:
file = "recordings/recording_20260120_172104.wav"
file2 = "recordings/recording_20260120_172104_logitech.wav"

In [None]:
#!/usr/bin/env python3
"""
3D Multi-source separation, tracking, and velocity estimation for arbitrary sounds
Array: 18 microphones (2 + 16)
Outputs: separated WAVs, tracked positions and velocities
"""

import time
import numpy as np
import soundfile as sf
from scipy.signal import stft, istft, resample_poly
from scipy.linalg import pinv
from tqdm import tqdm

# ======================
# PARAMETERS
# ======================
C = 343.0
FS = 48000
NFFT = 1024
HOP = 256
N_SOURCES = 2

# ======================
# SIMPLE TIMING HELPERS
# ======================
def stage(msg):
    print(f"\n[+] {msg}")
    return time.time()

def done(t0):
    print(f"    done in {time.time() - t0:.2f} s")

# ======================
# MICROPHONE POSITIONS (meters) â€” 18 mics
# ======================
mic_positions = np.array([
    [0.0, 0.0, 0.0],
    [0.1, 0.0, 0.0],
    [0.2, 0.0, 0.0],
    [0.3, 0.0, 0.0],
    [0.0, 0.1, 0.0],
    [0.1, 0.1, 0.0],
    [0.2, 0.1, 0.0],
    [0.3, 0.1, 0.0],
    [0.0, 0.2, 0.0],
    [0.1, 0.2, 0.0],
    [0.2, 0.2, 0.0],
    [0.3, 0.2, 0.0],
    [0.0, 0.3, 0.0],
    [0.1, 0.3, 0.0],
    [0.2, 0.3, 0.0],
    [0.3, 0.3, 0.0],
    [0.15, 0.15, 0.9144],
    [0.15, 0.15, 1.2]
])

MIC_COUNT = mic_positions.shape[0]

# ======================
# LOAD + RESAMPLE AUDIO
# ======================
def load_and_resample(path, target_fs):
    audio, fs = sf.read(path)
    if audio.ndim == 1:
        audio = audio[:, None]

    if fs != target_fs:
        gcd = np.gcd(fs, target_fs)
        up = target_fs // gcd
        down = fs // gcd
        audio = resample_poly(audio, up, down, axis=0)

    return audio, target_fs

t0 = stage("Loading and resampling audio")
audio1, _ = load_and_resample(file2, FS)   # (samples, 2)
audio2, _ = load_and_resample(file, FS)    # (samples, 16)

min_len = min(len(audio1), len(audio2))
audio1 = audio1[:min_len]
audio2 = audio2[:min_len]

audio = np.vstack([audio1.T, audio2.T])
done(t0)

print(f"Audio shape: {audio.shape} (mics, samples)")

# ======================
# STFT
# ======================
def stft_all(audio):
    X = []
    for ch in audio:
        _, _, Z = stft(
            ch,
            FS,
            nperseg=NFFT,
            noverlap=NFFT - HOP,
            boundary=None
        )
        X.append(Z)
    return np.array(X)

t0 = stage("Computing STFT")
X = stft_all(audio)
done(t0)

print(f"STFT shape: {X.shape} (mics, freqs, frames)")

# ======================
# SRP-PHAT LOCALIZATION
# ======================
xg = np.linspace(-2, 2, 40)
yg = np.linspace(-2, 2, 40)
zg = np.linspace(0, 2, 30)
grid = np.array(np.meshgrid(xg, yg, zg)).reshape(3, -1).T

freqs = np.arange(X.shape[1]) * FS / NFFT

def srp_phat(X, mic_pos, grid):
    P = np.zeros(len(grid))
    for gi, g in enumerate(tqdm(grid, desc="SRP-PHAT grid")):
        score = 0.0
        for i in range(len(mic_pos)):
            for j in range(i + 1, len(mic_pos)):
                di = np.linalg.norm(g - mic_pos[i])
                dj = np.linalg.norm(g - mic_pos[j])
                tau = (di - dj) / C

                # FIX: make phase (freqs, 1) so it broadcasts over frames
                phase = np.exp(-2j * np.pi * freqs * tau)[:, None]

                R = X[i] * np.conj(X[j])
                R /= np.abs(R) + 1e-12

                score += np.sum(np.real(R * phase))
        P[gi] = score
    return P

t0 = stage("Running SRP-PHAT localization")
P = srp_phat(X, mic_positions, grid)
done(t0)

top = np.argsort(P)[-5:][::-1]
print("\nTop SRP-PHAT peaks:")
for i in top:
    print(f"  score={P[i]:.2e}, position={grid[i]}")

src_idx = np.argsort(P)[-N_SOURCES:]
src_positions = grid[src_idx]

# ======================
# KALMAN TRACKING
# ======================
class Kalman3D:
    def __init__(self, x0):
        self.x = np.hstack([x0, np.zeros(3)])
        self.P = np.eye(6)
        self.F = np.eye(6)
        self.F[:3, 3:] = np.eye(3)
        self.Q = np.eye(6) * 0.01
        self.H = np.hstack([np.eye(3), np.zeros((3, 3))])
        self.R = np.eye(3) * 0.05

    def update(self, z):
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q
        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        self.x += K @ y
        self.P = (np.eye(6) - K @ self.H) @ self.P
        return self.x[:3], self.x[3:]

trackers = [Kalman3D(p) for p in src_positions]
tracked_positions, tracked_velocities = [], []

for k, p in zip(trackers, src_positions):
    pos, vel = k.update(p)
    tracked_positions.append(pos)
    tracked_velocities.append(vel)

print("\nInitial tracked states:")
for i, (p, v) in enumerate(zip(tracked_positions, tracked_velocities)):
    print(f"  Source {i}: pos={p}, vel={v}")

# ======================
# MVDR BEAMFORMING
# ======================
def steering_vector(freq, mic_pos, src_pos):
    return np.exp(
        -2j * np.pi * freq *
        np.linalg.norm(mic_pos - src_pos, axis=1) / C
    )

def mvdr(X, mic_pos, src_pos):
    Y = np.zeros((X.shape[1], X.shape[2]), dtype=complex)
    for f in range(X.shape[1]):
        R = X[:, f, :] @ X[:, f, :].conj().T
        Rinv = pinv(R)
        a = steering_vector(freqs[f], mic_pos, src_pos)
        w = Rinv @ a / (a.conj().T @ Rinv @ a + 1e-12)
        Y[f] = w.conj().T @ X[:, f, :]
    return Y

separated = []
for i, pos in enumerate(tracked_positions):
    t0 = stage(f"MVDR beamforming source {i}")
    Y = mvdr(X, mic_positions, pos)
    _, y = istft(Y, FS, nperseg=NFFT, noverlap=NFFT - HOP)
    y /= np.max(np.abs(y)) + 1e-12
    separated.append(y)
    done(t0)

# ======================
# SAVE WAVs
# ======================
for i, sig in enumerate(separated):
    sf.write(f"source_{i}.wav", sig, FS)

# ======================
# DOPPLER VELOCITY ESTIMATION
# ======================
def dominant_freq(Z):
    mag = np.abs(Z)
    idx = np.argmax(mag, axis=0)
    return idx * FS / NFFT

f0_est = np.mean(dominant_freq(X[0]))
delta_f = np.array([
    np.mean(dominant_freq(X[i])) - f0_est
    for i in range(MIC_COUNT)
])

A, b = [], []
src = tracked_positions[0]
for i, mic in enumerate(mic_positions):
    r_hat = mic - src
    r_hat /= np.linalg.norm(r_hat)
    A.append(r_hat)
    b.append(delta_f[i] * C / f0_est)

v_est = np.linalg.lstsq(np.array(A), np.array(b), rcond=None)[0]

print("\nEstimated velocity (m/s):", v_est)
print("Tracked positions (m):", tracked_positions)
