### Dump flash

In [None]:
import serial

SERIAL_PORT = '/dev/cu.usbmodem0010577524853'  # DK-1
# SERIAL_PORT = '/dev/cu.usbmodem0010577286653'  # DK-3
# SERIAL_PORT = '/dev/cu.usbmodem0010577391403'  # DK-4
BAUDRATE = 1000000

with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) as ser, open('raw.txt', 'w') as f:

    end_str = ""

    while True:
        bytes_to_read = ser.in_waiting

        if bytes_to_read > 0:
            data = ser.read(bytes_to_read)

            try:
                data_str = data.decode('utf-8')
            except UnicodeDecodeError:
                continue

            if data_str:
                end_str = (end_str + data_str)[-50:].replace('\n', '')
                f.write(data_str)

            if "Dumping flash finished." in end_str:
                print("Done writing to raw.txt.")
                break

### Convert to .wav and readable .txt files

In [None]:
import re
import struct
import wave

SAMPLE_RATE = 8000
CHANNELS = 1
SAMPLE_WIDTH = 3
OUTPUT_NAME = "output"

with open("raw.txt", "r") as f:
    raw = f.read()

match = re.search(r'Dumping flash...\s*(.*?)\s*Dumping flash finished.', raw, re.DOTALL)

if match:
    byte_data = bytes.fromhex(match.group(1).replace('\n', ''))
    wav_data = b''

    with open(f"{OUTPUT_NAME}.txt", "w") as f:
        for block in range(len(byte_data) // 4096):
            local_ts = struct.unpack('<Q', byte_data[block * 4096: block * 4096 + 8])[0]
            central_ts = struct.unpack('<Q', byte_data[block * 4096 + 8: block * 4096 + 16])[0]
            f.write("local: " + str(local_ts) + " central: " + str(central_ts) + "\n")

            for i in range(1020):
                sample = struct.unpack('<i', byte_data[block * 4096 + 16 + 4 * i: block * 4096 + 16 + 4 * i + 4])[0] >> 8
                f.write(str(sample) + "\n")
                wav_data += struct.pack('<i', sample)[:3]

    with wave.open(f"{OUTPUT_NAME}.wav", 'wb') as wav_file:
        wav_file.setnchannels(CHANNELS)
        wav_file.setsampwidth(SAMPLE_WIDTH)
        wav_file.setframerate(SAMPLE_RATE)
        wav_file.writeframes(wav_data)

    print(f"Done writing to {OUTPUT_NAME}.txt and {OUTPUT_NAME}.wav from {len(byte_data) / (1024 * 1024)} MB of data, resulting in {(len(byte_data) // 4096) * 1020} samples.")

else:
    print("No flash dump found")

### Initialize microphone objects and calculate timestamps

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import correlate, correlation_lags

%matplotlib widget


class Mic:
    def __init__(self, pos, file):
        self.pos = np.array(pos)

        local_times = []
        central_times = []
        samples = []

        with open(file, 'r') as f:
            for line in f:
                if line.startswith('local:'):
                    local_times.append(int(line.split()[-3]))
                    central_times.append(int(line.split()[-1]))
                else:
                    try:
                        sample = int(line.strip())
                    except ValueError:
                        sample = 0
                    samples.append(sample)

        offsets = np.array(local_times) - np.array(central_times)

        # with synchronization
        timestamps = np.interp(np.arange(0, len(samples)), np.arange(0, len(samples), 1020), central_times)

        # without synchronization 
        # timestamps = np.interp(np.arange(0, len(samples)), np.arange(0, len(samples), 1020), local_times) - offsets[0]

        print(f"Calculated {len(timestamps)} data points for {file}.")
        if np.std(np.diff(local_times)) > 1:
            print("\033[31mWARNING: Timestamps vary more than expected. Data collection was likely flawed.\033[0m")

        print(f"Drift: {offsets[-1] - offsets[0]} us -> {(offsets[-1] - offsets[0]) * 1e6 / (central_times[-1] - central_times[0])} PPM\n")

        self.timestamps = timestamps
        self.data = np.column_stack([timestamps * 1e-6, samples])

    def get_sliced_data(self, t1, t2):
        start_index = np.argmax(self.data[:, 0] > t1)
        end_index = np.argmax(self.data[:, 0] > t2)
        sliced_data = self.data[start_index: end_index, :]
        return sliced_data

    @staticmethod
    def calculate_measured_tdoa(mic1, mic2, source, visualize=False):
        sd1 = mic1.get_sliced_data(source.t1, source.t2)
        sd2 = mic2.get_sliced_data(source.t1, source.t2)
        t1, s1 = sd1[:, 0], sd1[:, 1]
        t2, s2 = sd2[:, 0], sd2[:, 1]

        corr = correlate(s1, s2, mode='full')
        lags = correlation_lags(len(s1), len(s2), mode='full')
        lag = lags[np.argmax(corr)]

        if lag >= 0:
            tdoa = t2[0] - t1[lag]
        else:
            tdoa = t2[-lag] - t1[0]

        if visualize:
            fig, ax = plt.subplots(figsize=(10, 2.5))
            ax.plot(t1, s1, linewidth=0.2)
            ax.plot(t2, s2, linewidth=1)
            ax.plot(t1 + tdoa, s1, linewidth=1, c='tab:blue')
            plt.show()

        limit = (np.linalg.norm(mic1.pos - mic2.pos) - 0.00001) / 343
        return max(min(tdoa, limit), -limit)


class Source:
    def __init__(self, pos, t1, t2):
        self.pos = np.array(pos)
        self.t1 = t1
        self.t2 = t2


mics_small = [
    Mic([-0.5, 0.0], "small_mic0.txt"),
    Mic([0.5, 0.0], "small_mic1.txt"), 
]
mics_linear= [
    Mic([-1.0, 0.0], "linear_mic0.txt"),
    Mic([0.0, 0.0], "linear_mic1.txt"), 
    Mic([1.0, 0.0], "linear_mic2.txt"), 
]

### Plot timestamped microphone data

In [None]:
def plot_mics(mics):
    fig, axes = plt.subplots(len(mics), 1, figsize=(15, 2 * len(mics) + 2), sharex=True, sharey=True)
    for i in range(len(mics)):
        ax = axes[i] if len(mics) > 1 else axes
        ax.plot(mics[i].data[:, 0], mics[i].data[:, 1], linewidth=0.2)
    fig.supxlabel("Time (s)")


plot_mics(mics_small)
plot_mics(mics_linear)

### Plot direction estimations (2 mics)

In [None]:
def plot_direction(mics, sources, title="", visualize=False):
    fig, ax = plt.subplots(figsize=(6, 6))

    mic_positions = np.array([mic.pos for mic in mics])
    source_positions = np.array([source.pos for source in sources])

    ax.scatter(source_positions[:, 0], source_positions[:, 1], c='red', label='Source')
    ax.scatter(mic_positions[:, 0], mic_positions[:, 1], c='blue', label='Microphones')

    x_vals = np.linspace(-4.5, 4.5, 1000)
    y_vals = np.linspace(-0.5, 10.5, 1000)
    X, Y = np.meshgrid(x_vals, y_vals)

    for source in sources:
        actual_delta_d = Mic.calculate_measured_tdoa(mics[0], mics[1], source, visualize=visualize) * 343

        d1s = np.sqrt((X - mics[0].pos[0])**2 + (Y - mics[0].pos[1])**2)
        d2s = np.sqrt((X - mics[1].pos[0])**2 + (Y - mics[1].pos[1])**2)
        delta_ds = d2s - d1s

        Z = delta_ds - actual_delta_d
        ax.contour(X, Y, Z, levels=[0], colors='darkseagreen', linewidths=1.5, linestyles='dashed')

    ax.set_title(title)
    ax.set_xlabel('X Position (m)')
    ax.set_ylabel('Y Position (m)')
    ax.legend()
    ax.set_aspect('equal')
    ax.set_axisbelow(True)
    ax.grid(True)
    

sources = [
    Source([-4.0, 4.0], 24.112, 24.122),
    Source([-2.0, 4.0], 27.024, 27.034),
    Source([0.0, 4.0], 30.116, 30.126),
    Source([2.0, 4.0], 33.133, 33.143),
    Source([4.0, 4.0], 36.078, 36.088),
]
plot_direction(mics_small, sources, "Two Microphones", visualize=True)

### Plot location estimations (3 mics)

In [None]:
def plot_location(mics, sources, title="", visualize=False):
    fig, ax = plt.subplots(figsize=(12, 9))

    mic_positions = np.array([mic.pos for mic in mics])
    source_positions = np.array([source.pos for source in sources])

    ax.scatter(source_positions[:, 0], source_positions[:, 1], c='red', label='Source')
    ax.scatter(mic_positions[:, 0], mic_positions[:, 1], c='blue', label='Microphones')
    
    x_vals = np.linspace(-6.5, 6.5, 1000)
    y_vals = np.linspace(-0.5, 12.5, 1000)
    X, Y = np.meshgrid(x_vals, y_vals)

    errors = []

    for source in sources:
        total_error = np.zeros_like(X)

        for i, j in [(0, 1), (0, 2), (1, 2)]:
            mic1, mic2 = mics[i], mics[j]
            actual_delta_d = Mic.calculate_measured_tdoa(mic1, mic2, source, visualize=visualize) * 343

            d1s = np.sqrt((X - mic1.pos[0])**2 + (Y - mic1.pos[1])**2)
            d2s = np.sqrt((X - mic2.pos[0])**2 + (Y - mic2.pos[1])**2)
            delta_ds = d2s - d1s

            Z = delta_ds - actual_delta_d
            ax.contour(X, Y, Z, levels=[0], colors='gray', linewidths=0.5, linestyles='dashed', zorder=0)

            total_error += Z**2

        masked_error = np.ma.masked_where(Y < 0, total_error)
        min_idx = np.unravel_index(np.argmin(masked_error), total_error.shape)
        x_est = X[min_idx]
        y_est = Y[min_idx]
        errors.append(np.linalg.norm(source.pos - np.array([x_est, y_est])))

        ax.plot(x_est, y_est, 'gx', markersize=8, markeredgewidth=2, label='Estimated Source' if source == sources[0] else "")

    ax.set_title(title)
    ax.set_xlabel('X Position (m)')
    ax.set_ylabel('Y Position (m)')
    ax.legend()
    ax.set_aspect('equal')
    ax.set_axisbelow(True)
    ax.grid(True)

    print("Average error:", np.average(errors), "m")
    print("Maximum error:", np.max(errors), "m")


sources = [
    Source([-4.0, 4.0], 44.585, 44.605),
    Source([-2.0, 4.0], 46.555, 46.575),
    Source([0.0, 4.0], 48.610, 48.630),
    Source([2.0, 4.0], 50.638, 50.658),
    Source([4.0, 4.0], 52.590, 52.610),
]
plot_location(mics_linear, sources, "Linear Array", visualize=False)