In [None]:
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
%matplotlib inline
import seaborn as sns
from IPython.display import Audio
from scipy.io import wavfile
from scipy import fft
from scipy import signal
from scipy import optimize
from scipy import stats
import librosa.display as ld
import librosa.feature as lf
import librosa as lb
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from sklearn.manifold import TSNE
from umap import UMAP
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)
palette = ['#000000', 
           '#ff0000', '#00ff00', '#0000ff', 
           '#00ffff', '#ff00ff', '#ffff00', 
           '#ff7777', '#77ff77', '#7777ff', 
           '#77ffff', '#ff77ff', '#ffff77', 
           '#ff7700', '#ff0077', '#77ff00',
           '#00ff77', '#7700ff', '#0077ff',
           '#007777', '#770077', '#777700',
           '#777777']

# Finding Notes in Music

What happens when we apply unsuperised learning to musical recordings? 

Can we find specific notes?

## Data Sources

### MusicNet

https://www.kaggle.com/imsparsh/musicnet-dataset

In [None]:
meta_df = pd.read_csv('musicnet_metadata.csv')
meta_df.head()

In [None]:
meta_df['ensemble'].unique()

In [None]:
solos = [ens for ens in list(meta_df['ensemble'].unique()) if 'Solo' in ens]

In [None]:
mn_instruments_df = meta_df[meta_df['ensemble'].isin(solos)].copy()

In [None]:
mn_instruments_df.groupby(by='ensemble').count()['id']

I want more than 4 instruments.

In [None]:
mn_instruments_df['ensemble'] = mn_instruments_df['ensemble'].apply(lambda x: x.split('Solo ')[1])

In [None]:
mn_instruments_df.rename(columns={'ensemble':'instrument'}, inplace=True)

In [None]:
mn_instruments_df['file_path'] = mn_instruments_df['id'].apply(lambda i: f'musicnet/{i}.wav')

In [None]:
cello_df = mn_instruments_df[mn_instruments_df['instrument'] == 'Cello'].copy()
flute_df = mn_instruments_df[mn_instruments_df['instrument'] == 'Flute'].copy()
piano_df = mn_instruments_df[mn_instruments_df['instrument'] == 'Piano'].copy()
violin_df = mn_instruments_df[mn_instruments_df['instrument'] == 'Violin'].copy()

In [None]:
cfpv_df = pd.concat([cello_df.sample(n=3), flute_df.sample(n=3), piano_df.sample(n=3), violin_df.sample(n=3)])

### URMP

http://www2.ece.rochester.edu/projects/air/projects/URMP.html

In [None]:
ins_abbrev = {
    'vn':'Violin',
    'va':'Viola',
    'vc':'Cello',
    'db':'Double Bass',
    'fl':'Flute',
    'ob':'Oboe',
    'cl':'Clarinet',
    'sax':'Saxophone',
    'bn':'Bassoon',
    'tpt':'Trumpet',
    'hn':'Horn',
    'tbn':'Trombone',
    'tba':'Tuba'}

In [None]:
ls = !ls urmp/Dataset/

In [None]:
ls.remove('Supplementary_Files')

In [None]:
file_path_list = []
for directory in ls:
    files = !ls urmp/Dataset/{directory}
    for f in files:
        if '.wav' in f:
            file_path_list.append(f'urmp/Dataset/{directory}/{f}')

In [None]:
def get_ins(file_path):
    return file_path.split('/')[-1].split('_')[2]
ins_list = [f for f in file_path_list if get_ins(f) in ins_abbrev]

In [None]:
urmp_df = pd.DataFrame()
urmp_df['file_path'] = ins_list
urmp_df['instrument'] = list(map(get_ins, ins_list))

In [None]:
urmp_df.replace(ins_abbrev, inplace=True)

In [None]:
urmp_df.head()

In [None]:
urmp_df.groupby(by='instrument').count()

## Combination

URMP (13 instruments) + MusicNet (4 instuments) - overlap (3 instruments) = unique (14 instruments)

In [None]:
sample_list = []
for i in list(urmp_df['instrument'].unique()):
    i_df = urmp_df[urmp_df['instrument'] == i].copy()
    sample_list.append(i_df.sample(n=3))
sample_list.append(piano_df[['file_path','instrument']].sample(n=3))
equal_df = pd.concat(sample_list)

In [None]:
equal_df.reset_index(drop=True, inplace=True)

In [None]:
equal_df

In [None]:
def reduce_dimension(X, algo, params, show_title=None, xlab=None, ylab=None):
    font_size = 12
    results = algo(**params).fit_transform(X)
    x_label = f'{algo.__name__}_1'
    y_label = f'{algo.__name__}_2'
    res_df = pd.DataFrame(results, columns=[x_label, y_label])
    splot = sns.scatterplot(data=res_df, x=x_label, y=y_label)
    if xlab:
        #splot.set(xlabel=xlab)
        plt.xlabel(xlab, size=font_size)
    else:
        splot.set(xlabel='')
    if ylab:
        #splot.set(ylabel=ylab)
        plt.ylabel(ylab, size=font_size)
    else:
        splot.set(ylabel='')
    if show_title == 'auto':
        title_elements = [f'{par[:min(6,len(par))]}:{str(val)[:min(6,len(par))]}' for par, val in params.items()]
        title = ', '.join(title_elements)
        _ = plt.title(title, size=font_size)
    elif show_title:
        _ = plt.title(show_title, size=font_size)
    splot.tick_params(axis='both', which='both', left=False, right=False, bottom=False, top=False, labelbottom=False, labelleft=False)
    return res_df
    
def parameter_grid(X, algo, params):
    scaling = 3
    all_keys = params.keys()
    grid_keys = [key for key in all_keys if isinstance(params[key], list) and len(params[key]) > 1]
    extra_keys = []
    if len(grid_keys) > 2:
        print(f'Got {len(grid_keys)} parameters with more that one value, I can only handle 2.')
        return
    elif len(grid_keys) == 2: # 2D case
        key1, key2 = grid_keys
        if len(all_keys) > len(grid_keys):
            extra_keys = [key for key in all_keys if key not in grid_keys]
    elif len(grid_keys) == 1: # 1D case
        key1 = grid_keys[0]
        if len(all_keys) > len(grid_keys):
            single_keys = [key for key in all_keys if key not in grid_keys]
            key2 = single_keys[0]
            extra_keys = [key for key in single_keys if key != key2]
        else: # 1D case
            pars1 = params[key1]
            width = len(pars1)
            height = 1
            plt.figure(figsize=(scaling*width, scaling*height))
            for i, par1 in enumerate(pars1):
                plt.subplot(height, width, i+1)
                pars = {key1:par1}
                x_label = f'{key1}:{par1}'
                y_label = ''
                _ = reduce_dimension(X, algo, pars, show_title=x_label, ylab=y_label)
            plt.tight_layout()
            return
    else: # 0D case
        _ = reduce_dimension(X, algo, params)
        return
    # 2D case
    pars1 = params[key1]
    pars2 = params[key2]
    width = len(pars1)
    height = len(pars2)
    plt.figure(figsize=(scaling*width, scaling*height))
    for i1, par1 in enumerate(pars1):
        for i2, par2 in enumerate(pars2):
            i = i1 + width*i2
            plt.subplot(height, width, i+1)
            extra_pars = {key:params[key] for key in extra_keys}
            pars = {**{key1:par1, key2:par2}, **extra_pars}
            if i1 == 0:
                y_label = f'{key2}:{par2}'
            else:
                y_label = ''
            if i2 == 0:
                x_label = f'{key1}:{par1}'
            else:
                x_label = ''
            _ = reduce_dimension(X, algo, pars, show_title=x_label, ylab=y_label, xlab='')
    plt.tight_layout()

## Single Recording Example

1. Fourier Transform
2. Dimensionality Reduction
3. Cluster
4. Identify Clusters
5. Identify Intervals

### 1. Get the Fourier transforms of short samples of an audio file.

In [None]:
_ = plt.figure(figsize=(16,5))
x = np.linspace(0, 2*math.pi, 100)
y1 = np.sin(x)
y2 = 1/2*np.sin(2*x)
y3 = 1/3*np.sin(3*x)
_ = plt.subplot(1, 2, 1)
_ = plt.plot(x, y1+y2+y3)
_ = plt.subplot(1, 2, 2)
_ = plt.plot(x, y1)
_ = plt.plot(x, y2)
_ = plt.plot(x, y3)

In [None]:
def amp_sr_stfts(file_path): # sub_size is amp sample size for each fft, smaller means better time res, lower frequency res, and vice versa
    sr, amp = wavfile.read(file_path) # read in audio file
    if sr < 44000:
        subs = 2048
    else:
        subs = 4096
    stft_results = lb.stft(y=amp.astype(float), n_fft=subs, hop_length=subs//2) # get the Fourier transforms, no overlap between subsamples
    ft = np.abs(stft_results.transpose()) # take the magnitude and transpose
    ft /= ft.sum(axis=0, keepdims=True) # normalize amplitude in frequency domain by integral
    return amp, sr, subs, ft

In [None]:
%%time
amp, sr, sub_size, ft = amp_sr_stfts('urmp/Dataset/35_Rondeau_vn_vn_va_db/AuSep_1_vn_35_Rondeau.wav')

### 2. Dimensionality Reduction

PCA

In [None]:
nonzero_fts = np.nonzero(ft.sum(axis=1))[0]
nz_fts = ft[nonzero_fts]
fts_normalized = nz_fts / nz_fts.sum(axis=1, keepdims=True)
pca = PCA(n_components=2)
pca_xy = pca.fit_transform(fts_normalized)
pca_df = pd.DataFrame(pca_xy, columns=['pc1', 'pc2'])
_ = sns.scatterplot(data=pca_df, x='pc1', y='pc2')

There's not much we can do with this.

t-SNE

In [None]:
parameter_grid(ft, TSNE, {'n_components':[2], 'perplexity':[5, 20, 50], 'n_iter':300})

UMAP

In [None]:
parameter_grid(ft, UMAP, {'n_neighbors':[5, 15, 45], 'min_dist':[0.01, 0.1, 1.0], 'metric':'correlation'})

In [None]:
parameter_grid(ft, UMAP, {'n_neighbors':[5, 10, 20], 'min_dist':[0.005, 0.01, 0.02], 'metric':'correlation'})

smaller n_neighbors allows for finer differentiation, and smaller min_dist of keeps clusters tight.

In [None]:
parameter_grid(ft, UMAP, {'n_neighbors':[5, 10], 'min_dist':[0.005, 0.01], 'metric':'correlation'})

In [None]:
parameter_grid(ft, UMAP, {'n_neighbors':[5], 'min_dist':0.005, 'metric':['correlation', 'manhattan', 'euclidean', 'cosine']})

In [None]:
%%time
_ = plt.figure(figsize=(10,8))
umap_df = reduce_dimension(ft, UMAP, {'n_neighbors':5, 'min_dist':0.005, 'metric':'correlation'})

### 3. Cluster

In [None]:
def cluster_2D(df, algo, params, show_plot=False):
    clusterer = algo(**params) # identify clusters in the UMAP results with DBSCAN
    clusters = clusterer.fit_predict(df)
    n_clusters = len(np.unique(clusters))
    cols = df.columns.to_list()
    if show_plot:
        splot = sns.scatterplot(data=df, x=cols[0], y=cols[1], hue=clusters.astype(str), palette=sns.color_palette(palette[:n_clusters], n_clusters)) # plot the clusters and color them
        splot.legend(bbox_to_anchor=(1.2, 1))
    clustered_df = df.copy()
    clustered_df['cluster'] = clusters
    return clustered_df

In [None]:
_ = plt.figure(figsize=(10,7))
umap_dbscan_df = cluster_2D(umap_df, DBSCAN, {'eps':0.6, 'min_samples':7}, show_plot=True)

The groups are well-separated.

### 4. Identify Clusters

First, I collect the audio sub-samples associated with each cluster and make them playable as a manual check.

In [None]:
times = lb.frames_to_samples(np.arange(ft.shape[0]), hop_length=sub_size//2) # get the start time index for each sample

In [None]:
umap_dbscan_df['i_times'] = times

In [None]:
%%time
def collect_samps(amp, groups_df, group_num): # for a given cluster, return the start and end time indices for all samples in that cluster
    this_group_intervals = []
    interval = []
    group_times = groups_df[groups_df['cluster'] == group_num]['i_times']
    for i in group_times:
        if not interval:
            interval = [i, i]
        sub_interval = [i, i+sub_size]
        if sub_interval[0] <= interval[1]:
            interval[1] = sub_interval[1]
            if sub_interval[0] == group_times.iloc[-1]:
                interval[1] = min(len(amp)-1, interval[1])
                this_group_intervals.append(interval)
        else:
            this_group_intervals.append(interval)
            interval = sub_interval
    return this_group_intervals

group_intervals = {} # fill a dictionary with cluster numbers and their corresponding sample start and end time indices
for g in umap_dbscan_df['cluster']:
    group_intervals[g] = collect_samps(amp, umap_dbscan_df, g)

In [None]:
group_amps = {}
for g in group_intervals: # combine all of the samples for each cluster and make the clusters playable.
    group_amp = np.array([])
    for i in group_intervals[g]:
        group_amp = np.append(group_amp, amp[i[0]:i[1]])
    if len(group_amp) >= sr:
        group_amps[g] = group_amp
        print(g)
        Audio(group_amp, rate=sr)

Each group typically corresponds to a single note, sometimes with an half-step, 5th, or octave above or below.

In [None]:
for g in [0, 1, 8, 11]:
    group_amp = group_amps[g]
    print(g)
    Audio(group_amp, rate=sr)

In [None]:
def max_indices(list_in):
    list_max = max(list_in)
    return [i for i, val in enumerate(list_in) if val == list_max]

def peak_freq_diffs(freq, peaks):
    return freq[peaks[1:]] - freq[peaks[:-1]]

def fundamental_freqs(clusters_to_fit, cluster_intervals, amp, sr, show_plot=False):
    cluster_hrfts = {}
    cluster_peaks = {}
    cluster_peak_seps = {}
    cluster_fundamentals = {}
    n_clusters = len(clusters_to_fit)
    n_rows = n_clusters//2 + n_clusters % 2
    if show_plot:
        _ = plt.figure(figsize=(20, 5*n_rows))
    for i, c in enumerate(clusters_to_fit): # loop over clusters that are playable for at least 1 second
        intervals = cluster_intervals[c] # get the interval indices for this cluster
        interval_lengths = [i[1] - i[0] for i in intervals] # convert to interval lengths
        max_i = max_indices(interval_lengths) # get the indices of all maximum length intervals
        max_length_samples = [amp[intervals[i][0]:intervals[i][1]] for i in max_i] # get the max length interval sample(s)
        fts = [abs(fft.rfft(sample)) for sample in max_length_samples] # get the Fourier transform(s) of max length sample(s)
        fts_normalized = [ft_raw/ft_raw.sum(axis=0, keepdims=True) for ft_raw in fts] # normalize the Fourier transform(s)
        avg_fts = sum(fts_normalized)/len(fts_normalized) # get the average normalized Fourier transform
        hrft = avg_fts/avg_fts.sum() # normalize again, hrft = high-resolution Fourier transform
        freq = lb.fft_frequencies(sr=sr, n_fft=len(max_length_samples[0]))
        cluster_hrfts[c] = [freq, hrft]
        freq_res = freq[1] - freq[0]
        freq_dist = int(round(25 / freq_res)) # distance sets a lower limit on detectable frequencies
        peaks, peak_properties = signal.find_peaks(hrft, distance=freq_dist, prominence=0.002)
        if len(peaks) < 1:
            continue
        if freq[peaks][0] < 25:
            peaks = peaks[1:]
        multiples_flag = False
        if len(peaks) > 1:
            multiples_flag = True
            seps = peak_freq_diffs(freq, peaks)
            cluster_peak_seps[c] = seps
            diff_ratio = seps.max() / seps.min()
            if diff_ratio > 6:
                freq_dist = int(round(2*seps.min() / freq_res))
                peaks, peak_properties = signal.find_peaks(hrft, distance=freq_dist, prominence=0.002)
                if freq[peaks][0] < freq_dist:
                    peaks = peaks[1:]
                    cluster_peak_seps[c] = peak_freq_diffs(freq, peaks)
        plot_frac = 0.6
        plot_end = int(plot_frac*len(freq))
        if show_plot:
            _ = plt.subplot(n_rows, 2, i+1)
            _ = plt.plot(freq[:plot_end], hrft[:plot_end])
            _ = plt.plot(freq[peaks], hrft[peaks], 'rx')
        if multiples_flag:
            peak_freqs = freq[peaks]
            peak_heights = hrft[peaks]
            i_max = peak_heights.argmax()
            i_fund = i_max
            freq_max = peak_freqs[i_max]
            freq_fund = freq_max
            for f in [1/3, 1/2]:
                guess_freq = f*freq_max
                for i in range(i_max):
                    frac_error = abs(peak_freqs[i] - guess_freq)/guess_freq
                    if frac_error < 0.04:
                        i_fund = i
                        freq_fund = peak_freqs[i]
            matches = 0
            for f in range(2,15):
                harmonic = f*freq_fund
                for p in range(i_fund+1, len(peaks)):
                    frac_error = abs(harmonic - peak_freqs[p]) / harmonic
                    if frac_error < 0.04:
                        matches += 1
                if show_plot:
                    _ = plt.axvline(f*freq_fund, c='y')
            if matches >= 1:
                cluster_fundamentals[c] = freq_fund
        if show_plot:
            _ = plt.title(f'{c}')
    if show_plot:
        plt.tight_layout()
    return cluster_fundamentals

cluster_fundamental_freqs = fundamental_freqs(group_amps, group_intervals, amp, sr, show_plot=True)

In [None]:
sample_cluster_amps = {c:group_amps[c] for c in [1, 5, 8, 11]}
sample_cluster_intervals = {c:group_intervals[c] for c in [1, 5, 8, 11]}
sample_fundamental_freqs = fundamental_freqs(sample_cluster_amps, sample_cluster_intervals, amp, sr, show_plot=True)

In [None]:
cluster_fundamental_freqs

### 5. Identify Intervals

In [None]:
def get_fundamental(sample):
    ft_raw = abs(fft.rfft(sample)) # get the Fourier transform of sample
    ft = ft_raw/ft_raw.sum() # normalize the Fourier transform(s)
    freq = lb.fft_frequencies(sr=sr, n_fft=len(sample))
    freq_res = freq[1] - freq[0]
    freq_dist = int(round(25 / freq_res)) # distance sets a lower limit on detectable frequencies (25 Hz)
    peaks, peak_properties = signal.find_peaks(ft, distance=freq_dist, prominence=0.002)
    if freq[peaks][0] < 25: # drop peaks less than 25 Hz
        peaks = peaks[1:]
    multiples_flag = False # we don't know if there are multiple peaks yet
    if len(peaks) > 1:
        multiples_flag = True # there are multiple peaks
        seps = peak_freq_diffs(freq, peaks) # separation distances between the peaks
        #cluster_peak_seps[g] = seps
        diff_ratio = seps.max() / seps.min() # the ratio of the maximum separation to the minimum separation
        if diff_ratio > 6: # if that ratio is greater than 6, we probably found too many little peaks
            freq_dist = int(round(2*seps.min() / freq_res)) # calculate a new minimum distance between adjacent peaks
            peaks, peak_properties = signal.find_peaks(ft, distance=freq_dist, prominence=0.002)
            if freq[peaks][0] < freq_dist: # if the first peak is less than our new minimum separation
                peaks = peaks[1:] # drop the first peak
                #cluster_peak_seps[g] = peak_freq_diffs(freq, peaks)
    if multiples_flag: # find the fundamental frequency if we fit more than one peak
        peak_freqs = freq[peaks]
        peak_heights = ft[peaks]
        i_max = peak_heights.argmax() # the index of the tallest peak
        i_fund = i_max # assume the tallest peak has the fundamental frequency for now
        freq_max = peak_freqs[i_max]
        freq_fund = freq_max
        for f in [1/3, 1/2]: # check for peaks at 1/3 and 1/2 of the peak frequency in case the fundamental is not dominant
            guess_freq = f*freq_max # guesses for the fundamental
            for i in range(i_max): # for each peak before the dominant
                frac_error = abs(peak_freqs[i] - guess_freq)/guess_freq # calculate the fractional error
                if frac_error < 0.04: # if the fractional error is less than 4%
                    i_fund = i # assume we found the actual fundamental
                    freq_fund = peak_freqs[i]
        matches = 0 # initialize a counter for the number of matches between harmonics and peaks
        for f in range(2,15): # for each harmonic from 2x to 15x fundamental frequency
            harmonic = f*freq_fund
            for p in range(i_fund+1, len(peaks)): # for each peak after the fundamental
                frac_error = abs(harmonic - peak_freqs[p]) / harmonic # calculate the fractional error
                if frac_error < 0.04: # if the fractional error is less than 4%
                    matches += 1 # assume a match
        if matches >= 1: # if there was at least one match between harmonics and peaks
            return freq_fund # return the fundamental frequency
        else:
            return 0.0 # 0.0 means we didn't find a convincing fundamental frequency
    else:
        return 0.0

In [None]:
def reduce_dim(X, algo, params):
    results = algo(**params).fit_transform(X)
    xlabel = f'{algo.__name__}_1'
    ylabel = f'{algo.__name__}_2'
    res_df = pd.DataFrame(results, columns=[xlabel, ylabel])
    return res_df

def get_cluster_intervals(amp, clustered_df, cluster_num, sub_size): # for a given cluster, return the start and end time indices for all samples in that cluster
    this_cluster_intervals = []
    cluster_length = 0
    interval = []
    cluster_times = clustered_df[clustered_df['cluster'] == cluster_num]['i_times']
    for i in cluster_times:
        if not interval: # get the first interval of the cluster
            interval = [i, i] # give it zero length for now
        sub_interval = [i, i+sub_size] # sub_interval is the sub_size-sized interval that will extend interval or start a new one
        if sub_interval[0] <= interval[1]: # if the current sub_interval starts where or before the current interval ends
            interval[1] = sub_interval[1] # extend the interval to the end of the current sub_interval
            if sub_interval[0] == cluster_times.iloc[-1]: # if the current sub_interval is the last
                interval[1] = min(len(amp)-1, interval[1]) # end the current interval at the end of the amp data if that comes first.
                cluster_length += (interval[1] - interval[0])
                this_cluster_intervals.append(interval) # add this interval to the list for this cluster
        else:
            cluster_length += (interval[1] - interval[0])
            this_cluster_intervals.append(interval)
            interval = sub_interval
    return this_cluster_intervals, cluster_length

In [None]:
def process_recording(file_path):
    amp, sr, sub_size, ft = amp_sr_stfts(file_path)
    umap_df = reduce_dim(ft, UMAP, {'n_neighbors':5, 'min_dist':0.005, 'metric':'correlation'}) # 3 parameters
    umap_dbscan_df = cluster_2D(umap_df, DBSCAN, {'eps':0.6, 'min_samples':7}) # 2 more parameters
    times = lb.frames_to_samples(np.arange(ft.shape[0]), hop_length=sub_size//2) # get the start time index for each sample
    umap_dbscan_df['i_times'] = times
    cluster_intervals = {} # fill a dictionary with cluster numbers and their corresponding sample start and end time indices
    clusters_to_fit = []
    for c in umap_dbscan_df['cluster'].unique():
        cluster_intervals[c], cluster_length = get_cluster_intervals(amp, umap_dbscan_df, c, sub_size)
        if cluster_length >= sr: # keep cluster if the total duration is at least 1 second
            clusters_to_fit.append(c)
    #print(f'{clusters_to_fit}, {cluster_intervals}')
    return fundamental_freqs(clusters_to_fit, cluster_intervals, amp, sr), cluster_intervals

In [None]:
%%time
frequencies, intervals = process_recording('urmp/Dataset/35_Rondeau_vn_vn_va_db/AuSep_1_vn_35_Rondeau.wav')

In [None]:
def pitch_plot(frequencies, intervals, file_path): # for frequency determined by cluster
    sr, amp = wavfile.read(file_path)
    recording_length = len(amp)
    x = np.linspace(0, recording_length/sr, recording_length)
    y = np.zeros(recording_length)
    for c in frequencies:
        for interval in intervals[c]:
            y[interval[0]:interval[1]] = frequencies[c] * np.ones(interval[1] - interval[0])
    plt.figure(figsize=(16,6))
    plt.scatter(x[:recording_length//6], y[:recording_length//6])

In [None]:
def pitch_plot_2(frequencies, intervals, file_path): # for frequency determined by interval
    sr, amp = wavfile.read(file_path)
    recording_length = len(amp)
    x = np.linspace(0, recording_length/sr, recording_length)
    y = np.zeros(recording_length)
    all_intervals = [interval for interval_list in list(intervals.values()) for interval in interval_list]
    clustered_intervals = [interval for interval_list in [intervals[c] for c in frequencies] for interval in interval_list]
    for i in all_intervals:
        fund = get_fundamental(amp[i[0]:i[1]])
        if fund > 0.0:
            y[i[0]:i[1]] = fund * np.ones(i[1] - i[0])
        else:
            if i in clustered_intervals:
                for c in frequencies:
                    if i in intervals[c]:
                        y[i[0]:i[1]] = frequencies[c] * np.ones(i[1] - i[0])
    plt.figure(figsize=(16,6))
    plt.scatter(x[:recording_length//6], y[:recording_length//6])

In [None]:
pitch_plot(frequencies, intervals, 'urmp/Dataset/35_Rondeau_vn_vn_va_db/AuSep_1_vn_35_Rondeau.wav')

In [None]:
pitch_plot_2(frequencies, intervals, 'urmp/Dataset/35_Rondeau_vn_vn_va_db/AuSep_1_vn_35_Rondeau.wav')

In [None]:
sr, amp = wavfile.read('urmp/Dataset/35_Rondeau_vn_vn_va_db/AuSep_1_vn_35_Rondeau.wav')
Audio(amp, rate=sr)

## Collecting Notes by Instrument

In [None]:
results_dict = {}
ins_list = list(equal_df['instrument'].unique())
for ins in ins_list:
    file_list = equal_df[equal_df['instrument'] == ins]['file_path'].to_list()
    for i, file_path in enumerate(file_list):
        frequencies, intervals = process_recording(file_path)
        results_dict[f'{ins}_{i}'] = {'frequencies':frequencies, 'intervals':intervals}
    print(f'{ins} done')

In [None]:
results_dict.keys()

In [None]:
done_list = [key.split('_')[0] for key in list(results_dict.keys())]
done_list = list(set(done_list))
remainder_list = [item for item in ins_list if item not in done_list]

file_list = equal_df[equal_df['instrument'] == 'Piano']['file_path'].to_list()
for i, file_path in enumerate(file_list):
    frequencies, intervals = process_recording(file_path)
    results_dict[f'{ins}_{i}'] = {'frequencies':frequencies, 'intervals':intervals}
print(f'{ins} done')

In [None]:
results_dict.keys()

In [None]:
equal_df['key'] = list(results_dict.keys())
equal_df

In [None]:
import pickle
a_file = open("results_dict.pkl", "wb")
pickle.dump(results_dict, a_file)
a_file.close()

In [None]:
freqs_by_ins_dict = {}
for ins in ins_list:
    ins_df = equal_df[equal_df['instrument'] == ins]
    ins_freqs = []
    for key in ins_df['key']:
        freqs = list(results_dict[key]['frequencies'].values())
        ins_freqs.append(freqs)
    ins_freqs = [f for f_list in ins_freqs for f in f_list]
    freqs_by_ins_dict[ins] = list(set(ins_freqs))

In [None]:
ins_freq_res = []
for ins, freqs in freqs_by_ins_dict.items():
    freqs.sort()
    freq_diffs = [freqs[i+1] - freqs[i] for i in range(len(freqs)-1)]
    freq_res = min([diff for diff in freq_diffs if diff > 0.001])
    ins_freq_res.append(freq_res)
freq_res = min(ins_freq_res)

In [None]:
freqs_flat = list(set([freq for sublist in list(freqs_by_ins_dict.values()) for freq in sublist]))
freqs_flat.sort()
freq_min = freqs_flat[0]
freq_max = freqs_flat[-1]

In [None]:
pitches_df = pd.DataFrame()
freq_range = freq_max - freq_min
arb_freq_res = 1.0
n_bins = int(freq_range // arb_freq_res) + 1
bins = np.linspace(freq_min-arb_freq_res, freq_max+arb_freq_res, n_bins+1)
for ins, freqs in freqs_by_ins_dict.items():
    fill = np.zeros(n_bins)
    inds = np.digitize(freqs, bins)
    for i in inds:
        fill[i-1] += 1
    pitches_df[ins] = fill
_ = plt.figure(figsize=(12,10))
_ = sns.heatmap(pitches_df)
hide_code_in_slideshow()

## Conclusion

Data processing outline:

1. Select solo instrument recordings
2. For each recording:

    a. Do short-time Fourier transforms
    
    b. Apply UMAP
    
    c. Apply DBSCAN to UMAP results, this defines the clusters
    
    d. For each cluster:
        i. Take the longest sample (average) FT
        ii. Find the most prominent peaks at least 25 Hz apart
        iii. Find the dominant frequency, check for peaks at 1/2 and 1/3 of this frequency
        iv. Identify the fundamental frequency (lowest peak frequency within 4% of these three (dom, 1/2 dom, 1/3 dom)
        v. Count the number of peaks within 4% of the harmonics of the fundamental frequency
        vi. If there is at least 1 peak matching a harmonic, save that fundamental frequency for that cluster
    
    e. Return the fundamental frequency for each cluster or interval

# Failed Attempts

In [None]:
# freq = lb.fft_frequencies(sr=sr, n_fft=sub_size)

In [None]:
# def plot_cluster_ffts(ft, freq, clustered_df, cluster_num):
#     fts_0 = ft[np.where(clustered_df['cluster']==cluster_num)]
#     for ft_0 in fts_0:
#         _ = plt.plot(freq, ft_0, alpha=0.3)

In [None]:
# def plot_cluster_avg_fft(ft, freq, clustered_df, cluster_num):
#     avg_ft_0 = cluster_avg_fft(ft, freq, clustered_df, cluster_num)
#     _ = plt.plot(freq, avg_ft_0)

In [None]:
#for c in cluster_peaks:
#    peaks = cluster_peaks[c]
#    freq, hrft = cluster_hrfts[c]
#    peak_heights = hrft[peaks]
#    peak_freqs = freq[peaks]
#    if peak_heights[0] = max(peak_heights):
#        # take multiples
#        for i in range(1, len(peaks)):
#            ratio = peak_freqs[i]/peak_freqs[0]
#            abs_error = abs(ratio-round(ratio))/round(ratio)
#            if abs_error < 0.1:
#                print(rounded_frac)
#    else:
#        # take fractions and multiples        

In [None]:
#for c in cluster_peak_seps:
#    seps = cluster_peak_seps[c]
#    print(c)
#    s_max = max(seps)
#    for s in seps:
#        s_ratio = s_max/s
#        if s_ratio > 1.5:
#            round_abs_diff = abs(round(s_ratio) - s_ratio)
#            if round_abs_diff < 0.05:
#                print(f's_ratio: {s_ratio}')

In [None]:
#n_clusters = umap_dbscan_df['cluster'].nunique()
#_ = plt.figure(figsize=(20, 5*(n_clusters//2 + n_clusters % 2)))
#for i in umap_dbscan_df['cluster'].unique():
#    i_plot = i + 1 if i > -1 else n_clusters
#    _ = plt.subplot(n_clusters//2 + n_clusters % 2, 2, i_plot)
#    plot_cluster_ffts(ft, freq, umap_dbscan_df, i)
#    _ = plt.title(f'{i}')
#_ = plt.tight_layout()

In [None]:
#_ = plt.figure(figsize=(16,7))
#for i, c in enumerate([0, 11, 1, 5]):
#    _ = plt.subplot(2, 2, i+1)
#    plot_cluster_ffts(ft, freq, umap_dbscan_df, c)
#    _ = plt.title(f'{c}')
#_ = plt.tight_layout()

In [None]:
# equal_tempered_pitches = [27.5*2**(i/12) for i in range(88)]
# note_pitches = equal_tempered_pitches
# notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
# note_names = ['A0', 'A#0', 'B0'] + [f'{note}{i}' for i in range(1, 8) for note in notes] + ['C8']
# notes_df = pd.DataFrame()
# notes_df['name'] = note_names
# notes_df['pitch'] = note_pitches
# notes_df['lower_pitch'] = list(map(lambda x: x / 1.01, note_pitches))
# notes_df['upper_pitch'] = list(map(lambda x: x * 1.01, note_pitches))

# def cluster_note(ft, freqs, clustered_df, cluster_num):
#     caf = cluster_avg_fft(ft, freqs, clustered_df, cluster_num)
#     peaks = prominent_peaks(caf)
#     if len(peaks) < 2: # If there aren't enough prominent peaks, assume a rest in the music.
#         return 'rest'
#     diffs = peak_freq_diffs(freqs, peaks)
#     cm = centralize(diffs).mean()
#     return find_note(cm, notes_df)

In [None]:
# def cluster_avg_fft(ft, freq, clustered_df, cluster_num): # return the average Fourier Transform of a cluster
#     fts_0 = ft[np.where(clustered_df['cluster']==cluster_num)]
#     return fts_0.mean(axis=0)

# def prominent_peaks(caf):
#     peaks, peak_properties = signal.find_peaks(caf, prominence=0.002)
#     return peaks

# def centralize(a, sigmas=1):
#     upper = a.mean() + sigmas*a.std()
#     lower = a.mean() - sigmas*a.std()
#     a_below = a[a <= upper]
#     a_central = a_below[a_below >= lower]
#     return a_central

# def find_note(freq, note_df):
#     if note_df['lower_pitch'].iloc[0] <= freq <= note_df['upper_pitch'].iloc[-1]: # check if freq is in overall range of notes
#         for i, row in note_df.iterrows():
#             if row['lower_pitch'] <= freq <= row['upper_pitch']:
#                 return row['name']
#     return freq

In [None]:
# cluster_central_freqs = {}
# cluster_diffs_central = {}
# _ = plt.figure(figsize=(20,20))
# for i, c in enumerate(cluster_peak_seps):
#     diffs = cluster_peak_seps[c]
#     _ = plt.subplot(len(cluster_peak_seps) // 2 + len(cluster_peak_seps) % 2, 2, i+1)
#     _ = plt.title(f'{c}')
#     _ = sns.histplot(diffs, bins=20, kde=True)
#     _ = plt.axvline(diffs.mean(), c='g')
#     upper = diffs.mean() + diffs.std()
#     _ = plt.axvline(upper, c='r')
#     lower = diffs.mean() - diffs.std()
#     _ = plt.axvline(lower, c='r')
#     diffs_central = centralize(diffs)
#     cluster_diffs_central[c] = diffs_central
#     diffs_central_mean = diffs_central.mean()
#     _ = plt.axvline(diffs_central_mean, c='k')
# plt.tight_layout()

In [None]:
#n_clusters = umap_dbscan_df['cluster'].nunique()
#cluster_avg_fft_peaks = {}
#cluster_diffs = {}
#_ = plt.figure(figsize=(20, 5*(n_clusters//2 + n_clusters % 2)))
#for i in umap_dbscan_df['cluster'].unique():
#    i_plot = i + 1 if i > -1 else n_clusters
#    caf = cluster_avg_fft(ft, freq, umap_dbscan_df, i)
#    peaks, peak_properties = signal.find_peaks(caf, prominence=0.0008)
#    if len(peaks) > 1:
#        cluster_diffs[i] = peak_freq_diffs(freq, peaks)
#    _ = plt.subplot(n_clusters//2 + n_clusters % 2, 2, i_plot)
#    _ = plt.plot(freq, caf)
#    _ = plt.plot(freq[peaks], caf[peaks], 'rx')
#    _ = plt.title(f'{i}')
#_ = plt.tight_layout()

In [None]:
#_ = plt.figure(figsize=(16,7))
#for i, c in enumerate([0, 11, 1, 5]):
#    caf = cluster_avg_fft(ft, freq, umap_dbscan_df, c)
#    peaks, peak_properties = signal.find_peaks(caf, prominence=0.0008)
#    if len(peaks) > 1:
#        cluster_diffs[i] = peak_freq_diffs(freq, peaks)
#    _ = plt.subplot(2, 2, i+1)
#    _ = plt.plot(freq, caf)
#    _ = plt.plot(freq[peaks], caf[peaks], 'rx')
#    _ = plt.title(f'{i}')
#plt.tight_layout()

In [None]:
#_ = plt.figure(figsize=(16,4))
#for i, c in enumerate([1, 5]):
#    diffs = cluster_diffs[c]
#    _ = plt.subplot(1, 2, i+1)
#    _ = plt.title(f'{c}')
#    _ = sns.histplot(diffs)
#     _ = plt.axvline(diffs.mean(), c='g')
#     upper = diffs.mean() + diffs.std()
#     _ = plt.axvline(upper, c='r')
#     lower = diffs.mean() - diffs.std()
#     _ = plt.axvline(lower, c='r')
#     diffs_central = centralize(diffs)
#     cluster_diffs_central[c] = diffs_central
#     diffs_central_mean = diffs_central.mean()
#     diffs_central_central_mean = centralize(diffs_central).mean()
#     cluster_central_freqs[c] = diffs_central_mean
#     _ = plt.axvline(diffs_central_mean, c='k')
#     _ = plt.axvline(diffs_central_central_mean, c='m')
# plt.tight_layout()

In [None]:
#def remove_outliers(a):
#    mode_info = stats.mode(a)
#    if mode_info[1] > 2:
#        reference = mode_info[0]
#    else:
#        reference = a.mean()
#    factor = 1.5
#    lower = reference / factor
#    upper = reference * factor
#    return a[(a > lower) & (a < upper)]

#_ = plt.figure(figsize=(20,40))
#for i, c in enumerate(cluster_diffs):
#    diffs = cluster_diffs[c]
#    mode_info = stats.mode(diffs)
#    mode = mode_info[0]
#    mean = diffs.mean()
#    _ = plt.subplot(21, 2, i+1)
#    _ = plt.plot(diffs)
#    _ = plt.axhline(mode, c='r')
#    _ = plt.axhline(mean, c='g')
#plt.tight_layout()

In [None]:
#_ = plt.figure(figsize=(20,40))
#for i, c in enumerate(cluster_diffs):
#    inliers = remove_outliers(cluster_diffs[c])
#    _ = plt.subplot(21, 2, i+1)
#    _ = plt.plot(inliers)
#plt.tight_layout()

In [None]:
#_ = plt.figure(figsize=(20,10))
#for i, c in enumerate(cluster_diffs_central):
#    central_diffs = cluster_diffs_central[c]
#    _ = plt.subplot(len(cluster_diffs_central) // 2 + len(cluster_diffs_central) % 2, 2, i+1)
#    _ = sns.histplot(central_diffs)
#    _ = plt.axvline(central_diffs.mean(), c='g')
#    sigmas = 1.5
#    upper = central_diffs.mean() + sigmas*central_diffs.std()
#    _ = plt.axvline(upper, c='r')
#    lower = central_diffs.mean() - sigmas*central_diffs.std()
#    _ = plt.axvline(lower, c='r')

In [None]:
#equal_temps = np.array([55, 110, 164.8, 220, 329.6, 440, 659.26, 880, 1318.5, 1760])
#stretched = np.array([54.9 / 55, 109.8 / 110, 164.6 / 164.8, 220 / 220, 329.5 / 329.6, 440 / 440, 660.5 / 659.26, 881.9 / 880, 1324.8 / 1318.5 , 1772.5 / 1760])
#def stretch_fit(x, c1, c2):
#    return c1*np.exp(c2*x)
#popt, pcov = optimize.curve_fit(stretch_fit, equal_temps, stretched, [0.1, 0.001])
#_ = plt.plot(equal_temps, stretched)
#_ = plt.plot(equal_temps, stretch_fit(equal_temps, popt[0], popt[1]))
#plt.xscale('log')

In [None]:
#for cf in cluster_central_freqs:
#    print(f'{cf}: {find_note(cluster_central_freqs[cf], notes_df)}')

In [None]:
def reduce_to_3_dimensions(X, algo, params, show_title=None, xlab=None, ylab=None): # in progress
    #font_size = 12
    results = algo(**params).fit_transform(X)
    #n_dim = results.shape[1] # TODO: account for number of dimensions, probably separate 2D and 3D functions
    x_label = f'{algo.__name__}_1'
    y_label = f'{algo.__name__}_2'
    z_label = f'{algo.__name__}_3'
    res_df = pd.DataFrame(results, columns=[x_label, y_label, z_label])
    #splot = sns.scatterplot(data=res_df, x=x_label, y=y_label)
    #if xlab:
        #splot.set(xlabel=xlab)
        #plt.xlabel(xlab, size=font_size)
    #else:
    #    splot.set(xlabel='')
    #if ylab:
    #    #splot.set(ylabel=ylab)
    #    plt.ylabel(ylab, size=font_size)
    #else:
    #    splot.set(ylabel='')
    #if show_title == 'auto':
    #    title_elements = [f'{par[:min(6,len(par))]}:{str(val)[:min(6,len(par))]}' for par, val in params.items()]
    #    title = ', '.join(title_elements)
    #    _ = plt.title(title, size=font_size)
    #elif show_title:
    #    _ = plt.title(show_title, size=font_size)
    #splot.tick_params(axis='both', which='both', left=False, right=False, bottom=False, top=False, labelbottom=False, labelleft=False)
    return res_df

In [None]:
#def cluster_3D(df, algo, params): # in progress
#    clusterer = algo(**params) # identify clusters in the UMAP results with DBSCAN
#    clusters = clusterer.fit_predict(df)
#    unique_clusters = np.unique(clusters)
#    n_clusters = len(unique_clusters)
#    color_dict = {uc:color for uc, color in zip(unique_clusters, palette[:n_clusters])}
#    colors = [color_dict[cluster] for cluster in clusters]
#    cols = df.columns.to_list()
#    ipv.quickscatter(df[cols[0]], df[cols[1]], df[cols[2]], 
    #fig = plt.figure()
    #ax = fig.add_subplot(111, projection='3d')
    #ax = Axes3D(fig, auto_add_to_figure=False)
    #fig.add_axes(ax)
    #sc = ax.scatter(df[cols[0]], df[cols[1]], df[cols[2]], c=colors, s=100)
    #plt.legend(*sc.legend_elements(), bbox_to_anchor=(1.2, 1), loc=2)
#    clustered_df = df.copy()
#    clustered_df['cluster'] = clusters
#    return clustered_df

In [None]:
#umap_3D_df = reduce_to_3_dimensions(ft, UMAP, {'n_components':3, 'n_neighbors':5, 'min_dist':0.005, 'metric':'correlation'})

In [None]:
#umap_dbscan_3D_df = cluster_3D(umap_3D_df, DBSCAN, {'eps':0.6, 'min_samples':7})

## FFT: PCA, t-SNE, UMAP

Functions for hyperparameter optimization.

Getting ffts of 0.5 second samples.

In [None]:
fts = np.array([])
ins = []
indices = []
begins = []
ends = []
for index, row in equal_df.iterrows(): # for each file, cut into pieces, and apply fft
    sr, amp = wavfile.read(row['file_path'])
    amp = amp / max(abs(amp))
    duration = len(amp) // sr
    instrument = row['instrument']
    begin = list(range(sr*2, sr*min(duration, 22), sr // 2)) # don't take more than 20 seconds of any file, start a couple seconds in, 0.5-sec samples
    end = [i + sr for i in begin]
    for j, k in zip(begin, end):
        ft = abs(fft.fft(amp[j:k]))[:5000]
        fts = np.append(fts, ft)
        ins.append(instrument)
        indices.append(index)
    begins.append(begin)
    ends.append(end)

Formatting outputs.

In [None]:
n_fts = fts.shape[0]//len(ft)
fts_shaped = fts.reshape((n_fts,len(ft)))
print(fts_shaped.shape)
subsamples_df = pd.DataFrame()
subsamples_df['instrument'] = ins
subsamples_df['equal_df_index'] = indices
subsamples_df['i_begin'] = [i for sublist in begins for i in sublist]
subsamples_df['i_end'] = [i for sublist in ends for i in sublist]

Filtering empty samples.

In [None]:
empties = np.where(fts_shaped.mean(axis=1) == 0)[0]
fts_nz = np.delete(fts_shaped, empties, axis=0)
subsamples_nz_df = subsamples_df.drop(index=empties)

Normalizing ffts.

In [None]:
fts_normalized = fts_nz / fts_nz.mean(axis=1, keepdims=True)

### PCA

In [None]:
plt.figure(figsize=(10,7))
pca = PCA(n_components=2)
pca_xy = pca.fit_transform(fts_normalized)
pca_df = pd.DataFrame(pca_xy, columns=['pc1', 'pc2'])
pca_df['instrument'] = subsamples_nz_df['instrument']
splot = sns.scatterplot(data=pca_df, x='pc1', y='pc2', hue='instrument')
splot.legend(bbox_to_anchor=(1.2, 1))

There is some structure, but the separation is not great.

### t-SNE

In [None]:
_ = plt.figure(figsize=(10,7))
tsne = TSNE(n_components=2, perplexity=15, n_iter=500)
tsne_res = tsne.fit_transform(fts_normalized)
tsne_res_df = pd.DataFrame(tsne_res, columns=['tsne1', 'tsne2'])
tsne_res_df['instrument'] = subsamples_nz_df['instrument']
splot = sns.scatterplot(data=tsne_res_df, x='tsne1', y='tsne2', hue='instrument')
_ = splot.legend(bbox_to_anchor=(1.2, 1))

The separation is better with t-SNE, but still messy.

### UMAP

In [None]:
_ = plt.figure(figsize=(10,7))
umap_results = UMAP(n_neighbors=15, min_dist=0.9, metric='correlation').fit_transform(fts_normalized)
umap_df = pd.DataFrame(umap_results, columns=['umap1', 'umap2'])
umap_df['instrument'] = subsamples_nz_df['instrument']
_ = plt.figure(figsize=(10,7))
splot = sns.scatterplot(data=umap_df, x='umap1', y='umap2', hue='instrument', palette=sns.color_palette(palette, 14))
_ = splot.legend(bbox_to_anchor=(1.2, 1))

In [None]:
parameter_grid(fts_normalized, UMAP, {'n_neighbors':[5,15,45], 'min_dist':[0.1, 0.3, 0.9], 'metric':'correlation'})

In [None]:
parameter_grid(fts_normalized, UMAP, {'n_neighbors':[9,12,15], 'min_dist':[0.2, 0.5, 0.8], 'metric':'correlation'})

## STFT: t-SNE and UMAP

In [None]:
ins = []
stfts = np.array([])
for index, row in equal_df.iterrows(): # for each file, cut into pieces, and apply stft
    sr, amp = wavfile.read(row['file_path'])
    amp = amp / max(abs(amp)) # normalize the amplitude, cast to float
    ft = np.abs(lb.stft(y=amp[sr*7:sr*12], n_fft=8192))
    n_rows = ft.shape[1]
    stfts = np.append(stfts, ft)
    ins += [row['instrument']]*n_rows

In [None]:
ld.specshow(lb.amplitude_to_db(ft[:,0:10], ref=np.max), y_axis='log')

In [None]:
stfts_shaped = stfts.reshape((stfts.shape[0] // ft.shape[0], ft.shape[0]))
stfts_shaped.shape

In [None]:
ld.specshow(lb.amplitude_to_db(stfts_shaped[:,0:208], ref=np.max), y_axis='log')

In [None]:
plt.figure(figsize=(10,10))
tsne = TSNE(n_components=2, perplexity=50, n_iter=500)
tsne_res = tsne.fit_transform(stfts_shaped)
tsne_res_df = pd.DataFrame(tsne_res, columns=['tsne1', 'tsne2'])
tsne_res_df['instrument'] = ins
splot = sns.scatterplot(data=tsne_res_df, x='tsne1', y='tsne2', hue='instrument', palette=sns.color_palette(palette, 14))
splot.legend(bbox_to_anchor=(1.2, 1))

In [None]:
parameter_grid(stfts_shaped, TSNE, {'perplexity':[5, 20, 50], 'n_iter':[500]})

In [None]:
plt.figure(figsize=(10,7))
umap_results = UMAP(n_neighbors=7, min_dist=0.1, metric='correlation').fit_transform(stfts_shaped)
umap_df = pd.DataFrame(umap_results, columns=['umap1', 'umap2'])
umap_df['instrument'] = ins
plt.figure(figsize=(10,7))
splot = sns.scatterplot(data=umap_df, x='umap1', y='umap2', hue='instrument', palette=sns.color_palette(palette, 14))
splot.legend(bbox_to_anchor=(1.2, 1))

## MFCC: t-SNE and UMAP

In [None]:
mfccs = np.array([])
ins = []
for index, row in equal_df.iterrows(): # for each file, cut into pieces, and apply mfcc
    sr, amp = wavfile.read(row['file_path'])
    amp = amp / max(abs(amp)) # normalize the amplitude, cast to float
    duration = len(amp) // sr
    mfcc = lb.feature.mfcc(y=amp[sr*7:sr*12], sr=sr, n_mfcc=40)
    n_rows = mfcc.shape[1]
    mfccs = np.append(mfccs, mfcc)
    ins += [row['instrument']]*n_rows

In [None]:
mfccs_shaped = mfccs.reshape((mfccs.shape[0]//40, 40))

### t-SNE

In [None]:
plt.figure(figsize=(10,7))
tsne = TSNE(n_components=2, perplexity=25, n_iter=500)
tsne_res = tsne.fit_transform(mfccs_shaped)
tsne_res_df = pd.DataFrame(tsne_res, columns=['tsne1', 'tsne2'])
tsne_res_df['instrument'] = instruments
splot = sns.scatterplot(data=tsne_res_df, x='tsne1', y='tsne2', hue='instrument')
splot.legend(bbox_to_anchor=(1.2, 1))

Using MFCCs doesn't seem to help.

In [None]:
plt.figure(figsize=(10,7))
umap_results = UMAP(n_neighbors=5, min_dist=0.001, metric='correlation').fit_transform(mfccs_shaped)
umap_df = pd.DataFrame(umap_results, columns=['umap1', 'umap2'])
umap_df['instrument'] = ins
plt.figure(figsize=(10,7))
splot = sns.scatterplot(data=umap_df, x='umap1', y='umap2', hue='instrument', palette=sns.color_palette(palette, 14))
splot.legend(bbox_to_anchor=(1.2, 1))

In [None]:
lb.display.specshow(mfccs[0])

## MusicNet Experimentation

In [None]:
fts = np.array([])
ins = []
for i in cfpv_df['id']: # for each file, cut into pieces, apply fft, and filter
    sr, amp = wavfile.read(f'musicnet/{i}.wav')
    instrument = cfpv_df[cfpv_df['id'] == i]['instrument']
    for j in range(0, sr*100, sr):
        ft = abs(fft.fft(amp[j:j+sr]))[:5000]
        fts = np.append(fts, ft)
        ins.append(instrument)
print(fts.shape)

In [None]:
n_fts = fts.shape[0]//len(ft)
fts_shaped = fts.reshape((n_fts,len(ft)))

In [None]:
pca = PCA(n_components=2)
pca_xy = pca.fit_transform(fts_shaped)
pca_df = pd.DataFrame(pca_xy, columns=['pc1', 'pc2'])
sns.scatterplot(data=pca_df, x='pc1', y='pc2')

There is some structure, but no clear groups.
I should check for empty rows and normalize first though.

In [None]:
nonzero_fts = np.nonzero(fts_shaped.sum(axis=1))[0]
nz_fts = fts_shaped[nonzero_fts]
fts_normalized = nz_fts / nz_fts.sum(axis=1, keepdims=True)
ins_normalized = np.array(ins)[nonzero_fts]

In [None]:
pca = PCA(n_components=2)
pca_xy = pca.fit_transform(fts_normalized)
pca_df = pd.DataFrame(pca_xy, columns=['pc1', 'pc2'])
pca_df['instrument'] = ins_normalized
sns.scatterplot(data=pca_df, x='pc1', y='pc2', hue='instrument')

Even looking at nothing but the Fourier transform, pca tends to group instrument samples near each other. This will need to verified with a more diverse data set though.

In [None]:
tsne = TSNE(n_components=2, perplexity=25, n_iter=500)
tsne_res = tsne.fit_transform(fts_normalized)
tsne_res_df = pd.DataFrame(tsne_res, columns=['tsne1', 'tsne2'])
tsne_res_df['instrument'] = ins_normalized
sns.scatterplot(data=tsne_res_df, x='tsne1', y='tsne2', hue='instrument')

t-SNE generates cleaner separations between groups than pca, as expected. Some groups are split into subgroups.

In [None]:
umap_results = UMAP(n_neighbors=7, min_dist=0.05, metric='correlation').fit_transform(fts_normalized)
umap_df = pd.DataFrame(umap_results, columns=['umap1', 'umap2'])
umap_df['instrument'] = ins_normalized
plt.figure(figsize=(10,7))
#dbscan = DBSCAN(eps=0.45, min_samples=9)
#umap_clusters = dbscan.fit_predict(umap_df)
splot = sns.scatterplot(data=umap_df, x='umap1', y='umap2', hue='instrument')
splot.legend(bbox_to_anchor=(1.2, 1))

As usual, UMAP creates the most clearly-separated groups, but there are more of them than I expected.

I will use UMAP over PCA or t-SNE because of this ability to clearly separate.

Let's find out what characterizes each group.

In [None]:
plt.figure(figsize=(10,7))
dbscan = DBSCAN(eps=0.9, min_samples=7)
umap_clusters = dbscan.fit_predict(umap_df[['umap1','umap2']])
splot = sns.scatterplot(data=umap_df, x='umap1', y='umap2', hue=umap_clusters.astype(str))
splot.legend(bbox_to_anchor=(1.2, 1))

In [None]:
group0 = fts_normalized[np.where(umap_clusters == 0)[0]]
plt.plot(group0[7])

In [None]:
def trim_zeros(amplitude):
    nonzeros = np.nonzero(amplitude)[0]
    zero_spans = nonzeros[1:] - nonzeros[:-1]
    adj_nonzeros = np.where(zero_spans == 1, np.arange())[0]
    i_adj_begin = adj_nonzeros[0]
    i_adj_end = adj_nonzeros[-1]
    i_begin = zero_spans[i_adj_begin]
    i_end = zero_spans[i_adj_end]
    i_start = nonzeros[0]
    i_end = nonzeros[-1]
    subsample = amplitude[i_start:i_end+1]
    #print(f'i_start: {i_start}, i_end:{i_end-len(amplitude)+1}')
    return amplitude[i_start:i_end+1]

fts = np.array([])
for i in cfpv_df['id']: # for each file, cut into pieces, then filter the pieces.
    sr, amp = wavfile.read(f'musicnet/{i}.wav')
    samp = trim_zeros(amp)
    print(samp)
    #for j in range(100):
    #    samp[j:j+sr]
    #samp = trim_intro(amp, sr)[0:sr]
    #ft = np.abs(fft.fft(samp))[0:4000]
    #fts = np.append(fts, ft)
#print(fts.shape)

In [None]:
sr1, data1 = wavfile.read('musicnet/2298.wav')

In [None]:
def real_start(amp_data, sr): # do a fft on 1-sec intervals in 0.1 sec steps until the peak of the fft occurs at > 101 Hz 
    i = np.nonzero(amp_data)[0][0]
    while i < len(amp_data):
        ft = np.abs(fft.fft(amp_data[i:i+sr]))
        ft_max = ft.max()
        peaks, peak_dict = signal.find_peaks(ft[0:4000], height=ft_max/10)
        heights = peak_dict['peak_heights']
        if len(heights) < 2:
            print(peaks)
            print(heights)
            plt.figure(figsize=(16,6))
            plt.supblot(1, 2, 1)
            plt.plot(amp_data[i:i+sr])
            plt.subplot(1, 2, 2)
            plt.plot(ft[0:4000])
        i_max = heights.argmax() # index of max frequency
        f_max = peaks[i_max]
        if f_max in [99, 100, 101] or f_max in [49, 50, 51]:
            i += sr//10
        else:
            return i + 3 * sr // 4
    return 0

def trim_intro(amp_data, sample_rate):
    i_start = real_start(amp_data, sample_rate)
    return amp_data[i_start:]

plt.figure(figsize=(10,5))
sample1 = trim_intro(data1, sr1)[0:sr1]
plt.plot(sample1)

In [None]:
ft1733 = fft.fft(sample1)

In [None]:
def rolling_mean(x, w=501):
    return np.convolve(x, np.ones(w), 'valid') / w

ft_limit = 4000
plt.figure(figsize=(18,9))
ft_abs = np.abs(ft1733)[0:ft_limit]
plt.plot(ft_abs)
#ft_mean = ft_abs.mean()
rmean_width = ft_limit//8 +1
rmean = rolling_mean(ft_abs, w=rmean_width)
pad = np.ones(rmean_width//2)
rmean = 1.5*np.append(np.insert(rmean, 0, pad*rmean[0]), pad*rmean[-1])
ft_max = ft_abs.max()
peaks, _ = signal.find_peaks(ft_abs, height=rmean, distance=25, prominence=ft_max/100, wlen=21)
plt.plot(peaks, ft_abs[peaks], 'rx')
plt.plot(rmean, 'g')
print(peaks)

In [None]:
def dominant_freq(ft):
    aft = np.abs(ft)[:5000]
    ft_max = max(aft)
    pks, _ = signal.find_peaks(aft, distance=25, prominence=ft_max/100, wlen=21)
    i_peak_max = aft[pks].argmax()
    return pks[i_peak_max]
dominant_freq(ft1733)

In [None]:
fts = np.array([])
for i in mn_instruments_df['id']:
    sr, amp = wavfile.read(f'musicnet/{i}.wav')
    samp = trim_intro(amp, sr)[0:sr]
    ft = np.abs(fft.fft(samp))[0:4000]
    fts = np.append(fts, ft)
print(fts.shape)

In [None]:
ftsize = fts.shape[0]//len(ft)
fts = fts.reshape((ftsize,len(ft)))

In [None]:
sampsize = samps.shape[0]//len(samp)
samps = samps.reshape((sampsize,len(samp)))

I can't use StandardScaler here. I chose to normalize each signal by dividing by its mean absolute amplitude.

In [None]:
#norm_samps = samps / abs(samps).mean(axis=0, keepdims=True)
norm_samps = samps / 215

In [None]:
mn_instruments_df = pd.concat([mn_instruments_df, pca_df], axis=1)

## Scaled FFTs

In [None]:
def normal(x, mu, sigma):
    return np.exp(-0.5*((x-mu)/sigma)**2)/(sigma*math.sqrt(2*math.pi))

def bounded_normal(x, mu, sigma):
    y = normal(x, mu, sigma)
    return y/sum(y)

def bounded_normal_convolution(aft, freq, sigma):
    running_bounded_normal_mean = []
    for i in freq:
        bn = bounded_normal(freq, i, sigma)
        running_bounded_normal_mean.append(np.dot(bn, aft))
    return np.array(running_bounded_normal_mean)

In [None]:
n_freqs = len(freq)
bounded_normal_array = np.zeros((n_freqs, n_freqs))
for i in range(n_freqs):
    bounded_normal_array[i,:] = bounded_normal(freq, i, len(freq) // 10)

In [None]:
bncs = np.zeros((n_freqs,))
for i in freq:
    bncs[i] = np.dot(bounded_normal_array[i,:], aft)

In [None]:
plt.figure(figsize=(12,8))
i = 0
plt.plot(freq, ft_abs_array[i])
bncs = []
bnc = bounded_normal_convolution(ft_abs_array[i], freq, freq[-1]//10)
bncs.append(bnc)
plt.plot(freq, bncs[i])

In [None]:
ft_abs_shaped = ft_abs_array.reshape((len(ft_abs_array) // len(ft_abs), len(ft_abs)))

In [None]:
fts = np.array([])
ins = []
for index, row in equal_df.iterrows(): # for each file, cut into pieces, and apply fft
    sr, amp = wavfile.read(row['file_path'])
    for j in range(0, sr*30, sr):
        ft = abs(fft.fft(amp[j:j+sr]))[:5000]
        fts = np.append(fts, ft)
        ins.append(row['instrument'])
print(fts.shape)

In [None]:
n_fts = fts.shape[0]//len(ft)
fts_shaped = fts.reshape((n_fts,len(ft)))

In [None]:
nonzero_fts = np.nonzero(fts_shaped.sum(axis=1))[0]
nz_fts = fts_shaped[nonzero_fts]
fts_normalized = nz_fts / nz_fts.sum(axis=1, keepdims=True)
#fts_normalized = nz_fts / nz_fts.max(axis=1, keepdims=True)
ins_normalized = np.array(ins)[nonzero_fts]

In [None]:
umap_results = UMAP(n_neighbors=18, min_dist=0.5, metric='correlation').fit_transform(fts_normalized)
umap_df = pd.DataFrame(umap_results, columns=['umap1', 'umap2'])
umap_df['instrument'] = ins_normalized
plt.figure(figsize=(10,7))
splot = sns.scatterplot(data=umap_df, x='umap1', y='umap2', hue='instrument', palette=sns.color_palette(palette, 14))
splot.legend(bbox_to_anchor=(1.2, 1))