In [0]:
#libs for setup training set
import numpy as np

import matplotlib.pyplot as plt
import os
from IPython.display import Audio
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras

In [0]:

def DFTSignal(sig,harmonic_num):
    AUD = np.fft.rfft(sig,harmonic_num)
    AUD = np.absolute(AUD)
    
    #get k largest elements' index
    # K = int(harmonic_num/150)
    RATIO = 30

    # k_large_ids = np.argpartition(AUD,-K)[-K:]
    # k_large_ids = k_large_ids.astype(float)+1
    # k_large_ids = k_large_ids*(np.max(AUD)/np.min(k_large_ids))*RATIO

    AUD = np.append(AUD,np.max(AUD)/np.where(AUD == np.max(AUD)) * RATIO)

    # AUD = normalize(AUD)
    return AUD




#set up input
'''
Function: setup the input signal to return its FFT arrays
@param input: input audio array
@param fs: the sampling rate of the input signal
@return input_FFTs: output array of FFTs
'''
def setupInput(input,fs,harmonic_num,tru):
    #first get the absolute time duration of the input
#     input = getSteadyState(input)
    input_FFTs = []
    print("input size", input.shape)
    
    #cut the signal into pieces of tru
    i = 0
    if len(input)/fs < tru:
        in_piece = input
        # in_piece = getSteadyState(in_piece)
        AUD = DFTSignal(in_piece,harmonic_num)
        input_FFTs.append(AUD)
        
    else:
        while (i+tru)*fs < len(input):
            i_b = int(i*fs)
            i_n = int((i+tru)*fs)
            in_piece = input[i_b:i_n]
            # in_piece = getSteadyState(in_piece)
            AUD = DFTSignal(in_piece,harmonic_num)
            input_FFTs.append(AUD)
            i = i+tru
    return input_FFTs




def setupExample(path, dir, instru_types,harmonic_num):
    #get all time-domain signals
    DFTs = np.array([])
    labels = np.array([])
    for file in path:
        if file.endswith(".wav"):
            for type in instru_types:
                if file.startswith(type):
                    print("loading in " + file)
                    #set example
                    audio = np.array([])
                    fs,audio = wavfile.read(dir+file)
                    if audio.ndim > 1:
                        audio = audio[:,0]
                    # audio = getSteadyState(audio)

                    AUDs = setupInput(audio,fs,harmonic_num,0.5)
                    for AUD in AUDs:
                        nan_flag = False
                        for val in AUD:
                            if np.isnan(val) == True:
                                nan_flag = True
                                print("a nan array")
                                break
                        if nan_flag == False:
                            if DFTs.size == 0:
                                DFTs = np.append(DFTs,AUD)
                            else:
                                DFTs = np.vstack((DFTs,AUD))
                            #set label
                            label = np.zeros((instru_types.size))
                            # print("instru_types.size: ", instru_types.size, "label.size: ", label.size)
                            label[np.where(instru_types == type)] = 1
                            if labels.size == 0:
                                labels = np.append(labels,label)
                            else:
                                labels = np.vstack((labels,label))
                            # print("array is ",AUD)
                            print("labeled ",label)     

    training_set = (DFTs,labels)
    labels_dim = instru_types.size

    return training_set, labels_dim, instru_types





'''
Function: setup training set
File dependency: ../instru_types.txt: types of instruments in the training set
                 ../each_category.txt: document each category corresponds to 
                                       each type in instru_types.txt 
'''
def preTrainSetup(harmonic_num):
    #get the current workspace's abs path
    work_dir = os.getcwd()
    print("workspace: " + work_dir)
    #get the path for training set & trait set
    snds_path = os.listdir(work_dir + '/instru_snds')
    valid_path = os.listdir(work_dir + '/valid_snds')


    #get the types of instruments
    for file in snds_path:
        if file.startswith("instru_types"):
            print("loading in: " + file)
            instru_types = np.array([])
            f = open('instru_snds/' + file, "r")
            for x in f:
                if x.endswith('\n'):
                    instru_types = np.append(instru_types,x.rstrip('\n'))
                else:
                    instru_types = np.append(instru_types,x)
    f.close()
    print("types include", instru_types)

    train_set, labels_dim, instru_types = setupExample(snds_path,"instru_snds/",instru_types,harmonic_num)
    valid_set, labels_dim, instru_types = setupExample(valid_path,"valid_snds/",instru_types,harmonic_num)


    return train_set, valid_set, labels_dim, instru_types

# preTrainSetup(500)


In [0]:
def trainNeualNet(feature_dim,label_dim,train_set,valid_set):
    #build the neural network
    from tensorflow.keras import layers
    from tensorflow.keras import optimizers
    #initialize model
    i = 0
    while pow(2,i) <= feature_dim:
        i+=1
    unit_num = pow(2,i)
    print("unit_num: ",unit_num)
    
    model = keras.Sequential([
        layers.Dense(feature_dim, input_shape=(feature_dim,), activation="tanh"),
        layers.Dense(feature_dim, activation="relu"),
        layers.Dense(feature_dim, activation="relu"),
        layers.Dense(label_dim, activation="linear")
    ])

    model.compile(
        optimizer=optimizers.Adadelta(),
        loss="categorical_hinge",
        metrics=["accuracy","mae"]
    )
    
    (x_train,y_train) = train_set
    (x_valid,y_valid) = valid_set
    BATCHSIZE = int(x_train.size/feature_dim/1.5)
    print("batch_size: ",BATCHSIZE)
    hist = model.fit(
        x_train,
        y_train,
        epochs=2500, batch_size=BATCHSIZE,
        validation_data=(x_valid,y_valid)
    )
    
    model.summary()

    plt.figure()
    plt.plot(hist.history['acc'])
    plt.plot(hist.history['val_acc'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    plt.show()
    # Plot training & validation loss values
    plt.plot(hist.history['loss'])
    plt.plot(hist.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')

    return model


def testNeuralNet(model,TESTs,labels_dim,feature_dim):
    voted_result = np.zeros(labels_dim)
    for TEST in TESTs:
        TEST = TEST.tolist()
        TEST = np.array([TEST])
        #don't know why quick fix
        if TEST.shape[1] != feature_dim:
            print("wrong shape: "+str(TEST.shape[1]))
            continue
        result = model.predict(TEST)

        nan_flag = False
        for val in result[0]:
            if np.isnan(val) == True:
                nan_flag = True
                break
        if nan_flag == True:
            print("nan result")
            continue

        idx_tuple = np.where(result[0] == np.max(result))
        idx = idx_tuple[0][0]
        voted_result[idx]+=1
        if idx == 2:
            print("predict Piano")
        elif idx == 1:
            print("predict Guitar")
        elif idx == 0:
            print("predict flute")
    
    return voted_result

In [21]:
def main():
    HARMONIC_NUM = 2000
    train_set, valid_set, labels_dim, instru_types = preTrainSetup(HARMONIC_NUM)
    print("pretrain finished")
    print("------------------------------")
    # print("labels_dim:", labels_dim, "y_train's shape", y_train.shape)

    model = trainNeualNet(len(DFTSignal(np.ones(HARMONIC_NUM),HARMONIC_NUM)), labels_dim ,train_set, valid_set)
    # model.save("model_featuredim_relux2_tanhx2_mse2.h5")
    
    model = keras.models.load_model('model_featuredim_relux2_tanhx2_mse2.h5')

    #get the current workspace's abs path
    work_dir = os.getcwd()
    print("workspace: " + work_dir)
    #get the path for training set & trait set
    tests_path = os.listdir(work_dir + '/tests')
    for file in tests_path:
        if file.endswith(".wav"):
            print("testing",file)
            audio = np.array([])
            fs,audio = wavfile.read("tests/"+file)
            if audio.ndim > 1:
                audio = audio[:,0]
            TESTs = setupInput(audio,fs,HARMONIC_NUM,0.5)
            result = testNeuralNet(model,TESTs,labels_dim,len(DFTSignal(np.ones(HARMONIC_NUM),HARMONIC_NUM)) )
            print("result: ", result)
            nan_flag = False
            for val in result:
                if np.isnan(val) == True:
                    nan_flag = True
                    print("a nan array")
                    break
            if nan_flag == True:
                print("nan result")
                print("================================================")
                continue
            idx_tuple = np.where(result == np.max(result))
            idx = idx_tuple[0][0]
            if idx == 2 and file.startswith("Piano"):
                print("Piano right")
            elif idx == 1 and file.startswith("Guitar"):
                print("Guitar right")
            elif idx == 0 and file.startswith("flute"):
                print("flute right")
            else:
                print("wrong")
            print("================================================")



if __name__ == "__main__":
    main()

.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
a nan array
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
a nan array
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 1. 0.]
labeled  [0. 

KeyboardInterrupt: 

In [0]:
#########################################
#########################################

# x = np.array([-pow(i,2)+10000 for i in range(-10,10)])
fs,x = wavfile.read("test_dft.wav")
if x.ndim > 1:
    x = x[:,0]
X = DFTSignal(x,2000)
X_ori = abs(np.fft.rfft(x,2000))

# plt.figure()
# plt.plot(x)
plt.figure()
plt.plot(X)
# plt.figure()
# plt.plot(X_ori,"g")




In [0]:
import numpy as np
#toy array
# x_train = np.random.random((20,5))*10
x_train = np.zeros((100,5))
for i in range(99):
    if i%2 != 0:
        x_train[i] = np.ones((1,5))
    elif i%3 == 0:
        x_train[i] = np.array([2.,2.,2.,2.,2.])

x_train = np.asarray(x_train)
y_train = np.array([[i%3,i%2,i%1]for i in range(20)])
y_train = np.zeros((100,3))
for i in range(99):
    if i%2 != 0:
        y_train[i] = np.ones((1,3))
    elif i%3 == 0:
        y_train[i] = np.array([2.,2.,2.])
print("x_train(5x20): \n",x_train.shape,"\n",x_train) 
print("x: ",x_train[1].shape)
print("y_train(1x20): \n",y_train.shape, "\n",y_train) 
print("y: ",y_train[1].shape)


from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
from tensorflow import keras

#set up the model
from tensorflow.keras import layers

model = keras.Sequential([
    layers.Dense(5,input_shape=(5,)),
    layers.Dense(10,activation="sigmoid"),
    layers.Dense(3)
])

model.compile(optimizer=tf.keras.optimizers.Adam(0.01),
              loss='mse',
              metrics=['mae'])

model.fit(x_train, y_train, epochs=100, batch_size=10)

model.summary()

x_test = np.ones((1,5))
print("x_test: \n",x_test)
print(x_test.shape)
result = model.predict(x_test)
print("result: ",result)

In [0]:
  # #get all time-domain signals
    # DFTs = np.array([])
    # labels = np.array([])
    # for file in snds_path:
    #     if file.endswith(".wav"):
    #         for type in instru_types:
    #             if file.startswith(type):
    #                 if file.endswith("train_long.wav"):
    #                     print("loading in " + file)
    #                     #set example
    #                     audio = np.array([])
    #                     fs,audio = wavfile.read("instru_snds/" + file)
    #                     if audio.ndim > 1:
    #                         audio = audio[:,0]
    #                     # audio = getSteadyState(audio)

    #                     print("loading in long file" + file)
    #                     AUDs = setupInput(audio,fs,harmonic_num,0.5)
    #                     for AUD in AUDs:
    #                         nan_flag = False
    #                         for val in AUD:
    #                             if np.isnan(val) == True:
    #                                 nan_flag = True
    #                                 print("a nan array")
    #                                 break
    #                         if nan_flag == False:
    #                             if DFTs.size == 0:
    #                                 DFTs = np.append(DFTs,AUD)
    #                             else:
    #                                 DFTs = np.vstack((DFTs,AUD))
    #                             #set label
    #                             label = np.zeros((instru_types.size))
    #                             # print("instru_types.size: ", instru_types.size, "label.size: ", label.size)
    #                             label[np.where(instru_types == type)] = 1
    #                             if labels.size == 0:
    #                                 labels = np.append(labels,label)
    #                             else:
    #                                 labels = np.vstack((labels,label))
    #                             # print("array is ",AUD)
    #                             print("labeled ",label)                            

                    # elif file.startswith("rfl"):
                    #     print("loading in " + file)
                    #     #set example
                    #     audio = np.array([])
                    #     fs,audio = wavfile.read("instru_snds/" + file)
                    #     if audio.ndim > 1:
                    #         audio = audio[:,0]
                    #     audio = getSteadyState(audio)

                    #     AUD = DFTSignal(audio,harmonic_num)
                    #     nan_flag = False
                    #     for val in AUD:
                    #         if np.isnan(val) == True:
                    #             nan_flag = True
                    #             print("a nan array")
                    #             break
                    #     if nan_flag == False:
                    #         if DFTs.size == 0:
                    #             DFTs = np.append(DFTs,AUD)
                    #         else:
                    #             DFTs = np.vstack((DFTs,AUD))
                    #         #set label
                    #         label = np.zeros((instru_types.size))
                    #         # print("instru_types.size: ", instru_types.size, "label.size: ", label.size)
                    #         label[np.where(instru_types == type)] = 1
                    #         if labels.size == 0:
                    #             labels = np.append(labels,label)
                    #         else:
                    #             labels = np.vstack((labels,label))
                    #         print("labeled ",label)
    # print("\n", DFTs.shape,labels.shape,DFTs.size)
    # print("labels: \n", labels)

    # training_set = (DFTs,labels)
    # labels_dim = instru_types.size




In [0]:
'''
Function: normalize the input list
@param arr: input list
@return arr: output list
'''
def normalize(arr):
    #get the summation of all elements
    sum = 0
    for i in range(len(arr)):
        sum = sum + arr[i]
        
    #iteratively normalize
    #make whole arr sum to 100
    for i in range(len(arr)):
        arr[i] = arr[i]/sum * 100   
    return arr


'''
Function: get steady state a time-domain signal
@param sig: input time-domain signal
@return sts_sig: the stead-state part of the input
'''
def getSteadyState(sig):
    #first get the amplitude of the signal
    all_sig = sig * sig
    #first get the average amplitude
    avg = sum(all_sig)/len(all_sig)
    #find the part below avgerage from the back
    rev_sig = all_sig[::-1]
    back_idx = 0
    for i in range(0,len(rev_sig)):
        if rev_sig[i] > avg:
            back_idx = i
            break
    back_idx = len(rev_sig) - back_idx
    #find the part below avgerage from the front
    front_idx = 0
    for i in range(0,len(all_sig)):
        if all_sig[i] > avg:
            front_idx = i
            break
   
    #get the steady-state signal
    sts_sig = sig[front_idx : back_idx]
#     print("sts's length: ", sts_sig.shape, "original's length ",sig.shape)
    
    return sts_sig


def getEnergyTrait(arr):
    trait = np.var(arr)
    return trait