In [1]:
import numpy as np
import pandas as pd
import os
import scipy
from scipy.io import wavfile
import librosa
import webrtcvad
from pydub import AudioSegment
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, Conv1D, MaxPooling1D, Flatten
from keras.layers import LSTM
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import load_model



In [2]:
"""
import sys
sys.path.append('C:/Users/vodou/Documents/Python Scripts')
import pushover_notifier as pn

notifier = pn.PushoverNotifier('Microcontroller_Project')
notifier.redirect_print_to_pushover()
"""

"\nimport sys\nsys.path.append('C:/Users/vodou/Documents/Python Scripts')\nimport pushover_notifier as pn\n\nnotifier = pn.PushoverNotifier('Microcontroller_Project')\nnotifier.redirect_print_to_pushover()\n"

# Import Audio Files

### Classes: Real = 0; Fake = 1

In [3]:
test_path_fake = "for-2seconds/testing/fake"
test_path_real = "for-2seconds/testing/real"

train_path_fake = "for-2seconds/training/fake"
train_path_real = "for-2seconds/training/real"

validation_path_fake = "for-2seconds/validation/fake"
validation_path_real = "for-2seconds/validation/real"

In [4]:
X_test = []
y_test = []

X_train = []
y_train = []

X_val = []
y_val = []

In [5]:
# Add fake.wav files to the X_test matrix and generate corresponding y_test vector

for audio_file in os.listdir(test_path_fake):
    if audio_file.endswith(".wav"):

        file_path = os.path.join(test_path_fake, audio_file)

        sample_rate, audio_data = wavfile.read(file_path)

        # Ensure audio is 1D (mono), if stereo, you can take one channel
        if len(audio_data.shape) > 1:
            audio_data = audio_data[:, 0]  # Take the first channel

        X_test.append(audio_data)
        y_test.append(1)

In [6]:
# Add real.wav files to the X_test matrix and generate corresponding y_test vector

for audio_file in os.listdir(test_path_real):
    if audio_file.endswith(".wav"):

        file_path = os.path.join(test_path_real, audio_file)

        sample_rate, audio_data = wavfile.read(file_path)

        # Ensure audio is 1D (mono), if stereo, you can take one channel
        if len(audio_data.shape) > 1:
            audio_data = audio_data[:, 0]  # Take the first channel

        X_test.append(audio_data)
        y_test.append(0)

In [7]:
# Add fake.wav files to the X_train matrix and generate corresponding y_train vector

for audio_file in os.listdir(train_path_fake):
    if audio_file.endswith(".wav"):

        file_path = os.path.join(train_path_fake, audio_file)

        sample_rate, audio_data = wavfile.read(file_path)

        # Ensure audio is 1D (mono), if stereo, you can take one channel
        if len(audio_data.shape) > 1:
            audio_data = audio_data[:, 0]  # Take the first channel

        X_train.append(audio_data)
        y_train.append(1)

In [8]:
# Add real.wav files to the X_train matrix and generate corresponding y_train vector

for audio_file in os.listdir(train_path_real):
    if audio_file.endswith(".wav"):

        file_path = os.path.join(train_path_real, audio_file)

        sample_rate, audio_data = wavfile.read(file_path)

        # Ensure audio is 1D (mono), if stereo, you can take one channel
        if len(audio_data.shape) > 1:
            audio_data = audio_data[:, 0]  # Take the first channel

        X_train.append(audio_data)
        y_train.append(0)

In [9]:
# Add fake.wav files to the X_train matrix and generate corresponding y_train vector

for audio_file in os.listdir(validation_path_fake):
    if audio_file.endswith(".wav"):

        file_path = os.path.join(validation_path_fake, audio_file)

        sample_rate, audio_data = wavfile.read(file_path)

        # Ensure audio is 1D (mono), if stereo, you can take one channel
        if len(audio_data.shape) > 1:
            audio_data = audio_data[:, 0]  # Take the first channel

        X_val.append(audio_data)
        y_val.append(1)

In [10]:
# Add real.wav files to the X_val matrix and generate corresponding y_val vector

for audio_file in os.listdir(validation_path_real):
    if audio_file.endswith(".wav"):

        file_path = os.path.join(validation_path_real, audio_file)

        sample_rate, audio_data = wavfile.read(file_path)

        # Ensure audio is 1D (mono), if stereo, you can take one channel
        if len(audio_data.shape) > 1:
            audio_data = audio_data[:, 0]  # Take the first channel

        X_val.append(audio_data)
        y_val.append(0)

# Convert matrices to 2D numpy arrays, and vectors (lists) into 1D numpy arrays

In [11]:
X_train = np.array(X_train)#[:100]
X_test = np.array(X_test)#[:100]
X_val = np.array(X_val)#[:100]

In [12]:
y_test = np.array(y_test)#[:100]
y_train = np.array(y_train)#[:100]
y_val = np.array(y_val)#[:100]

# Preprocessing

In [13]:
def extract_features(X):
    FRAME_LENGTH = 2048
    HOP_LENGTH = 512
    sample_rate = 44100

    zcr_features = []
    rms_features = []
    mfccs_features = []
    chroma_features = []
    spectral_centroid_features = []
    spectral_bandwidth_features = []
    spectral_rolloff_features = []

    for audio in X:
        # Ensure the audio is in float32 format
        audio = audio.astype(np.float32) / 32768.0  # Convert from int16 to float32

        # Extract features
        zcr = librosa.feature.zero_crossing_rate(audio, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)
        rms = librosa.feature.rms(y=audio, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)
        mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=13, hop_length=HOP_LENGTH)
        chroma = librosa.feature.chroma_stft(y=audio, sr=sample_rate)
        spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=sample_rate)
        spectral_bandwidth = librosa.feature.spectral_bandwidth(y=audio, sr=sample_rate)
        spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate)

        # Append features
        zcr_features.append(zcr)
        rms_features.append(rms)
        mfccs_features.append(mfccs)
        chroma_features.append(chroma)
        spectral_centroid_features.append(spectral_centroid)
        spectral_bandwidth_features.append(spectral_bandwidth)
        spectral_rolloff_features.append(spectral_rolloff)

    return zcr_features, rms_features, mfccs_features,chroma_features, spectral_centroid_features, spectral_bandwidth_features, spectral_rolloff_features


In [14]:
def combine_features(zcr_list, rms_list, mfccs_list, chroma_list, spectral_centroid_list, spectral_bandwidth_list, spectral_rolloff_list):

    zcr_features = np.swapaxes(zcr_list, 1, 2)
    rms_features = np.swapaxes(rms_list, 1, 2)
    mfccs_features = np.swapaxes(mfccs_list, 1, 2)
    chroma_features = np.swapaxes(chroma_list, 1, 2)
    spectral_centroid_features = np.swapaxes(spectral_centroid_list, 1, 2)
    spectral_bandwidth_features = np.swapaxes(spectral_bandwidth_list, 1, 2)
    spectral_rolloff_features = np.swapaxes(spectral_rolloff_list, 1, 2)

    X_features = np.concatenate(
        (zcr_features, rms_features, mfccs_features, chroma_features, spectral_centroid_features, spectral_bandwidth_features, spectral_rolloff_features),
        axis=2)

    return X_features

In [15]:
zcr_features_val, rms_features_val, mfccs_features_val, chroma_features_val, spectral_centroid_features_val, spectral_bandwidth_features_val, spectral_rolloff_features_val = extract_features(X_val)
zcr_features_test, rms_features_test, mfccs_features_test, chroma_features_test, spectral_centroid_features_test, spectral_bandwidth_features_test, spectral_rolloff_features_test = extract_features(X_test)
zcr_features_train, rms_features_train, mfccs_features_train, chroma_features_train, spectral_centroid_features_train, spectral_bandwidth_features_train, spectral_rolloff_features_train = extract_features(X_train)

  return pitch_tuning(


In [16]:
X_features_val = combine_features(zcr_features_val, rms_features_val, mfccs_features_val, chroma_features_val, spectral_centroid_features_val, spectral_bandwidth_features_val, spectral_rolloff_features_val)
X_features_test = combine_features(zcr_features_test, rms_features_test, mfccs_features_test, chroma_features_test, spectral_centroid_features_test, spectral_bandwidth_features_test, spectral_rolloff_features_test)
X_features_train = combine_features(zcr_features_train, rms_features_train, mfccs_features_train, chroma_features_train, spectral_centroid_features_train, spectral_bandwidth_features_train, spectral_rolloff_features_train)

In [17]:
X_features_train.shape

(13956, 63, 30)

In [18]:
x_train = np.array([flattened.flatten() for flattened in X_features_train])
x_val = np.array([flattened.flatten() for flattened in X_features_val])
x_test = np.array([flattened.flatten() for flattened in X_features_test])

In [19]:
x_train.shape

(13956, 1890)

In [20]:
indices = np.random.permutation(len(x_train))

x_train = x_train[indices]
y_train = y_train[indices]

indices = np.random.permutation(len(x_val))

x_val = x_val[indices]
y_val = y_val[indices]

indices = np.random.permutation(len(x_test))

x_test = x_test[indices]
y_test = y_test[indices]

# Model

### Important: cannot prune model because pruning only works on Dense Layers, Conv2D layers, and their 'derivatives', hence we will not be pruning our model.

In [21]:
def create_model(input_shape):

    model = Sequential()

    # Reshape input to make it suitable for Conv1D
    model.add(Conv1D(filters=12, kernel_size=3, activation='relu', input_shape=(input_shape[1], 1)))
    model.add(MaxPooling1D(pool_size=2))  # Max Pooling to reduce dimensionality
    model.add(BatchNormalization())  # Batch Normalization

    model.add(Conv1D(filters=6, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=2))  # Max Pooling to reduce dimensionality
    model.add(BatchNormalization())  # Batch Normalization

    # Flatten the output to feed into Dense layers
    model.add(Flatten())

    model.add(Dense(16, activation='relu'))  # Further reduce neurons
    model.add(BatchNormalization())
    model.add(Dropout(0.2))

    # 1st Dense Layer
    model.add(Dense(6, activation='relu'))  # Further reduce neurons
    model.add(BatchNormalization())
    model.add(Dropout(0.2))

    # Final Dense Layer for Binary Classification
    model.add(Dense(2, activation='softmax'))  # Two output neurons with softmax activation

    # Compile the model
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    return model

In [22]:
def train_model(X_features_train, X_features_val, y_train, y_val):

    model = create_model(X_features_train.shape) # adding one to make input shape even (such that pooling works in UNet)

    y_train = to_categorical(y_train, num_classes=2)
    y_val = to_categorical(y_val, num_classes=2)

    early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min', restore_best_weights=True)
    checkpoint = ModelCheckpoint('CustomModel.keras', monitor='val_loss', verbose=1, save_best_only=True, mode='min')
    rlrop = ReduceLROnPlateau(monitor='val_accuracy', factor=0.02, patience=5)

    history = model.fit(X_features_train, y_train, epochs=30, batch_size=64, validation_data=(X_features_val, y_val), callbacks=[rlrop, early_stopping, checkpoint])

    return model, history

In [23]:
model = create_model(x_train.shape)
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d (Conv1D)             (None, 1888, 12)          48        
                                                                 
 max_pooling1d (MaxPooling1D  (None, 944, 12)          0         
 )                                                               
                                                                 
 batch_normalization (BatchN  (None, 944, 12)          48        
 ormalization)                                                   
                                                                 
 conv1d_1 (Conv1D)           (None, 942, 6)            222       
                                                                 
 max_pooling1d_1 (MaxPooling  (None, 471, 6)           0         
 1D)                                                             
                                                        

In [24]:
model, history = train_model(x_train, x_val, y_train, y_val)

Epoch 1/30
Epoch 1: val_loss improved from inf to 0.57830, saving model to CustomModel.keras
Epoch 2/30
Epoch 2: val_loss improved from 0.57830 to 0.40790, saving model to CustomModel.keras
Epoch 3/30
Epoch 3: val_loss improved from 0.40790 to 0.35835, saving model to CustomModel.keras
Epoch 4/30
Epoch 4: val_loss did not improve from 0.35835
Epoch 5/30
Epoch 5: val_loss improved from 0.35835 to 0.32534, saving model to CustomModel.keras
Epoch 6/30
Epoch 6: val_loss did not improve from 0.32534
Epoch 7/30
Epoch 7: val_loss did not improve from 0.32534
Epoch 8/30
Epoch 8: val_loss did not improve from 0.32534
Epoch 9/30
Epoch 9: val_loss did not improve from 0.32534
Epoch 10/30
Epoch 10: val_loss did not improve from 0.32534
Epoch 11/30
Epoch 11: val_loss improved from 0.32534 to 0.19198, saving model to CustomModel.keras
Epoch 12/30
Epoch 12: val_loss improved from 0.19198 to 0.18700, saving model to CustomModel.keras
Epoch 13/30
Epoch 13: val_loss improved from 0.18700 to 0.18383, sav

In [25]:
model.save('trained_model.h5')
#model = load_model("trained_model.h5")

In [26]:
y_test = to_categorical(y_test, num_classes=2)
y_pred = model.predict(x_test)



In [27]:
y_pred

array([[1.3405330e-02, 9.8659462e-01],
       [9.9566388e-01, 4.3361960e-03],
       [9.2779654e-01, 7.2203338e-02],
       ...,
       [1.2046626e-03, 9.9879527e-01],
       [3.8956394e-04, 9.9961036e-01],
       [7.9260415e-01, 2.0739581e-01]], dtype=float32)

# Quantisation

### 1. Convert Keras model to TFLite model

In [28]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)

converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS
]
converter._experimental_lower_tensor_list_ops = False  # Disable lowering tensor list ops

tflite_model = converter.convert()



INFO:tensorflow:Assets written to: C:\Users\vodou\AppData\Local\Temp\tmp6k18vwe2\assets


INFO:tensorflow:Assets written to: C:\Users\vodou\AppData\Local\Temp\tmp6k18vwe2\assets


In [29]:
tf.saved_model.save(model, "tflite_model")



INFO:tensorflow:Assets written to: tflite_model\assets


INFO:tensorflow:Assets written to: tflite_model\assets


In [30]:
open("C:/Users/vodou/Documents/Academic/MA1/Machine_Learning_on_Microcontrollers/ML_MCU/tflite_model.tflite", "wb").write(tflite_model)

191092

In [31]:
# Show the model size for the non-quantized HDF5 model
h5_mod = os.path.getsize('trained_model.h5') / 1024
print("HDF5 Model size without quantization: %d KB" % h5_mod)

# Show the model size for the non-quantized TFLite model
tflite_mod = os.path.getsize('tflite_model.tflite') / 1024
print("TFLite Model size without quantization: %d KB" % tflite_mod)

# Determine the reduction in model size
print("\nReduction in file size by a factor of %f" % (h5_mod / tflite_mod))

HDF5 Model size without quantization: 623 KB
TFLite Model size without quantization: 186 KB

Reduction in file size by a factor of 3.339334


In [32]:
interpreter = tf.lite.Interpreter(model_content=tflite_model)

input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)

output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

input:  <class 'numpy.float32'>
output:  <class 'numpy.float32'>


In [33]:
def representative_data_gen():
    for input_value in tf.data.Dataset.from_tensor_slices(x_train).batch(1).take(100):
        # Reshape to (batch_size, 32000, 1)
        reshaped_value = tf.reshape(input_value, (1, x_train.shape[1], 1))
        yield [tf.cast(reshaped_value, dtype=tf.float32)]

In [34]:
import logging
tf.get_logger().setLevel(logging.INFO)

In [35]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen

converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
converter._experimental_lower_tensor_list_ops = False

tflite_model_quant_int8 = converter.convert()



INFO:tensorflow:Assets written to: C:\Users\vodou\AppData\Local\Temp\tmpuvutxn_y\assets


INFO:tensorflow:Assets written to: C:\Users\vodou\AppData\Local\Temp\tmpuvutxn_y\assets


In [36]:
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant_int8)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

input:  <class 'numpy.uint8'>
output:  <class 'numpy.uint8'>


In [37]:
# Save the quantized model to disk
open("quantized_model.tflite", "wb").write(tflite_model_quant_int8)

print("Model was saved at location: %s" % os.path.abspath('quantized_model.tflite'))

Model was saved at location: C:\Users\vodou\Documents\Academic\MA1\Machine_Learning_on_Microcontrollers\ML_MCU\quantized_model.tflite


In [38]:
# Show the model size for the 8-bit quantized TFLite model
tflite_quant_in_kb = os.path.getsize('quantized_model.tflite') / 1024
print("TFLite Model size with 8-bit quantization: %d KB" % tflite_quant_in_kb)

print("TFLite Model size without quantization: %d KB" % tflite_mod)

# Determine the reduction in model size
print("\nReduction in model size by a factor of %f" % (tflite_mod / tflite_quant_in_kb))

TFLite Model size with 8-bit quantization: 54 KB
TFLite Model size without quantization: 186 KB

Reduction in model size by a factor of 3.397795


In [39]:
# Helper function to run inference on a TFLite model
def run_tflite_model(tflite_file, test_image_indices):
  global x_test

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  predictions = np.zeros((len(test_image_indices),), dtype=int)
  for i, test_image_index in enumerate(test_image_indices):
    test_image = x_test[test_image_index]
    test_label = y_test[test_image_index]

    if (test_image_index % 1000 == 0):
      print("Evaluated on %d images." % test_image_index)

    # Check if the input type is quantized, then rescale input data to uint8
    if input_details['dtype'] == np.uint8:
      input_scale, input_zero_point = input_details["quantization"]
      test_image = test_image / input_scale + input_zero_point

    test_image = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
    interpreter.set_tensor(input_details["index"], test_image)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]

    predictions[i] = output.argmax()

  return predictions

In [40]:
# Helper function to evaluate a TFLite model on all images
def evaluate_model(tflite_file, model_type):
  global x_test
  global y_test

  test_image_indices = range(x_test.shape[0])
  predictions = run_tflite_model(tflite_file, test_image_indices)

  accuracy = (np.sum(y_test== predictions) * 100) / len(x_test)

  print('%s model accuracy is %.4f%% (Number of test samples=%d)' % (
      model_type, accuracy, len(x_test)))

# Convert to C .h file

In [41]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nstatic const unsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'static const unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [42]:
from pathlib import Path

# Path to the TFLite model file
tflite_model_quant_int8_file = Path("quantized_model.tflite")

# Read the binary content of the TFLite model
with open(tflite_model_quant_int8_file, "rb") as f:
    tflite_model_content = f.read()

# Convert the binary data into a C array
c_model_name = "my_tflite_model"
with open('cfiles/' + c_model_name + '.h', 'w') as file:
    file.write(hex_to_c_array(tflite_model_content, c_model_name))

# Save Test Data for Inference on MCU

In [44]:
# save the test data as numpy arrays
np.save('x_test.npy', x_test.astype(np.float32))
np.save('y_test.npy', y_test.astype(np.float32))

In [45]:
def convert_npy_to_header(file_name, header_name):
    # Load the .npy file
    data = np.load(file_name)

    # Ensure the data is converted to an integer type
    data_int = (data * 255).astype(np.uint8)  # Scale and cast to uint8 (if needed)

    # Open the header file for writing
    with open(header_name, 'w') as f:
        f.write('// Generated header file\n')
        f.write(f'#define {header_name.split(".")[0]}_LEN {data_int.size}\n')
        f.write('const uint8_t data[] = {\n')

        # Write the data in hexadecimal format
        data_flat = data_int.flatten()  # Flatten to 1D
        for i, val in enumerate(data_flat):
            f.write(f'0x{val:02x}, ')
            if (i + 1) % 12 == 0:  # Newline every 12 values for readability
                f.write('\n')
        f.write('\n};\n')

In [46]:
# Example usage for x_test.npy and y_test.npy
convert_npy_to_header('x_test.npy', 'x_test_data.h')
convert_npy_to_header('y_test.npy', 'y_test_data.h')