In [None]:
import sounddevice as sd

print(sd.query_devices())

In [None]:
#Speak into one mic only this will help identify which mic is which
#LEFT MIC HAS WHITE DOT
#VERIFY MIC IDECIES

import pyaudio
import numpy as np

p = pyaudio.PyAudio()
fs = 44100  # Sample rate
chunk = 1024
seconds = 2  # Short test recording

for mic_index in [1, 2]:  # Replace with actual device indices
    print(f"Testing Microphone {mic_index}. Speak into it now...")
    
    stream = p.open(format=pyaudio.paInt16, channels=1, rate=fs, input=True, input_device_index=mic_index, frames_per_buffer=chunk)
    
    frames = []
    for _ in range(0, int(fs / chunk * seconds)):
        data = stream.read(chunk, exception_on_overflow=False)
        frames.append(np.frombuffer(data, dtype=np.int16))
    
    stream.stop_stream()
    stream.close()

    # Convert to numpy array and print summary
    audio_data = np.concatenate(frames)
    print(f"Microphone {mic_index} Average Amplitude: {np.mean(np.abs(audio_data))}\n")

p.terminate()


In [None]:
#Record sound clip from both mics at the same time

import sounddevice as sd
import soundfile as sf
import numpy as np
import time

# Replace these with the correct indices you found
LEFT_MIC_INDEX = 1
RIGHT_MIC_INDEX = 2

DURATION = 1.0  # seconds
SAMPLERATE = 44100

# Buffer to hold both mic recordings
recordings = {}

# Record from both devices using threads
def record_from_device(index, key):
    recording = sd.rec(int(DURATION * SAMPLERATE), samplerate=SAMPLERATE, channels=1, device=index, dtype='float32')
    sd.wait()
    recordings[key] = recording.flatten()

# Start both recordings nearly simultaneously
from threading import Thread

t1 = Thread(target=record_from_device, args=(LEFT_MIC_INDEX, 'left'))
t2 = Thread(target=record_from_device, args=(RIGHT_MIC_INDEX, 'right'))

start_time = time.time()
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Recording finished. Elapsed: {time.time() - start_time:.3f}s")

# Save to .wav files
sf.write('mic_left.wav', recordings['left'], SAMPLERATE)
sf.write('mic_right.wav', recordings['right'], SAMPLERATE)
print("Saved mic_left.wav and mic_right.wav")


In [None]:
#Determine correction factor
#To do so clap directly between the mics then run this chunk of code
#Manually add correlation factor into the rest of the code (this way it wont needed to be changed every time you run the code)

import numpy as np
import soundfile as sf
from scipy.signal import correlate

# Load recordings
left, sr = sf.read('mic_left.wav')
right, _ = sf.read('mic_right.wav')

# Ensure same length
min_len = min(len(left), len(right))
left = left[:min_len]
right = right[:min_len]

# Normalize both signals
left = (left - np.mean(left)) / np.std(left)
right = (right - np.mean(right)) / np.std(right)

# Cross-correlation to estimate delay
corr = correlate(left, right, mode='full')
lags = np.arange(-len(left) + 1, len(right))
offset = lags[np.argmax(corr)]

# Print the correction factor
print(f"Estimated correction factor (manual_offset_samples) = {offset} samples")

#postive value means left lags behind right
#Negative value means right lags behind left


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pyaudio
import time
import threading

# Audio parameters
fs = 44100
chunk = 1024
record_seconds = 1

# Manual timing correction
manual_offset_samples = 276  # Adjust this based on your setup

# Speed of sound and mic spacing
speed_of_sound = 343.0
mic_distance = 0.37465  # Distance between microphones in meters

# Your actual device indices (update these as needed)
mic_indices = [1, 2]

# Initialize PyAudio
p = pyaudio.PyAudio()
streams = []
for idx in mic_indices:
    stream = p.open(format=pyaudio.paInt16,
                    channels=1,
                    rate=fs,
                    input=True,
                    input_device_index=idx,
                    frames_per_buffer=chunk)
    streams.append(stream)

# Control flag
keep_running = True

def localization_loop():
    global keep_running
    while keep_running:
        print("Recording...")
        data = {i: [] for i in mic_indices}
        for _ in range(0, int(fs / chunk * record_seconds)):
            for i, stream in zip(mic_indices, streams):
                audio_chunk = stream.read(chunk, exception_on_overflow=False)
                data[i].append(np.frombuffer(audio_chunk, dtype=np.int16))

        left = np.concatenate(data[mic_indices[1]])
        right = np.concatenate(data[mic_indices[0]])

        # Apply correction
        if manual_offset_samples > 0:
            left_corr = left[manual_offset_samples:]
            right_corr = right[:len(left_corr)]
        elif manual_offset_samples < 0:
            right_corr = right[-manual_offset_samples:]
            left_corr = left[:len(right_corr)]
        else:
            left_corr = left
            right_corr = right

        min_len = min(len(left_corr), len(right_corr))
        left_corr = left_corr[:min_len]
        right_corr = right_corr[:min_len]

        # Peaks
        left_peak = np.argmax(np.abs(left_corr))
        right_peak = np.argmax(np.abs(right_corr))

        peak_diff_samples = left_peak - right_peak
        time_diff = peak_diff_samples / fs
        distance_diff = speed_of_sound * time_diff

        closer = "Right Mic" if distance_diff > 0 else "Left Mic"

        # Plot
        x = np.arange(min_len)
        plt.figure(figsize=(14, 5))
        plt.plot(x, left_corr, label='Left Mic', alpha=0.5, color='blue')
        plt.plot(x, right_corr, label='Right Mic', alpha=0.5, color='red')
        plt.scatter(left_peak, left_corr[left_peak], color='cyan', label=f"Left Peak ({left_peak})")
        plt.scatter(right_peak, right_corr[right_peak], color='orange', label=f"Right Peak ({right_peak})")
        plt.title(f"{closer} is closer | Δ Distance: {abs(distance_diff):.4f} m")
        plt.xlabel("Sample Index")
        plt.ylabel("Amplitude")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        time.sleep(1)

    print("Localization loop ended.")

# Start the loop in a thread
localization_thread = threading.Thread(target=localization_loop)
localization_thread.start()

# Stop function you can call manually
def stop():
    global keep_running
    keep_running = False
    localization_thread.join()
    for stream in streams:
        stream.stop_stream()
        stream.close()
    p.terminate()
    print("All audio streams closed and thread stopped.")


In [None]:
stop()