In [1]:
%pylab inline
from __future__ import print_function
import pickle
import os
import IPython

import numpy as np
import scipy
import sklearn.mixture

# Path for all pre-computed chroma
DATA_DIR = '/home/py/projects/dataset/beatles/beatchromlabs/'

Populating the interactive namespace from numpy and matplotlib


In [2]:
# Read in the list of training file IDs.
def read_file_list(filename):
    """Read a text file with one item per line."""
    items = []
    with open(filename, 'r') as f:
        for line in f:
            items.append(line.strip())
    return items

def read_beat_chroma_labels(file_id):
    """Read back a precomputed beat-synchronous chroma record."""
    filename = os.path.join(os.path.join(DATA_DIR, 'beatchromlabs', file_id + '.pkl'))
    with open(filename, "rb") as f:
        u = pickle._Unpickler(f)
        u.encoding = 'latin1'
        beat_times, chroma_features, label_indices = u.load()
        # beat_times, chroma_features, label_indices = pickle.load(f)
    #chroma_features = chroma_features**0.25
    chroma_features /= np.maximum(0.01, np.max(chroma_features, axis=1))[:, np.newaxis]
    return beat_times, chroma_features, label_indices

def load_all_features_labels(train_ids):
    """Load all the features and labels from a lit into big arrays."""
    features = []
    labels = []
    for train_id in train_ids:
        beat_times, chroma, label = read_beat_chroma_labels(train_id)
        assert not np.any(np.isnan(chroma))
        features.append(chroma)
        labels.append(label)
    features = np.concatenate(features)
    labels = np.concatenate(labels)
    print('Training features shape:', features.shape)
    return features, labels

def estimate_transitions(labels, num_models):
    # Count the number of transitions in the label set.
    # Each element of gtt is a 4 digit number indicating one transition 
    # e.g. 2400 for 24 -> 0.
    hashed_transitions = 100*labels[:-1] + labels[1:]
    # Arrange these into the transition matrix by counting each type.
    transitions = np.zeros((num_models, num_models))
    for i in range(num_models):
        for j in range(num_models):
            transition_hash = 100 * i + j 
            # Add one to all counts, so no transitions have zero 
            # probability.
            transitions[i, j] = 1 + np.count_nonzero(hashed_transitions == 
                                                     transition_hash)
    # Priors of each chord = total count of pairs starting in that chord.
    priors = np.sum(transitions, axis=1)
    # Normalize each row of transitions.
    transitions /= priors[:, np.newaxis]
    # Normalize priors too.
    priors /= np.sum(priors)
    return transitions, priors

def train_chord_models(train_ids):
    """Train Gaussian models for all chord data from a list of IDs.
    
    Args:
      train_ids:  List of IDs to pass to read_beat_chroma_labels().

    Returns:
      models: a list of sklearn.mixture.GMM objects, one for each class.
      transitions: np.array of size (num_classes, num_classes). 
        transitions[i, j] is the probability of moving to state j when 
        starting in state i.
      priors: 1D np.array giving the prior probability for each class.

    2016-04-03, 2010-04-07 Dan Ellis dpwe@ee.columbia.edu
    """
    features, labels = load_all_features_labels(train_ids)
    num_chroma = 12
    # We have a major and a minor chord model for each chroma, plus NOCHORD.
    num_models = 2 * num_chroma + 1
    # Global mean/covariance used for empty models.
    global_model = sklearn.mixture.GMM(n_components=1, 
                                       covariance_type='full')
    # Train a background model on all the data, regardless of label.
    global_model.fit(features)
    # Set up individual models for all chords.
    models = []
    for model_index in range(num_models):
        # labels contains one value in the range 0..24 for each row of 
        # features.
        true_example_rows = np.nonzero(labels == model_index)
        if true_example_rows:
            model = sklearn.mixture.GMM(n_components=1, 
                                        covariance_type='full')
            model.fit(features[true_example_rows])
            models.append(model)
        else:
            # No training data for this label, so substitute the 
            # background model.
            models.append(global_model)
    
    transitions, priors = estimate_transitions(labels, num_models)
    
    return models, transitions, priors

def viterbi_path(posteriors, transitions, priors):
    """Calculate Viterbi (best-cost) path through Markov model.
    
    Args:
      posteriors: np.array sized (num_frames, num_states) giving the 
        local-match posterior probability of being in state j at time i.
      transitions: np.array of (num_states, num_states).  For each row, 
        transitions(row, col) gives the probability of transitioning from
        state row to state col.
      priors: np.array of (num_states,) giving prior probability of 
        each state.    
    """
    num_frames, num_states = posteriors.shape
    traceback = np.zeros((num_frames, num_states), dtype=int)
    # Normalized best probability-to-date for each state.
    best_prob = priors * posteriors[0]
    best_prob /= np.sum(best_prob)
    for frame in range(1, num_frames):
        # Find most likely combination of previous prob-to-path, and 
        # transition.
        possible_transition_scores = (transitions * 
                                      np.outer(best_prob, posteriors[frame]))
        # The max is found for each destination state (column), so the max
        # is over all the possible preceding states (rows).
        traceback[frame] = np.argmax(possible_transition_scores, axis=0)
        best_prob = np.max(possible_transition_scores, axis=0)
        best_prob /= np.sum(best_prob)
    # Traceback from best final state to get best path.
    path = np.zeros(num_frames, dtype=int)
    path[-1] = np.argmax(best_prob)
    for frame in range(num_frames - 1, 0, -1):
        path[frame - 1] = traceback[frame, path[frame]]
    return path

def recognize_chords(chroma, models, transitions, priors):
    """Perform chord recognition on chroma feature matrix."""
    scores = np.array([model.score(chroma) for model in models])
    chords = viterbi_path(np.exp(scores.transpose()), transitions, priors)
    return chords, scores

In [3]:
train_list_filename = os.path.join(DATA_DIR, 'trainfilelist.txt')
train_ids = read_file_list(train_list_filename)
test_list_filename = os.path.join(DATA_DIR, 'testfilelist.txt')
test_ids = read_file_list(test_list_filename)

# Run the full set of training examples through the model training.
models, transitions, priors = train_chord_models(train_ids)
# Extract the means from each class's model to illustrate.
model_means = np.concatenate([model.means_ for model in models])
# Construct a list of names for each of the 25 classes.
all_chords = ['-', 'C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 
              'B', 'c', 'c#', 'd', 'd#', 'e', 'f', 'f#', 'g', 'g#', 'a', 'a#', 'b']


Training features shape: (82359, 12)


In [4]:
file_id = test_ids[2]
beat_times, chroma_features, label_indices = read_beat_chroma_labels(file_id)
hyp_chords, scores = recognize_chords(chroma_features, models, transitions, priors)


In [5]:
print(hyp_chords[:5])
print(hyp_chords.shape)
print(beat_times[:5])
print(scores[:5])

[0 0 3 3 3]
(637,)
[ 0.164  0.444  0.748  1.036  1.324]
[[  6.798089     7.47085408  -0.32822758 ...,  -6.63868351  -6.43495699
   -1.53859825]
 [  5.01088989   6.50111516  -4.75959906 ..., -14.08632203 -14.20738854
  -13.06187947]
 [  4.09268478   2.57464549  -5.00723223 ...,  -9.89702973  -8.82294678
   -4.36274636]
 [  2.9408196    2.18335322  11.40085556 ...,   7.9813119    7.30701032
   -3.51932142]
 [  2.90986612   5.17188858  -3.98970498 ..., -14.90898675 -15.54676788
  -20.58222363]]


In [6]:
%matplotlib inline

# Beat tracking example
from __future__ import print_function
import numpy as np, scipy, matplotlib.pyplot as plt, librosa
from IPython.display import Audio

# Load the example clip
y, sr = librosa.load('/home/py/projects/tmp/tmp.m4a')


In [7]:
# Separate harmonics and percussives into two waveforms
y_harmonic, y_percussive = librosa.effects.hpss(y)
# Beat track on the percussive signal
tempo, beat_frames = librosa.beat.beat_track(y=y_percussive, sr=sr)
# 4. Convert the frame indices of beat events into timestamps
beat_times = librosa.frames_to_time(beat_frames, sr=sr)
# Compute chroma features from the harmonic signal
chromagram = librosa.feature.chroma_cqt(y=y_harmonic, sr=sr)
# Aggregate chroma features between beat events
# We'll use the median value of each feature between beat frames
beat_chroma = librosa.feature.sync(chromagram, beat_frames, aggregate=np.median)


  return array(a, dtype, copy=False, order=order)


In [9]:
hyp_chords, scores = recognize_chords(beat_chroma.transpose(), models, transitions, priors)
print(beat_times.shape)
print(hyp_chords.shape)
print(beat_times[:10])
print(hyp_chords[:10])

(108,)
(109,)
[ 0.09287982  0.65015873  1.2306576   1.81115646  2.41487528  3.0185941
  3.64553288  4.2492517   4.85297052  5.45668934]
[ 0 22 22 22 22 22 22  8  8  8]
