In [24]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
from microphone import record_audio
from typing import Tuple
import librosa
from random import randint
from collections import Counter
import os

# For peak finding:
from scipy.ndimage.filters import maximum_filter
from scipy.ndimage.morphology import generate_binary_structure, binary_erosion
from scipy.ndimage.morphology import iterate_structure
from typing import Tuple, Callable, List

%matplotlib notebook

In [25]:
#Create functions for converting all variety of audio recordings into a NumPy-array of digital samples.

def from_mp3(local_song_path: str, length: float, sr: int):
    #Gets MP3 from disk
    samples, sample_rate = librosa.load(local_song_path, sr=sr, mono=True, duration=length)
    
    #Turns MP3 into .npy file and saves to disk
    samples = np.hstack([np.frombuffer(i, np.int16) for i in samples])
    array = np.hstack((sample_rate, samples)) #sample rate is first
    
    return array

def from_recording(length: float, sr: int):
    samples, sample_rate = record_audio(length)
    
    #Turns MP3 into .npy file and saves to disk
    samples = np.hstack([np.frombuffer(i, np.int16) for i in samples])
    array = np.hstack((sample_rate, samples)) #sample rate is first
    
    return array


In [26]:
from numba import njit

# `@njit` "decorates" the `_peaks` function. This tells Numba to
# compile this function using the "low level virtual machine" (LLVM)
# compiler. The resulting object is a Python function that, when called,
# executes optimized machine code instead of the Python code
# 
# The code used in _peaks adheres strictly to the subset of Python and
# NumPy that is supported by Numba's jit. This is a requirement in order
# for Numba to know how to compile this function to more efficient
# instructions for the machine to execute

@njit

def _peaks(
    data_2d: np.ndarray, rows: np.ndarray, cols: np.ndarray, amp_min: float
) -> List[Tuple[int, int]]:
    """
    A Numba-optimized 2-D peak-finding algorithm.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected.

    rows : numpy.ndarray, shape-(N,)
        The 0-centered row indices of the local neighborhood mask
    
    cols : numpy.ndarray, shape-(N,)
        The 0-centered column indices of the local neighborhood mask
        
    amp_min : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location. 
    """
    
    peaks = []  # stores the (row, col) locations of all the local peaks

    # Iterate over the 2-D data in col-major order
    # we want to see if there is a local peak located at
    # row=r, col=c
    for c, r in np.ndindex(*data_2d.shape[::-1]):
        if data_2d[r, c] <= amp_min:
            # The amplitude falls beneath the minimum threshold
            # thus this can't be a peak.
            continue
        
        # Iterating over the neighborhood centered on (r, c)
        # dr: displacement from r
        # dc: discplacement from c
        for dr, dc in zip(rows, cols):
            if dr == 0 and dc == 0:
                # This would compare (r, c) with itself.. skip!
                continue

            if not (0 <= r + dr < data_2d.shape[0]):
                # neighbor falls outside of boundary
                continue

            # mirror over array boundary
            if not (0 <= c + dc < data_2d.shape[1]):
                # neighbor falls outside of boundary
                continue

            if data_2d[r, c] < data_2d[r + dr, c + dc]:
                # One of the amplitudes within the neighborhood
                # is larger, thus data_2d[r, c] cannot be a peak
                break
        else:
            # if we did not break from the for-loop then (r, c) is a peak
            peaks.append((r, c))
    return peaks

# `local_peak_locations` is responsible for taking in the boolean mask `neighborhood`
# and converting it to a form that can be used by `_peaks`. This "outer" code is 
# not compatible with Numba which is why we end up using two functions:
# `local_peak_locations` does some initial pre-processing that is not compatible with
# Numba, and then it calls `_peaks` which contains all of the jit-compatible code
def local_peak_locations(data_2d: np.ndarray, neighborhood: np.ndarray, amp_min: float):
    """
    Defines a local neighborhood and finds the local peaks
    in the spectrogram, which must be larger than the specified `amp_min`.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected
    
    neighborhood : numpy.ndarray, shape-(h, w)
        A boolean mask indicating the "neighborhood" in which each
        datum will be assessed to determine whether or not it is
        a local peak. h and w must be odd-valued numbers
        
    amp_min : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location.
    
    Notes
    -----
    Neighborhoods that overlap with the boundary are mirrored across the boundary.
    
    The local peaks are returned in column-major order.
    """
    
    rows, cols = np.where(neighborhood)
    assert neighborhood.shape[0] % 2 == 1
    assert neighborhood.shape[1] % 2 == 1

    # center neighborhood indices around center of neighborhood
    rows -= neighborhood.shape[0] // 2
    cols -= neighborhood.shape[1] // 2

    return _peaks(data_2d, rows, cols, amp_min=amp_min)

In [27]:
def local_peaks_mask(data: np.ndarray, cutoff: float) -> np.ndarray:
    """Find local peaks in a 2D array of data.

    Parameters:
    data : numpy.ndarray, shape-(H, W)
    cutoff : float
         A threshold value that distinguishes background from foreground

    Returns
    -------
    Binary indicator, of the same shape as `data`. The value of
    1 indicates a local peak."""
    
    # Generate a rank-2, connectivity-2 binary mask
    r2c2 = generate_binary_structure(2, 2)


    # Use that neighborhood to find the local peaks in `data`.
    # Pass `cutoff` as `amp_min` to `local_peak_locations`.
    peak_locations = local_peak_locations(data, r2c2, cutoff)
    

    # Turns the list of (row, col) peak locations into a shape-(N_peak, 2) array
    # Save the result to the variable `peak_locations`
    peak_locations = np.array(peak_locations)

    # create a mask of zeros with the same shape as `data`
    mask = np.zeros(data.shape, dtype=bool)

    # populate the local peaks with `1`
    mask[peak_locations[:, 0], peak_locations[:, 1]] = 1
    return mask

In [28]:
def peaks_to_fingerprints (local_peaks: np.ndarray, n=15):
    """Takes in the array local_peaks of freq, time for each peak
    for all of the local peaks in a section of a song and takes in
    fanout value n.
    
    It will return fingerprints for each peak entered in an ndarray 
    """
    
    pairs = form_peak_pairs(local_peaks)
    peaks = points_to_data(pairs)

    fingerprint = np.array([peaks_to_fanout(
        peaks[
            np.where(np.multiply(pairs[:,0,0] == i[0], pairs[:,0,1] == i[1]))],
        n) for i in local_peaks if np.size(
        np.where(np.multiply(pairs[:,0,0] == i[0], pairs[:,0,1] == i[1])))>= n])
    
    ### np.where(np.multiply(pairs[:,0,0] == i[0], pairs[:,0,1] == i[1])) is the index of the peak pair data (row containing it) for each peak i
    ### comparing the first and second items of each peak with those of peak i
    
    """
    # GET RID OF FOOR LOOP
    for i in local_peaks: # each peak...
        indices = np.array(np.where(np.multiply(pairs[:,0,0] == i[0], pairs[:,0,1] == i[1]))) # index of peak pair data (row containing it) where first peak is peak i (can ignore the second peak since we are only looking forward)
        
        ### indices[i] --> (row) index of peak-pair P the array peaks (peaks[indices[i], : ] --> original data for peak-pair P)
        
        selected = peaks[indices, : ]
        
        if len(selected) >= n: # only going to add it if it's long enough
            fanout = peaks_to_fanout(selected, n)
    """
        
    ### return array of the fingerprints for each peak
    return fingerprint

def form_peak_pairs(local_peaks: np.ndarray):
    """Takes in the array local_peaks of any number of rows and two columns freq and time
    for each of the local peaks and returns an array of each peak-pair from these peaks
    """
    
    pairs = np.array([(peak1,peak2) for peak2 in local_peaks for peak1 in local_peaks if peak1[1] < peak2[1]])    
    
    return pairs

def points_to_data(points: np.ndarray):
    """Takes in an array of freq and time data for each peak in each peak-pair
    (where the first listed is one that occurs first) and returns a 2d array
    which contains three columns and number of rows equivalent to the number of 
    peak-pairs entered
        Col 1 --> frequency at point 1 (point 1 must occur first)
        Col 2 --> frequency at point 2
        Col 3 --> time elapsed between points
    """
    data = np.array([[pair[0,0], pair[1,0], pair[1,1]-pair[0,1]] for pair in points])
    return data

def peaks_to_fanout(selected_peaks: np.ndarray, n):
    """Takes in a 2d array selected_peaks of selected peak-pair
    data in the same formatting as the full array, and containing
    all of the peak-pairs with a particular peak as peak 1. Also
    takes in the fanout value n.
    
    Returns the fanout for this peak

                                  [[fi, fi+1, delta_t(i,i+1)]
                                   [fi, fi+2, delta_t(i,i+2)],
        fanout (for peak i)  =               ...,
                                   [fi, fi+n, delta_t(i,i+n)]]    
    """

    delta_fs = selected_peaks[:,1]-selected_peaks[:,0] # array of change in frequencies for our peak pairs
    delta_ts = selected_peaks[:,2] # array of change in times for our peak pairs

    ### for each index i,
    ###     selected_peaks[i] --> peak-pair P data (this is just a segment of the original data; just the peak-pairs contianing our particular peak)
    ###     delta_fs[i]       --> change in frequency for peak-pair P
    ###     delta_ts[i]       --> change in time for peak-pair P


    # an array of indices pointing towards our selected peak-pair data
    # (selected_peaks_sorted[i] --> index of i'th closest peak (index pointing towards our selected peak-pair data arrays))
    selected_peaks_sorted = sort_peaks(delta_fs, delta_ts) 

    #selected_peaks_sorted[0:n] are the indices of the peaks within the fanout
    fanout_inds = selected_peaks_sorted[0:n]
    fanout_peaks = selected_peaks[fanout_inds]

    ###    fanout for this peak = an array of [[fi, fi+1, delta_t(i,i+1)]
    ###                                        [fi, fi+2, delta_t(i,i+2)],
    ###                                                   ...,
    ###                                        [fi, fi+n, delta_t(i,i+n)]]
    
    fanout = fanout_peaks[:,0:3]
    return fanout

def sort_peaks (delta_fs: np.ndarray, delta_ts: np.ndarray):
    """Takes in two 1 dimensional nd arrays delta_fs and delta_ts
    of the same length which both point towards data from the same
    selected peak-pairs and for which the value of each at each same
    index point to the same individual peak-pair
        - this means that for each index i, delta_fs[i] and delta_ts[i]
          correspond to the same peak-pair
        - the delta_f and delta_t values should be from all of the
          peak-pairs for which a particular peak is the first peak in
          the pair
    
    It returns an nd array where the values of which represent the
    indices of delta_ts, delta_fs, and other arrays which point to the
    same data in the same order sorted by time then frequency (eg. the
    first value of the output array is the index of the smallest
    delta_t, second is the next smallest, etc. (where identical delta_ts
    will be ordered by smallest delta_f to greatest))
    """
    
    
    ### get indices sorted based on delta_t values of our selected data (sorted_t)
    sorted_t = np.argsort(delta_ts)

    # we need to make sure that any duplicate times are sorted by frequency:

    #finding the time values
    time_counts = Counter(delta_ts)
    arr_counts = np.array(list(time_counts.items())) # an array of the items in the Counter time_counts
    duplicates_ind = np.where(arr_counts[:,1] >1) # finding the indexes of where there are more than one of each time
    if len(duplicates_ind) > 0: # only continue to sort if there are duplicates
        duplicates_times = arr_counts[:, 0][duplicates_ind] #finding the time values based on the indices in the counter

        # list of an array of indices of each duplicate time
        # (indices are the indices pointing towards data in delta_fs, delta_ts, and t_sorted, NOT the values in t_sorted which will be used as indices (to get those do t_sorted[unsorted_by_freq]))
        unsorted_by_freq = [list(np.where(delta_ts == time)[0]) for time in duplicates_times]

        # list of an array containing the new index values pointed to by each array of indices in unsorted_by_freq, sorted by frequency
        # (these values are the index values to put into the array of indices)
        # each item = reorder of the values of sorted_t by sorting by frequency for each index_arr in the unsorted_by_freq lsit
        sorted_by_freq = [list(sorted_t[index_arr][np.argsort(delta_fs[sorted_t[index_arr]])]) for index_arr in unsorted_by_freq]                
        # put back in:
        ## 'flattening' the lists so as to be able to index with them
        flat_unsorted = []
        flat_sorted = []
        [[flat_unsorted.append(inner_vals) for inner_vals in value] for value in unsorted_by_freq]
        [[flat_sorted.append(inner_vals) for inner_vals in value] for value in sorted_by_freq]
        ## putting these into sorted
        sorted_fully = sorted_t.copy()
        sorted_fully[flat_unsorted] = flat_sorted
        
        return sorted_fully

In [29]:
#Databases
database_names = {}
database_fingerprints = {}
#add functions
def add_to_DBN(song_name: str, artist: str):
    database_names[len(database)+1] = (song_name,artist)
    
    
def add_to_DBF(fingerprint_arr: tuple, time_occured: float,  song_ID: str):
    if (fingerprint_arr in database_fingerprints.keys()):
        temp_tuple = database_fingerprints[fingerprint_arr] + (song_ID,time_occured)
        database_fingerprints[fingerprint_arr] = temp_tuple
    else:
        database_fingerprints[fingerprint_arr] = (song_ID, time_occured)
        
def del_id(ex_dict, id_n):
    del ex_dict[id_n]

def get_all(ex_dict: dict):
    # ex_dict is the dictionary which stores a tuple of songs and artists as the value of keys which are ids.
    return [ex_dict[i] for i in range(1,len(ex_dict)+1)]
    # i refers to the id number, range begins at 1 and len is added by 1 because id begins (here at least) with 1, not 0. 

def get_song_names(ex_dict: dict):
    return [ex_dict[i][0] for i in range(1, len(ex_dict)+1)]

def get_artists(ex_dict: dict):
    return [ex_dict[i][1] for i in range(1, len(ex_dict)+1)]

#add_to_DBF(tuple(np.array([0,0,0,1])) ,0, 1)
#add_to_DBF(tuple(np.array([0,0,0,0])) ,1, 1)
#add_to_DBF(tuple(np.array([0,0,0,0])) ,2, 1)
#del_id(database_fingerprints, tuple(np.array([0,0,0,0])))
#database_fingerprints

In [30]:
#Clips
def short_clip(percent, data): # percent refers to the percentage of the original file the new file should be
    # example: a 15 second clip with a percent of 10% would return files that are 1.5 seconds long.
    # Where in the original clip the function pulls from is randomly chosen.
    percent /= 100
    start = randint(0, len(data)-int(len(data)*percent)) # Chooses a random starting point for the new file, avoids out of bounds errors
    print("start ind is", start, "of", len(data)-int(len(data)*percent), "possible inds")
    return data[start:start+int(len(data)*percent)]

def multiple_clips(percent, data, num_clips):
    clips = [] # nd array would also work
    for i in range(num_clips):
        clips.append(short_clip(15, data))
    return clips

In [32]:
os.listdir('./MusicMP3s')[1:]

['Andrew Lloyd Webber - The Music of the Night.mp3',
 'Daniela Mercury - O Canto Da Cidade.mp3',
 'David Bowie – Space Oddity.mp3',
 'Imagine Dragons - I Bet My Life.mp3',
 'Jimi Hendrix - Little Wing.mp3',
 'Lewis Capaldi - Someone You Loved.mp3',
 'Mat Kearney - I Dont Really Care.mp3',
 'Rick Astley - Never Gonna Give You Up.mp3']