In [1]:
import numpy as np
import librosa
from matplotlib import pyplot as plt
from skimage.feature import peak_local_max
import pickle
import os
import time

In [2]:

# helper function to dump all database items to a file
def dump(data, dir, name):
    # Ensure the directory exists
    if not os.path.exists(dir):
        os.makedirs(dir)
    
    #create the file path
    file_path = os.path.join(dir, f"{name}.pkl")
    
    # write the data to the file
    with open(file_path, 'wb') as file:
        pickle.dump(data, file, pickle.HIGHEST_PROTOCOL)

def spectral_peaks(y, window_hop=5, threshhold=0.05):    
    # take STFT and get magnitude values
    stft = np.abs(librosa.stft(y))
    # get the peaks           
    peaks = peak_local_max(stft, min_distance=window_hop, threshold_abs=threshhold)
    return peaks # index of peaks

# use every point in constelation map as anchor then find pair in target zone        
def peak_combinations(peaks, id, target_size=32, fan_max=10):
    peaks = np.array(peaks) # use numpy for brevity
    
    combinations = {}
    # set each peak as an anchor
    for anchor in peaks: 
        # define the current target bounds
        upper_bound = anchor + target_size
        # find peaks in target range
        neighbours = peaks[(peaks > anchor)     .all(axis=1) & # this is slow - instead directly index the constelation map with window
                           (peaks < upper_bound).all(axis=1)] 
        
        # combine anchor with each neighbour and time diff
        for i in range(len(neighbours)): # create get neighbours function
            # value 
            time_stamp = anchor[1]
            data       = {"id": id, "time_stamp": time_stamp} # object containing id and offset timestamp
            # key
            time_diff  = neighbours[i][1] - time_stamp
            hash       = (anchor[0], neighbours[i][0], time_diff) # create hash tuple
            
            # create key value pair
            combinations[hash] = [data] # add new key
            
            if i == fan_max: 
                break
    
    return combinations

def fingerprint_file(file, id=None, resample=16384):
    y, sr    = librosa.load(file, sr=resample)
    peaks    = spectral_peaks(y)
    features = peak_combinations(peaks, id)    
    return features

# helper function to get id based on GTZAN dataset file names
def get_id(file_name):
    split = file_name.split('.')
    id = split[0] + '.' + split[1][:5]
    return id

def fingerprintBuilder(db_path, fingerprints_path, dataset_size=200, show=False):
    doc_count = 0
    num_documents = len(os.listdir(db_path))
    fingerprints = {}
    
    start_time = time.perf_counter()
    for entry in os.scandir(db_path):
        id = get_id(entry.name)
        combinations = fingerprint_file(entry, id=id) # fingerprint
        
        # add combinations to fingerprints data structure
        for hash in combinations.keys():
            value = combinations[hash][0] # this is not ideal but it works
            if hash in fingerprints: fingerprints[hash].append(value) # if the a hash exists append to current array
            else: fingerprints[hash] = [value] # add new key
                
        doc_count+=1
        if show:
            print("{} - {} of {} documents processed".format(id, doc_count, num_documents))
        if doc_count == dataset_size: break
        
    # show run time and hash count
    end_time = time.perf_counter()
    total_time = end_time - start_time
    print("---number of hashes {}---".format(len(fingerprints)))
    print("---time to run fingerprinting {} seconds---".format(total_time))
    
    # dump values to file
    dump(fingerprints, fingerprints_path, "documents")

db_path = "_database_recordings"
fingerprint_path = "_fingerprints"
fingerprintBuilder(db_path, fingerprint_path, show=True)

pop.00027 - 1 of 200 documents processed
pop.00033 - 2 of 200 documents processed
classical.00079 - 3 of 200 documents processed
classical.00045 - 4 of 200 documents processed
classical.00051 - 5 of 200 documents processed
classical.00086 - 6 of 200 documents processed
classical.00092 - 7 of 200 documents processed
classical.00093 - 8 of 200 documents processed
classical.00087 - 9 of 200 documents processed
classical.00050 - 10 of 200 documents processed
classical.00044 - 11 of 200 documents processed
classical.00078 - 12 of 200 documents processed
pop.00032 - 13 of 200 documents processed
pop.00026 - 14 of 200 documents processed
pop.00030 - 15 of 200 documents processed
pop.00024 - 16 of 200 documents processed
pop.00018 - 17 of 200 documents processed
classical.00052 - 18 of 200 documents processed
classical.00046 - 19 of 200 documents processed
classical.00091 - 20 of 200 documents processed
classical.00085 - 21 of 200 documents processed
classical.00084 - 22 of 200 documents proce

In [5]:
from collections import defaultdict, Counter

# helpering function for loading pickled data
def load(name):
    with open(name + '.pkl', 'rb') as file:
        loaded_data = pickle.load(file)
    return loaded_data

def identify(Q, D):    
    # calulcate the frequnecy of matching offsets
    matches = get_matches(Q, D)
    # find documents with most frequnent shifts
    candidates = get_largest_matches(matches)
    # sort most frequnent ids by count
    sorted_candidates = sort_dic(candidates)

    return sorted_candidates # [0] = most common

def get_matches(Q, D):
    matches = defaultdict(Counter) # counter for shift frequency - defaultdict allows arbitrary keys
     # find valid shifts   
    for h_q in Q: # for each hash index in query
        if h_q not in D: continue # only search hash indexes in query
        for n in Q[h_q]:
            for l in D[h_q]:
                m    = l["time_stamp"] - n["time_stamp"]
                D_id = l["id"]
                matches[D_id][m] += 1
                
    return matches

# create dict of most most common matches for each id - returns dict {"doc_id" : count}
def get_largest_matches(matches):
    candidates = {}
    
    for D_id, counter in matches.items():
        most_common_offset, count = counter.most_common(1)[0] # get most common time shift
        candidates[D_id] = count # add to list of possible matches
    
    return candidates

# sort a dict by value - return keys
def sort_dic(dict):
    values = [key for key in dict]
    keys_by_value = sorted(values, key=lambda x: -dict[x])
    return keys_by_value

def write_output_line(output_file, docs, query_name):
    if len(docs) > 0:
        output_line = "%s\t%s\n" % (
            query_name,
            "\t".join([doc + '.wav' for doc in docs[:min(3, len(docs))]]))
    else:
        output_line = query_name
    output_file.write(output_line)

def audioIdentification(query_path, fingerprints_path, output_path):
    D = load(fingerprints_path) # load fingerprints
    output_file = open(output_path, "w")
    matches = 0 # keep track of correct matches
    count   = 0
    start_time = time.perf_counter()
    # start profile timer
    for entry in os.scandir(query_path):
        count+=1
        query_name = entry.name
        id  = get_id(query_name)
        # finger print the query
        Q = fingerprint_file(entry, id=id)
        # find matching documents
        doc_ids = identify(Q, D) 
        if(len(doc_ids) > 0 and doc_ids[0] == id): matches+=1
        # write top documents 3 to file
        write_output_line(output_file, doc_ids, query_name)
    
    # stop profile timer 
    end_time = time.perf_counter()
    # show runtime
    total_time = end_time - start_time
    print("---time to run identification {} seconds---".format(total_time))
    print("---percentage correct {}---".format(matches / count))
    
    output_file.close()

fingerprints_path = '_fingerprints/documents'
query_path = "_query_recordings"
output_path = "_output.txt"
audioIdentification(query_path, fingerprints_path, output_path)

---time to run identification 157.12603164699976 seconds---
---percentage correct 0.7793427230046949---


In [None]:
# evaluate
