This model uses CNNs to classify audio input into instrument classes.

General Procedure:
	- Use STFTs to transform audio input into spectral graphs
	- Use spectral graphs as "images" to classify into instrument classes

Data Format:
 	- Data is stored in tfrecord form obtained from the Nsynth dataset: https://magenta.tensorflow.org/datasets/nsynth

Based off of aymericdamien's TensorFlow examples: https://github.com/aymericdamien/TensorFlow-Examples

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
from tensorflow.contrib.framework.python.ops import audio_ops
import matplotlib.pyplot as plt
import numpy as np

In [8]:
# Training Parameters
LEARNING_RATE = 0.0001
NUM_STEPS = 2000
BATCH_SIZE = 32

# Network Parameters
NUM_INPUT = 16384   # spectrogram data input (img shape: 128*128)
NUM_CLASSES = 11    # total instrument classes
DROPOUT = 0.25      # Dropout, probability to drop a unit

# Data paths
TRAINING_DATA = "E:/NSynth/nsynth-train.tfrecord"
TEST_DATA = "E:/NSynth/nsynth-valid.tfrecord"
EVAL_DATA = "E:/NSynth/nsynth-test.tfrecord"
MODEL_PATH = "E:/NSynth/trained_models/"

# Data parameters
SAMPLE_RATE = 16000

In [3]:
# Test GPU presence
# If no errors thrown, GPU is being used
with tf.device('/gpu:0'):
    a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
    b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
    c = tf.matmul(a, b)

with tf.Session() as sess:
    print (sess.run(c))

[[22. 28.]
 [49. 64.]]


In [9]:
# Create the neural network
def conv_net(x_dict, n_classes, dropout, reuse, is_training):
    
    # Define a scope for reusing the variables
    with tf.variable_scope('ConvNet', reuse=reuse):
        # TF Estimator input is a dict, in case of multiple inputs
        x = x_dict['image']

        # Spectrogram data input is a 1-D vector of 16384 features (128*128 pixels)
        # Reshape to match picture format [Height x Width x Channel]
        # Tensor input become 4-D: [Batch Size, Height, Width, Channel]
        x = tf.reshape(x, shape=[-1, 128, 128, 1])

        # Convolution Layer with 32 filters and a kernel size of 5
        conv1 = tf.layers.conv2d(x, 32, 5, activation=tf.nn.relu)
        # Max Pooling (down-sampling) with strides of 2 and kernel size of 2
        conv1 = tf.layers.max_pooling2d(conv1, 2, 2)

        # Convolution Layer with 64 filters and a kernel size of 3
        conv2 = tf.layers.conv2d(conv1, 64, 3, activation=tf.nn.relu)
        # Max Pooling (down-sampling) with strides of 2 and kernel size of 2
        conv2 = tf.layers.max_pooling2d(conv2, 2, 2)

        # Flatten the data to a 1-D vector for the fully connected layer
        fc1 = tf.contrib.layers.flatten(conv2)

        # Fully connected layer (in tf contrib folder for now)
        fc1 = tf.layers.dense(fc1, 1024)
        # Apply Dropout (if is_training is False, dropout is not applied)
        fc1 = tf.layers.dropout(fc1, rate=dropout, training=is_training)

        # Output layer, class prediction
        out = tf.layers.dense(fc1, n_classes)

    return out

In [10]:
# Define the model function (following TF Estimator Template)
def model_fn(features, labels, mode):
    
    # Build the neural network
    # Because Dropout have different behavior at training and prediction time, we
    # need to create 2 distinct computation graphs that still share the same weights.
    logits_train = conv_net(features, NUM_CLASSES, DROPOUT, reuse=False, is_training=True)
    logits_test = conv_net(features, NUM_CLASSES, DROPOUT, reuse=True, is_training=False)
    
    # Predictions
    pred_classes = tf.argmax(logits_test, axis=1)
    pred_probas = tf.nn.softmax(logits_test)
    
    # If prediction mode, early return
    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode, predictions=pred_classes) 
        
    # Define loss and optimizer
#     loss_op = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
#         logits=logits_train, labels=tf.cast(labels, dtype=tf.int32)))
    print(labels)
    loss_op = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
        logits=logits_train, labels=tf.cast(labels, dtype=tf.int32)))
    optimizer = tf.train.AdamOptimizer(learning_rate=LEARNING_RATE)
    train_op = optimizer.minimize(loss_op, global_step=tf.train.get_global_step())
    
    # Evaluate the accuracy of the model
    acc_op = tf.metrics.accuracy(labels=labels, predictions=pred_classes)
    
    # TF Estimators requires to return a EstimatorSpec, that specify
    # the different ops for training, evaluating, ...
    estim_specs = tf.estimator.EstimatorSpec(
      mode=mode,
      predictions=pred_classes,
      loss=loss_op,
      train_op=train_op,
      eval_metric_ops={'accuracy': acc_op})

    return estim_specs

In [11]:
# Build the Estimator
model = tf.estimator.Estimator(model_fn, model_dir=MODEL_PATH)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': 'E:/NSynth/trained_models/', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x000002176A90A8D0>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


In [13]:
# Define the input function for training
# input_fn = tf.estimator.inputs.numpy_input_fn(
#     x={'images': mnist.train.images}, y=mnist.train.labels,
#     batch_size=BATCH_SIZE, num_epochs=None, shuffle=True)

# Function to parse the TFRecord
def _parse_(serialized_example):
    feature_list = {'note': tf.FixedLenSequenceFeature(shape=[], dtype=tf.int64, allow_missing=True),
                'note_str': tf.FixedLenSequenceFeature(shape=[], dtype=tf.string, allow_missing=True),
                'instrument': tf.FixedLenSequenceFeature(shape=[], dtype=tf.int64, allow_missing=True),
                'instrument_str': tf.FixedLenSequenceFeature(shape=[], dtype=tf.string, allow_missing=True),
                'pitch': tf.FixedLenSequenceFeature(shape=[], dtype=tf.int64, allow_missing=True),
                'velocity': tf.FixedLenSequenceFeature(shape=[], dtype=tf.int64, allow_missing=True),
                'sample_rate': tf.FixedLenSequenceFeature(shape=[], dtype=tf.int64, allow_missing=True),
                'audio': tf.FixedLenSequenceFeature(shape=[1], dtype=tf.float32, allow_missing=True),
                'qualities': tf.FixedLenSequenceFeature(shape=[1], dtype=tf.int64, allow_missing=True),
                'qualities_str': tf.FixedLenSequenceFeature(shape=[1], dtype=tf.string, allow_missing=True),
                'instrument_family': tf.FixedLenSequenceFeature(shape=[], dtype=tf.int64, allow_missing=True),
                'instrument_family_str': tf.FixedLenSequenceFeature(shape=[], dtype=tf.string, allow_missing=True),
                'instrument_source': tf.FixedLenSequenceFeature(shape=[], dtype=tf.int64, allow_missing=True),
                'instrument_source_str': tf.FixedLenSequenceFeature(shape=[], dtype=tf.string, allow_missing=True)}
    
    # Extract example by features
    example = tf.parse_single_example(serialized_example, feature_list)
    
    # Convert audio data to normalized spectrogram
#     waveform = audio_ops.decode_wav(example["audio"], desired_channels=1)
#     expanded_audio = tf.expand_dims(example["audio"], -1)
    spectrogram = audio_ops.audio_spectrogram(example["audio"], window_size=1024, stride=64)
    min_const = tf.constant(255.)
    minned_spectrogram = tf.minimum(spectrogram, min_const)
    expanded = tf.expand_dims(minned_spectrogram, -1)
    resized = tf.image.resize_bilinear(expanded, [128, 128])
    squeezed = tf.squeeze(resized, 0)
    flipped = tf.image.flip_left_right(squeezed)
    normalized_spectrogram = tf.image.transpose_image(flipped)
    
    # Cast data to input format required by model
#     image = tf.decode_raw(normalized_spectrogram, tf.int64) #remember to parse in int64. float will raise error
    image = normalized_spectrogram
    label = tf.squeeze(tf.cast(example['instrument_family'],tf.int64))
    return (dict({'image':image}),label)

# Define the data input function for training
def tfrecord_train_input_fn(batch_size=32):
    tfrecord_dataset = tf.data.TFRecordDataset(TRAINING_DATA)
    tfrecord_dataset = tfrecord_dataset.map(lambda   x:_parse_(x)).shuffle(True).batch(batch_size)
    tfrecord_iterator = tfrecord_dataset.make_one_shot_iterator()
    
    return tfrecord_iterator.get_next()

# Train the Model
model.train(tfrecord_train_input_fn, steps=NUM_STEPS)

Tensor("Squeeze_1:0", dtype=int64, device=/device:CPU:0)
INFO:tensorflow:Calling model_fn.
Tensor("IteratorGetNext:1", dtype=int64, device=/device:CPU:0)
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from E:/NSynth/trained_models/model.ckpt-0
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into E:/NSynth/trained_models/model.ckpt.
INFO:tensorflow:loss = 2.9561145, step = 0
INFO:tensorflow:global_step/sec: 2.03981
INFO:tensorflow:loss = 0.0, step = 100 (49.024 sec)


KeyboardInterrupt: 

In [None]:
# Evaluate the Model
# Define the input function for evaluating
input_fn = tf.estimator.inputs.numpy_input_fn(
    x={'images': mnist.test.images}, y=mnist.test.labels,
    batch_size=BATCH_SIZE, shuffle=False)
# Use the Estimator 'evaluate' method
model.evaluate(input_fn)

In [None]:
# Predict single images
n_images = 4
# Get images from test set
test_images = mnist.test.images[:n_images]
# Prepare the input data
input_fn = tf.estimator.inputs.numpy_input_fn(
    x={'images': test_images}, shuffle=False)
# Use the model to predict the images class
preds = list(model.predict(input_fn))

# Display
for i in range(n_images):
    plt.imshow(np.reshape(test_images[i], [28, 28]), cmap='gray')
    plt.show()
    print("Model prediction:", preds[i])