In [2]:
import os

from python_speech_features import mfcc
from python_speech_features import delta
from python_speech_features import logfbank
import scipy.io.wavfile as wav
from random import randint
import librosa

In [3]:
no_of_features = 13
no_of_frames = 10

no_of_columns = ((3 * no_of_features) + 40) * no_of_frames
stats_columns = (3 * no_of_features) + 40

def get_feature_vectors(dataset_type):
    
    #set parameters for training and testing
    if (dataset_type == "train"):
        directory = os.path.join(os.getcwd(), 'data_thuyg20_sre/enroll')
        no_of_cycles = 400
    elif (dataset_type == "test"):    
        directory = os.path.join(os.getcwd(), 'data_thuyg20_sre/test')
        no_of_cycles = 20
    
    dataset = numpy.empty([0, no_of_columns + 1])
    stats_dataset = numpy.empty([0, stats_columns])
    
    for file in os.listdir(directory):
        
        # filter speakers
        names = ['F101', 'F102', 'F103', 'F104', 'F105', 'M101', 'M102', 'M103']

        if any(name in file for name in names):
            
            # extract mfcc vectors
            (rate,sig) = wav.read(os.path.join(directory, file))
            mfcc_feat = mfcc(sig,rate)
            d_mfcc_feat = delta(mfcc_feat, 2)
            dd_mfcc_feat = delta(d_mfcc_feat, 2)
            
            for x in range(0, no_of_cycles):
                
                random_int = randint(1, 600)

                fbank_feat = logfbank(sig,rate)
                mfcc_vectors = mfcc_feat[random_int:random_int+no_of_frames, :no_of_features]
                dmfcc_vectors = d_mfcc_feat[random_int:random_int+no_of_frames, :no_of_features]
                ddmfcc_vectors = dd_mfcc_feat[random_int:random_int+no_of_frames, :no_of_features]
                fbank_vectors = fbank_feat[random_int:random_int+no_of_frames, :]
                
                feature_vectors = numpy.hstack((mfcc_vectors, dmfcc_vectors, ddmfcc_vectors,fbank_vectors))
                
#                 print(feature_vectors.shape)
#                 print(mfcc_vectors.shape)
                
                stats_dataset = numpy.concatenate((stats_dataset, feature_vectors), axis=0)
                
                feature_vectors = feature_vectors.flatten().reshape((1, -1))

                # get speaker index from filename
                speaker_index = file.split("_")[0]
                if speaker_index[0] == 'M':
                    speaker_index = 5 + int(speaker_index[3:])
                else:
                    speaker_index = int(speaker_index[3:])

                #append speaker index to feature vectors
                np_speaker_index = numpy.array([speaker_index])
                temp = numpy.tile(np_speaker_index[numpy.newaxis,:], (feature_vectors.shape[0],1))
                concatenated_feature_vector = numpy.concatenate((feature_vectors,temp), axis=1)

#                 print(concatenated_feature_vector.shape)
                # append file's data to dataset
                dataset = numpy.concatenate((dataset, concatenated_feature_vector), axis=0)

    
    mean = stats_dataset.mean(0, keepdims=True)
    std_deviation = numpy.std(stats_dataset, axis=0, keepdims=True)
    
    if (dataset_type == "train"):
        return dataset, mean, std_deviation
    elif (dataset_type == "test"):    
        return dataset
    

In [4]:
from keras.models import Sequential
import numpy as numpy 

Using Theano backend.


In [None]:
# from numpy import genfromtxt
my_data, mean, std_deviation = get_feature_vectors("train")


In [None]:
# print(my_data)
print(my_data.shape)
print(mean.shape)
print(std_deviation.shape)

In [None]:
Y = numpy.copy(my_data[:, no_of_columns:])
print(Y.shape)

In [None]:
X = numpy.copy(my_data[:, :no_of_columns])
print(X.shape)

mean_tiled = numpy.tile(mean, no_of_frames)
print(mean.shape)

std_deviation_tiled = numpy.tile(std_deviation, no_of_frames)
print(std_deviation.shape)

normalized_X = (X - mean_tiled) / std_deviation_tiled
print(normalized_X)


In [None]:
from keras import utils as np_utils

one_hot_labels = np_utils.to_categorical(Y, num_classes=10)
print(one_hot_labels.shape)

In [None]:
from keras.layers import Dense, Activation, Dropout
from keras.optimizers import SGD

# model = Sequential()
# model.add(Dense(32, activation='relu', input_dim=12))
# model.add(Dense(10, activation='softmax'))

# model.compile(optimizer='rmsprop',
#               loss='categorical_crossentropy',
#               metrics=['accuracy'])

# model.fit(normalized_X, one_hot_labels, epochs=10, batch_size=32)


# MultiLayer Perceptron
model = Sequential()

model.add(Dense(2560, activation='tanh', input_dim=no_of_columns))
# model.add(Dropout(0.2))
model.add(Dense(2560, activation='tanh'))
# model.add(Dropout(0.2))


# model.add(Dropout(0.1))
# model.add(Dense(6000, activation='tanh'))
# model.add(Dropout(0.1))
# model.add(Dense(6000, activation='tanh'))
# model.add(Dropout(0.1))

# model.add(Dense(790, activation='tanh'))
model.add(Dense(10, activation='softmax'))

sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='categorical_crossentropy',
              optimizer=sgd,
              metrics=['accuracy'])

model.fit(normalized_X, one_hot_labels, epochs=20, batch_size=32)
# score = model.evaluate(x_test, y_test, batch_size=128)

In [None]:
test_model = get_feature_vectors("test")
print(test_model.shape)

test_X = numpy.copy(test_model[:, :no_of_columns])
print(test_X.shape)

normalized_test_X = (test_X - mean_tiled) / std_deviation_tiled
print(normalized_test_X.shape)

test_Y = numpy.copy(test_model[:, no_of_columns:])
print(test_Y.shape)
test_labels = np_utils.to_categorical(test_Y, num_classes=10)


In [None]:
print(model.test_on_batch(test_X, test_labels, sample_weight=None))
print(model.metrics_names)
predictions = model.predict(test_X).argmax(axis=1)
print(predictions)

In [None]:
# b = [sum(predictions[current: current+40]) for current in range(0, len(predictions), 40)]
# predicted_Y = []
# for row in b:
#     predicted_Y.append(row.argmax(axis=0))
    
# # print(predicted_Y)
# # print(test_Y[::40].T)

# # for t, p in zip(test_Y[::40].T[0], predicted_Y):
# #     print (int(t), p)

# diff = predicted_Y - test_Y[::40].T[0]

# print(len(predicted_Y))
# print(sum(x == 0 for x in diff))

In [None]:
# print(test_Y.T)

for t, p in zip(test_Y.T[0], predictions):
    print (int(t), p)

diff = predictions - test_Y.T[0]

print(len(predictions))
print(sum(x == 0 for x in diff))