In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, initializers
import tensorflow_probability as tfp

FFT_WIDTH = 80
input_shape = (FFT_WIDTH,3)

In [None]:
#@keras_export('keras.layers.MultichannelInterpolate')
class MultichannelInterpolate(keras.layers.Layer):
    # TODO: Allow user to specify gaussian windows
    def __init__(self, harmonics):
        super(MultichannelInterpolate, self).__init__(trainable=False)
        self.harmonics = harmonics
        self.built = False

    def build(self, input_shape):
        dummy, self.clefs, self.buckets = input_shape
        super(MultichannelInterpolate, self).build(input_shape)

    def call(self, inputs, **kwargs):
        try:
            assert inputs.shape == (None, self.clefs, self.buckets)
        except:
            print("shape=(%d, %d, %d)" % (self.clefs, self.buckets, self.harmonics) + " inputs.shape=%s" % str(inputs.shape))
            #raise
        #inputs = tf.squeeze(inputs)
        inputs = tf.pad(inputs, tf.constant([[0,0],[0,0],[0,1]]), "CONSTANT") # Add padding to prevent interpolation errors
        inputs = inputs[..., tf.newaxis]                                # and a dummy 3rd dimension, so we can resize it like an image

        from scipy.signal.windows import gaussian
        from scipy.interpolate import interp1d
        import numpy as np

        windows = np.asarray([gaussian(2*self.buckets, 30)[self.buckets:], gaussian(self.buckets, 20), gaussian(2*self.buckets, 30)[:self.buckets]])
        windows = np.transpose(windows)

        output_list = []
        outputs = tf.zeros((self.harmonics,self.buckets), dtype=tf.float32)

        x = np.arange(0,self.buckets+1,1)
        #interpolator = interp1d(x,inputs)

        for h in range(self.harmonics):
            clef_multiplier = tf.constant(windows[h,:], dtype=tf.float32, shape=[1,3])

            interp_factor = 10.0 / (h+1)
            load = tf.image.resize(inputs, [self.clefs,(h+1)*8], method='bilinear', antialias=True, )[...,0]
            #load = interpolator(np.arange(0,80,interp_factor))[:,1:81]
            load = tf.linalg.matmul(clef_multiplier, load)

            try:
                if load.shape[2] < self.buckets:
                    load = tf.pad(load[:,0,:], tf.constant([[0,0],[0,self.buckets-load.shape[2]]]))
                else:
                    load = load[:,0,:self.buckets]
            except:
                print("h=%d, load=%s, clef_multiplier=%s, inputs=%s" % (h, load, clef_multiplier, inputs))
                raise

            output_list.append(load)
                #outputs[h,:len(load)] += load

        output = tf.stack(output_list)
        return tf.reshape(output, [-1, self.harmonics, self.buckets])

    def compute_output_shape(self, input_shape):
        return (self.harmonics, self.buckets)

In [None]:
#calibrations = layers.Input(shape=input_shape)                      # Input is (freq, clef)

calibrations = layers.Input(shape=(3, FFT_WIDTH), name="calibrations")     # Have these fixed for now
norm_calibrations = layers.LayerNormalization(axis=[1,2])(calibrations)
x = MultichannelInterpolate(48)
x1 = x(norm_calibrations)

inputs = layers.Input(shape=(FFT_WIDTH), name="main_input")                       # Input is (freq)
norm_inputs = layers.LayerNormalization(axis=[1])(inputs)

harmonic_extraction = layers.Dot(axes=(2,1))([x1, norm_inputs])                         # Output is (bucket)

input_direct = layers.Dense(48, activation='relu')(inputs)
# pool = layers.AveragePooling1D(pool_size = FFT_WIDTH,         # Output is (scaled freq)
#         data_format="channels_first")(x2)

expanded = layers.Reshape((FFT_WIDTH, -1))(inputs)
c1 = layers.Conv1D(6, 5, strides=2, padding='same')(expanded)
c2 = layers.Conv1D(12, 5, strides=2, padding='same')(c1)
c3 = layers.Conv1D(24, 5, strides=2, padding='same')(c2)
c4 = layers.Conv1D(48, 5, strides=2, padding='same')(c3)
pool = layers.MaxPooling1D(5)(c4)
flatten_conv = layers.Flatten()(pool)

merge = layers.Concatenate()([harmonic_extraction, flatten_conv, input_direct])
flat = layers.Flatten()(merge)

out = layers.Dense(48, activation='sigmoid')(flat)

model = keras.Model(inputs=[calibrations, inputs], outputs=out)
model.compile(loss=tf.keras.losses.BinaryCrossentropy(), optimizer="adam")
model.summary()

In [None]:
import tensorflow as tf
import numpy as np
from math import floor
import random
import os
from sklearn.model_selection import train_test_split

frames_filename = "automated_guitar_nonorm_inputs.npy"
labels_filename = "automated_guitar_nonorm_labels.npy"
frames = np.load(frames_filename)
labels = np.load(labels_filename)

assert(labels.shape[0] == frames.shape[0])

# # Add other files from other instruments or playstyles
# frames = np.concatenate((frames, np.load("dane_take1_inputs.npy")), axis=0)
# labels = np.concatenate((labels, np.load("dane_take1_labels.npy")), axis=0)
# frames = np.concatenate((frames, np.load("acoustic_oren_pick_take1_inputs.npy")), axis=0)
# labels = np.concatenate((labels, np.load("acoustic_oren_pick_take1_labels.npy")), axis=0)

# Shuffle it
data_size = frames.shape[0]
rind = list(range(data_size))
random.shuffle(rind)
frames = np.asarray([frames[i,:,:80] for i in rind])
labels = np.asarray([labels[i,:48] for i in rind])

frames_train, frames_test, labels_train, labels_test = train_test_split(frames, labels, test_size=0.2, random_state=1)
frames_train, frames_val, labels_train, labels_val = train_test_split(frames_train, labels_train, test_size=0.25, random_state=1)

checkpoint_path = "training_1/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, 
    verbose=1, 
    save_weights_only=True)

model.save_weights(checkpoint_path.format(epoch=0))

model.fit([frames_train[:,1:,:], frames_train[:,0,:]], labels_train, epochs=300, validation_data=([frames_val[:,1:], frames_val[:,0]], labels_val), callbacks=[cp_callback])


In [None]:
from matplotlib import pyplot as plt
from tensorflow.keras import backend as K
import time

model.load_weights("training_1/cp-0087.ckpt")

guesses = 0
false_positives = 0
total_labels = 0
false_negatives = 0
i = 0
for l, f in zip(labels_test, frames_test):
    before = time.time()
    inputs = np.expand_dims(f[0,:], axis=0)
    calibs = np.expand_dims(f[1:,:], axis=0)
    p = model.predict([calibs, inputs])[0]
    #print("Time: %f" % (time.time() - before))

    # func = K.function([model.layers[5].input, model.layers[0].input], model.layers[9].output)
    # dot_out = func([calibs, inputs])

    conf = max(p)
    pred = np.where(p > 0.8)[0]
    correct = np.where(l == 1)[0]

    false_positives += len([x for x in pred if x not in correct])
    false_negatives += len([x for x in correct if x not in pred])
    guesses += len(pred)
    total_labels += len(correct)

    if i < 10:
        plt.plot(inputs[0,:])
        plt.title(correct)
        plt.show()
        plt.plot(np.transpose(calibs[0]))
        plt.show()
        plt.bar(np.arange(len(p)), p)
        plt.title(pred)
        plt.show()
    i+=1

print("Detection rate: " + str(1 - false_negatives / total_labels))
print("False positive rate: " + str(false_positives / guesses))

In [None]:
model.save("tf_model")
model.summary()