In [3]:
%matplotlib inline

from __future__ import division, print_function

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

import SHS_data
import util
import paired_data

reload(paired_data);

## Learning cover song fingerprints

### Data

#### load training data

In [None]:
# train, test, validation split
ratio = (50,20,30)
clique_dict, _ = SHS_data.read_cliques()
train_cliques, test_cliques_big, _ = util.split_train_test_validation(clique_dict, ratio=ratio)

# preload training data to memory (just about doable)
print('Preloading training data...')
train_uris = util.uris_from_clique_dict(train_cliques)
chroma_dict = SHS_data.preload_chroma(train_uris)

# make a training dataset of cover and non-cover pairs of songs
print('Preparing training dataset...')
n_patches, patch_len = 8, 64
X_A, X_B, Y, pair_uris = paired_data.dataset_of_pairs(train_cliques, chroma_dict,
                                                             n_patches=n_patches, patch_len=patch_len)
print('    Training set:', X_A.shape, X_B.shape, Y.shape)

#### load test data

for now, load just a small part of the test set that we'll evaluate at every iteration, e.g., a few times batch size

In [None]:
# pick a test subset
n_test_cliques = 50  # e.g., 50 ~ small actual datasets
test_cliques = {uri: test_cliques_big[uri] for uri in test_cliques_big.keys()[:n_test_cliques]}

# preload test data to memory (just about doable)
print('Preloading test data...')
test_uris = util.uris_from_clique_dict(test_cliques)
chroma_dict_T = SHS_data.preload_chroma(test_uris)

# make a test dataset of cover and non-cover pairs of songs
print('Preparing test dataset...')
X_A_T, X_B_T, Y_T, test_pair_uris_T = paired_data.dataset_of_pairs(test_cliques, chroma_dict_T,
                                                             n_patches=n_patches, patch_len=patch_len)
print('    Test set:', X_A_T.shape, X_B_T.shape, Y_T.shape)

### Network

#### input

In [None]:
input_len = n_patches * patch_len

x_A = tf.placeholder("float", shape=[None, input_len, 12])
x_B = tf.placeholder("float", shape=[None, input_len, 12])
y_ = tf.placeholder("float", shape=[None,])

x_image_A = tf.reshape(x_A, [-1, input_len, 12, 1])
x_image_B = tf.reshape(x_B, [-1, input_len, 12, 1])
y_ = tf.reshape(y_, [-1, 1])

#### the rest

In [None]:
list_of_layers = []

# (512,12,1) > (128,1,32)
list_of_layers.append((conv_layer, {'shape' : (1,12), 'n_filters' : 32, 'padding' : 'VALID'}))
list_of_layers.append((max_pool_layer, {'shape' : (4,1)}))

# (128,1,32) > (8,1,64) 
list_of_layers.append((conv_layer, {'shape' : (2,1)}))
list_of_layers.append((max_pool_layer, {'shape' : (16,1)}))

# (8,1,64) > (128)
list_of_layers.append((fully_connected_layer, {'n_nodes' : 128}))

net_A, net_B = build_butterfly_network(x_A, x_B, list_of_layers)

### Training

#### objective function

Minize pair distances while maximizing non-pair distances smaller than `m`

Following [1].

1. Raffel, C., & Ellis, D. P. W. (2015). Large-Scale Content-Based Matching of Midi and Audio Files. Proceedings of the 16th International Society for Music Information Retrieval Conference (ISMIR), 234–240.

In [105]:
alpha = 1
m = 10 # e.g., sqrt(256)

squared_errors = tf.reduce_sum(tf.square(out_I_A - out_I_B), reduction_indices=1, keep_dims=True)
pair_loss = tf.reduce_mean(y_ * squared_errors, name='pair_loss')
non_pair_loss = tf.reduce_mean((1 - y_) * tf.square(tf.maximum(0.0, m - tf.sqrt(squared_errors))),
                               name='non_pair_loss')

loss_function = pair_loss + (alpha * non_pair_loss)
loss_float = tf.cast(loss_function, "float", name='loss')

# optimizer
learning_rate = tf.placeholder(tf.float32, shape=[])
train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss_function)
# train_step = tf.train.RMSPropOptimizer(learning_rate, decay=0.9).minimize(loss_function)

NameError: name 'out_I_A' is not defined

In [103]:
n_epoques = 10 # 1250 ~ 5 x training set of 50% of all data



In [108]:
learning_rate = tf.placeholder(tf.float32, shape=[])

AttributeError: can't set attribute

#### reporting

bundle all logging and printing into a function

In [None]:
def report(step, batch, train_log):
    
    if train_log is None:
        n_epoques = 2500
        metrics = ['train_error', 'test_error', 'train_bhatt', 'test_bhatt',
                   'pair_loss', 'non_pair_loss', 'd_pairs', 'd_non_pairs']
        train_log = pd.DataFrame(data = np.zeros((n_epoques, len(metrics))),
                                 columns=metrics)
    
    # train and test feeds
    train_feed = {x_A:batch[0], x_B:batch[1], y_: batch[2]}
    test_feed = {x_A:X_A_T, x_B:X_B_T, y_: Y_T}
    
    # compute and log train metrics
    train_metrics = sess.run([loss_float, pair_loss, non_pair_loss, squared_errors],
                             feed_dict=train_feed)
    train_log.loc[step, ['train_error', 'pair_loss', 'non_pair_loss']] = train_metrics[:-1]
    more_train_metrics = approx_bhattacharyya(train_metrics[-1], train_feed[y_])
    train_log.loc[step, ['train_bhatt', 'd_pairs', 'd_non_pairs']] = more_train_metrics
    
    # compute and log test metrics
    train_log.loc[step, ['test_error']], test_squared_errors = sess.run([loss_float, squared_errors],
                                                                        feed_dict=test_feed)
    train_log.loc[step, ['test_bhatt']], _, _ = approx_bhattacharyya(test_squared_errors,
                                                                     test_feed[y_])

    # print some of the metrics
    print(train_log[['train_error', 'test_error', 'train_bhatt',
                     'test_bhatt', 'd_pairs', 'd_non_pairs']][step:])
    
    return train_log

#### init session

In [None]:
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())

#### train

In [None]:
lr = 1e-4
n_epoques

metrics = ['loss', 'paid_loss', 'non_pair_loss']
train_log = report(n_epoques, )
train_batches = get_batches([X_A, X_B, Y], batch_size=128)
test_batch = [X_A_T, X_B_T, Y_T]
for step in range(n_epoques):  
    train_batch = next(train_batches)
    if step%10 == 0:
        train_log = report(train_log, step, train_batch=train_batch,
                           test_batch=test_batch, metrics=metrics)
    train_feed = {x_A:train_batch[0], x_B:train_batch[1],
                  y_: train_batch[2], learning_rate: lr}
    train_step.run(feed_dict=train_feed)
train_log = report(train_log, '[end]', train_batch=batch,
                   test_batch=test_batch, metrics=metrics)

#### plot loss function 

plot loss functions for train and test data.

Note that training loss fluctuates much more as it's computed for a different batch at every step, while test error is computed for the same (slightly larger) subset at every step.

In [None]:
plt.figure(figsize=(10,10))

plt.subplot(311)
plt.plot(train_error_log);
plt.plot(error_pairs_log, color='g');
plt.plot(error_non_pairs_log, color='r');
plt.plot(test_error_log, color='k');
plt.title('train (b) and test(k) loss function with train pairs (g) vs non-pairs (r) components');

plt.subplot(312)
plt.plot(d_pairs_log, color='g');
plt.plot(d_non_pairs_log, color='r');
plt.title('average distance, train pairs (g) vs non-pairs (r)');

plt.subplot(313)
plt.plot(np.log(train_bhatt_log));
plt.plot(np.log(test_bhatt_log), 'k');
plt.title('bhattacharyya distance train (b) and test (k)');

#### train distances

In [None]:
pair_dists = np.sqrt(squared_dists_log[-1][np.where(labels_log[-1]==1)])
non_pair_dists = np.sqrt(squared_dists_log[-1][np.where(labels_log[-1]==0)])
L1 = pair_loss.eval(feed_dict={x_A:batch[0], x_B:batch[1], y_:batch[2]})
L2 = non_pair_loss.eval(feed_dict={x_A:batch[0], x_B:batch[1], y_:batch[2]})

bins = np.arange(0,10,0.4)
plt.figure(figsize=(15,5))
plt.subplot(121)
plt.hist(non_pair_dists, bins=bins, alpha=0.5);
plt.hist(pair_dists, bins=bins, color='r', alpha=0.5);
plt.subplot(143)
plt.boxplot([non_pair_dists, pair_dists]);

print('bhatt =', approx_bhattacharyya(squared_dists_log[-1], labels_log[-1]))

#### test distances

In [None]:
test_squared_dists = squared_errors.eval(feed_dict={x_A:X_A_T, x_B:X_B_T})
test_squared_dists = np.sum(test_squared_dists, axis=1)

test_pair_dists = np.sqrt(test_squared_dists[np.where(Y_T==1)[0]])
test_non_pair_dists = np.sqrt(test_squared_dists[np.where(Y_T==0)[0]])

bins = np.arange(0,10,0.4)
plt.figure()
plt.hist(test_non_pair_dists, bins=bins, alpha=0.5);
plt.hist(test_pair_dists, bins=bins, color='r', alpha=0.5);

print('bhatt =', approx_bhattacharyya(test_squared_dists, Y_T.flatten()))

### Test

In [None]:
# # subset
# test_subset = {uri: test_cliques[uri] for uri in test_cliques.keys()[:200]}

# print('Preloading chroma...')
# test_uris = util.uris_from_clique_dict(test_subset)
# chroma_dict_T = SHS_data.preload_chroma(test_uris)

# print('Collecting test dataset...')
# X_A_T, X_B_T, is_cover_T, test_pair_uris_T = paired_data.dataset_of_pairs(test_subset, chroma_dict_T,
#                                                              n_patches=n_patches, patch_len=patch_len)
# print(X_A_T.shape, X_B_T.shape, is_cover_T.shape)

# Y_T = is_cover_T.astype(float)

In [None]:
def build_butterfly_network(x_A, x_B, [list_of_layers]):
    network_A = [x_A]
    network_B = [x_B]
    for layer in list_of_layers:
        network_A.append(layer['type'](network_A[-1], *layer['params']))
        network_B.append(layer['type'](network_B[-1], *layer['params']))
    return network_A, network_B

list_of_layers = []
list_of_layers.append(conv_bins, [])