In [1]:
import os
import librosa
import numpy as np

In [2]:
def beatTracker(inputFile):
    
    """
    Performs beat and downbeat tracking on a given audio sample.
    This is done by following:
        1 - Calculate an onset detection function (here using spectral flux, but others also apply)
        2 - Automatically extract a tempo [in BPM] from the onset detection function
        3 - Performs the beat tracking algorithm with dynamic programing (Ellis, 2007)
        4 - Given a beat sequence, estimates downbeat sequence 
        5 - Converts beat sequence and downbeat sequence from frames to time [in seconds]
        
    Arguments:
        inputFile: path name of input audio file;
    Returns:
        beat_sequence: Sequence of beats [in seconds]
        downbeat_sequence: Sequence of downbeats [in seconds]
    """
    
    # IMPORT AUDIO FILE AND RETRIEVE SAMPLING RATE
    y, sr = librosa.load(inputFile)
    
    
    # 1 - ONSET DETECTION
    #     Inspired by Müller's notes, available at:
    #     https://www.audiolabs-erlangen.de/resources/MIR/FMP/C6/C6S1_NoveltySpectral.html
    
    # Calculates onset detection function using spectral flux
    N = 1024
    hop_size = 512
    gamma_factor = 100
    
    # Calculates stft
    X = librosa.stft(y, n_fft=N, hop_length=hop_size, win_length=N, window='hanning')
    Fs_onset_function = sr / hop_size
    # Applies logarithmic compression
    Y = np.log(1 + gamma_factor * np.abs(X))
    # Applies discrete derivative
    Y_diff = np.diff(Y)
    # Applies half-wave rectification
    Y_diff[Y_diff < 0] = 0
    # Applies accumulation
    onset_function = np.sum(Y_diff, axis=0)
    onset_function = np.concatenate((onset_function, np.array([0.0])))
    
    # Applies local averaging, given M
    M = 10
    L = len(onset_function)
    local_average = np.zeros(L)
    for m in range(L):
        a = max(m - M, 0)
        b = min(m + M + 1, L)
        local_average[m] = (1 / (2 * M + 1)) * np.sum(onset_function[a:b])
        
    onset_function = onset_function - local_average
    onset_function[onset_function < 0] = 0.0
    
    # Applies normalization
    max_value = max(onset_function)
    if max_value > 0:
        onset_function = onset_function / max_value
    
    # Save onset_function for downbeat tracking
    onset_function_db = onset_function

    # 2 - TEMPO ESTIMATION
    # Estimates tempo given an onset detection function
    tempo_BPM = librosa.beat.tempo(onset_envelope=onset_function, sr=sr)
    # Converts tempo [in BPM] to number of samples per beat, given a sampling rate
    frames_beat = (60 * Fs_onset_function) / tempo_BPM
    
    
    # 3 - BEAT TRACKING by DYNAMIC PROGRAMMING (Ellis, 2007)
    #     Inspired by Müller's notes, available at:
    #     https://www.audiolabs-erlangen.de/resources/MIR/FMP/C6/C6S3_BeatTracking.html
    
    # 3.1 - Estimates the consistency function
    consistency_frames = len(onset_function)
    t = np.arange(1, consistency_frames) / frames_beat
    # Penalty for not being beat consistent
    consistency = -np.square(np.log2(t))
    t = np.concatenate((np.array([0]), t))
    consistency =  np.concatenate((np.array([0]), consistency))
    consistency_factor = 1
    consistency = consistency * consistency_factor
    
    # 3.2 - Estimates beat sequence, given the consistency function and the onset function,
    #       using dynamic programming
    
    N = len(onset_function)
    onset_o = onset_function
    onset_function = np.concatenate((np.array([0]), onset_function))
    # To cope with Python indexing
    acc_score = np.zeros(N+1)
    P = np.zeros(N+1, dtype=int) 
    # Accumulated score
    acc_score[1] = onset_function[1]
    # Store to backtrace
    P[1] = 0  
    
    # Forward calculation is performed
    for n in range(2, N+1):
        m_indices = np.arange(1,n)
        # Subtract penalty function from onset_function
        scores = acc_score[m_indices] + consistency[n-m_indices]
        # extract the maximum 
        maximum = np.max(scores)
        if maximum <= 0:
            acc_score[n] = onset_function[n]
            P[n] = 0
        else:
            # Adds maximum to the beat
            acc_score[n] = onset_function[n] + maximum
            # Saves the beat
            P[n] = np.argmax(scores) + 1
            
    # Backtracking 
    beat_sequence_frames = np.zeros(N, dtype=int)
    k = 0
    # Starts from the most confident beat
    beat_sequence_frames[k] = np.argmax(acc_score)
    # Iterate through P[] to keep positions of successive confident beats
    while( P[beat_sequence_frames[k]]!=0 ):
        k = k+1
        # Recursive step: P[P[P[P[]]]]
        # Assign the last values of P[] to beat_sequence_frames
        beat_sequence_frames[k] = P[beat_sequence_frames[k-1]]
        
    beat_sequence_frames = beat_sequence_frames[0:k+1]
    # Flips it
    beat_sequence_frames = beat_sequence_frames[::-1]
    # To compensate for Python indexing, remove the previous first one, now the last
    beat_sequence_frames = beat_sequence_frames - 1
    
    # 4 - DOWNBEAT ESTIMATION
    # Create an array for 4/4 and 3/4 (1st beat in a 4/4, 2nd beat in a 4/4, ... 1st beat in a 3/4)
    hypothesis = np.zeros(7)
    for i in range(0,7):
        # Which beat to start from (0,1,2,3)  
        num = (i % 4)
        den = 3 if ((i)//4) else 4
        # Store the hypothetical beat sequence
        hyp_downbeat_sequence = beat_sequence_frames[num::den]
        # Calculates the mean of each hypothesis
        hypothesis[i] = np.mean(onset_function_db[hyp_downbeat_sequence])
    
    # Retrieve the best candidate
    best = np.argmax(hypothesis)
    
    # Beat from which to start (considering first measure)
    a1 = (best % 4)
    # Step until next beat (3 if 3/4 or 4 if 4/4)
    a2 = 3 if ((best)//4) else 4
    
    # Best downbeat sequence retrieved straing from a1 and hopping with a2 size steps
    downbeat_sequence_frames = beat_sequence_frames[a1::a2]
    
    # 5 - FRAMES TO TIME 
    # Converts from frames to time [in seconds], given a sampling rate
    beat_sequence_time = librosa.frames_to_time(beat_sequence_frames, sr=sr)
    downbeat_sequence_time = librosa.frames_to_time(downbeat_sequence_frames, sr=sr)
    
    
    return beat_sequence_time, downbeat_sequence_time

In [3]:
'''
#Example of application in Python Notebook:

# Uncomment until the end
import IPython.display as ipd

# DEFINE DATA DIRECTORY
PATH = '/Users/pedro/Documents/GitHub/MINF_ECS7006/Assignment1/data'

DATAPATH = os.getcwd()
AUDIOPATH = PATH + '/ballroomwav/audio'
ANNOTPATH = PATH + '/ballroomwav/annotations'

# Five file examples:
# filename = 'Albums-Latin_Jam2-11'
# filename = 'Albums-Cafe_Paradiso-12'
# filename = 'Albums-Ballroom_Classics4-04'
filename = 'Media-100609'
# filename = 'Albums-I_Like_It2-09'
# filename = 'Albums-Chrisanne1-07'

# Import file
y, sr = librosa.load(AUDIOPATH + '/' + filename + '.wav')

# GET ANNOTATIONS
beat_annotations = np.genfromtxt(ANNOTPATH + '/' + filename + '.beats', delimiter=' ')
beat_time_annotations = beat_annotations[:,0]
downbeat_annotations = beat_annotations[:,1]

# GET DOWNBEAT TIME ANNOTATIONS
downbeat_time_annotations = []
for i, time in enumerate(beat_annotations[:,0]):
    if(beat_annotations[i,1] == 1):
        downbeat_time_annotations.append(time)


inputFile = AUDIOPATH + '/' + filename + '.wav'

beat_sequence, downbeat_sequence = beatTracker(inputFile)

print("Beat: estimated vs. annotated:", len(beat_sequence), (len(beat_time_annotations)))
print("Downbeat: estimated vs. annotated:", len(downbeat_sequence), (len(downbeat_time_annotations)))

# CREATE TICKS FROM ESTIMATED BEATS OR DOWNBEATS (change 'times=')
y_beats = librosa.clicks(times=downbeat_sequence, sr=sr, length=len(y))

# DISPLAY AND PLAY AUDIO AND CLICKS TOGETHER
ipd.Audio(y + y_beats, rate=sr)
'''

Beat: estimated vs. annotated: 85 87
Downbeat: estimated vs. annotated: 29 29
