In [None]:
%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_formats = ['svg']

import db
import fetcher
from recordings import Recording
from trim_recordings import detect_utterances

import librosa
import librosa.display
from matplotlib.patches import Rectangle
import matplotlib.pyplot as plt
import numpy as np
import pydub
import scipy.optimize
from tqdm import tqdm

import io
import multiprocessing

%matplotlib inline
plt.rcParams['svg.fonttype'] = 'none'

In [None]:
session = db.create_session('master.db')
recordings_fetcher = fetcher.Fetcher('recordings', pool_size=8)

In [None]:
# Load recordings from the database and filter them according to some selection criteria:
# right species, contains the data we need, good quality, not too short and not too long.

import hashlib

def md5(string):
    m = hashlib.md5()
    m.update(string.encode('utf-8'))
    return m.digest()

recordings = [
    r for r in session.query(Recording).filter(
        #Recording.genus == 'Turdus', Recording.species == 'merula', # Merel
        #Recording.genus == 'Passer', Recording.species == 'domesticus', # Huismus
        #Recording.genus == 'Parus', Recording.species == 'major', # Koolmees
        Recording.genus == 'Acrocephalus', Recording.species == 'palustris', # Bosrietzanger
    )
    if r.url and r.audio_url and not r.background_species and r.quality == 'A' and 10 <= r.length_seconds <= 120
]
recordings.sort(key=lambda recording: md5(recording.recording_id))
print(f'Found {len(recordings)} recordings')
recordings = recordings[:12]
#recordings = recordings[12:24]
recordings = {r.recording_id: r for r in recordings}

In [None]:
# Download (cached) and decode the MP3s. 

sr = 44100

def load_recording(recording):
    data = recordings_fetcher.fetch_cached(recording.audio_url)
    sound = pydub.AudioSegment.from_file(io.BytesIO(data), 'mp3')\
        .set_channels(1)\
        .set_frame_rate(sr)\
        .set_sample_width(2)
    return (recording.recording_id, sound)

pool = multiprocessing.pool.Pool(8)
sounds = dict(tqdm(pool.imap(load_recording, recordings.values(), 1), total=len(recordings)))
pool.close()

In [None]:
def show_spectrogram(sound, recording_id):
    y = np.array(sound.get_array_of_samples()) / 0x8000
    D = librosa.stft(y)
    S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)

    fig, ax = plt.subplots(figsize=(20, 5))
    img = librosa.display.specshow(S_db, x_axis='time', y_axis='linear', ax=ax, sr=sr)
    ax.set(title=recording_id)
    fig.colorbar(img, ax=ax, format="%+2.f dB")
    
    return fig, ax

def show_utterance(ax, utterance):
    start, end = utterance[0] / 1000, utterance[1] / 1000
    ax.add_patch(Rectangle((start, 0), end - start, sr / 2, edgecolor='none', facecolor='#00ff0040'))

def loudness_histogram(sound, num_bins=64):
    loudnesses = np.array([ms.dBFS for ms in sound])
    min_loudness = -90
    max_loudness = 0
    loudnesses[loudnesses == -np.inf] = min_loudness
    # It's important that we use fixed bins here.
    bin_edges = min_loudness + np.arange(num_bins + 1) / (num_bins + 1) * (max_loudness - min_loudness)
    hist, bin_edges = np.histogram(loudnesses, bins=bin_edges)
    return hist, bin_edges

loudness_histograms = {}
for recording_id, sound in sounds.items():
    loudness_histograms[recording_id] = loudness_histogram(sound)

In [None]:
# Bimodal distribution detection: fit two Gaussians to the curve.
# To find the initial parameters for the optimizer, first fit one Gaussian,
# subtract it from the histogram, then fit another Gaussian to the remaining curve.
#
# This often fails because one of the bumps isn't exactly Gaussian shaped,
# and it uses the other Gaussian to compensate for that.

def gaussian(x, mu, sigma, scale):
    # Coded such that we can use an optimizer that does not require bounds (Levenberg-Marquardt),
    # because it appears to be more robust that way.
    if sigma == 0:
        sigma = 1e-9
    scale = abs(scale)
    return scale * np.exp(-(x - mu)**2 / sigma**2)

def fit_gaussian(xs, ys):
    init_params = [np.average(xs), np.max(xs) - np.min(xs), np.max(ys)]
    min_params = [np.min(xs), np.min(xs[1:] - xs[:1]), 0]
    max_params = [np.max(xs), np.max(xs) - np.min(xs), np.max(ys)]
    
    popt, pcov = scipy.optimize.curve_fit(gaussian, xs, ys, p0=init_params, bounds=(min_params, max_params),
                                         method='dogbox', ftol=1e-3, xtol=1e-3, gtol=1e-3)
    
    return popt

def two_gaussians(x, *params):
    return gaussian(x, *params[0:3]) + gaussian(x, *params[3:6])

def fit_two_gaussians(xs, ys):
    init_params1 = fit_gaussian(xs, ys)
    init_params2 = fit_gaussian(xs, np.clip(ys - gaussian(xs, *init_params1), 0, np.inf))
    init_params = np.append(init_params1, init_params2)
    
    min_params = [np.min(xs), np.min(xs[1:] - xs[:1]), 0] * 2
    max_params = [np.max(xs), np.max(xs) - np.min(xs), np.max(ys)] * 2
    
    popt, pcov = scipy.optimize.curve_fit(two_gaussians, xs, ys, p0=init_params, bounds=(min_params, max_params),
                                          method='dogbox', ftol=1e-3, xtol=1e-3, gtol=1e-3)
    
    params = np.reshape(popt, (2, 3))
    if params[0][0] < params[1][0]:
        return params[0], params[1]
    else:
        return params[1], params[0]
    
fig, axs = plt.subplots((len(sounds) + 2) // 3, 3, figsize=(10, 8))
for ax, (recording_id, (hist, bin_edges)) in zip(axs.flat, loudness_histograms.items()):
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2
    
    params1, params2 = fit_two_gaussians(bin_centers, hist)
    
    ax.plot(bin_centers, hist, 'green')
    ax.plot(bin_centers, gaussian(bin_centers, *params1), 'red')
    ax.plot(bin_centers, gaussian(bin_centers, *params2), 'orange')
    ax.plot(bin_centers, two_gaussians(bin_centers, *params1, *params2), 'blue')
    ax.axvline(params1[0], color='red')
    ax.axvline(params2[0], color='orange')
    ax.set_title(recording_id)
    ax.title.set_url('https://' + recordings[recording_id].url)

plt.tight_layout()

In [None]:
# Bimodal distribution detection: same as above, but
# use the max of the two Gaussians instead of the sum.

def gaussian(x, mu, sigma, scale):
    # Coded such that we can use an optimizer that does not require bounds (Levenberg-Marquardt),
    # because it appears to be more robust that way.
    if sigma == 0:
        sigma = 1e-9
    scale = abs(scale)
    return scale * np.exp(-(x - mu)**2 / sigma**2)

def fit_gaussian(xs, ys):
    init_params = [np.average(xs), np.max(xs) - np.min(xs), np.max(ys)]
    min_params = [np.min(xs), np.min(xs[1:] - xs[:1]), 0]
    max_params = [np.max(xs), np.max(xs) - np.min(xs), np.max(ys)]
    
    popt, pcov = scipy.optimize.curve_fit(gaussian, xs, ys, p0=init_params, bounds=(min_params, max_params),
                                         method='dogbox', ftol=1e-3, xtol=1e-3, gtol=1e-3)
    
    return popt

def two_gaussians(x, *params):
    return np.maximum(gaussian(x, *params[0:3]), gaussian(x, *params[3:6]))

def fit_two_gaussians(xs, ys):
    init_params1 = fit_gaussian(xs, ys)
    init_params2 = fit_gaussian(xs, np.clip(ys - gaussian(xs, *init_params1), 0, np.inf))
    init_params = np.append(init_params1, init_params2)
    
    min_params = [np.min(xs), np.min(xs[1:] - xs[:1]), 0] * 2
    max_params = [np.max(xs), np.max(xs) - np.min(xs), np.max(ys)] * 2
    
    popt, pcov = scipy.optimize.curve_fit(two_gaussians, xs, ys, p0=init_params, bounds=(min_params, max_params),
                                          method='dogbox', ftol=1e-3, xtol=1e-3, gtol=1e-3)
    
    params = np.reshape(popt, (2, 3))
    if params[0][0] < params[1][0]:
        return params[0], params[1]
    else:
        return params[1], params[0]

fig, axs = plt.subplots((len(sounds) + 2) // 3, 3, figsize=(10, 8))
for ax, (recording_id, (hist, bin_edges)) in zip(axs.flat, loudness_histograms.items()):
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2
    
    params1, params2 = fit_two_gaussians(bin_centers, hist)
    
    ax.plot(bin_centers, hist, 'green')
    ax.plot(bin_centers, gaussian(bin_centers, *params1), 'red')
    ax.plot(bin_centers, gaussian(bin_centers, *params2), 'orange')
    ax.plot(bin_centers, two_gaussians(bin_centers, *params1, *params2), 'blue')
    ax.axvline(params1[0], color='red')
    ax.axvline(params2[0], color='orange')
    ax.set_title(recording_id)
    ax.title.set_url('https://' + recordings[recording_id].url)

plt.tight_layout()

In [None]:
# Bimodal distribution detection, attempt two: still fit two Gaussians to the curve.
# This time we fit one centered on the highest peak, and the second one
# to the highest peak in the remaining curve, both with some slight leeway for the solver.
#
# This is starting to look good, but maybe a Gaussian isn't quite the right function.

def gaussian(x, mu, sigma, scale):
    # Coded such that we can use an optimizer that does not require bounds (Levenberg-Marquardt),
    # because it appears to be more robust that way.
    if sigma == 0:
        sigma = 1e-9
    scale = abs(scale)
    return scale * np.exp(-(x - mu)**2 / sigma**2)

def fit_gaussian(xs, ys):
    x_range = np.max(xs) - np.min(xs)
    x_step = np.min(xs[1:] - xs[:1])
    peak_index = np.argmax(ys)
    mu = xs[peak_index]
    scale = ys[peak_index]
    delta_mu = 0.05 * x_range
    delta_scale = 0.1 * scale
    
    if scale <= 0:
        return [mu, x_range, 0]
    
    init_params = [mu, np.max(xs) - np.min(xs), scale]
    min_params = [mu - delta_mu, x_step, scale - delta_scale]
    max_params = [mu + delta_mu, x_range, scale]
    
    popt, pcov = scipy.optimize.curve_fit(gaussian, xs, ys, p0=init_params, bounds=(min_params, max_params),
                                          method='dogbox', ftol=1e-3, xtol=1e-3, gtol=1e-3)
    
    return popt

def two_gaussians(x, *params):
    return gaussian(x, *params[0:3]) + gaussian(x, *params[3:6])

def residual(xs, ys, *params):
    return np.clip(ys - gaussian(xs, *params), 0, np.inf)

def fit_two_gaussians(xs, ys):
    params1 = fit_gaussian(xs, ys)
    params2 = fit_gaussian(xs, residual(xs, ys, *params1))
    popt = [*params1, *params2]
    params = np.reshape(popt, (2, 3))
    if params[0][0] < params[1][0]:
        return params[0], params[1]
    else:
        return params[1], params[0]
    
fig, axs = plt.subplots((len(sounds) + 2) // 3, 3, figsize=(10, 8))
for ax, (recording_id, (hist, bin_edges)) in zip(axs.flat, loudness_histograms.items()):
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2
    
    params1, params2 = fit_two_gaussians(bin_centers, hist)
    
    ax.plot(bin_centers, two_gaussians(bin_centers, *params1, *params2), color='blue')
    ax.plot(bin_centers, gaussian(bin_centers, *params1), color='red', linewidth=0.5)
    ax.plot(bin_centers, gaussian(bin_centers, *params2), color='orange', linewidth=0.5)
    ax.plot(bin_centers, hist, color='green')
    ax.plot(bin_centers, residual(bin_centers, hist, *params1), '--', color='green', linewidth=0.5)
    ax.axvline(params1[0], color='red')
    ax.axvline(params2[0], color='orange')
    ax.set_title(f'{recording_id} - snr: {params2[0] - params1[0]:.1f} dB')
    ax.title.set_url('https://' + recordings[recording_id].url)

plt.tight_layout()

In [None]:
# Same as the last one, except we fit only to the part of the curve that is within 50% of the peak.

def gaussian(x, mu, sigma, scale):
    # Coded such that we can use an optimizer that does not require bounds (Levenberg-Marquardt),
    # because it appears to be more robust that way.
    if sigma == 0:
        sigma = 1e-9
    scale = abs(scale)
    return scale * np.exp(-(x - mu)**2 / sigma**2)

def fit_gaussian(xs, ys):
    x_range = np.max(xs) - np.min(xs)
    x_step = np.min(xs[1:] - xs[:1])
    
    peak_index = np.argmax(ys)
    threshold = ys[peak_index] * 0.5
    min_index = max(0, peak_index - 2)
    while min_index > 0 and ys[min_index - 1] > threshold:
        min_index -= 1
    max_index = min(peak_index + 3, len(ys))
    while max_index < len(ys) and ys[max_index] > threshold:
        max_index += 1
    
    mu = xs[peak_index]
    scale = ys[peak_index]
    
    init_params = [mu, np.max(xs) - np.min(xs), scale]
    min_params = [xs[min_index], x_step, 0]
    max_params = [xs[max_index - 1], x_range, 2.0 * scale]
    
    popt, pcov = scipy.optimize.curve_fit(gaussian, xs[min_index:max_index], ys[min_index:max_index],
                                          p0=init_params, bounds=(min_params, max_params),
                                          method='trf', ftol=1e-3, xtol=1e-3, gtol=1e-3)
    
    return popt

def two_gaussians(x, *params):
    return gaussian(x, *params[0:3]) + gaussian(x, *params[3:6])

def residual(xs, ys, *params):
    return np.clip(ys - gaussian(xs, *params), 0, np.inf)

def fit_two_gaussians(xs, ys):
    params1 = fit_gaussian(xs, ys)
    params2 = fit_gaussian(xs, residual(xs, ys, *params1))
    popt = [*params1, *params2]
    params = np.reshape(popt, (2, 3))
    if params[0][0] < params[1][0]:
        return params[0], params[1]
    else:
        return params[1], params[0]
    
fig, axs = plt.subplots((len(sounds) + 2) // 3, 3, figsize=(10, 8))
for ax, (recording_id, (hist, bin_edges)) in zip(axs.flat, loudness_histograms.items()):
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2
    
    params1, params2 = fit_two_gaussians(bin_centers, hist)
    
    ax.plot(bin_centers, two_gaussians(bin_centers, *params1, *params2), color='blue')
    ax.plot(bin_centers, gaussian(bin_centers, *params1), color='red', linewidth=0.5)
    ax.plot(bin_centers, gaussian(bin_centers, *params2), color='orange', linewidth=0.5)
    ax.plot(bin_centers, hist, color='green')
    ax.plot(bin_centers, residual(bin_centers, hist, *params1), '--', color='green', linewidth=0.5)
    ax.axvline(params1[0], color='red')
    ax.axvline(params2[0], color='orange')
    ax.set_title(f'{recording_id} - snr: {params2[0] - params1[0]:.1f} dB')
    ax.title.set_url('https://' + recordings[recording_id].url)

plt.tight_layout()