# Whale Vocalization Parsing

This notebook contains the following functionalities:

- Visualizing Data (time- and frequency-domain representations) 
- Dynamic time warping between two signals
- Cross correlation between two signals
- Monophonic pitch detection (zero-crossing, FFT peak, autocorrelation)
- Signal denoising (on a per signal basis)
- Locating probe signal in query for n-gram analysis

In [None]:
# this cell loads packages:

import numpy as np
import soundfile as sf
import pandas as pd
import random
import os
import sklean
import scipy
import bokeh
import librosa
import fastdtw
import IPython.display as ipd
import matplotlib.pyplot as plt

%matplotlib inline 
output_notebook()

# Audio Loading

In [None]:
# this cell loads audio:

def load_audio(filename):
    
    # read in file and convert to range [-1, 1]:
    
    srate, audio = scipy.io.wavfile.read(filename)
    audio = audio.astype(np.float32) / 32767.0 
    
    # set max to 0.9:
    
    if (len(audio.shape) == 1): 
        audio = (0.9 / max(np.abs(audio)) * audio)
    else: 
        audio[:,0] = (0.9 / max(np.abs(audio[:,0])) * audio[:,0])
        audio[:,1] = (0.9 / max(np.abs(audio[:,1])) * audio[:,1])
        return audio.transpose(), srate
    
    # return audio:
    
    return audio, srate 

# Audio Visualization

In [None]:
# this cell plots the time-domain representation of an audio file:

def plot_time_domain(audio, srate): 
    p = figure(plot_width=800, plot_height=200, x_axis_label='Time (s)', y_axis_label='Amplitude')
    time = np.linspace(0, len(audio)/srate, num=len(audio))
    p.line(time, audio)
    show(p)

In [None]:
# this cell plots the frequency-domain representation of an audio file:

def plot_freq_domain(audio, srate):
    f, t, s = scipy.signal.spectrogram(audio, srate)
    s = 10 * np.log10(s + 1e-40)
    p = figure(plot_width=800, plot_height=400, x_axis_label='Time (s)', y_axis_label='Frequency (Hz)')
    p.image(image=[s], x=0, y=0, dw=t[-1], dh=f[-1], palette="Viridis256", level="image")
    show(p)

# Audio Denoising

In [None]:
# this cell denoises an audio file:

def denoise(
    audio, 
    noise, 
    n_grad_freq   = 3,
    n_grad_time   = 4,
    n_fft         = 2048,
    win_length    = 2048,
    hop_length    = 512,
    n_std_thresh  = 1,
    prop_decrease = 1.0
):
    """
    
    Remove noise from audio based upon a clip containing only noise

    Args:
        audio (array)        : audio to denoise
        noise (array)        : noise sample
        n_grad_freq (int)    : how many frequency channels to smooth over with the mask.
        n_grad_time (int)    : how many time channels to smooth over with the mask.
        n_fft (int)          : number audio of frames between STFT columns.
        win_length (int)     : Each frame of audio is windowed by `window()`. The window will be of length `win_length` and then padded with zeros to match `n_fft`..
        hop_length (int)     : number audio of frames between STFT columns.
        n_std_thresh (int)   : how many standard deviations louder than the mean dB of the noise (at each frequency level) to be considered signal

    Returns:
        
        (array) The recovered signal with noise subtracted

    """

    # STFT over noise:
    
    stft_noise    = librosa.stft(y = noise, n_fft = n_fft, hop_length = hop_length, win_length = win_length)
    stft_noise_db = librosa.core.amplitude_to_db(np.abs(stft_noise))
    
    # calculate statistics over noise and noise threshold, over frequency axis:
    
    noise_freq_mean = np.mean(stft_noise_db, axis=1)
    noise_freq_std  = np.std(stft_noise_db, axis=1)
    noise_thresh    = noise_freq_mean + noise_freq_std * n_std_thresh
    
    # STFT over signal:

    stft_audio    = librosa.stft(y = audio, n_fft = n_fft, hop_length = hop_length, win_length = win_length)
    stft_audio_db = librosa.core.amplitude_to_db(np.abs(stft_audio))
    
    # calculate value to mask dB to:
    
    mask_gain_dB = np.min(stft_audio_db)
    
    # create a smoothing filter for the mask in time and frequency:
    
    smoothing_filter = np.outer(
        np.concatenate(
            [
                np.linspace(0, 1, n_grad_freq + 1, endpoint=False),
                np.linspace(1, 0, n_grad_freq + 2),
            ]
        )[1:-1],
        np.concatenate(
            [
                np.linspace(0, 1, n_grad_time + 1, endpoint=False),
                np.linspace(1, 0, n_grad_time + 2),
            ]
        )[1:-1],
    )
    smoothing_filter = smoothing_filter / np.sum(smoothing_filter)
    
    # calculate the threshold for each frequency/time bin:
    
    db_thresh = np.repeat(
        np.reshape(noise_thresh, [1, len(noise_freq_mean)]),
        np.shape(stft_audio_db)[1],
        axis=0,
    ).T
    
    # mask if the signal is above the threshold:
    
    mask_audio = stft_audio_db < db_thresh

    # convolve the mask with a smoothing filter:
    
    mask_audio = scipy.signal.fftconvolve(mask_audio, smoothing_filter, mode = "same")
    mask_audio = mask_audio * prop_decrease

    # mask the signal:
    
    stft_audio_db_masked = (stft_audio_db * (1 - mask_audio) + np.ones(np.shape(mask_gain_dB)) 
                            * mask_gain_dB * mask_audio)
    
    # mask real:
    
    audio_imag_masked = np.imag(stft_audio) * (1 - mask_audio)
    stft_audio_amp = (librosa.core.db_to_amplitude(stft_audio_db_masked) * np.sign(stft_audio)) + (
        1j * audio_imag_masked
    )

    # recover the signal:
    
    recovered_signal = librosa.istft(stft_audio_amp, hop_length = hop_length, win_length = win_length)

    return recovered_signal

# Audio Cross-Correlation

In [None]:
# this cell computes and visualizes the cross-correlation (or autocorrelation) between two signals:

def correlation(audio_probe, audio_query, srate):
    
    xcorrelation = np.correlate(audio_probe, audio_query, mode = 'full')
    
    p = figure(plot_width=800, plot_height=200, x_axis_label='Delay (s)', y_axis_label='Cross-Correlation')
    delay = np.linspace(0, len(audio_query)/srate, num=len(audio_query))
    p.line(delay, xcorrelation)
    show(p)
    
    return xcorrelation

# Audio Monophonic Pitch Detection

In [None]:
# this cell implements different monophonic pitch detection methods:

def pitch_zero_crossings(frame, srate): 
    
    zero_indices   = np.nonzero((frame[1:] >= 0) & (frame[:-1] < 0))[0]
    pitch_estimate = (srate / np.mean(np.diff(indices)))
    
    return pitch_estimate 

def pitch_fft(frame, srate): 
    
    mag            = np.abs(np.fft.fft(frame))
    mag            = mag[0:int(len(mag)/2)]
    pitch_estimate = np.argmax(mag) * (srate / len(frame))
    
    return pitch_estimate 

def pitch_autocorrelation(frame, srate):
    
    xcorrelation   = np.correlate(frame, frame, mode = 'full')
    derivative     = np.diff(xcorrelation[:int(len(xcorrelation)/2)+2])
    peak_indices   = np.nonzero((derivative[:-1] > 0) & (derivative[1:] <= 0))[0] + 1
    peak_values    = xcorrelation[peak_indices]
    peak_indices_sorted = peak_indices[np.argsort(peak_values)[-2:]]
    
    return srate/(peak_indices_sorted[1]-peak_indices_sorted[0])

def pitch_track(signal, hopSize, winSize, extractor, srate): 
    
    offsets = np.arange(0, len(signal), hopSize)
    pitch_track = np.zeros(len(offsets))
    amp_track = np.zeros(len(offsets))
    
    for (m, o) in enumerate(offsets): 
        frame = signal[o:o+winSize] 
        pitch_track[m] = extractor(frame, srate)
        amp_track[m] = np.sqrt(np.mean(np.square(frame)))  

        if (pitch_track[m] > 1500): 
            pitch_track[m] = 0 
    
    return (amp_track, pitch_track)

def sonify(amp_track, pitch_track, srate, hop_size):

    times = np.arange(0.0, float(hop_size * len(pitch_track)) / srate,
                      float(hop_size) / srate)

    # sample locations in time (seconds)                                                      
    sample_times = np.linspace(0, np.max(times), int(np.max(times)*srate-1))

    freq_interpolator = scipy.interpolate.interp1d(times,pitch_track)
    amp_interpolator = scipy.interpolate.interp1d(times,amp_track)
                                                                
    sample_freqs = freq_interpolator(sample_times)
    sample_amps  = amp_interpolator(sample_times)

    audio = np.zeros(len(sample_times));
    T = 1.0 / srate
    phase = 0.0
    
    for i in range(1, len(audio)):
        audio[i] = sample_amps[i] * np.sin(phase)
        phase = phase + (2*np.pi*T*sample_freqs[i])

    return audio

# Audio Dynamic Time Warping

In [None]:
# this cell computes the dynamic time warping between two audio signals

def dtw_table(x, y, distance = None):
    
    if distance is None:
        distance = scipy.spatial.distance.euclidean
    nx = len(x)
    ny = len(y)
    table = np.zeros((nx+1, ny+1))
    
    # compute left column separately, i.e. j=0.
    table[1:, 0] = np.inf
        
    # compute top row separately, i.e. i=0.
    table[0, 1:] = np.inf
        
    # Fill in the rest.
    for i in range(1, nx+1):
        for j in range(1, ny+1):
            d = distance(x[i-1], y[j-1])
            table[i, j] = d + min(table[i-1, j], table[i, j-1], table[i-1, j-1])
    return table

# dtw table traceback function:

def dtw_path(x, y, table):
    
    i = len(x)
    j = len(y)
    path = [(i, j)]
    while i > 0 or j > 0:
        minval = np.inf
        if table[i-1][j-1] < minval:
            minval = table[i-1, j-1]
            step = (i-1, j-1)
        if table[i-1, j] < minval:
            minval = table[i-1, j]
            step = (i-1, j)
        if table[i][j-1] < minval:
            minval = table[i, j-1]
            step = (i, j-1)
        path.insert(0, step)
        i, j = step
    
    return np.array(path)

# plot dtw table and path, along with signals:

def plot_dtw(table, path, signal1, signal2):
    
    %matplotlib widget

    plt.figure(figsize = (10, 10))

    # Bottom right plot.
    ax1 = plt.axes([0.2, 0, 0.8, 0.2])
    ax1.imshow(signal1, origin = 'upper', aspect = 'auto', cmap = 'coolwarm')
    ax1.set_xlabel('Signal 1')
    ax1.set_xticks([])
    ax1.set_yticks([])
    ax1.set_ylim(10)

    # Top left plot.
    ax2 = plt.axes([0, 0.2, 0.20, 0.8])
    ax2.imshow(signal2.T, origin = 'lower', aspect = 'auto', cmap = 'coolwarm')
    ax2.set_ylabel('Signal 2')
    ax2.set_xticks([])
    ax2.set_yticks([])
    ax2.set_ylim(1)

    # Top right plot.
    ax3 = plt.axes([0.2, 0.2, 0.8, 0.8], sharex = ax1, sharey = ax2)
    ax3.imshow(table.T, aspect = 'auto', origin = 'upper', interpolation = 'nearest', cmap = 'gray')
    ax3.set_xticks([])
    ax3.set_yticks([])

    # Path.
    ax3.plot(path[:,0], path[:,1], 'r')

# Audio Isolation

In [None]:
# this cell trims a denoised call file based on a amplitude/pitch track version:

def trim(audio, amplitude_track, srate, binSize = 6, hopSize = 256, threshold = 2.5, sensitivity = 5):

    # get noise at beginning and end of audio clip: 
    
    begin_noise_level = np.mean(amplitude_track[:int(len(amplitude_track)/binSize)])
    end_noise_level   = np.mean(amplitude_track[-int(len(amplitude_track)/binSize):])
        
    index_first = None
    index_last  = None
    
    # if signal > threshold * begin_noise_level many times in a row, trim here (from track start):
    
    for i in range(len(amplitude_track)):
        if amplitude_track[i] > threshold * begin_noise_level:
            if all(amplitude_track[j] > threshold * begin_noise_level for j in range(i, i + sensitivity + 1)):
                index_first = i
                break
    
    # if signal > threshold * end_noise_level many times in a row, trim here (from track end):

    for i in range(len(amplitude_track)):
        if amplitude_track[i] > threshold * end_noise_level:
            if all(amplitude_track[j] > threshold * end_noise_level for j in range(i - sensitivity + 2, i + 1)):
                index_last = i
    
    return audio[index_first * hopSize:(index_last + 10) * hopSize]

# Audio Probe Localization

In [None]:
# this cell takes a probe sequence and a query sequence, and locates the probe in the query:

def probe_localization_xcorrelation_time(probe_audio, query_audio, winSize, srate, threshold, mode):
    
    # get correlation between probe and query, and probe and probe:
    
    correlation_pq = np.correlate(probe_audio, query_audio, mode = 'same')[::-1]
    correlation_pp = np.correlate(probe_audio, probe_audio, mode = 'same')
    
    # mode:
    
    if mode == 'log':
        
        correlation_pq = np.log(correlation_pq)
        correlation_pp = np.log(correlation_pp)
    
    # define localization threshold based on probe-probe correlation:
    
    thresh = threshold * max(correlation_pp)

    offsets = np.arange(0, len(correlation_pq), winSize)
    amp = np.zeros(len(offsets))
    
    for (m, o) in enumerate(offsets): 
        frame = correlation_pq[o:o+winSize] 
        amp[m] = np.max(np.abs(frame))
    
    localizations = np.array([i for i in range(1, len(amp) - 1) 
                              if (amp[i] > amp[i - 1] and amp[i] > amp[i + 1] and amp[i] > thresh)])
    
    return amp, localizations * winSize / srate

def probe_localization_xcorrelation_stft(probe_audio, query_audio, winSize, srate, threshold):
    
    # compute STFT of probe and query audio:
    
    stft_probe = librosa.stft(probe_audio, n_fft=winSize)
    stft_query = librosa.stft(query_audio, n_fft=winSize)

    # calculate magnitude spectra:
    
    mag_probe = np.abs(stft_probe)
    mag_query = np.abs(stft_query)

    # compute cross-correlation in frequency domain:
    
    correlation_pq = np.abs(np.correlate(mag_probe.flatten(), mag_query.flatten(), mode='same'))

    # compute probe-probe correlation:
    
    correlation_pp = np.correlate(mag_probe.flatten(), mag_probe.flatten(), mode='same')

    # define localization threshold based on probe-probe correlation:
    
    thresh = threshold * np.max(correlation_pp)

    # find peaks in the correlation:
    
    offsets = np.arange(0, len(correlation_pq), winSize)
    amp = np.zeros(len(offsets))
    
    for (m, o) in enumerate(offsets): 
        frame = correlation_pq[o:o+winSize] 
        amp[m] = np.max(np.abs(frame))
    
    localizations = np.array([i for i in range(1, len(amp) - 1) 
                              if (amp[i] > amp[i - 1] and amp[i] > amp[i + 1] and amp[i] > thresh)])
    
    return amp, localizations * winSize / (2 * srate)

# Audio Similarity

In [None]:
# this cell measures the similarity between two calls, using a variety of methods:

# cross-correlation peak max:

def similarity_cross_correlation(audio_1, audio_2):
    
    correlation = np.correlate(audio_1, audio_2, mode = "same")
    
    return max(correlation)

# dtw distance:

def similarity_dynamic_time_warping(audio_1, audio_2):
    
    distance, path = fastdtw(audio_1, audio_2)
    
    return distance

# mfcc distance:

def similarity_mfcc(audio_1, audio_2, srate):
    
    mfccs_1 = librosa.feature.mfcc(y = audio_1, sr = srate)
    mfccs_2 = librosa.feature.mfcc(y = audio_2, sr = srate)
    
    distance = np.linalg.norm(mfccs_1 - mfccs_2)
    
    return distance

# Audio Features

In [None]:
def extract_audio_features(audio_file, srate):

    # initialize features dictionary:
    
    features_dict = {}
    
    # diy features:
    
    features_dict['length']    = len(audio_file)/srate
    #features_dict['frequency'] = np.abs(np.fft.fftfreq(len(audio_file), 1 / srate)[np.argmax(np.abs(np.fft.fft(audio_file)))])
    
    # spectral features:
    
    spectral_centroid  = librosa.feature.spectral_centroid(y = audio_file, sr = srate)
    spectral_bandwidth = librosa.feature.spectral_bandwidth(y = audio_file, sr = srate)
    spectral_contrast  = librosa.feature.spectral_contrast(y = audio_file, sr = srate)
    spectral_flatness  = librosa.feature.spectral_flatness(y = audio_file)
    spectral_rolloff   = librosa.feature.spectral_rolloff(y = audio_file, sr = srate)
    
    features_dict['spectral_centroid_mean']  = spectral_centroid.mean()
    features_dict['spectral_bandwidth_mean'] = spectral_bandwidth.mean()
    features_dict['spectral_contrast_mean']  = spectral_contrast.mean()
    features_dict['spectral_flatness_mean']  = spectral_flatness.mean()
    features_dict['spectral_rolloff_mean']   = spectral_rolloff.mean()
     
    features_dict['spectral_centroid_std']  = spectral_centroid.std()
    features_dict['spectral_bandwidth_std'] = spectral_bandwidth.std()
    features_dict['spectral_contrast_std']  = spectral_contrast.std()
    features_dict['spectral_flatness_std']  = spectral_flatness.std()
    features_dict['spectral_rolloff_std']   = spectral_rolloff.std()

    # temporal features:
    
    zero_crossing_rate = librosa.feature.zero_crossing_rate(y = audio_file)
    rmse = librosa.feature.rms(y = audio_file)
    
    features_dict['zero_crossing_rate_mean'] = zero_crossing_rate.mean()
    features_dict['rmse_mean'] = rmse.mean()

    features_dict['zero_crossing_rate_std'] = zero_crossing_rate.std()
    features_dict['rmse_std'] = rmse.std()
    
    # MFCCs:
    
    mfccs = librosa.feature.mfcc(y = audio_file, sr = srate)
    for i in range(mfccs.shape[0]):
        features_dict[f'mfcc_{i+1}'] = mfccs[i].mean()

    # Additional features
    chroma = librosa.feature.chroma_stft(y = audio_file, sr = srate)
    #tempogram = librosa.feature.tempogram(y = audio_file, sr = srate)
    tonnetz = librosa.feature.tonnetz(y = audio_file, sr = srate)
    
    for i in range(chroma.shape[0]):
        features_dict[f'chroma_{i+1}'] = chroma[i].mean()
        
    #for i in range(tempogram.shape[0]):
    #    features_dict[f'tempogram_{i+1}'] = tempogram[i].mean()
        
    for i in range(tonnetz.shape[0]):
        features_dict[f'tonnetz_{i+1}'] = tonnetz[i].mean()

    return features_dict


# Audio Clustering

In [None]:
# this cell takes an array of feature vectors, and clusters them:

def cluster_adaptive_kmeans(data, max_clusters = 50, tolerance = 0.01):
    
    kmeans = sklearn.cluster.Kmeans(n_clusters = 10, random_state = 0).fit(data)
    prev_inertia = kmeans.inertia_
    
    for n_clusters in range(11, max_clusters + 1):
        
        kmeans = KMeans(n_clusters = n_clusters, random_state = 0).fit(data)
        inertia = kmeans.inertia_
        
        if (prev_inertia - inertia) / prev_inertia < tolerance:
            break
        
        prev_inertia = inertia
    
    return kmeans

def cluster_dbscan(data, eps = 0.5, min_samples = 5):
    
    dbscan = sklearn.cluster.DBSCAN(eps = eps, min_samples = min_samples)    
    dbscan.fit(data)
    labels = dbscan.labels_
    n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
    n_noise_ = list(labels).count(-1)
    
    return labels, n_clusters_, n_noise_

# Experimentation

In [None]:
# load two calls:

(whale_1, srate) = load_audio('whale1.wav')
(whale_2, srate) = load_audio('whale2.wav')


In [None]:
# call 1 (original):

plot_time_domain(whale_1, srate)
plot_freq_domain(whale_1, srate)
ipd.Audio(whale_1, rate = srate)

In [None]:
# call 1 (denoised):

whale_1_denoised = denoise(whale_1, whale_1[:int(len(whale_1)/10)], n_std_thresh = 1.5)
plot_time_domain(whale_1_denoised, srate)
plot_freq_domain(whale_1_denoised, srate)
ipd.Audio(whale_1_denoised, rate = srate)

In [None]:
# call 2 (original):

plot_time_domain(whale_2, srate)
plot_freq_domain(whale_2, srate)
ipd.Audio(whale_2, rate = srate)

In [None]:
# call 2 (denoised):

whale_2_denoised = denoise(whale_2, whale_2[:int(len(whale_2)/10)], n_std_thresh = 1)
plot_time_domain(whale_2_denoised, srate)
plot_freq_domain(whale_2_denoised, srate)
ipd.Audio(whale_2_denoised, rate = srate)

In [None]:
# call 1 monophonic pitch detection:

whale_1_amp, whale_1_pitch  = pitch_track(whale_1_denoised, 256, 512, pitch_autocorrelation, 44100)
whale_1_pitch_filtered      = signal.medfilt(whale_1_pitch, kernel_size=15)

p = figure(plot_width=800, plot_height=200, x_axis_label='Time (s)', y_axis_label='Pitch (Extracted)')
time = np.linspace(0, len(whale_1)/srate, num=len(whale_1_pitch_filtered))
p.line(time, whale_1_pitch_filtered)
show(p)

p = figure(plot_width=800, plot_height=200, x_axis_label='Time (s)', y_axis_label='Amplitude (Extracted)')
time = np.linspace(0, len(whale_1)/srate, num=len(whale_1_pitch))
p.line(time, whale_1_amp)
show(p)

whale_1_mpd                 = sonify(whale_1_amp, whale_1_pitch_filtered, srate, 256)
ipd.Audio(whale_1_mpd, rate = srate)

In [None]:
# call 2 monophonic pitch detection:

whale_2_amp, whale_2_pitch  = pitch_track(whale_2_denoised, 256, 512, pitch_autocorrelation, 44100)
whale_2_pitch_filtered      = signal.medfilt(whale_2_pitch, kernel_size=15)

p = figure(plot_width=800, plot_height=200, x_axis_label='Time (s)', y_axis_label='Pitch (Extracted)')
time = np.linspace(0, len(whale_2)/srate, num=len(whale_2_pitch_filtered))
p.line(time, whale_2_pitch_filtered)
show(p)

p = figure(plot_width=800, plot_height=200, x_axis_label='Time (s)', y_axis_label='Amplitude (Extracted)')
time = np.linspace(0, len(whale_2)/srate, num=len(whale_2_pitch))
p.line(time, whale_2_amp)
show(p)

whale_2_mpd                 = sonify(whale_2_amp, whale_2_pitch_filtered, srate, 256)
ipd.Audio(whale_2_mpd, rate = srate)

In [None]:
# calls 1 and 2 alignment by DTW and visualization:

whale_1_stft = librosa.feature.chroma_stft(y = whale_1_denoised, sr = srate, hop_length = 256)
whale_2_stft = librosa.feature.chroma_stft(y = whale_2_denoised, sr = srate, hop_length = 256)

distance = dtw_table(whale_1_stft.T, whale_2_stft.T, distance = scipy.spatial.distance.cosine)
path     = dtw_path(whale_1_stft.T, whale_2_stft.T, distance)

plot_dtw(distance, path, whale_1_stft, whale_2_stft)

In [None]:
# call 1 denoised and trimmed:

whale_1_denoised_trimmed = trim(whale_1_denoised, whale_1_amp, srate, sensitivity = 10)
plot_time_domain(whale_1_denoised_trimmed, srate)
plot_freq_domain(whale_1_denoised_trimmed, srate)
ipd.Audio(whale_1_denoised_trimmed, rate = srate)

In [None]:
# call 2 denoised and trimmed:

whale_2_denoised_trimmed = trim(whale_2_denoised, whale_2_amp, srate, sensitivity = 10)
plot_time_domain(whale_2_denoised_trimmed, srate)
plot_freq_domain(whale_2_denoised_trimmed, srate)
ipd.Audio(whale_2_denoised_trimmed, rate = srate)

In [None]:
whale_1_trimmed_stft = librosa.feature.chroma_stft(y = whale_1_denoised_trimmed, sr = srate, hop_length = 256)
whale_2_trimmed_stft = librosa.feature.chroma_stft(y = whale_2_denoised_trimmed, sr = srate, hop_length = 256)

distance = dtw_table(whale_1_trimmed_stft.T, whale_2_trimmed_stft.T, distance = scipy.spatial.distance.cosine)
path     = dtw_path(whale_1_trimmed_stft.T, whale_2_trimmed_stft.T, distance)

plot_dtw(distance, path, whale_1_trimmed_stft, whale_2_trimmed_stft)

In [None]:
# build a fake query sequence and search for probe in query:

(whale_1, srate) = load_audio('whale1.wav')
(whale_2, srate) = load_audio('whale2.wav')
(whale_3, srate) = load_audio('whale3.wav')
(whale_probe, srate) = load_audio('whale_probe.wav')

whale_1_denoised = denoise(whale_1, whale_1[:int(len(whale_1)/10)])
whale_2_denoised = denoise(whale_2, whale_2[:int(len(whale_2)/10)])
whale_3_denoised = denoise(whale_3, whale_3[:int(len(whale_3)/10)])
whale_probe_denoised = denoise(whale_probe, whale_probe[:int(len(whale_probe)/10)])

whale_1_denoised_ = whale_1_denoised/np.max(np.abs(whale_1_denoised))
whale_2_denoised_ = whale_2_denoised/np.max(np.abs(whale_2_denoised))
whale_3_denoised_ = whale_3_denoised/np.max(np.abs(whale_3_denoised))
whale_probe_denoised_ = whale_probe_denoised/np.max(np.abs(whale_probe_denoised))

query = np.concatenate((whale_1_denoised_, whale_2_denoised_, whale_3_denoised_))
probe = whale_1_denoised

In [None]:
plot_time_domain(whale_1_denoised, srate)
plot_time_domain(whale)

In [None]:
probe = whale_2_denoised
stft_probe = librosa.stft(probe)
stft_query = librosa.stft(query)
mag_probe = np.abs(stft_probe)
mag_query = np.abs(stft_query)
correlation_pq = np.abs(np.correlate(mag_probe.flatten(), mag_query.flatten(), mode='full'))
correlation_pq

In [None]:
plot_time_domain(correlation_pq, srate) # whale_1_denoised

In [None]:
plot_time_domain(correlation_pq, srate) # whale_2_denoised

In [None]:
plot_time_domain(correlation_pq, srate) # whale_3_denoised

In [None]:
localization = probe_localization_xcorrelation_stft(probe, query, 22050, 44100, 0.1)

p = figure(plot_width=800, plot_height=200, x_axis_label='Time (s)', y_axis_label='Amplitude')
p.line(np.linspace(0, len(query)/44100, len(query)), query)
show(p)

p = figure(plot_width=800, plot_height=200, x_axis_label='Delay (s)', y_axis_label='Cross-Correlation')
p.line(np.linspace(0, int(len(localization[0]) * 22050 / (2 * 44100) ), len(localization[0])), localization[0])
show(p)

print("Calls located at: ", *localization[1], "seconds")

In [None]:
p = figure(plot_width=800, plot_height=200, x_axis_label='Delay (s)', y_axis_label='Cross-Correlation')
p.line(np.linspace(0, int(len(localization[0]) * 22050 / (2 * 44100) ), len(localization[0])), localization[0]**5)
show(p)

## Call Variance

In [None]:
files_according_to_type

In [None]:
# retrieve audio file names, pods, call types:

files = os.listdir('c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data')

pods  = np.array([string.split('-')[0] for string in files])
types = np.array([string.split('-')[1] for string in files])

pods_unique  = np.unique(pods)
types_unique = np.unique(types)

files_according_to_pod  = {x.split('-')[0]: [f for f in files if f.startswith(x.split('-')[0])] for x in files}
files_according_to_type = {x.split('-')[1]: [f for f in files if f.split('-')[1] == x.split('-')[1]] for x in files}

In [None]:
# loop through all files, denoise, and trim:

for i in files:
    
    try: 

        # load audio:

        audio = load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/call-catalog/orig/wav/"+i)

        # denoise:

        audio_denoised = denoise(audio[0], audio[0][:int(len(audio[0])/10)])

        # trim:

        audio_denoised_amp, audio_denoised_pitch  = pitch_track(audio_denoised, 256, 512, pitch_autocorrelation, audio[1])
        audio_denoised_trimmed = trim(audio_denoised, audio_denoised_amp, audio[1], sensitivity = 10)

        # output:

        sf.write("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data/"+i, audio_denoised_trimmed, audio[1])
        
    except:
        
        pass

In [None]:
def extract_audio_features(audio_file, srate):

    # initialize features dictionary:
    
    features_dict = {}
    
    # diy features:
    
    features_dict['length']    = len(audio_file)/srate

    # temporal features:
    
    zero_crossing_rate = librosa.feature.zero_crossing_rate(y = audio_file)
    rmse = librosa.feature.rms(y = audio_file)
    
    features_dict['zero_crossing_rate_mean'] = zero_crossing_rate.mean()
    features_dict['rmse_mean'] = rmse.mean()

    features_dict['zero_crossing_rate_std'] = zero_crossing_rate.std()
    features_dict['rmse_std'] = rmse.std()
    
    # MFCCs:
    
    mfccs = librosa.feature.mfcc(y = audio_file, sr = srate)
    for i in range(mfccs.shape[0]):
        features_dict[f'mfcc_{i+1}'] = mfccs[i].mean()

    # Additional features
    chroma = librosa.feature.chroma_stft(y = audio_file, sr = srate)
    #tempogram = librosa.feature.tempogram(y = audio_file, sr = srate)
    tonnetz = librosa.feature.tonnetz(y = audio_file, sr = srate)
    
    for i in range(chroma.shape[0]):
        features_dict[f'chroma_{i+1}'] = chroma[i].mean()
        
    #for i in range(tempogram.shape[0]):
    #    features_dict[f'tempogram_{i+1}'] = tempogram[i].mean()
        
    for i in range(tonnetz.shape[0]):
        features_dict[f'tonnetz_{i+1}'] = tonnetz[i].mean()

    return features_dict


In [None]:
# calculate global call variances across pods:

features = []

for key, values in files_according_to_type.items():
            
    for filename in values:
        
        try:
        
            audio = load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data/"+filename)
            stuff = list(extract_audio_features(audio[0], audio[1]).values())
            features.append([key, *stuff])
            
        except:
            
            pass

In [None]:
data = pd.DataFrame(features, columns = ["key", *list(extract_audio_features(audio[0], audio[1]).keys())])
data.to_csv("./data.csv", index = False)

In [None]:
# conduct PCA and look at variance of first two components:

labels_  = [sublist[0] for sublist in features] 
feature_ = [sublist[1:] for sublist in features] 
colors = ['#{:02x}{:02x}{:02x}'.format(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) for _ in range(55)]
y        = [{label: idx for idx, label in enumerate(set(labels_))}[string] for string in labels_]

pca = PCA(n_components = 2)

features_PCA = pca.fit_transform(feature_)

# plot PCA:

source_features_PCA = ColumnDataSource(data=dict(
    x = features_PCA[:, 0],
    y = features_PCA[:, 1],
    color = [colors[i] for i in y]
    #label = [["instrumental", "hiphop", "rock", "folk"][i] for i in y]
))

p = figure(title="PCA of meanMFCC", plot_width = 500, plot_height = 500)
p.scatter(x = 'x', y = 'y', size = 8,  color = 'color', source = source_features_PCA)
p.xaxis.axis_label = 'Principal Component 1'
p.yaxis.axis_label = 'Principal Component 2'
show(p)

In [None]:
df = pd.DataFrame(features_PCA, columns = ["PCA1", "PCA2"])
df['Label'] = labels_

# compute variance across each axis:

df_avg = df.groupby('Label').mean()
df_std = df.groupby('Label', as_index = False).std()

# arrange data according to weighted sum over variances:

weights = {'PCA1': pca.explained_variance_ratio_[0], 'PCA2': pca.explained_variance_ratio_[1]}
weighted_sum = df_std[['PCA1', 'PCA2']].mul(weights).sum(axis=1)
df_std['Weighted_Sum'] = weighted_sum
df_std_sorted = df_std.sort_values(by = 'Weighted_Sum', ascending=False)

In [None]:
# Create a ColumnDataSource

labels = df_std_sorted["Label"]
weighted_sums = df_std_sorted["Weighted_Sum"]

source = ColumnDataSource(data = dict(labels = labels, weighted_sums = weighted_sums))

# Create a figure
p = figure(x_range = labels, plot_height = 400, plot_width = 800, toolbar_location=None, tools="")
p.circle(x='labels', y='weighted_sums', size = 10, color="navy", source=source)
p.xaxis.major_label_orientation = np.pi / 4
p.xaxis.axis_label = "Call Type"
p.yaxis.axis_label = "Weighted 2-Component PCA Standard Deviation"
show(p)

## N-Gram Location

In [None]:
# from each call-type, select one probe:

probes = []

for key, values in files_according_to_type.items():
    
    probes.append([key, load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data/"
                                  +values[0])[0]])

probes = [["honk", whale_1_denoised], ["N04", whale_2_denoised], ["N09", whale_3_denoised]]
    
# load big audio track:

query = np.concatenate((whale_1_denoised_, whale_2_denoised_, whale_3_denoised_))
#query = load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/432A.wav")[0]

In [None]:
timestamps

In [None]:
# collect timestamps per probe:

timestamps = []

for probe in probes:
    
    print(probe[0])
    times  = probe_localization_xcorrelation_stft(probe[1], query, 22050, 44100, 0.25)[1]
    stamps = [[probe[0], value] for value in times]
    
    for stamp in stamps:
        
        timestamps.append(stamp)
    
# order by timestamp:

timestamps_sorted = sorted(timestamps, key = lambda x: x[1])

In [None]:
# build dictionary of n-grams and increase count:

ngrams    = {}
threshold = 10 

for i in range(0, timestamps_sorted - 1):
    
    label = ','.join(sorted([timestamps_sorted[i][0], timestamps_sorted[i+1][0]]))
    
    if timestamps_sorted[i+1][1] - timestamps_sorted[i][1] < 10:
        
        if label in ngrams:
            ngrams[label] += 1
        else:
            ngrams[label] = 1

In [None]:
# plot ngram frequencies:

ngrams_counts = list(ngrams.items())
ngrams_counts_sorted = sorted(ngrams_counts, key = lambda x: x[1], reverse = True)

labels = [pair[0] for pair in ngrams_counts_sorted]
counts = [pair[1] for pair in ngrams_counts_sorted]

source = ColumnDataSource(data = dict(labels = labels, counts = counts))

# Create a figure
p = figure(x_range = labels, plot_height = 400, plot_width = 800, toolbar_location=None, tools="")
p.circle(x='labels', y='counts', size = 10, color="navy", source=source)
p.xaxis.major_label_orientation = np.pi / 4
p.xaxis.axis_label = "Call N-Gram"
p.yaxis.axis_label = "Count"
show(p)

## X-Correlation and DTW

In [None]:
files_N01 = files_according_to_type['N01']
files_N09 = files_according_to_type['N09']

In [None]:
# compute all pairwise similarities for each call type:

similarities_xcorrelation_N01  = []
similarities_dtw_N01           = []
similarities_mfcc_N01          = []

for i in range(0, len(files_N01)):
    
    audio_1 = load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data/"+files_N01[i])
    
    for j in range(i+1, len(files_N01)):
        
        audio_2 = load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data/"+files_N01[j])
    
        similarities_xcorrelation_N01.append(similarity_cross_correlation(audio_1[0], audio_2[0]))
        similarities_dtw_N01.append(similarity_dynamic_time_warping(audio_1[0], audio_2[0]))
        #similarities_mfcc_N01.append(similarity_mfcc(audio_1[0], audio_2[0], audio_1[1]))

similarities_xcorrelation_N09  = []
similarities_dtw_N09           = []
similarities_mfcc_N09          = []

for i in range(0, len(files_N09)):
    
    audio_1 = load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data/"+files_N09[i])
    
    for j in range(i+1, len(files_N09)):
        
        audio_2 = load_audio("c:/users/tsmza/desktop/school/course/csc575/project/whales/data/data/"+files_N09[j])
    
        similarities_xcorrelation_N09.append(similarity_cross_correlation(audio_1[0], audio_2[0]))
        similarities_dtw_N09.append(similarity_dynamic_time_warping(audio_1[0], audio_2[0]))
        #similarities_mfcc_N01.append(similarity_mfcc(audio_1[0], audio_2[0], audio_1[1]))

In [None]:
N01 = [similarities_xcorrelation_N01, similarities_dtw_N01]
N09 = [similarities_xcorrelation_N09, similarities_dtw_N09]

ALL = [N01, N09]

In [None]:
# Extracting data:

N01_XCOR = N01[0]
N01_DTW = N01[1]
N09_XCOR = N09[0]
N09_DTW = N09[1]

# Creating boxplot:

plt.figure(figsize=(5, 5))
plt.boxplot([N01_XCOR, N09_XCOR], labels=['N01', 'N09'])
plt.xlabel('Call Type')
plt.ylabel('Cross-Correlation')
plt.grid(True)
plt.show()

In [None]:
plt.figure(figsize=(5, 5))
plt.boxplot([N01_DTW, N09_DTW], labels=['N01', 'N09'])
plt.xlabel('Call Type')
plt.ylabel('DTW')
plt.grid(True)
plt.show()

In [None]:
from scipy import stats

# Perform Kolmogorov-Smirnov two-sample test
statistic, p_value = stats.ks_2samp(N01_XCOR, N09_XCOR)

# Print the results
print("K-S statistic:", statistic)
print("p-value:", p_value)

# Interpret the p-value
alpha = 0.05
if p_value < alpha:
    print("Reject the null hypothesis: distributions are different.")
else:
    print("Fail to reject the null hypothesis: distributions are the same.")

In [None]:

# Perform Kolmogorov-Smirnov two-sample test
statistic, p_value = stats.ks_2samp(N01_DTW, N09_DTW)

# Print the results
print("K-S statistic:", statistic)
print("p-value:", p_value)

# Interpret the p-value
alpha = 0.05
if p_value < alpha:
    print("Reject the null hypothesis: distributions are different.")
else:
    print("Fail to reject the null hypothesis: distributions are the same.")