In [None]:
import numpy as np
from scipy import signal
from scipy.io import wavfile
import matplotlib.pyplot as plt
from multiprocessing import Pool

def ACF(f, W, t, lag):
    # Vectorized ACF computation
    valid_length = min(W, len(f) - t, len(f) - (lag + t))
    return np.sum(f[t:t + valid_length] * f[lag + t:lag + t + valid_length])

def DF(f, W, t, max_lag):
    # Compute all lag values in one go
    df = np.zeros(max_lag)
    base = ACF(f, W, t, 0)
    for lag in range(max_lag):
        cross = ACF(f, W, t, lag)
        df[lag] = base + ACF(f, W, t + lag, 0) - 2 * cross
    return df

def CMNDF(df):
    # Cumulative mean normalized difference function
    cmndf = np.zeros_like(df)
    cumulative_sum = np.cumsum(df[1:])
    cmndf[1:] = df[1:] * np.arange(1, len(df)) / cumulative_sum
    cmndf[0] = 1
    return cmndf

def detect_pitch(f, W, t, sample_rate, bounds, thresh=0.1):
    max_lag = bounds[1]
    df = DF(f, W, t, max_lag)
    cmndf = CMNDF(df)
    for i in range(bounds[0], bounds[1]):
        if cmndf[i] < thresh:
            return sample_rate / i
    return sample_rate / (np.argmin(cmndf[bounds[0]:bounds[1]]) + bounds[0])

def main():
    sample_rate, f = wavfile.read('test_audio/male-C_major.wav')

    # Cut off after 1 second
    f = f[:sample_rate]

    if len(f.shape) > 1:
        f = np.mean(f, axis=1) # Convert stereo to mono

    data = f.astype(np.float64)
    window_size = int(5 / 2000 * 44100)
    bounds = [20, 2000]
    
    pitches = []
    num_windows = data.shape[0] // (window_size + 3)
    print(f"{num_windows} windows")
    for i in range(num_windows):
        t = i * window_size
        pitches.append(
            detect_pitch(data, window_size, t, sample_rate, bounds)
        )
    plt.scatter(range(len(pitches)), pitches)
    plt.xlabel("Window Index")
    plt.ylabel("Pitch (Hz)")
    plt.title("Detected Pitch Over Time")
    plt.show()

main()


6280 windows


Process SpawnPoolWorker-2:
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/miniconda3/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/miniconda3/lib/python3.12/multiprocessing/pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "/opt/miniconda3/lib/python3.12/multiprocessing/queues.py", line 389, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'detect_pitch' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Process SpawnPoolWorker-6:
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/miniconda3/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
 