# The Goal

Our goal here is to build a personal audio search engine like «Shazam». 

Based on http://willdrevo.com/fingerprinting-and-audio-recognition-with-python/

# Step 1: Defining our Sound Descriptor



In [1]:
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
from scipy.ndimage.filters import maximum_filter
from scipy.ndimage.morphology import (generate_binary_structure,
                                      iterate_structure, binary_erosion)
import hashlib
from operator import itemgetter

IDX_FREQ_I = 0
IDX_TIME_J = 1

######################################################################
# Sampling rate, related to the Nyquist conditions, which affects
# the range frequencies we can detect.
DEFAULT_FS = 44100

######################################################################
# Size of the FFT window, affects frequency granularity
DEFAULT_WINDOW_SIZE = 4096

######################################################################
# Ratio by which each sequential window overlaps the last and the
# next window. Higher overlap will allow a higher granularity of offset
# matching, but potentially more fingerprints.
DEFAULT_OVERLAP_RATIO = 0.5

######################################################################
# Degree to which a fingerprint can be paired with its neighbors --
# higher will cause more fingerprints, but potentially better accuracy.
DEFAULT_FAN_VALUE = 15

######################################################################
# Minimum amplitude in spectrogram in order to be considered a peak.
# This can be raised to reduce number of fingerprints, but can negatively
# affect accuracy.
DEFAULT_AMP_MIN = 10

######################################################################
# Number of cells around an amplitude peak in the spectrogram in order
# for Dejavu to consider it a spectral peak. Higher values mean less
# fingerprints and faster matching, but can potentially affect accuracy.
PEAK_NEIGHBORHOOD_SIZE = 20

######################################################################
# Thresholds on how close or far fingerprints can be in time in order
# to be paired as a fingerprint. If your max is too low, higher values of
# DEFAULT_FAN_VALUE may not perform as expected.
MIN_HASH_TIME_DELTA = 0
MAX_HASH_TIME_DELTA = 200

######################################################################
# If True, will sort peaks temporally for fingerprinting;
# not sorting will cut down number of fingerprints, but potentially
# affect performance.
PEAK_SORT = True

######################################################################
# Number of bits to throw away from the front of the SHA1 hash in the
# fingerprint calculation. The more you throw away, the less storage, but
# potentially higher collisions and misclassifications when identifying songs.
FINGERPRINT_REDUCTION = 20

In [2]:
import lib.wavio

def fingerprint(channel_samples, Fs=DEFAULT_FS,
                wsize=DEFAULT_WINDOW_SIZE,
                wratio=DEFAULT_OVERLAP_RATIO,
                fan_value=DEFAULT_FAN_VALUE,
                amp_min=DEFAULT_AMP_MIN):
    """
    FFT the channel, log transform output, find local maxima, then return
    locally sensitive hashes.
    """
    # FFT the signal and extract frequency components
    arr2D = mlab.specgram(
        channel_samples,
        NFFT=wsize,
        Fs=Fs,
        window=mlab.window_hanning,
        noverlap=int(wsize * wratio))[0]

    # apply log transform since specgram() returns linear array
    arr2D = 10 * np.log10(arr2D)
    arr2D[arr2D == -np.inf] = 0  # replace infs with zeros

    # find local maxima
    local_maxima = get_2D_peaks(arr2D, plot=False, amp_min=amp_min)

    # return hashes
    return generate_hashes(list(local_maxima), fan_value=fan_value)


def get_2D_peaks(arr2D, plot=False, amp_min=DEFAULT_AMP_MIN):
    # http://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.morphology.iterate_structure.html#scipy.ndimage.morphology.iterate_structure
    struct = generate_binary_structure(2, 1)
    neighborhood = iterate_structure(struct, PEAK_NEIGHBORHOOD_SIZE)

    # find local maxima using our fliter shape
    local_max = maximum_filter(arr2D, footprint=neighborhood) == arr2D
    background = (arr2D == 0)
    eroded_background = binary_erosion(background, structure=neighborhood,
                                       border_value=1)

    # Boolean mask of arr2D with True at peaks
    detected_peaks = local_max - eroded_background

    # extract peaks
    amps = arr2D[detected_peaks]
    j, i = np.where(detected_peaks)

    # filter peaks
    amps = amps.flatten()
    peaks = zip(i, j, amps)
    peaks_filtered = [x for x in peaks if x[2] > amp_min]  # freq, time, amp

    # get indices for frequency and time
    frequency_idx = [x[1] for x in peaks_filtered]
    time_idx = [x[0] for x in peaks_filtered]

    if plot:
        # scatter of the peaks
        fig, ax = plt.subplots()
        ax.imshow(arr2D)
        ax.scatter(time_idx, frequency_idx)
        ax.set_xlabel('Time')
        ax.set_ylabel('Frequency')
        ax.set_title("Spectrogram")
        plt.gca().invert_yaxis()
        plt.show()

    return zip(frequency_idx, time_idx)


def generate_hashes(peaks, fan_value=DEFAULT_FAN_VALUE):
    """
    Hash list structure:
       sha1_hash[0:20]    time_offset
    [(e05b341a9b77a51fd26, 32), ... ]
    """
    if PEAK_SORT:
        peaks.sort(key=lambda x: x[1])

    for i in range(len(peaks)):
        for j in range(1, fan_value):
            if (i + j) < len(peaks):
                
                freq1 = peaks[i][IDX_FREQ_I]
                freq2 = peaks[i + j][IDX_FREQ_I]
                t1 = peaks[i][IDX_TIME_J]
                t2 = peaks[i + j][IDX_TIME_J]
                t_delta = t2 - t1

                if t_delta >= MIN_HASH_TIME_DELTA and t_delta <= MAX_HASH_TIME_DELTA:
                    h = hashlib.sha1("{}|{}|{}".format(freq1, freq2, t_delta).encode('utf-8'))
                    yield (h.hexdigest()[0:FINGERPRINT_REDUCTION], t1)


def loadfile(filename):
    # pydub does not support 24-bit wav files, use wavio when this occurs
    try:
        audiofile = AudioSegment.from_file(filename)

        data = np.fromstring(audiofile._data, np.int16)

        channels = []
        for chn in range(audiofile.channels):
            channels.append(data[chn::audiofile.channels])

        fs = audiofile.frame_rate
    except audioop.error:
        fs, _, audiofile = wavio.readwav(filename)

        audiofile = audiofile.T
        audiofile = audiofile.astype(np.int16)

        channels = []
        for chn in audiofile:
            channels.append(chn)
    
    return channels, fs

# Step 2: Extracting Features from Our Dataset

In [None]:
import glob
import os
import pandas as pd
from tqdm import tqdm
from pydub import AudioSegment
from pydub.utils import audioop
from hashlib import sha1

# open the output index file for writing
output = open("indexes/music.idx", "w")

music_index = pd.DataFrame(columns=('sound_id', 'hash', 'time_offset'))

# use glob to grab the image paths and loop over them
for musicPath in tqdm(glob.glob("dataset/*.mp3")):
    # extract the sound ID (i.e. the unique filename) from the sopund
    # path and load the sound itself
    soundID = musicPath[musicPath.rfind("/") + 1:]
    
    channels, fs = loadfile(musicPath)
 
    result = set()
    channel_amount = len(channels)

    for channeln, channel in enumerate(channels):
        # TODO: Remove prints or change them into optional logging.
        #print("Fingerprinting channel %d/%d for %s" % (channeln + 1, channel_amount, musicPath))
        hashes = set(fingerprint(channel, Fs=fs))
        #print("Finished channel %d/%d for %s" % (channeln + 1, channel_amount, musicPath))
        
        result |= hashes
        for hash, offset in hashes:
             music_index = music_index.append(
                 {
                     'sound_id': soundID,
                     'hash': hash, 
                     'time_offset': offset
                 }, ignore_index=True)
    
    features = [str(f) for f in result]
    print(len(music_index))
    output.write("%s,%s\n" % (soundID, ",".join(features)))

# close the index file
output.close()

print("Indexes saved with size(Bytes): {}".format(os.stat('indexes/music.idx').st_size))





In [7]:
len(music_index)

0

# Step 3: The Searcher

In [3]:
def find_matches():
    

def search(queryFile):
    channels, fs = loadfile(musicPath)
    
    result = set()
    channel_amount = len(channels)

    for channeln, channel in enumerate(channels):
        hashes = fingerprint(channel, Fs=fs)
        result |= set(hashes)
    
    matches = []
    for channel in channels:
        matches.extend(find_matches(channel, fs))
    return self.dejavu.align_matches(matches)
    
    return result
    
def chi2_distance(histA, histB, eps = 1e-10):
    # compute the chi-squared distance
    d = 0.5 * np.sum([((a - b) ** 2) / (a + b + eps) for (a, b) in zip(histA, histB)])
    # return the chi-squared distance
    return d

# Step 4: Performing a Search

In [None]:
%%time

# load the query image and describe it
query = cv2.imread("holidays/100000.jpg")
features = cd.describe(query)
 
# perform the search
searcher = Searcher('indexes/holidays.idx')
results = searcher.search(features)
 
# display the query
print("Query")
plt.imshow(cv2.cvtColor(query, cv2.COLOR_BGR2RGB))
plt.show()
 
# loop over the results
for (score, resultID) in results:
    # load the result image and display it
    result = cv2.imread("holidays/" + resultID)
    print("Result: {}".format("holidays/" + resultID))
    plt.imshow(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
    plt.show()

Time 13 second. Not very quick, but simple

In [None]:
%%time

# load the query image and describe it
query = cv2.imread("holidays/110000.jpg")
features = cd.describe(query)
 
# perform the search
searcher = Searcher('indexes/holidays.idx')
results = searcher.search(features)
 
# display the query
print("Query")
plt.imshow(cv2.cvtColor(query, cv2.COLOR_BGR2RGB))
plt.show()
 
# loop over the results
for (score, resultID) in results:
    # load the result image and display it
    result = cv2.imread("holidays/" + resultID)
    print("Result")
    plt.imshow(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
    plt.show()