In [None]:
!pip3 install matplotlib --break-system-packages

In [None]:
import os
import time

import matplotlib.pyplot as plt
import numpy as np
import scipy.io.wavfile as wav
import scipy.signal

from IPython.display import Audio


# Utility functions

In [None]:
def lin_to_db(l):
    return 20 * np.log10(l)

def db_to_lin(d):
    return np.pow(10.0, d / 20.0)

In [None]:
print(np.reshape(np.tile(np.array([[1,2,3,4],[5,6,7,8]])[:,:, np.newaxis], [1, 1, 2]), shape=(2, 8)))

# I think replicate_last is not currently used.

def replicate_last(x, num_reps):
    """Replicate the last dimension of an array num_reps times."""
    shape_in = x.shape
    shape_out = np.array(shape_in)
    shape_out[-1] *= num_reps
    return np.reshape(np.tile(x[..., np.newaxis], list(np.ones_like(shape_in)) + [num_reps]), shape=shape_out)

print(replicate_last(np.array([[1,2,3,4],[5,6,7,8]]), 5))

In [None]:
w = np.hanning(256)
x = np.random.rand(1000)
print(np.convolve(w, x, 'same').shape)
print(scipy.signal.convolve(x, w, 'same').shape)
# "same" means "same as 1st arg" for scipy, but "same as longer arg" for numpy.
#convolve = np.convolve
convolve = scipy.signal.convolve

In [None]:
def old_slinterp(x, interp, truncate=False):
    """Linear interpolation of vector.  Ramps down to implicit zero at end unless truncate."""
    x_shape = list(x.shape)
    x_out_shape = list(x.shape)
    x_out_shape[-1] *= interp
    x_in_flat = np.reshape(x, (np.prod([1] + x_shape[:-1]), x_shape[-1]))
    n_rows, n_cols = x_in_flat.shape
    x_out_flat = np.zeros((n_rows, interp * n_cols))
    interpolation_window = np.hstack([np.linspace(0, 1, 1 + interp)[1:], np.linspace(1, 0, 1 + interp)[1:]])
    for i in range(x_in_flat.shape[0]):
        x_atrous = np.reshape(np.hstack([x_in_flat[[i], :].T, np.zeros((n_cols, interp - 1))]), (n_cols * interp,))
        x_out_flat[i] = convolve(x_atrous, interpolation_window, 'same')
    if truncate:
        if interp > 1:  # Skip for degenerate interp == 1; indexing by [:, :-(1 - 1)] deletes the entire array.
            x_out_flat = x_out_flat[:, :-(interp - 1)]
        x_out_shape[-1] -= (interp - 1)
    #print(f'{x_out_flat.shape=}, {x_out_shape=}')
    return np.reshape(x_out_flat, x_out_shape)

def slinterp(x, interp, truncate=False):
    """Linear interpolation of vector.  Ramps down to implicit zero at end unless truncate. CONV-FREE VERSION."""
    x_shape = list(x.shape)
    x_out_shape = list(x.shape)
    x_out_shape[-1] *= interp
    x_in_flat = np.reshape(x, (np.prod([1] + x_shape[:-1]), x_shape[-1]))
    n_rows, n_cols = x_in_flat.shape
    x_out_flat = np.zeros((n_rows, interp * (n_cols + 1)))
    interpolation_window = np.expand_dims(np.hstack([np.linspace(0, 1, 1 + interp)[1:], np.linspace(1, 0, 1 + interp)[1:]]), 0)
    for i in range(x_in_flat.shape[-1]):
        #x_atrous = np.reshape(np.hstack([x_in_flat[[i], :].T, np.zeros((n_cols, interp - 1))]), (n_cols * interp,))
        #x_out_flat[i] = convolve(x_atrous, interpolation_window, 'same')
        x_out_flat[..., i * interp : (i + 2) * interp] += x_in_flat[..., i] * interpolation_window
    x_out_flat = x_out_flat[..., (interp - 1):-1]
    if truncate:
        if interp > 1:  # Skip for degenerate interp == 1; indexing by [:, :-(1 - 1)] deletes the entire array.
            x_out_flat = x_out_flat[:, :-(interp - 1)]
        x_out_shape[-1] -= (interp - 1)
    #print(f'{x_out_flat.shape=}, {x_out_shape=}')
    return np.reshape(x_out_flat, x_out_shape)

print(slinterp(np.arange(4), 4, truncate=True).shape)
print(slinterp(np.arange(4), 4, truncate=False).shape)
print(slinterp(np.arange(4), 1, truncate=True).shape)
print(slinterp(np.arange(4), 1, truncate=False).shape)

plt.plot(np.arange(25), slinterp(np.array([[1, 1, 0, 2, 2]]), 5).T, '.',
         np.arange(21), slinterp(np.array([[1, 1, 0, 2, 2]]), 5, truncate=True).T, '.')
plt.grid()

In [None]:
from scipy import fft

def spectrogram(x, fft_len=4096, win_len=None, hop_len=None, window_fn=np.hanning, sr=1, f_max=None, title=None):
    if win_len is None:
        win_len = fft_len
    if hop_len is None:
        hop_len = win_len // 2
    # Window.
    prepad_len = (fft_len - win_len) // 2
    win = np.hstack([np.zeros(prepad_len), window_fn(win_len), np.zeros(fft_len - win_len - prepad_len)])
    # Frame.
    frame_indices = np.arange(0, len(x) - win_len, hop_len)[:, np.newaxis] + np.arange(win_len)[np.newaxis, :]
    x_chunks_windowed = x[frame_indices] * win[np.newaxis, :]
    # Transform.
    stft_mag_db = 20 * np.log10(np.abs(fft.fft(x_chunks_windowed, n=fft_len)[:, :(fft_len // 2 + 1)]))
    num_frames, num_bins = stft_mag_db.shape
    t_base = np.arange(num_frames) * hop_len / sr
    f_base = np.arange(num_bins) * sr / fft_len
    plt.imshow(stft_mag_db.T, aspect='auto', origin='lower', extent=[t_base[0], t_base[-1], f_base[0], f_base[-1]])
    plt.clim(np.max(stft_mag_db) + [-80, 0])
    #plt.ylim([0, f_max / sr * fft_len])
    if f_max:
        plt.ylim([0, f_max])
    #x_times = np.arange(0, len(x) / sr, 0.25)
    #y_freqs = np.arange(0, 10000, 1000)
    #plt.xticks(x_times * sr / hop_len, x_times)
    #plt.yticks(y_freqs / sr * fft_len, y_freqs)
    plt.colorbar()
    if title:
        plt.title(title)


In [None]:
def synth_partial(mag, freq, frame_rate, sr):
    """Reconstruct audio waveform at sr from mag and freq 1-row arrays at frame_rate."""
    # Linear interpolation of both amplitude and frequency.
    samps_per_frame = int(round(sr / frame_rate))
    num_partials, num_frames = mag.shape
    output_samples = num_frames * samps_per_frame
    t = np.arange(output_samples) / sr
    dt = 1 / sr
    phase = np.cumsum(slinterp(2 * np.pi * freq * dt, samps_per_frame))
    carrier = np.cos(phase)
    return slinterp(mag, samps_per_frame) * carrier

x = synth_partial(np.array([[1, 2, 3, 2]]), np.array([[40, 80, 120, 20]]), 10, 1000)
plt.figure(figsize=(18, 4))
plt.plot(x.T)
plt.grid()
print(x.shape)

In [None]:
# Synthesize from (possibly downsampled) mag and freq contours.

def synth_harmonic(freq, mag, interp_factor=1, sample_rate=44100, mag_is_db=True, truncate=True):
    """Convert a single pair of frequency and magnitude vectors to an audio waveform."""
    # As a special case, freq can be a scalar indicating a constant frequency.
    carrier = None
    if isinstance(freq, (list, np.ndarray)):
        if len(freq) == 1:
            freq = freq[0]
        else:
            assert len(freq) == len(mag)
            # Ignore the final freq value, which tends to go crazy.  Replicate the preceding one.
            #freq[-1] = freq[-2]
            if interp_factor > 1:
                freq = slinterp(freq, interp_factor, truncate=truncate)
            carrier = np.cos(np.cumsum(2 * np.pi / sample_rate * freq))
    if carrier is None:
        # freq must be a scalar
        #freq = freq * np.ones(1 + (len(mag) - 1) * interp_factor)
        carrier = np.cos(2 * np.pi * freq / sample_rate * np.arange(truncate + (len(mag) - truncate) * interp_factor))

    # Can now expand mag to match.
    if interp_factor > 1:
        mag = slinterp(mag, interp_factor, truncate=truncate)
    # freq is cycles per sec.  We want radians per sample
    if mag_is_db:
        mag = db_to_lin(mag)
    return mag * carrier

x1 = synth_harmonic(40, np.array([1, 2, 3, 2]), 100, 1000, truncate=False, mag_is_db=False)
x2 = synth_harmonic(np.array([40, 80, 120, 20]), np.array([1, 2, 3, 2]), 100, 1000, truncate=False, mag_is_db=False)
plt.figure(figsize=(18, 4))
plt.plot(x2.T)
plt.plot(x.T)
plt.plot(x1.T)
plt.grid()
print(x2.shape)

def synth_harmonics(freqs, mags, interp_factor=1, sample_rate=44100):
    """Convert arrays where each row is a freq or mag vector to a waveform."""
    n_harms, n_frames = mags.shape
    n_samps = 1 + interp_factor * (n_frames - 1)
    result = np.zeros(n_samps)
    for freq, mag in zip(freqs, mags):
        result += synth_harmonic(freq, mag, interp_factor, sample_rate)
    return result
    

In [None]:
# Helpers from piano-partials.ipynb

def lin_fit(x):
    """Least-squares linear fit to points."""
    # xhat = a + b.index
    # err = x[i] - (a + b.i)
    # err^2 = x[i]^2 - 2 x[i] (a + b.i) + (a^2 + 2 a b i +b^2 i^2)
    # Define axes around midpoint.
    lenx = len(x)
    if not lenx:
        return []
    index = np.arange(lenx) - (lenx - 1) / 2
    # Make x zero mean
    a = np.mean(x)
    x_z = x - a
    b = 0
    if lenx > 1:
        # Linear fit is normalized inner product
        b = np.sum(x_z * index) / np.sum(index * index)
    return a + b * index

def expand_params(params):
    """Convert [val, npts, val, npts, val] sequence to points."""
    val0 = params[0]
    outpts = [[val0]]
    npts_val_pairs = np.vstack([params[1::2], params[2::2]])
    for npts, val in npts_val_pairs.transpose():
        outpts.append(np.linspace(val0, val, int(npts + 1))[1:])
        val0 = val
    return np.concatenate(outpts)

plt.plot(expand_params([0, 1, 1, 2, 0, 4, 1]), '.-')


# Read waveforms

In [None]:
def wavread(filename):
  """Read in audio data from a wav file.  Return d, sr."""
  # Read in wav file.
  file_handle = open(filename, 'rb')
  samplerate, wave_data = wav.read(file_handle)
  # Normalize short ints to floats in range [-1..1).
  data = np.asarray(wave_data, dtype=np.float32) / 32768.0
  return data, samplerate

def wavwrite(data, samplerate, filename):
  """Write a waveform to a WAV file."""
  wav.write(filename, samplerate, (32768.0 * data).astype(np.int16))

def read_and_trim(filename, duration=2.0, rel_threshold=0.005, abs_threshold=0, sr=44100, channel=None, do_plot=False, pre_time=0.010):
    d, file_sr = wavread(filename)
    #print("d.shape=", d.shape)
    if len(d.shape) > 1:  # Stereo file
        if channel is None:
            d = np.mean(d, axis=-1)
        else:
            d = d[:, channel]
    assert file_sr == sr
    d = d - np.mean(d)
    threshold = np.maximum(np.max(np.abs(d)) * rel_threshold, abs_threshold)
    # Tight window at start?
    drop_initial_samps = np.maximum(0, -int(round(pre_time * sr)) + np.min(np.flatnonzero(np.abs(d) > threshold)))
    d_return = d[int(round(drop_initial_samps)) + np.arange(int(round(duration * sr)))]
    if do_plot:
        t = np.arange(len(d)) / sr
        plt.figure(figsize=(12, 4))
        plt.subplot(121)
        plt.plot(t[:50000], d[:50000])
        plt.ylim(threshold * np.array([-1, 1]))
        plt.subplot(122)
        plt.plot(t[:len(d_return)], d_return)
        plt.xlim([0, 0.1])
        print("drop initial", drop_initial_samps / sr)
    return d_return, sr

def filename_for_note(note, octave, strike='ff', directory=None):
    filename = '.'.join(['Piano', strike, note + str(octave), 'wav'])
    if directory:
        filename = os.path.join(directory, filename)
    return filename

#filename = 'Piano.ff.D4.wav'
directory = '/Users/dpwe/Downloads/uiowa-piano'
#note = 'C'
#octave = 3
note = 'Ab'
octave = 1
strike = 'pp'  # ['pp', 'mf', 'ff']:
filename = filename_for_note(note, octave, strike, directory)
print(filename)
waveform, sr = read_and_trim(filename, channel=0, duration=5.0, rel_threshold=0.015, abs_threshold=0.0007, do_plot=True, pre_time=0.002)

Audio(data=waveform.T, rate=sr)

# Fundamental estimation from autocorrelation

In [None]:
def freq_from_autoco(d, sr=44100, max_duration=1.0, drop_initial_sec=0.05, do_plot=False, prop_max_thresh=0.75):
    # Estimate fundamental by autocorrelation.
    min_period = int(round(sr / 4000.0))
    max_period = int(round(sr / 20.0))
    drop_initial_samples = int(round(drop_initial_sec * sr))
    d = d[drop_initial_samples : drop_initial_samples + int(round(sr * max_duration))]
    #acr = np.correlate(d, d, 'full')[len(d) - 1:]
    # Using FFT makes it 1000x faster, seconds to milliseconds.  Identical result.
    acr = np.real(np.fft.ifft(np.abs(np.fft.fft(d, len(d) * 2)**2)))[:len(d)]
    acr = acr[:10 * max_period]
    # Find Nth peak, divide by N
    maxima = scipy.signal.argrelextrema(acr, np.greater)[0]
    # Keep only peaks at least 75% of largest peak below 4kHz.
    maxima = maxima[np.flatnonzero(maxima >= min_period)]
    max_acr_peak = np.max(acr[maxima])
    maxima = maxima[np.flatnonzero(acr[maxima] > prop_max_thresh * max_acr_peak)]
    npeaks = np.minimum(30, len(maxima))
    # Assume first peak corresponds to fundamental, keep peaks that are close to integer multiples.
    est_period = maxima[0]
    #print(sr / est_period)
    multiples = maxima / est_period
    #print(multiples)
    #print(np.abs((multiples - np.round(multiples)) / multiples))
    # Max 2% relative error
    good_multiples = np.flatnonzero(np.abs((multiples - np.round(multiples)) / multiples) < 0.02)
    if do_plot:
        plt.plot(acr)
        plt.plot(maxima[:npeaks], acr[maxima[:npeaks]], 'or')
        plt.plot([0, len(acr)], prop_max_thresh * max_acr_peak * np.array([1, 1]), '--')
        plt.plot(maxima[good_multiples], acr[maxima[good_multiples]], 'ob')
        plt.xlim([0, 10*est_period])
    #freqs = sr * np.arange(1, npeaks + 1) / maxima[:npeaks]
    freqs = (sr / (maxima / np.round(multiples)))[good_multiples]
    #print(freqs)
    freq = np.mean(freqs[-40:])
    return freq


%time freq = freq_from_autoco(waveform[8000:32000], do_plot=True, prop_max_thresh=0.81)
print(freq)
# Nominal: D5= 587.3295
# C2 = 65.89466739451501
# Db6 = 1108.73 Hz (we get 1113.37 for ff, 1114.50 for pp)

# Find frequency peaks in long Fourier transform

In [None]:
from scipy import fft

def localmax(x, npts):
    """Return indices of x where value is largest among nearest npts."""
    maxima = []
    padval = np.array([np.min(x) - 1])
    padded_x = np.hstack([padval, x, padval])
    localmaxes = np.flatnonzero(np.logical_and(padded_x[1:-1]> padded_x[:-2], padded_x[1:-1] >= padded_x[2:]))
    #for i in np.arange(1, len(x)):
    for i in localmaxes:  
        locality = x[np.maximum(0, i - npts // 2) : np.minimum(len(x), i + npts // 2)]
        if x[i] == np.max(locality) and x[i] > x[i - 1]:
            maxima.append(i)
    return np.array(maxima)

np.random.seed(57730)
print(localmax(np.random.rand(32), npts=6))


def peak_freqs(x, n_fft=65536, f_max=10000.0, depth_db=20.0, local_smooth_points=2047, local_max_points=100, sr=44100, est_f0=None, do_plot=False):
    """Return frequencies of local maxima from a long FFT."""
    # Provided f0 estimate is solely used to set the exclusion window for local max picking.
    if est_f0:
        # Use expected f_0 to set local_max_points.
        f0_in_bins = est_f0 / (sr / n_fft)
        local_max_points = int(round(f0_in_bins / 1))
        if do_plot:
            print("local_max_points=", local_max_points)
    long_fft = fft.fft(x[:n_fft])
    long_fft = long_fft[:(n_fft // 2)]
    freqs = np.arange(n_fft // 2) * sr / n_fft
    n_bins = np.sum(freqs < f_max)
    X_spec = 20 * np.log10(np.abs(long_fft[:n_bins]))
    # Local average
    smoo_X_spec = convolve(X_spec, np.ones(local_smooth_points) / local_smooth_points, 'same')
    unsmoo_X_spec = X_spec - smoo_X_spec

    #peaks = scipy.signal.argrelextrema(np.maximum(np.max(unsmoo_X_spec) - 30, unsmoo_X_spec), np.greater)[0]
    peaks = localmax(np.maximum(np.max(unsmoo_X_spec) - depth_db, unsmoo_X_spec), local_max_points)

    # Remove peaks more than 1 octave below expected F0.
    peaks = peaks[peaks > f0_in_bins / 2.5]

    if do_plot:
        plt.figure(figsize=(20, 6))
        plt.plot(freqs[:n_bins], unsmoo_X_spec)
        plt.plot(freqs[:n_bins], smoo_X_spec)
        plt.plot(freqs[peaks], unsmoo_X_spec[peaks], 'or')
        plt.xlim((0, 4000))
    
    return peaks * sr / n_fft

est_f0 = freq_from_autoco(waveform, drop_initial_sec=0.1)
print('f0_from_autoco=', est_f0)
#pk_frqs = peak_freqs(waveform[8000:], n_fft=32768, depth_db=30, do_plot=True, est_f0=est_f0)

pk_frqs = peak_freqs(waveform[2000:80000], n_fft=32768, depth_db=40, do_plot=True, est_f0=est_f0)

plt.grid()
plt.xlabel('freq / Hz')
plt.ylabel('level / dB')
plt.title('Spectral peaks for ' + os.path.basename(filename))

print(pk_frqs[:6])
print(pk_frqs[:6] / est_f0)



# Inharmonicity estimation

In [None]:
# Manual search for lowest-error f0 and beta
# Error is (measured_freq / pred_harm_freq) - 1.
avg_freqs = pk_frqs
hn = 1 + np.arange(len(avg_freqs))
beta = 0.0003128
f0 = est_f0 # 262.2118
print(f0, avg_freqs)
prop_err = avg_freqs / (f0 * hn * np.sqrt(1 + beta * hn * hn)) - 1
#plt.plot(hn, avg_freqs / hn, hn, f0 * np.sqrt(1 + beta * hn * hn))
plt.plot(hn, prop_err)
print("rms prop err =", np.sqrt(np.mean(prop_err ** 2)))
f0= 253.68557562939364
beta= 0.0008292878905710716
#plt.plot(hn, f0 * np.sqrt(1 + beta * hn * hn))

In [None]:
def est_inharmonicity(freqs, f0=None, beta=0, tolerance=0.2, verbose=False):
    """Estimate an inharmonicity beta for a series of harmonic frequencies."""
    freq_order = np.argsort(freqs)
    if not f0:
        f0 = freqs[freq_order[0]]
    # Find harmonic numbers sequentially.
    h_nums = np.zeros_like(freqs, dtype=int)
    valid = np.zeros_like(freqs, dtype=bool)
    f0s = np.zeros_like(freqs)
    freq_errs = np.zeros_like(freqs)
    last_freq = 0
    last_hnum = 0
    # Try every harmonic up to the largest candidate.
    last_harm_freq = 0
    for cand_hnum in 1 + np.arange(np.ceil(np.max(freqs) / f0)):
        stretched_df = f0 * np.sqrt(1 + beta * cand_hnum * cand_hnum)
        expected_freq = last_harm_freq + stretched_df
        # Find actual freq closest to this.
        best_freq_index = np.argmin(np.abs(freqs - expected_freq))
        found_freq = freqs[best_freq_index]
        delta_freq = expected_freq - found_freq
        if np.abs(delta_freq / f0) > 0.1:
            if verbose:
                print("Closest harmonic to %d (%.1f) is %.1f Hz - reject" % (cand_hnum, expected_freq, freqs[best_freq_index]))
            last_harm_freq = expected_freq
        else:
            if verbose:
                print("Found harmonic %d expected %.1f Hz got %.1f Hz" % (cand_hnum, expected_freq, found_freq))
            last_harm_freq = found_freq
            h_nums[best_freq_index] = cand_hnum
            valid[best_freq_index] = True
    if verbose and np.sum(valid == False):
        print(np.sum(valid == False), "values invalid");
    #print("h_nums=", h_nums)
    #print("freq_errs=", freq_err)
    # We expect (f / h.f0) = sqrt(1 + beta h^2)
    betas = (((freqs / (f0 * h_nums + (h_nums==0))) ** 2) - 1) / (h_nums ** 2 + (h_nums == 0))
    #print("betas=", betas)
    weights = h_nums * h_nums * valid * (betas > 0) # Increase weighting in proportion to harmonic num for valid harmonics
    #print("weights=", weights)
    # Weights can be all zero if all the calculated betas are <= 0.
    if np.sum(weights):
        beta = np.sum(weights * betas) / np.sum(weights)
    # Now figure implied f0s
    valid_ixs = np.flatnonzero(valid)
    f0s = freqs / (h_nums * np.sqrt(1 + beta * (h_nums ** 2)) + (h_nums==0))
    #print("f0s=", f0s)
    weights = valid / ((h_nums ** 2) + (valid == False))  # Downweight high harmonics for f0.
    if np.sum(weights):
        f0 = np.sum(weights * f0s) / np.sum(weights)
    #f0 = np.median(f0s[valid])

    if verbose:
      plt.subplot(211)
      plt.plot(h_nums, betas, '.')
      plt.subplot(212)
      plt.plot(h_nums, f0s, '.')
    
    return f0, beta, h_nums * valid
    
    # We are fitting freqs[h_num] = f0 * h_num * sqrt(1 + beta * (h_num ^ 2))
    # Given beta, what is the (non-integral) h such that 
    # freq = f0 * h * sqrt(1 + beta * h^2)
    # => freq/f0 = h sqrt(1 + beta h^2)
    # => (f/f0)^2 = h^2 (1 + beta h^2)   => (f/(h.f0))^2 = 1 + beta h^2 => beta = ((f/(h.f0))^2 - 1) / h^2
    # => (f/f0)^2 = h^2 + beta h^4
    # => beta h^4 + h^2 - (f/f0)^2 = 0
    # => h^2 = (-1 +/- sqrt(1 - 4 * beta * (-(f/f0)^2))) / (2 * beta)
    # => h = sqrt( 1/(2 beta) * ( - 1))
    # => freq^2 = f0^2 * h^2 * (1 + beta h^2) => beta h^4  + h^2 - (freq / f0)^2 = 0
    # => h^2 = [-1 +/- sqrt(1 + 4 beta ((freq/f0) ^ 2) )] / 2 beta
    # => h = sqrt( 1/(2 beta) sqrt(1 + 4 beta ((freq/f0) ^ 2)) )
    #h_nums = np.sqrt( (1 / (2 * beta)) * (np.sqrt(1 + 4 * beta * ((freqs / f0) ** 2)) - 1))
    #h_nums_q = np.round(h_nums)
    #valid_wts = (np.abs(h_nums - h_nums_q) < 0.2) * (0 + np.arange(len(h_nums)))
    #betas = (((freqs / (f0 * h_nums_q)) ** 2) - 1) / (h_nums_q ** 2)
    #new_f0s = freqs / (h_nums_q * np.sqrt(1 + beta * (h_nums_q ** 2)))

est_f0 = freq_from_autoco(waveform, sr, drop_initial_sec=0.1)
# Trim transient based on fundamental frequency duration
skip_initial_cycles = 10
skip_initial_time = skip_initial_cycles / est_f0
skip_initial_samples = int(round(skip_initial_time *sr))

f0 = est_f0  # avg_freqs[0]
beta = 0
#f0 = 262.2118
#beta = 0.0003128
#freqs = avg_freqs[:25]
#freqs = peak_freqs(d[8000:80000], n_fft=32768, depth_db=32, est_f0=est_f0)[:25]
freqs = peak_freqs(waveform[skip_initial_samples:], n_fft=32768, depth_db=40, est_f0=est_f0)[:25]
print(freqs)
#del freqs[13]
for _ in range(5):
  print(f0, beta)
  f0, beta, h_nums = est_inharmonicity(np.array(freqs), f0=f0, beta=beta, verbose=0)
#for h_num, freq in zip(h_nums, avg_freqs):
#    print(freq, h_num)
print(h_nums)

In [None]:
#f0 = 32.4
#beta = 0.00025
valid = np.flatnonzero(h_nums > 0)
plt.plot(h_nums[valid], freqs[valid] / h_nums[valid], '.')

_ = plt.plot(h_nums, f0 * np.sqrt(1 + beta * h_nums * h_nums), '.')
#print(freqs[12])

plt.legend(['Measured equivalent fundamentals of harmonics', 'Fundamentals for  inharmonicity parameter %f' % beta])
plt.xlabel('harmonic number')
plt.ylabel('fundamental frequency / Hz for ' + os.path.basename(filename))
plt.grid()

# Calculate fundamental and inharmonicity across whole range

In [None]:
def expected_f0_hz(note, octave):
    semis = {'C': 0, 'C#': 1, 'Db': 1, 'D': 2, 'D#': 3, 'Eb': 3, 'E': 4, 'F': 5, 'F#': 6, 'Gb': 6, 'G': 7, 
             'G#': 8, 'Ab': 8, 'A': 9, 'A#': 10, 'Bb': 10, 'B': 11}[note[0].upper() + note[1:]]
    semis += (octave + 1) * 12  # semis is midi note number, C4 = 60.
    return 440 * (2 ** ((semis - 69) / 12))

print(expected_f0_hz('C', 4))
print(expected_f0_hz('A', 4))
print(expected_f0_hz('C', 5))

In [None]:
directory = '/Users/dpwe/Downloads/uiowa-piano'
strike = 'ff'  # ['pp', 'mf', 'ff']:

filenames = []
est_f0s = []
f0s = []
betas = []

for octave in [1, 2, 3, 4, 5, 6]:
  #for note in ['C', 'D', 'E', 'Gb', 'Ab', 'Bb']:
  for note in ['C', 'Db', 'D', 'Eb', 'E', 'F', 'Gb', 'G', 'Ab', 'A', 'Bb', 'B']:
      for strike in ['pp', 'mf', 'ff']:
        filename =  filename_for_note(note, octave, strike)
        #print(filename)
        #d, sr = read_and_trim(os.path.join(directory, filename), channel=0, duration=5.0, 
        #                      rel_threshold=0.010, abs_threshold=0.0001, pre_time=0.01)
        d, sr = read_and_trim(os.path.join(directory, filename), channel=0, duration=5.0, 
                              rel_threshold=0.015, abs_threshold=0.0007, pre_time=0.002)
        drop_initial_samps = 8000  # int(round(drop_initial_sec * sr))
        est_f0 = freq_from_autoco(d[drop_initial_samps : 32000], sr, prop_max_thresh=0.81)
        est_f0s.append(est_f0)
        filenames.append(filename)
        freqs = peak_freqs(d[drop_initial_samps : 80000], n_fft=32768, depth_db=32, est_f0=est_f0)[:25]
        f0 = est_f0
        beta = 0
        for _ in range(10):
          f0, beta, h_nums = est_inharmonicity(np.array(freqs), f0=f0, beta=beta)
        f0s.append(f0)
        betas.append(beta)

plt.plot(1200 * (np.log2(np.array(est_f0s) / (261.626 * 2)) - replicate_last(np.linspace(-4, 2, 72, endpoint=False), 3)), '.')
plt.ylabel('tuning error / cents')
_ = plt.xlabel('key number')

In [None]:
for octave in [1, 2, 3, 4, 5, 6]:
  #for note in ['C', 'D', 'E', 'Gb', 'Ab', 'Bb']:
  for note in ['C', 'Db', 'D', 'Eb', 'E', 'F', 'Gb', 'G', 'Ab', 'A', 'Bb', 'B']:
      for strike in ['pp', 'mf', 'ff']:
        filename =  filename_for_note(note, octave, strike)
        #print(filename)
        d, sr = read_and_trim(os.path.join(directory, filename), channel=0, duration=5.0, 
                              rel_threshold=0.015, abs_threshold=0.0007, pre_time=0.002)
        est_f0 = freq_from_autoco(d[8000:32000], sr, prop_max_thresh=0.81)
        if np.abs(np.log2(est_f0 / expected_f0_hz(note, octave))) > 0.5/12:
            print(filename, 'expected f0 %.1f, observed %.1f (%.1f cents)' % (
                expected_f0_hz(note, octave), est_f0, 1200 * np.log2(est_f0 / expected_f0_hz(note, octave))))

# Piano.pp.C1.wav was the same recording as Piano.pp.B0.wav, it's a data error.  
# The error is present on the UIowa site, but I can't find any mention.
# I resampled it up 100 cents with sox and rewrote it.

In [None]:
#plt.plot(np.log2(np.array(est_f0s) / 440), '.')
#print(len(est_f0s))
#print(est_f0s)
plt.plot(betas, '.-')
print(filenames[5], est_f0s[5], betas[5])
print(filenames[6], est_f0s[6], betas[6])

# Extract magnitude and frequency contours for a single harmonic

`heterodyne_extract` extracts sample-by-sample magnitude and frequency contours for a single harmonic in the neighborhood of a specified input frequency (specified by a fundamental and a real-valued harmonic number, so the actual center frequency is simply the product of the two).  This is done by 'heterodyne extraction' - frequency-shifting the signal by multiplying it by the conjugate complex sinusoid at the center frequency (which shifts the center frequency down to 0 Hz), then low-pass filtering with a hanning window whose length is `smooth_cycles` times the fundamental period corresponding to `fundamental_freq`.  This window has zeros at the fundamental, so modulation at the fundamental rate is completely canceled.  This gives smooth contours for true harmonics of a fundamental.

In [None]:
#convolve = scipy.signal.convolve

def heterodyne_extract(waveform, fundamental_freq, harmonic_number=1.0, sr=44100, smooth_cycles=2.0):
    """Return a full-sample-rate amplitude and frequency envelope near the specified frequency."""
    t = np.arange(len(waveform)) / sr
    h_freq = harmonic_number * fundamental_freq
    # Conjugate complex exponential at center freq.
    complex_exp = np.exp(-1j * 2 * np.pi * h_freq * t)
    f_period = sr / fundamental_freq
    # Smoothing over e.g. 2 cycles of fundamental cancels periodic components, leaving only the DC shifted down from h_freq.
    smooth_len = int(round(smooth_cycles * f_period))
    #print("f_period=", f_period, "sm_len=", smooth_len)
    smooth_win = np.hanning(smooth_len)
    smooth_win = smooth_win / np.sum(smooth_win)
    smoothed_heterodyne_waveform = convolve(waveform * complex_exp, smooth_win, 'same')
    heterodyne_magnitude = np.abs(smoothed_heterodyne_waveform)
    # Unwrap the phase of the near-DC component, then also smooth at ~fundamental to get freq deviation.
    unwrapped_phase = np.hstack([[0], np.unwrap(np.angle(smoothed_heterodyne_waveform))])
    smoothed_dphi_dt = np.diff(convolve(unwrapped_phase, smooth_win, 'same'))
    inst_freq = h_freq + smoothed_dphi_dt / (2 * np.pi) * sr
    #plt.plot(unwrapped_phase[-10000:])
    #plt.plot(np.angle((waveform * complex_exp)[-1000:]))
    #plt.plot(np.imag(smoothed_heterodyne_waveform)[-1000:])
    # If it moves more than 0.5 rad/sample, it's not tracking well.
    dphi_dt_valid = (np.abs(smoothed_dphi_dt) < 0.5)
    dphi_dt_weighting = heterodyne_magnitude * dphi_dt_valid
    weighted_mean_dphi_dt = np.sum(dphi_dt_weighting * smoothed_dphi_dt) / np.sum(dphi_dt_weighting)   # radians / sample
    avg_freq = h_freq + weighted_mean_dphi_dt / (2 * np.pi) * sr   # cycles / second
    reconstructed_waveform = 2 * np.real(smoothed_heterodyne_waveform * np.conj(complex_exp))  # Smoothed complex waveform remodulated.
    return lin_to_db(heterodyne_magnitude), inst_freq, avg_freq, reconstructed_waveform

#f_freq = freq_from_autoco(waveform)  # 589.232
#harmonic_num = 1
#sm_cyc = 2.0
#mag, freq, avg_freq, recons_waveform = heterodyne_extract(waveform, f_freq, harmonic_num, sr, sm_cyc)


In [None]:
# Heterodyne
# h_num  delta_f
# 1      -0.63
# 2      -6.5
# 3       4.1
# 4      18.7
# 5      35.3
# 6      61.4
# 7     104     4229
# 8.2    31.4   4863
# 9.3    97.1   5518
# 10.2          6003
# 10.8          6222
# 11.2          6593
# 11.7          6890
# 13.3          7830
# 14.15         8343
# 15.45         9097
# 16.7          9845

f_freq = freq_from_autoco(waveform, drop_initial_sec=0.18)  # 589.232
harmonic_num = 1
sm_cyc = 2.0

mag, freq, avg_freq, recons_waveform = heterodyne_extract(waveform, f_freq, harmonic_num, sr, sm_cyc)

t = np.arange(len(waveform)) / sr
ii = np.arange(22000) # 
#ii = np.arange(len(mag)-1000,len(mag))

print(mag.shape)
print(freq.shape)

plt.subplot(211)
#plt.plot(t[ii], db_to_lin(mag[ii]), '.')
plt.plot(t[ii], mag[ii], '.')
plt.grid()
#plt.ylim([-60, -20])
plt.title(f'harmonic {harmonic_num} - mag / dB')

plt.subplot(212)
plt.plot(t[ii], freq[ii], '.')
plt.ylim([0.9 * avg_freq, 1.1 * avg_freq])
plt.grid()
plt.title(f'harmonic {harmonic_num} - freq / Hz')

print("pre_freq=%.2f" % (f_freq * harmonic_num), "post_freq=%.2f" % avg_freq, "diff=%.2f" % (avg_freq - f_freq * harmonic_num))

#Audio(data=synth_partial(np.array([db_to_lin(mag)]), np.array([freq]), sr, sr), rate=sr)
#Audio(data=synth_partial(np.array([db_to_lin(mag)]), avg_freq * np.ones((1, len(freq))), sr, sr), rate=sr)

In [None]:
interp_factor = 44 * 10  # 10 ms sample period for parameters
f_down = freq[::interp_factor]
m_down = mag[::interp_factor]
print(f'{f_down.shape=} {m_down.shape=}')
h_recon = synth_harmonic(f_down, m_down, interp_factor=interp_factor, sample_rate=sr)
plt.subplot(311)
plt.plot(f_down)
plt.subplot(312)
plt.plot(m_down)
plt.subplot(313)
spectrogram(h_recon, sr=sr, f_max=4 * avg_freq)  #, fft_len=fft_len, f_max=f_max, title='original')
#plt.plot(h_recon[-1000:])
Audio(data=(h_recon).T, rate=sr)

# Parent function extracts all harmonic envelopes for a note

`extract_harmonics` goes through multiple steps to model a fixed-frequency note as a set of harmonics.
* It makes an initial estimate of the fundamental from the autocorrelation of the whole note (with `freq_from_autoco`)
* It finds all the most prominent peaks in the Fourier transform of the entire note.  The estimated fundamental is used here to set a local-dominance region so only the largest peak within one harmonics-spacing is returned.
* It finds a best-fit for the inharmonicity of the upper harmonics by repeated application of `est_inharmonicity`.
* It calls `heterodyne_extract` for up to the lowest `num_harms` harmonics, calculating a magnitude and (time-varying) frequency estimate for each one, then, for harmonics above `lowest_retry_harmonic`, extracts again based on the average of the extracted frequency.
* The estimated harmonic is subtracted from the residual then goes on to extract the next harmonic.  The estimated harmonics are also accumulated to return a sinusoidal-model estimate of the input.
* The final residual (aspects of the waveform not captured by the harmonics) is also returned.

In [None]:
def extract_harmonics(d, sr=44100, num_harms=8, skip_initial_cycles=10, verbose=False, lowest_retry_harmonic=0, est_f0=None):
    """Model a waveform with a set of harmonics."""

    recons_d = np.zeros_like(d)
    recons_d_flat_f = np.zeros_like(d)

    sm_cyc = 2.0

    d_remaining = np.array(d)

    mags = []
    freqs = []
    avg_freqs = []

    # Initial F0 estimation from autocorrelation unless one was passed in.
    if est_f0 is None:
        est_f0 = freq_from_autoco(d, sr)
    if verbose:
        print('est_f0=', est_f0)
    # Trim transient based on fundamental frequency duration
    skip_initial_time = skip_initial_cycles / est_f0
    skip_initial_samples = int(round(skip_initial_time * sr))
    # Find all the prominent spectral peaks.  est_f0 is just for local max exclusion region.
    f_peaks = peak_freqs(d[skip_initial_samples:], n_fft=32768, depth_db=32, est_f0=est_f0)[:num_harms]
    if verbose:
        print("f_peaks=", f_peaks)

    # We will extract tracks for each peak, but we will reference them to the independent f0 estimate.
    #h_num_list = f_peaks / est_f0
    max_inharm_peaks = 25  # Too many peaks messes up inharmonicity estimation for lower freqs.
    f0 = est_f0
    beta = 0
    for _ in range(10):
      # Iterate estimaing f0 and beta multiple times for covergence.
      #print(f0, beta)
      f0, beta, h_num_list = est_inharmonicity(f_peaks[:max_inharm_peaks], f0=f0, beta=beta)
    # Use the fundamental extraction from the inharmonicity estimation.
    est_f0 = f0
    if verbose:
        print('inh est_f0=', est_f0, 'beta=', beta)
    
    # Clip num_harmonics at 0.8 F_nyq
    h_nums = (1 + np.arange(num_harms))
    h_freqs = est_f0 * h_nums * np.sqrt(1 + beta * h_nums * h_nums)
    num_harmonics_below_nyq = np.sum(h_freqs < 0.8 * sr / 2)
    if num_harms > num_harmonics_below_nyq:
        if verbose:
            print('Clamping num_harms from', num_harms, 'to', num_harmonics_below_nyq)
        num_harms = num_harmonics_below_nyq
        h_nums = h_nums[:num_harms]
    
    for h_num in h_nums:
        # Extract one harmonic from the residual so far.
        stretched_hnum = h_num * np.sqrt(1 + beta * h_num * h_num)
        # Oblige the search to be at the predicted frequency, incorporating inharmonic stretching
        mag, freq, avg_freq, recons_waveform = heterodyne_extract(d_remaining, est_f0, stretched_hnum, sr, sm_cyc)
        if verbose:
            print(' h_num={:.3f}, imp_f0={:.1f}, f={:.1f} err_f={:.1f}, max_mag={:.1f} dB'.format(
                h_num, avg_freq / stretched_hnum, avg_freq, avg_freq - est_f0 * stretched_hnum, max(mag)))

        # Repeat with estimated frequency for higher harmonics.
        if h_num > lowest_retry_harmonic:
            mag, freq, avg_freq, recons_waveform = heterodyne_extract(d_remaining, avg_freq / h_num, h_num, sr, sm_cyc)
            if verbose:
                print('*h_num={:.3f}, imp_f0={:.1f}, f={:.1f} err_f={:.1f}, max_mag={:.1f} dB'.format(
                    h_num, avg_freq / h_num, avg_freq, avg_freq - est_f0 * h_num, max(mag)))

        recons_d += recons_waveform
        d_remaining -= recons_waveform
        #recons_d_flat_f += synth_partial(np.array([db_to_lin(mag)]), avg_freq * np.ones((1, len(freq))), sr, sr)[0]
        recons_d_flat_f += synth_harmonic(avg_freq, np.array([db_to_lin(mag)]), 1, sr)[0]
        mags.append(mag)
        freqs.append(freq)
        avg_freqs.append(avg_freq)

    mags = np.array(mags)
    freqs = np.array(freqs)
    if verbose:
        print(mags.shape)

    return mags, recons_d, freqs, avg_freqs, est_f0

#filename = filename_for_note('D', 4, 'ff')
filename = filename_for_note('Ab', 1, 'pp')
waveform, sr = read_and_trim(os.path.join(directory, filename), channel=0, duration=5.0, rel_threshold=0.015, abs_threshold=0.0007, pre_time=0.002)

d = waveform

mags, recons_d, freqs, avg_freqs, est_f0 = extract_harmonics(d, num_harms=40, lowest_retry_harmonic=0, verbose=1)
#                                          extract_harmonics(d, num_harms=num_harms, est_f0=est_f0)
print(f'{mags.shape=}, {freqs.shape=}')
print(f'{avg_freqs=}')
resid = d - recons_d

t = np.arange(len(d)) / sr
ii = np.arange(14000, 14200)
plt.plot(t[ii], d[ii], '.', t[ii], recons_d[ii], '.', t[ii], resid[ii])
#Audio(data=d.T, rate=sr)
#Audio(data=recons_d.T, rate=sr)
Audio(data=(resid).T, rate=sr)
#Audio(data=recons_d_flat_f.T, rate=sr)

In [None]:
plt.figure(figsize=(20, 5))
plt.imshow(mags, aspect='auto', origin='lower', interpolation='nearest')
plt.colorbar()

In [None]:
wavwrite(resid, sr, 'resid.wav')
wavwrite(recons_d, sr, 'recons.wav')

In [None]:
f_max = 2000
fft_len = 2 * 4096

plt.figure(figsize=(18, 12))
plt.subplot(311)
spectrogram(d, sr=sr, fft_len=fft_len, f_max=f_max, title='original')
plt.subplot(312)
spectrogram(recons_d, sr=sr, fft_len=fft_len, f_max=f_max, title='partials')
plt.subplot(313)
spectrogram(resid, sr=sr, fft_len=fft_len, f_max=f_max, title='residual')
#spectrogram(recons_d_flat_f, sr=sr, f_max=10000, title='flat_f partials')


In [None]:
# Resynthesize downsampled mags and freqs
interp_factor = 44 * 10  # 10 ms sample period for parameters
fs_down = freqs[:, ::interp_factor]
ms_down = mags[:, ::interp_factor]
print(f'{ms_down.shape=}')
# Variable frequencies
#print('frame-rate frequencies')
#audio = synth_harmonics(fs_down, ms_down, interp_factor)
# Fixed frequency per harmonic
print('Constant frequencies')
audio = synth_harmonics(avg_freqs, ms_down, interp_factor)

spectrogram(audio, sr=sr, f_max=8000)  #, fft_len=fft_len, f_max=f_max, title='original')
#plt.plot(h_recon[-1000:])
Audio(data=(audio).T, rate=sr)

In [None]:
f = avg_freqs[0]
ms = mags[0]
%timeit a = synth_harmonic(f, ms, 1)
plt.plot(synth_harmonic(f, ms[::2], 2))

# Fixed log-time sampling

We sample the magnitude contours at 21 log-spaced times (in milliseconds):
```
0, 4, 8, 12, 16, 24, 32, 48, 64, 96, 128, 192, 256, 384, 512, 768, 1024, 1536, 2048, 3072, 4096
```

In [None]:
sample_times_ms = np.array([0, 4, 8, 12, 16, 24, 32, 48, 64, 96, 128, 192, 256, 384, 512, 768, 1024, 1536, 2048, 3072, 4096])


# Adaptive magnitude sampling as convex hull

In [None]:
# New approach to linseg fitting - take all extrema, then filter down.
filt_len = 31  # for D5
#filt_len = 127  # for D4
#filt_len = 1023
filt_win = np.hanning(filt_len)/np.sum(np.hanning(filt_len))

def lin_hull(x, ends=None):
    """Return a linear interpolation between the first and last values of x (or ends[0], ends[1]), of the same length as x."""
    if ends is None:
        ends = (x[0], x[-1])
    npts = len(x)
    return np.linspace(ends[0], ends[1], npts)

def max_prominence(x, ends=None, polarity=1):
    """Return index of point in x that is furthest above (below if polarity=-1) a line connecting the end points of x."""
    prominence = polarity * (x - lin_hull(x, ends))
    argmax_prominence = np.argmax(prominence)
    if np.max(prominence) > 0:
        return argmax_prominence
    return None

def err_from_line(x, ends=None, abs_fn=np.abs):
    """Return sum(abs(error)) between x and line seg defined by ends."""
    return np.sum(abs_fn(x - lin_hull(x, ends)))

def convex_hull(x, points=None, polarity=1):
    """Attempt to insert one new convex hull point in every interval defined by points."""
    if points is None:
        points = [0, len(x) - 1]
    new_points = [0]
    for start in range(len(points) - 1):
        new_point = max_prominence(x[points[start]:points[start + 1] + 1], polarity)
        if new_point:
            new_points.append(points[start] + int(new_point))
        new_points.append(points[start + 1])
    return new_points

def hull(x, npoints):
    """Return npoints indices into x which attempt to provide a convex hull."""
    points = [0, len(x) - 1]
    while len(points) < npoints:
        max_err = 0
        for i in range(len(points) - 1):
            start = points[i]
            end = points[i + 1]
            err = err_from_line(x[start : end]) / np.sqrt((end - start))
            #print(points, start, end, err, max_err)
            if abs(err) > max_err:
                max_err = abs(err)
                worst_seg_ix = i
        worst_start = points[worst_seg_ix]
        worst_end = points[worst_seg_ix + 1]
        polarity = 1 if err_from_line(x[worst_start : worst_end], abs_fn=np.asarray) > 0 else -1
        points.insert(worst_seg_ix + 1, int(worst_start + max_prominence(x[worst_start : worst_end], polarity=polarity)))
    return np.array(points)

#h = convex_hull(mag_trim)
#print(h)
#h = convex_hull(mag_trim, h, -1)
#print(h)
#h = convex_hull(mag_trim, h)
#print(h)
#h = convex_hull(mag_trim, h, -1)
#print(h)
#h = convex_hull(mag_trim, h)
#print(h)

trim_start_samps = 128
trim_end_samps = 1024

hnum = 1
mag_filt = lin_to_db(convolve(db_to_lin(mags[hnum]), filt_win, 'same'))
mag_filt_trim = mag_filt[trim_start_samps:-trim_end_samps]
max_times = scipy.signal.argrelmax(mag_filt_trim)[0]
max_vals = mag_filt_trim[max_times]
t = np.arange(len(mag_filt_trim)) / sr
mmag = np.maximum(-130.0, mag_filt_trim)

# Adaptive timing
#h = hull(mmag, 12)
# Fixed log-spaced timing
h = np.round(sample_times_ms * 44.1).astype(np.int32)

plt.figure(figsize=(16, 5))
plt.subplot(121)
#plt.plot(t, mmag, t[max_times], max_vals, 'o-')
plt.plot(1000 * t, mmag)
plt.plot(1000 * h / sr, mmag[h], 'o-')
ylims = [-80, -20]
t_lim = 0.15
plt.xlim([0, 1000 * t_lim])
#plt.legend(['mag', 'max_vals', 'hull'])
plt.legend(['mag', 'hull'])
plt.xlabel('time / ms')
plt.ylabel('level / dB')
plt.title('Envelope for harmonic %d of %s (%.1f Hz)' % (hnum, filename, est_f0))
plt.ylim(ylims)
plt.grid()
plt.subplot(122)
plt.plot(t, mmag)
#plt.plot(t[max_times], max_vals, 'o-')
plt.plot(h / sr, mmag[h], 'o-')
# Line showing limit of left plot
plt.plot([t_lim, t_lim], ylims, ':k')
plt.xlim([0, 4.1])
plt.ylim(ylims)
plt.grid()
plt.xlabel('time / s')
print(h / 44.1)

In [None]:
# Convert sinusoids into bp sets

# harms_params is a list of harmonic descriptions
# each harmonic description is [const_freq, [(delta_time_ms, lin_mag), (delta_time_ms, lin_mag), ...]]

def make_harms_params(mags, avg_freqs, max_harmonic=None, sr=44100, mag_floor=-100.0, terminal_slope=20.0, 
                      filt_len=31, adaptive_timing=True, verbose=False):
  if not max_harmonic:
      max_harmonic = mags.shape[0]
    
  dt = 1 / sr
  sm_cyc = 2.0

  #trim_ends_sec = 0.014
  #trim_ends_samps = int(round(trim_ends_sec * sr))
  trim_start_samps = 0 # 128
  trim_end_samps = 1024

  #filt_len = 31
  filt_win = np.hanning(filt_len)/np.sum(np.hanning(filt_len))

  harms_params = []

  for hnum in range(max_harmonic):
    mag_filt = lin_to_db(convolve(db_to_lin(mags[hnum]), filt_win, 'same'))
    mag_filt_trim = mag_filt[trim_start_samps : -trim_end_samps]
    #mag_filt_trim = mag_filt
    mmag = np.maximum(mag_floor, mag_filt_trim)
    if adaptive_timing:
      # Adaptive timing
      anchor_times = hull(mmag, 12)
    else:
      # Fixed log-spaced timing
      sample_times_ms = np.array([0, 4, 8, 12, 16, 24, 32, 48, 64, 96, 128, 192, 256, 384, 512, 768, 1024, 1536, 2048, 3072, 4096])
      anchor_times = np.round(sample_times_ms * 44.1).astype(np.int32)
    anchor_vals = mmag[anchor_times]
    nvals = list(zip([0] + list(np.diff(anchor_times)), anchor_vals))
    bp_list = [(int(round(n * dt * 1000)), float(db_to_lin(val))) 
               for n, val in nvals]
    last_time, last_mag = bp_list[-1]
    last_mag_db = lin_to_db(last_mag)
    final_mag_db = mag_floor
    if adaptive_timing:
      if last_mag_db > final_mag_db:
        terminal_slope = terminal_slope  # -db/sec
        final_dur = (last_mag_db - final_mag_db) / terminal_slope
        bp_list.append((int(round(1000 * final_dur)), float(db_to_lin(final_mag_db))))
      if adaptive_timing and bp_list[-1][1] == bp_list[-2][1]:  # Final points have same mag
        bp_list = bp_list[:-1]
    bp_list_pair = [float(avg_freqs[hnum]), bp_list]
    if verbose:
      print("h", hnum + 1, bp_list_pair)
    harms_params.append(bp_list_pair)
  return harms_params

harms_params = make_harms_params(mags, avg_freqs, mag_floor=-130, verbose=False, adaptive_timing=False)
for hp in harms_params:
    print(hp[0], len(hp[1]))


In [None]:
mags_dict = {}
harms_dict = {}

In [None]:
# Filename to parameters set
#filenames = ['Piano.mf.C5.wav', 'Piano.ff.C5.wav', 'Piano.mf.D5.wav', 'Piano.ff.D5.wav']
#filenames = ['Piano.pp.D4.wav', 'Piano.pp.D5.wav', 'Piano.mf.D4.wav', 'Piano.mf.D5.wav', 'Piano.ff.D4.wav', 'Piano.ff.D5.wav']

# Original U Iowa piano samples from https://theremin.music.uiowa.edu/mispiano.html converted to wav format.
# Note: The file called Piano.pp.C0.wav was actually just a copy of Piano.pp.B1.wav (with different cropping).
# I fixed its pitch by resampling:
# dpwe@dpwe-macbookpro:~/Downloads$ sox uiowa-piano/Piano.pp.C1.wav piano.wav speed 100c 
# dpwe@dpwe-macbookpro:~/Downloads$ cp piano.wav uiowa-piano/Piano.pp.C1.wav 

directory = '/Users/dpwe/Downloads/uiowa-piano'

num_harms = 40  # was 20

filenames = []

for octave in range(1, 8):
    for note in ['C', 'E', 'Ab']:
        est_f0 = None
        for strike in ['ff', 'mf', 'pp']:
            filename = filename = filename_for_note(note, octave, strike)
            filenames.append(filename)
            d, sr = read_and_trim(os.path.join(directory, filename), 
                                  channel=0, rel_threshold=0.015, abs_threshold=0.001, duration=5.0, pre_time=0.002)
            #print(d.shape)
            mags_dict[filename], recons_d, freqs, avg_freqs, est_f0 = extract_harmonics(d, num_harms=num_harms, est_f0=est_f0)
            print(time.ctime(), filename, '%.1f' % est_f0, ['%.1f' % float(v) for v in avg_freqs[:4]])
            #resid = d - recons_d
            harms_dict[filename] = make_harms_params(mags_dict[filename], avg_freqs, mag_floor=-130, adaptive_timing=False)

In [None]:
# Describe note as initial amp + dB slope for single linear fit.
filt_len = 31
filt_win = np.hanning(filt_len)/np.sum(np.hanning(filt_len))

plt.figure(figsize=(12, 6))
colors = plt.cm.rainbow(np.linspace(0, 1, 21))
avg_h_decay = np.zeros(len(filenames))
avg_h_mag = np.zeros(len(filenames))
for i, filename in enumerate(filenames):
    #print(filename)
    #plt.subplot(1, 3, (i % 3) + 1)
    mags = mags_dict[filename]

    num_harmonics = min(12, mags.shape[0])
    h_init_amp = np.zeros(num_harmonics)
    h_decay = np.zeros(num_harmonics)
    h_nums = range(num_harmonics)
    for h in h_nums:
        mag_filt = lin_to_db(convolve(db_to_lin(mags[h]), filt_win, 'same'))
        mag_filt_trim = mag_filt[trim_end_samps:-trim_end_samps]
        t = np.arange(len(mag_filt_trim)) / sr
        linfit = lin_fit(mag_filt_trim)
        h_init_amp[h] = linfit[0]
        h_decay[h] = (linfit[-1] - linfit[0]) / (t[-1] - t[0])
    plt.plot(h_nums, h_decay, color=colors[i % len(colors)])
    plt.ylim([-20, 10])
    #plt.plot(h_nums, h_init_amp, color=colors[i])
    #
    #plt.plot(h_nums, lin_fit(h_decay), '--', h_nums, lin_fit(h_init_amp), '--', color=colors[i])
    #plt.plot(h_nums, h_decay, color=colors[i])
    #plt.plot(h_nums, lin_fit(h_decay), '--', color=colors[i])
    #plt.legend(['decay dB/sec', 'init amp/dB'])
    if i < 3:
        plt.grid()
        _ = plt.title(filename + ' avg dB/sec decay lin_fit harmonics')
    avg_h_decay[i] = np.mean(h_decay)
    avg_h_mag[i] = np.mean(h_init_amp)
plt.legend(filenames)

In [None]:
note_indices = np.arange(len(avg_h_decay))
plt.figure(figsize=(16, 4))
plt.plot(note_indices, avg_h_decay, '.-')
plt.plot(note_indices, avg_h_mag, '.-')
plt.legend(['avg_h_decay', 'avg_h_mag'])
plt.grid()

In [None]:
# Mapping here is 1/8 shrunk after the first 250 ms.
def time_mapper(t, boundary=0.25, magnification=8.0):
        """Convert time in secs to a nonlinear projection."""
        #return np.log10(0.01 + t)
        t = np.asarray(t)
        magnified = (t < boundary)
        return magnified * t + (1 - magnified) * (boundary + (t - boundary) / magnification)

def plot_mags(mags, filename, nharms=None, hop_len=64, sr=44100, mag_floor=-130.0, mag_ceil=-20.0, show_lin_fit=False):
    if not nharms:
        nharms = mags.shape[0]
    nfrm = mags[:nharms, ::hop_len].shape[-1]
    frmTime = np.arange(nfrm) * hop_len / sr
    colors = plt.cm.rainbow(np.linspace(0, 1, max(12, nharms)))

    boundary = 0.25
    magnification = 8.0

    for i in range(nharms):
        plt.plot(time_mapper(frmTime[:nfrm]), mags[i, ::hop_len].T, '.', color=colors[i])
        if show_lin_fit:
            lin_fit_mag = lin_fit(mags[i, ::hop_len])
            plt.plot(time_mapper(frmTime[:nfrm]), lin_fit_mag, '--', color=colors[i])

    plt.plot(time_mapper([boundary, boundary]), [mag_floor, mag_ceil], '--k')
    plt.ylim([mag_floor, mag_ceil])
    xtimes = np.hstack([np.arange(0, 0.25, 0.0625), np.arange(0.5, 5.0, 0.5)])
    plt.xlim([0, time_mapper(5.0)])
    plt.xticks(time_mapper(xtimes), xtimes)
    plt.legend(1 + np.arange(nharms), loc='upper right')
    _ = plt.title((filename + ' - ' + str(nharms) + ' harmonics'))
    

def plot_harms(harms_params, filename, nharms=None, hop_len=64, sr=44100, mag_floor=-130.0, mag_ceil=-20.0):
    if not nharms:
        nharms = len(harms_params)
    nfrm = mags[:nharms, ::hop_len].shape[-1]
    frmTime = np.arange(nfrm) * hop_len / sr
    colors = plt.cm.rainbow(np.linspace(0, 1, max(12, nharms)))

    # Mapping here is 1/8 shrunk after the first 250 ms.
    boundary = 0.25
    magnification = 8.0

    for i, hp in enumerate(harms_params):
        times, vals = zip(*np.array(hp[1]))
        times = np.cumsum(times) / 1000
        vals = np.array(vals)
        plt.fill_between(time_mapper(times), mag_floor, lin_to_db(vals), alpha=0.8, color=colors[i])
    plt.plot(time_mapper([boundary, boundary]), [mag_floor, -mag_ceil], '--k')
    plt.ylim([mag_floor, mag_ceil])
    plt.legend(1 + np.arange(nharms))
    xtimes = np.hstack([np.arange(0, 0.25, 0.0625), np.arange(0.5, 5.0, 0.5)])
    plt.xticks(time_mapper(xtimes), xtimes)
    plt.xlim([0, time_mapper(5.0)])
    _ = plt.title((filename + ' - ' + str(nharms) + ' harmonics'))


In [None]:
plt.figure(figsize=(15, 15))
for i, filename in enumerate(filenames[:9]):
    plt.subplot(3, 3, i + 1)
    plot_mags(mags_dict[filename], filename, show_lin_fit=True)
    #plot_harms(harms_dict[filename], filename)

In [None]:
plt.figure(figsize=(15, 15))
for i, filename in enumerate(filenames[:9]):
    plt.subplot(3, 3, i + 1)
    #plot_mags(mags_dict[filename], filename)
    plot_harms(harms_dict[filename], filename)

In [None]:
def time_mapper(t, boundary=0.25, magnification=8.0):
    """Convert time in secs to a nonlinear projection."""
    return np.log10(0.01 + t)
    #t = np.asarray(t)
    #magnified = (t < boundary)
    #return magnified * t + (1 - magnified) * (boundary + (t - boundary) / magnification)

def inv_time_mapper(mapped_t, boundary=0.25, magnification=8.0):
    magnified = mapped_t < boundary
    return magnified * mapped_t + (1 - magnified) * (boundary + magnification * (mapped_t - boundary))

from matplotlib.collections import PolyCollection

def polygon_under_graph(x, y):
    return [(x[0], 0.), *zip(x, y), (x[-1], 0.)]

#filename = filename_for_note(note='C', octave=4, strike='ff')

def harmonics_waterfall_plot(filename, axes=None):
    global harms_dict
    if axes == None:
        axes = fig.add_subplot(projection='3d')
    verts = []
    mag_floor = -130
    for i, hp in list(enumerate(harms_dict[filename])):
            times, vals = zip(*np.array(hp[1]))
            times = np.cumsum(times) / 1000
            vals = np.array(vals)
            times = np.hstack([[times[0]], times])
            vals = np.hstack([[db_to_lin(mag_floor)], vals])
            #plt.plot(np.log10(0.01 + times), lin_to_db(vals), zs=-i, zdir='y', alpha=0.8, color=colors[i])
            #poly = axes.add_collection3d(plt.fill_between(2 + np.log10(0.01 + times), -100, lin_to_db(vals), alpha=0.8, color=colors[i]), 
            #                           zs=(i + 1), zdir='y')
            verts.append(polygon_under_graph(time_mapper(times), -mag_floor + lin_to_db(vals)))
    facecolors = plt.colormaps['rainbow'](np.linspace(0, 1, len(verts)))
    poly = PolyCollection(verts, facecolors=facecolors, alpha=.7)
    axes.add_collection3d(poly, zs=np.arange(len(harms_dict[filename]), 0, -1), zdir='y')
    axes.set_zlim(0, 120)
    axes.set_zlabel('level / dB')
    
    plt.title(filename)
    plt.xlabel('time / s')
    #tick_times = np.array([0, 0.1, 0.2, 1.0, 2.0, 3.0, 4.0])
    tick_times = np.array([0, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0])
    plt.xticks(time_mapper(tick_times), tick_times)
    #plt.xlim([-0.05, 0.8])
    plt.ylabel('harm #')
    plt.yticks(np.arange(2, 22, 2), np.arange(19, -1, -2))
    #plt.zlabel('level / dB')

    #plt.zlim([-100, -20])
    #plt.legend(1 + np.arange(nharms))
    #plt.xticks(np.log10(0.01 + xtimes), xtimes)
    #plt.xlim([0, 3000])
    #_ = plt.title((filename + ' - ' + str(nharms) + ' harmonics'))
    axes.view_init(elev=20., azim=-115, roll=0)

#fig = plt.figure(figsize=(8, 6))

plt.figure(figsize=(18, 6))
axes = plt.subplot(131, projection='3d')
note = 'C'
octave = 4
harmonics_waterfall_plot(filename_for_note(note, octave, 'pp'), axes=axes)
axes = plt.subplot(132, projection='3d')
harmonics_waterfall_plot(filename_for_note(note, octave, 'mf'), axes=axes)
axes = plt.subplot(133, projection='3d')
harmonics_waterfall_plot(filename_for_note(note, octave, 'ff'), axes=axes)

In [None]:
import json
print(filename.split('.'))
params_file = '.'.join(filename.split('.', -1)[:-1]) + '.json'
print(params_file)
with open(params_file, 'w') as f:
    f.write(json.dumps(harms_params))
print('saved to', params_file)

In [None]:
import itertools

hps = harms_params[0]
bps = np.array(hps[1])
print(bps[:, 0])
plt.plot(expand_params(list(itertools.chain.from_iterable(bps))[1:]))
plt.plot(np.cumsum(bps[:, 0]), bps[:, 1], '.')
_ = plt.xlim(0, 10000)

In [None]:
# Synthesize various versions.
# Original magnitudes, fixed frequencies

def synth_harm_param(harm_param, sr=44100):
    """Synthesize a single harmonic described by a harm_param structure."""
    # harm_param is [freq, [(dtime_ms, mag), (dtime_ms, mag), ...]]
    freq, breakpoints = harm_param
    breakpoints = np.array(breakpoints)
    breakpoints[:, 1] = lin_to_db(breakpoints[:, 1])
    # magnitudes interpolated on millisecond scale in log domain.
    # First value is assumed to be zero, so dropped.
    mags = expand_params(list(itertools.chain.from_iterable(breakpoints))[1:])
    return synth_harmonic(freq, mags, interp_factor=int(round(sr / 1000)), sample_rate=sr, mag_is_db=True)


def synth_params(harms_params, verbose=False):
    """Synthesize a note described by a list of harm_param structures."""
    upper_bound_len_samples = 44100 * 10
    result = np.zeros(upper_bound_len_samples)
    max_harm_len_samples = 0
    for hp in harms_params:
        harm_audio = synth_harm_param(hp)
        harm_len_samples = len(harm_audio)
        result[:harm_len_samples] += harm_audio
        max_harm_len_samples = np.maximum(max_harm_len_samples, harm_len_samples)
    return result[:max_harm_len_samples], harm_audio


def synth_params_orig(harms_params, verbose=False):
    # Older version does not use the common synth_harmonic function.
    result = np.zeros(0)
    for f0, hp in harms_params:
        # ignore first time, start list with first value.
        params = [lin_to_db(hp[0][1])]
        # Step through remaining pairs, converting times in ms to samples
        for bpp in hp[1:]:
            params.extend([int(round(bpp[0] * sr/1000)), lin_to_db(bpp[1])])
        # Expand (val, npoints, val, npoints..., val) list
        ep = expand_params(params)
        carrier = np.cos(2 * np.pi * f0 * np.arange(len(ep)) / sr)
        ep_len = len(ep)
        if ep_len > len(result):
            result = np.hstack([result, np.zeros(ep_len - len(result))])
        result[:ep_len] += db_to_lin(ep) * carrier
        if verbose:
            print(f0)
    return result, ep

#audio, ep = synth_params([harms_params[i] for i in 1 + np.arange(10)])
#filename = 'Piano.ff.C4.wav'
filename = 'Piano.ff.C3.wav'
#hd = harms_dict[filename][:40]
# How does it sound if we skip some of the higher harmonics?
# In particular, above the 16th harmonic, skip harmonics that are 2.n.f0 and 3.n.f0.  This lets us get higher, for brighter sounds, 
# and the missing harmonics aren't particularly noticeable in those regions.
hd = [harms_dict[filename][i] for i in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 18, 22, 24, 28, 30, 34, 36]]
h_audio, ep = synth_params(hd)
#h_audio, ep = synth_params(harms_params)

#f0 = harms_params[0][0]
#carrier = np.cos(2 * np.pi * f0 * np.arange(len(ep)) / sr)

#ii = np.arange(20000)
ii = np.arange(4000)
#plt.plot(t[ii], db_to_lin(mags[h_num - 1][ii]))
#plt.plot(t[ii], db_to_lin(ep[ii]))
#plt.title(filename)
npts = len(resid)
audio = h_audio[:npts]  # + resid
Audio(data=audio, rate=sr)
print(audio.shape, resid.shape, sr)
#old_audio = audio
start_pt = 950
npts = 100
plt.plot((start_pt + np.arange(npts)) / sr, audio[start_pt : start_pt + npts])
#         (start_pt + np.arange(npts)) / sr, old_audio[start_pt : start_pt + npts])


In [None]:
print(filename, len(audio), sr)
Audio(data=audio, rate=sr)

In [None]:
print(filename, len(audio), sr)
Audio(data=audio, rate=sr)

In [None]:
waveform, sr = read_and_trim(os.path.join(directory, filename), channel=0, duration=5.0, rel_threshold=0.010, abs_threshold=0.001, do_plot=False, pre_time=0.01)
Audio(data=waveform, rate=sr)

In [None]:
def interp_harm_param(hp0, hp1, alpha):
    """Return harm_param list that is alpha of the way to hp1 from hp0."""
    hp = []
    for h0, h1 in zip(hp0, hp1):
        f_a = np.exp(np.log(h0[0]) + alpha * (np.log(h1[0]) - np.log(h0[0])))
        #if (abs(h0[0] - h1[0]) / (0.5 * (h0[0] + h1[0]))) > 0.1:
        #    print("warn: interpolating", h0[0], "and", h1[0])
        # Don't warn since we're interpolating between notes too.
        bps = []
        eps = 1e-9
        for bp0, bp1 in zip(h0[1], h1[1]):
            bp = (bp0[0] + alpha * (bp1[0] - bp0[0]), np.exp(-eps + np.log(bp0[1] + eps) + alpha * (np.log(bp1[1] + eps) - np.log(bp0[1] + eps))))
            bps.append(bp)
        hp.append((f_a, bps))
    return hp

def print_hp(harm_params, max_harms=8):
    print('***')
    for harm_param in harm_params[:max_harms]:
        s = '%.1f ' % harm_param[0]
        for bp in harm_param[1][:8]:
            s += '%d, %.1f / ' % (bp[0], 1000 * bp[1])
        print(s)

alpha = 0.9
#ns = ['Piano.mf.E3.wav', 'Piano.mf.Ab3.wav', 'Piano.mf.C4.wav']
ns = ['Piano.pp.C4.wav', 'Piano.mf.C4.wav', 'Piano.ff.C4.wav']

#print_hp(harms_dict[n1])
#print_hp(harms_dict[n2])
#print_hp(interp_harm_param(harms_dict[n1], harms_dict[n2], alpha))
#Audio(data=synth_params(interp_harm_param(harms_dict[n1], harms_dict[n2], alpha))[0], rate=sr)
w = []
for start in [0, 1]:
    n1 = ns[start]
    n2 = ns[start + 1]
    for alpha in np.linspace(0, 1, 4, endpoint=False):
        w.append(synth_params(interp_harm_param(harms_dict[n1], harms_dict[n2], alpha))[0][:sr])
w = np.concat(w)
Audio(data=w, rate=sr)

In [None]:
wavwrite(w, sr, '/tmp/tmp.wav')

In [None]:
# Compare to pure scaling
audio = synth_params(harms_dict[ns[1]])[0][:sr]
ww = []
for alpha in np.linspace(0, np.log(100/7), 10):
    ww.append(np.exp(alpha) * 0.2 * audio)
ww = np.concat(ww)
wavwrite(ww, sr, '/tmp/tmp2.wav')

# New plan:
- Each note is described by a conformal set of parameters
  - exactly one param set per harmonic (so harmonics of different notes are always in the same place)
  - harmonic envelopes are sampled at a fixed set of log-spaced times (so we can interpolate notes without changing timing)
We write the analysis out as a big blob:
 - A single vector of harmonic envelope time intervals (odd-numbered values of BP lists, e.g. 20 of them)
 - An array giving the base note, base velocity, and # harmonics for all the described notes.  For octaves 1-7 with 3 notes/oct ('C', 'E', 'Ab') and 3 velocities ('pp', 'mf', 'ff'), that's 63 notes.  At up to 20 harmonics per ote, that's 1260 harmonics.
 - A big array giving the 20 target values per harmonic per note.  Maybe these should be in dB, and 8 bit integers?  With 1260 harmonics, that's a total of 25,200 values (100 kB in float32).

In [None]:
# Construct the piano-params.json file.
# Contents are:
#   sample_times_ms - single vector of fixed log-spaced envelope sample times (in int16 integer ms)
#   notes - Array of MIDI notes
#   velocities - Array of strike velocities available for each note, the same for all notes
#   num_harmonics - Array of (num_notes * num_velocities) counts of how many harmonics are defined for each note.
#   harmonics_freq - Vector of (total_num_harmonics) int16s giving freq for each harmonic in cents 6900 = 440 Hz.
#   harmonics_mags - Array of (total_num_harmonics, 20) uint8s giving 20 envelope samples for each harmonic.  In dB re: -130.

# sample_times_ms is defined upscript.
#print(f'{len(sample_times_ms)=}')
#print(sample_times_ms)

def filename_of_note_vel(note, vel):
    return ('Piano.' + 
            {40: 'pp', 80: 'mf', 120: 'ff'}[velocity] + 
            '.' +
            {0: 'C', 4: 'E', 8: 'Ab'}[note % 12] + str((note - 12) // 12) + 
            '.wav')

def note_vel_of_filename(filename):
    _, vel, note, _ = filename.split('.')
    vel_val = {'pp': 40, 'mf': 80, 'ff': 120}[vel]
    note_name = note[:-1]
    note_oct = int(note[-1])
    note_val = {'C': 0, 'E': 4, 'Ab': 8}[note_name] + 12 * note_oct + 12  # C4 is midi note 60.
    return note_val, vel_val

# Extract notes from filenames.
assert set(filenames) == set(harms_dict.keys())
velocities = set()
notes = set()
for filename in filenames:
    note_val, strike_val = note_vel_of_filename(filename)
    velocities.add(strike_val)
    notes.add(note_val)

notes = sorted(notes)
velocities = sorted(velocities)
#print(notes)

num_harmonics = []

for note in notes:
    for velocity in velocities:
        filename = filename_of_note_vel(note, velocity)
        num_harmonics.append(len(harms_dict[filename]))

total_num_harmonics = sum(num_harmonics)

harmonics_freq = np.zeros(total_num_harmonics, dtype=int)
harmonics_mags = np.zeros((total_num_harmonics, len(sample_times_ms) - 1), dtype=int)
harmonic_index = 0

def hz_to_cents(freq_hz):
    """Convert freq in hz to cents: 6000 = C4 = 261.63, 6900 = A4 = 440.0."""
    return np.round(1200 * np.log2(freq_hz / 440.0) + 6900).astype(int)

def cents_to_hz(cents):
    """Convert freq cents back to Hzin hz to real-valued midi: 60 = C4 = 261.63, 69 = A4 = 440.0."""
    return 440 * np.exp2((cents - 6900) / 1200)


print(hz_to_cents(440))
print(hz_to_cents(444))
print(cents_to_hz(hz_to_cents(444)))

for note in notes:
    for velocity in velocities:
        filename = filename_of_note_vel(note, velocity)
        harm_data = harms_dict[filename]
        print(filename, ":", len(harm_data), "harmonics")
        for harm in harm_data:
            # harm is [freq, [(time, mag), (time, mag), ...]]
            # Round to 1 dp.
            #harmonics_freq[harmonic_index] = np.round(10 * harm[0]) / 10.0
            # Store freq in "midi cents" (a440 = 6900)
            harmonics_freq[harmonic_index] = hz_to_cents(harm[0])
            mag_info = np.array(harm[1])
            # mag_info may include a final value which is not at a fixed time if it needed a "run out" to get down to -60dB.
            len_mag_info = mag_info.shape[0]
            if len_mag_info > len(sample_times_ms):
                # print(f'{len_mag_info=} {len(sample_times_ms) + 1=}')
                mag_info = mag_info[:-1]
            #print(mag_info.T)
            assert np.all(np.cumsum(mag_info[:, 0]) == sample_times_ms)
            # Offset all harmonics mags as rounded to dB re: -130, the min from above.
            harmonics_mags[harmonic_index] = 130 + np.round(lin_to_db(mag_info[1:, 1])).astype(int)
            harmonic_index += 1

# 3 notes per octave; 20 harmonics per note (for low notes), so C4 = 3 octaves in = 180 harmonics in.
print(harmonics_freq[178:190])
print(harmonics_mags[178:190])
print(harmonic_index, "total harmonics in", len(notes) * len(velocities), "notes")

In [None]:
# Save out the json file.
data = {
    # Drop the initial zero in sample_times_ms.  Initial sample is always 0.
    'sample_times_ms': sample_times_ms[1:].tolist(),
    'notes': notes,
    'velocities': velocities,
    'num_harmonics': num_harmonics,
    'harmonics_freq': harmonics_freq.tolist(),
    'harmonics_mags': harmonics_mags.tolist(),
}
json_file = 'piano-params.json'
with open(json_file, 'w') as f:
    f.write(json.dumps(data))
print('saved to', json_file)