## MFCC feature extraction and Network training

In this notebook you will go through an example flow of processing audio data, complete with feature extraction and training. Then we will use TFLite to quantize the model to int8 and process to be able to port it over to the MCU.

Make sure you read the instructions on the exercise sheet and follow the task order.

#### Task 1 

In [2]:
import json
import numpy as np
from scipy.io import wavfile
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tqdm import tqdm

DataSetPath = "/Users/yannickstrumpler/Documents/Studium/MLoMCU/Lab2/hey_snips_research_6k_en_train_eval_clean_ter/"

with open(DataSetPath+"train.json") as jsonfile:
    traindata = json.load(jsonfile)

with open(DataSetPath+"test.json") as jsonfile:
    testdata = json.load(jsonfile)

#### Task 2

In [3]:
def load_data():
    x_train_list = []
    y_train_list = []

    x_test_list = []
    y_test_list = []

    totalSliceLength = 10 # Length to stuff the signals to, given in seconds

    # trainsize = len(traindata) # Number of loaded training samples
    # testsize = len(testdata) # Number of loaded testing samples

    trainsize = 1000 # Number of loaded training samples
    testsize = 100 # Number of loaded testing samples


    fs = 16000 # Sampling rate of the samples
    segmentLength = 1024 # Number of samples to use per segment

    sliceLength = int(totalSliceLength * fs / segmentLength)*segmentLength

    for i in tqdm(range(trainsize)): 
        fs, train_sound_data = wavfile.read(DataSetPath+traindata[i]['audio_file_path']) # Read wavfile to extract amplitudes

        _x_train = train_sound_data.copy() # Get a mutable copy of the wavfile
        _x_train.resize(sliceLength) # Zero stuff the single to a length of sliceLength
        _x_train = _x_train.reshape(-1,int(segmentLength)) # Split slice into Segments with 0 overlap
        x_train_list.append(_x_train.astype(np.float32)) # Add segmented slice to training sample list, cast to float so librosa doesn't complain
        y_train_list.append(traindata[i]['is_hotword']) # Read label 

    for i in tqdm(range(testsize)):
        fs, test_sound_data = wavfile.read(DataSetPath+testdata[i]['audio_file_path'])
        _x_test = test_sound_data.copy()
        _x_test.resize(sliceLength)
        _x_test = _x_test.reshape((-1,int(segmentLength)))
        x_test_list.append(_x_test.astype(np.float32))
        y_test_list.append(testdata[i]['is_hotword'])

    x_train = tf.convert_to_tensor(np.asarray(x_train_list))
    y_train = tf.convert_to_tensor(np.asarray(y_train_list))

    x_test = tf.convert_to_tensor(np.asarray(x_test_list))
    y_test = tf.convert_to_tensor(np.asarray(y_test_list))

    return x_train, y_train, x_test, y_test

In [4]:
res = dict(map(lambda x: (x['worker_id'],x), traindata))
print(len(res))

3311


In [5]:
def compute_mfccs(tensor):
    sample_rate = 16000.0
    lower_edge_hertz, upper_edge_hertz, num_mel_bins = 80.0, 7600.0, 80
    frame_length = 1024
    num_mfcc = 13

    stfts = tf.signal.stft(tensor, frame_length=frame_length, frame_step=frame_length, fft_length=frame_length)
    spectrograms = tf.abs(stfts)
    spectrograms = tf.reshape(spectrograms, (spectrograms.shape[0],spectrograms.shape[1],-1))
    num_spectrogram_bins = stfts.shape[-1]
    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
      num_mel_bins, num_spectrogram_bins, sample_rate, lower_edge_hertz,
      upper_edge_hertz)
    mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
    log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)
    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrograms)[..., :num_mfcc]
    return tf.reshape(mfccs, (mfccs.shape[0],mfccs.shape[1],mfccs.shape[2],-1))

In [6]:
x_train, y_train, x_test, y_test = load_data()

100%|██████████| 1000/1000 [00:00<00:00, 2321.47it/s]
100%|██████████| 100/100 [00:00<00:00, 1868.99it/s]


#### Task 3

In [None]:
x_train_mfcc = compute_mfccs(x_train)
x_test_mfcc = compute_mfccs(x_test)

print(x_train_mfcc.shape)
print(x_test_mfcc.shape)


In [None]:
    totalSliceLength = 10 # Length to stuff the signals to, given in seconds

    # trainsize = len(traindata) # Number of loaded training samples
    # testsize = len(testdata) # Number of loaded testing samples

    trainsize = 1000 # Number of loaded training samples
    testsize = 100 # Number of loaded testing samples


    fs = 16000 # Sampling rate of the samples
    segmentLength = 1024 # Number of samples to use per segment

    sliceLength = int(totalSliceLength * fs / segmentLength)*segmentLength
    sliceLength/fs
    
    #Memory of a single frame: sliceLength * 32 bit (every sample is a double)
    #Task 3: Input encoding, may be wav, mp3 etc
    #feature transformation that amkes it easier for a network to learn good representations

In [None]:
print(x_test_mfcc.shape)

#### Task 4

In [None]:
batchSize = 10
epochs = 30

train_set = (x_train_mfcc/512 + 0.5)
train_labels = y_train

test_set = (x_test_mfcc/512 + 0.5)
test_labels = y_test


In [None]:

model = tf.keras.models.Sequential()

#model.add(layers.InputLayer(input_shape=(train_set.shape[1],train_set.shape[2],train_set.shape[3]), batch_size=(batchSize)))
model.add(layers.Conv2D(filters=3,kernel_size=(3,3),padding="same",input_shape=(train_set[0].shape)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.Conv2D(filters=16,kernel_size=(3,3),strides=(2,2),padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.MaxPool2D((2,2)))

model.add(layers.Conv2D(filters=32,kernel_size=(3,3),strides=(2,2),padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.MaxPool2D((2,2)))

model.add(layers.Conv2D(filters=48,kernel_size=(3,3),padding='same',strides=(2,2)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.GlobalAveragePooling2D())

model.add(layers.Flatten())

model.add(layers.Dense(8, kernel_regularizer=(regularizers.l1(0))))
model.add(layers.Activation('relu'))

model.add(layers.Dense(2))
model.add(layers.Activation('softmax'))


model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])
model.fit(train_set, y_train, batchSize, epochs, steps_per_epoch=trainsize/batchSize)


In [None]:
model.summary()
score = model.evaluate(test_set, y_test)

In [None]:
model.save("MFCCmodel.h5")

#### TFLite conversion
#### Task 8

In [None]:
train_set = train_set.numpy()
test_set = test_set.numpy()
train_labels = train_labels.numpy()
test_labels = test_labels.numpy()
tflite_model_name = 'MFCC'
# Convert Keras model to a tflite model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
# Convert the model to the TensorFlow Lite format with quantization
quantize = True
if (quantize):
    def representative_dataset():
        for i in range(500):
            yield([train_set[i].reshape(1,156,13,1)])
    # Set the optimization flag.
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    # Enforce full-int8 quantization
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8  # or tf.uint8
    converter.inference_output_type = tf.int8  # or tf.uint8
    # Provide a representative dataset to ensure we quantize correctly.
converter.representative_dataset = representative_dataset
tflite_model = converter.convert()

open(tflite_model_name + '.tflite', 'wb').write(tflite_model)

#### This function here takes in the model and outputs an header file we will import into the TFLite example project folder. (/Core/Inc/)

In [None]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nunsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [None]:
c_model_name = 'MFCC'
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model, c_model_name))

#### Let's have a look at the network we just generated

In [None]:
tflite_interpreter = tf.lite.Interpreter(model_path=tflite_model_name + '.tflite')
tflite_interpreter.allocate_tensors()
input_details = tflite_interpreter.get_input_details()
output_details = tflite_interpreter.get_output_details()

print("== Input details ==")
print("name:", input_details[0]['name'])
print("shape:", input_details[0]['shape'])
print("type:", input_details[0]['dtype'])

print("\n== Output details ==")
print("name:", output_details[0]['name'])
print("shape:", output_details[0]['shape'])
print("type:", output_details[0]['dtype'])

#### Next let's see how the performance of the network is

In [None]:
predictions = np.zeros((len(test_set),), dtype=int)
input_scale, input_zero_point = input_details[0]["quantization"]
for i in range(len(test_set)):
    val_batch = test_set[i]
    #We must convert the data into int8 format before invoking inference.
    val_batch = val_batch / input_scale + input_zero_point
    val_batch = np.expand_dims(val_batch, axis=0).astype(input_details[0]["dtype"])
    tflite_interpreter.set_tensor(input_details[0]['index'], val_batch)
    tflite_interpreter.allocate_tensors()
    tflite_interpreter.invoke()

    tflite_model_predictions = tflite_interpreter.get_tensor(output_details[0]['index'])
    #print("Prediction results shape:", tflite_model_predictions.shape)
    output = tflite_interpreter.get_tensor(output_details[0]['index'])
    predictions[i] = output.argmax()

In [None]:
sum = 0
for i in range(len(predictions)):
    if (predictions[i] == test_labels[i]):
        sum = sum + 1
accuracy_score = sum / 100
print("Accuracy of quantized to int8 model is {}%".format(accuracy_score*100))
print("Compared to float32 accuracy of {}%".format(score[1]*100))
print("We have a change of {}%".format((accuracy_score-score[1])*100))