# Dataset & Preprocessing

## Init & Create Dataset

In [None]:
# Init

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
from tensorflow.keras import models
import os
import logging
from sklearn.metrics import confusion_matrix
import pandas as pd
import seaborn as sns
import pathlib
from tqdm import tqdm
import librosa

os.environ["KERAS_BACKEND"] = "jax"

import keras
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.io.wavfile
from keras import layers
from scipy.signal import resample

# set global seeds for reproducibility
tf.random.set_seed(1234)
np.random.seed(1234)

# parameters for plotting
plt.rcParams['figure.figsize'] = (15.0, 8.0) 
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

logging.getLogger("tensorflow").setLevel(logging.DEBUG)

print("TensorFlow version: ", tf.__version__)



# check if GPU is available
print("GPU is", "AVAILABLE" if tf.config.list_physical_devices('GPU') else "NOT AVAILABLE")


if not os.path.exists('urbansounds'):
    os.makedirs('urbansounds')

# Load dataset
#!wget https://zenodo.org/record/1203745/files/UrbanSound8K.tar.gz -O urban8k.tgz
#!tar -xzf urban8k.tgz -C urbansounds
#!rm urban8k.tgz

BASE_DATA_DIR = "urbansounds/UrbanSound8K"


#for microphone rerecorded audio
#BASE_DATA_DIR = "urbansounds/recorded_urbansounds"
BATCH_SIZE = 64
NUM_CLASSES = 10
EPOCHS = 200
SAMPLE_RATE = 16000

pd_data = pd.read_csv(os.path.join(BASE_DATA_DIR, "metadata", "UrbanSound8K.csv"))



# Map classID to target 
pd_data["target"] = pd_data["classID"]


targets = sorted(pd_data["target"].unique().tolist())
assert len(targets) == NUM_CLASSES
old_target_to_new_target = {old: new for new, old in enumerate(targets)}
pd_data["target"] = pd_data["target"].map(lambda t: old_target_to_new_target[t])
pd_data

# Create mapping from target to category
class_names = [None] * NUM_CLASSES  
for old_target, new_target in old_target_to_new_target.items():
    class_name = pd_data[pd_data["target"] == new_target]["class"].iloc[0]
    class_names[new_target] = class_name


print(class_names)



def read_wav_file(path, fold, target_sr=SAMPLE_RATE):
    full_path = os.path.join(BASE_DATA_DIR, "audio", f"fold{fold}", path)
    wav, _ = librosa.load(full_path, sr=target_sr, res_type='kaiser_fast')

    max_len = int(target_sr * 1)

    
    # Truncate if longer, pad with zeros if shorter
    if len(wav) > max_len:
        wav = wav[:max_len]
    elif len(wav) < max_len:
        wav = np.pad(wav, (0, max_len - len(wav)))

    return wav




def read_dataset(df, folds):
    msk = df["fold"].isin(folds)
    filenames = df["slice_file_name"][msk].values
    foldnums  = df["fold"][msk].values
    targets   = df["target"][msk].values
    waves = np.array([read_wav_file(f, fold) for f, fold in zip(filenames, foldnums)], dtype=np.object_)
    return waves, targets





from sklearn.model_selection import train_test_split

# Extract data
filenames = pd_data["slice_file_name"].values
foldnums  = pd_data["fold"].values
targets   = pd_data["target"].values 

print("Loading all audio data")
waves = np.array([read_wav_file(fname, fold) for fname, fold in zip(filenames, foldnums)], dtype=np.object_)
targets = np.array(targets)



# 70% train, 30% temp (to be split into val/test)
train_x, temp_x, train_y, temp_y = train_test_split(
    waves, targets,
    test_size=0.30,
    random_state=42,
    stratify=targets
)


In [None]:
from rerecord_data import *
#rerecord data with microphone

#record_entire_dataset(pd_data)

In [None]:

# 15% val, 15% test
valid_x, test_x, valid_y, test_y = train_test_split(
    temp_x, temp_y,
    test_size=0.5,
    random_state=42,
    stratify=temp_y
)

print(f"Number of segments for: train_x: {len(train_x)}, valid_x: {len(valid_x)}, test_x: {len(test_x)}")


## Augment Data

In [None]:

from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift, Gain, HighPassFilter, LowPassFilter
import numpy as np



from audiomentations import (
    Compose, AddColorNoise, Shift, Gain,
    HighPassFilter, LowPassFilter, PolarityInversion, 
    ClippingDistortion,  PitchShift, AddBackgroundNoise
)

SAMPLE_RATE = 16000
                

augment = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),
    AddColorNoise(
        min_snr_db=15, max_snr_db=18, p=0.5,  # “pink” by default
    ),
    Gain(min_gain_db=-6.0, max_gain_db=6.0, p=0.5)

])





"""
esc50_path = ""
MAX_SHIFT_S = 0.15   
augment = Compose([

    AddBackgroundNoise(sounds_path=esc50_path, min_snr_in_db=5.0, max_snr_in_db=20.0, p=1.0),

    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),

    AddColorNoise(min_snr_db=15, max_snr_db=18, p=0.5,),  # pink by default

    Gain(min_gain_db=-6.0, max_gain_db=6.0, p=0.5),

    LowPassFilter( min_cutoff_freq=3000., max_cutoff_freq=6000., p=0.25),

])
"""





def apply_augmentations(x, y, sample_rate=16000):
    aug = [augment(samples=wav[None, :].astype(np.float32),
                   sample_rate=sample_rate)[0] for wav in x]
    x_aug = np.stack(aug, axis=0)              
    y_aug = y.copy()                           # duplicate labels

    x_ext = np.concatenate([x, x_aug], axis=0)
    y_ext = np.concatenate([y, y_aug], axis=0)

    assert x_ext.shape[0] == y_ext.shape[0]
    return x_ext, y_ext



train_x_augmented, train_y_augmented = apply_augmentations(train_x, train_y)



print(train_x_augmented.shape)
print(train_y_augmented.shape)



train_x = train_x_augmented


## Create MFCCs

In [None]:

import numpy as np
import cmsisdsp as dsp
import cmsisdsp.mfcc as mfcc
import scipy.signal.windows as sig
from cmsisdsp.datatype import F32


# parameters 
FFTSize         = 1024
numOfMelFilters = 80
numOfDctOutputs = 16
freq_min        = 80.0
freq_high       = 7600.0
sample_rate     = 16000.0
frame_length    = 1024 
frame_step      = 512


# Create the Hamming window
window = sig.hamming(FFTSize, sym=False)
# Create mel filters
filtLen, filtPos, packedFilters = mfcc.melFilterMatrix(
    F32, freq_min, freq_high,
    numOfMelFilters, sample_rate, FFTSize
)

# Create DCT matrix
dctMatrixFilters = mfcc.dctMatrix(F32, numOfDctOutputs, numOfMelFilters)

# initialize arm_mfcc_instance_f32
mfccf32 = dsp.arm_mfcc_instance_f32()
status = dsp.arm_mfcc_init_f32(
    mfccf32,
    FFTSize,
    numOfMelFilters,
    numOfDctOutputs,
    dctMatrixFilters,
    filtPos,
    filtLen,
    packedFilters,
    window
)
if status != 0:
    raise RuntimeError("MFCC init failed (status code = {})".format(status))



def compute_mfccs_cmsis_batch(x_data, mfcc_instance,
                              numDctOutputs,  
                              frame_length=1024, frame_step=512):
    """
    x_data:         NumPy array of shape [N, num_samples], float32
    mfcc_instance:  The initialized arm_mfcc_instance_f32
    numDctOutputs:  The number of MFCC coefficients 
    frame_length:   Number of samples per frame
    frame_step:     Hop size
    Returns:        MFCC array of shape [N, num_frames, numDctOutputs].
    """
    N, num_samples = x_data.shape

    # frames per sample
    num_frames = (num_samples - frame_length) // frame_step + 1

    # storing mfccs in array: (N, num_frames, numDctOutputs)
    all_mfccs = np.zeros((N, num_frames, numDctOutputs), dtype=np.float32)

    # Temporary buffer for arm_mfcc_f32:
    tmp = np.zeros(frame_length + 2, dtype=np.float32)

    for i in range(N):
        signal = x_data[i]  # shape: (16000,)

        for f in range(num_frames):
            start = f * frame_step
            end   = start + frame_length

            frame_data = signal[start:end].astype(np.float32)

            # 1-frame MFCC via CMSIS
            mfcc_result = dsp.arm_mfcc_f32(mfcc_instance, frame_data, tmp)

            # Store result
            all_mfccs[i, f, :] = mfcc_result

    return all_mfccs




train_cmsis_mfccs = compute_mfccs_cmsis_batch(
    train_x,
    mfccf32,
    numDctOutputs=numOfDctOutputs,  
    frame_length=1024,
    frame_step=512
)


valid_cmsis_mfccs = compute_mfccs_cmsis_batch(
    valid_x,
    mfccf32,
    numDctOutputs=numOfDctOutputs,  
    frame_length=1024,
    frame_step=512
)



test_cmsis_mfccs = compute_mfccs_cmsis_batch(
    test_x,
    mfccf32,
    numDctOutputs=numOfDctOutputs,  
    frame_length=1024,
    frame_step=512
)


print(train_cmsis_mfccs.shape)
print(valid_cmsis_mfccs.shape)
print(test_cmsis_mfccs.shape)



train_x_mfcc = tf.reshape(train_cmsis_mfccs, (train_cmsis_mfccs.shape[0], train_cmsis_mfccs.shape[1], train_cmsis_mfccs.shape[2], -1))
print(train_x_mfcc.shape)
print(train_y_augmented.shape)


valid_x_mfcc = tf.reshape(valid_cmsis_mfccs, (valid_cmsis_mfccs.shape[0], valid_cmsis_mfccs.shape[1], valid_cmsis_mfccs.shape[2], -1))
print(valid_x_mfcc.shape)
print(valid_y.shape)

test_x_mfcc = tf.reshape(test_cmsis_mfccs, (test_cmsis_mfccs.shape[0], test_cmsis_mfccs.shape[1], test_cmsis_mfccs.shape[2], -1))
print(test_x_mfcc.shape)
print(f"test_y Shape: {test_y.shape}")

In [None]:
#Compute Compression Ratio

total_train_size_no_compression = train_x.nbytes
total_train_size_with_compression = train_x_mfcc.numpy().nbytes

print("Total training data size without compression: ", total_train_size_no_compression)
print("Total training data size with compression: ", total_train_size_with_compression)
print("Compression ratio: ", total_train_size_no_compression/total_train_size_with_compression)
print(1- total_train_size_with_compression/total_train_size_no_compression)

In [None]:
# print mfcc frames (for Debugging)
x_mfcc_np = test_x_mfcc.numpy()  



for frame in range(x_mfcc_np.shape[1]): 
    print(f"Frame {frame:02d}:", end=" ")
    for coeff in x_mfcc_np[0, frame, :, 0]:  
        print(f"{coeff:.6f}", end=" ")
    print()


# Models

## Create Baseline Model

In [None]:
#training with keras tuner for hyperparameter tuning
from tensorflow import keras
from tensorflow.keras import layers, models, regularizers
import keras_tuner as kt

def build_model(hp):
    l2_val = hp.Float("l2_reg", 1e-5, 1e-2, sampling="log")
    dropout_1 = hp.Float("dropout_1", 0.1, 0.5, step=0.05)
    dropout_2 = hp.Float("dropout_2", 0.1, 0.5, step=0.05)
    act_fn = hp.Choice("activation", ["relu", "linear"])

    model = models.Sequential([
        layers.Input(shape=(30, 16, 1)),

        layers.Conv2D(16, kernel_size=(2, 4), activation=act_fn, padding="same",
                      kernel_regularizer=regularizers.l2(l2_val)),
        layers.MaxPooling2D(pool_size=(2, 3)),

        layers.Conv2D(32, kernel_size=(2, 4), activation=act_fn, padding="same",
                      kernel_regularizer=regularizers.l2(l2_val)),
        layers.MaxPooling2D(pool_size=(2, 2)),

        layers.Conv2D(64, kernel_size=(2, 4), activation=act_fn, padding="same",
                      kernel_regularizer=regularizers.l2(l2_val)),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Dropout(dropout_1),

        layers.Conv2D(64, kernel_size=(2, 4), activation="relu", padding="same",
                      kernel_regularizer=regularizers.l2(l2_val)),

        layers.GlobalAveragePooling2D(),
        layers.Dropout(dropout_2),

        layers.Dense(10, activation="softmax")
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model



tuner = kt.BayesianOptimization(
    build_model,
    objective='val_accuracy',
    max_trials=3,
    directory='keras_tuner_dir',
    project_name='urban8k_tuning'
)

tuner.search(train_x_mfcc, train_y_augmented, epochs=10, validation_data=(test_x_mfcc, test_y), batch_size=256)


In [None]:
import visualkeras
import IPython.display as display
from PIL import Image


baseline_model = tuner.get_best_models(num_models=1)[0]
baseline_model.summary()


# visualize model
def show_model_visualization(model, title):
    img = visualkeras.layered_view(model, legend=True)  
    img_path = f"{title}.png"
    img.save(img_path)  
    display.display(Image.open(img_path))  


print("Model Architecture:\r\n")
show_model_visualization(baseline_model, "model")

In [None]:
#Convert to tflite

model_name = "baseline"

baseline_model_loss, baseline_model_acc = baseline_model.evaluate(test_x_mfcc, test_y)

converter = tf.lite.TFLiteConverter.from_keras_model(baseline_model)
baseline_tflite_model = converter.convert()

# Save tflite model 
open(f"models/{model_name}.tflite", "wb").write(baseline_tflite_model)



# Show model size 
baseline_size = os.path.getsize(f'models/{model_name}.tflite') / 1024
print("Baseline TFLite Model size : %d KB" % baseline_size)



## Create Quantization Aware Training (QAT) Model

In [None]:



import tensorflow_model_optimization as tfmot
from tensorflow_model_optimization.quantization.keras import QuantizeConfig
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.models import clone_model

# Custom NoOpQuantizeConfig (Skips BatchNormalization Quantization)
class NoOpQuantizeConfig(QuantizeConfig):
    def get_weights_and_quantizers(self, layer):
        return []
    def get_activations_and_quantizers(self, layer):
        return []
    def set_quantize_weights(self, layer, quantize_weights):
        pass
    def set_quantize_activations(self, layer, quantize_activations):
        pass
    def get_output_quantizers(self, layer):
        return []
    def get_config(self):
        return {}

# Annotate Model with Custom Config for BatchNormalization
def apply_quantization_with_custom_config(model):
    def quantize_layer(layer):
        if isinstance(layer, BatchNormalization):
            return tfmot.quantization.keras.quantize_annotate_layer(layer, quantize_config=NoOpQuantizeConfig())
        return tfmot.quantization.keras.quantize_annotate_layer(layer)

    return clone_model(model, clone_function=quantize_layer)




quant_aware_annotate_model = apply_quantization_with_custom_config(baseline_model)

# quantize_scope to register custom objects
with tfmot.quantization.keras.quantize_scope({'NoOpQuantizeConfig': NoOpQuantizeConfig}):
    qat_model = tfmot.quantization.keras.quantize_apply(
        quant_aware_annotate_model,
        tfmot.experimental.combine.Default8BitPrunePreserveQuantizeScheme()
    )


optimizer = tf.keras.optimizers.Adam(learning_rate=0.0003, clipnorm=1.0)

qat_model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

qat_model.summary()


def scheduler(epoch, lr):
    if epoch < 50:
        return lr
    else:
        return lr * tf.math.exp(-0.03) 

callbacks = [
    tf.keras.callbacks.EarlyStopping(verbose=1, patience=100, restore_best_weights=True),
    tf.keras.callbacks.LearningRateScheduler(scheduler),
    tfmot.sparsity.keras.UpdatePruningStep()
]



EPOCHS = 250
history = qat_model.fit(
    train_x_mfcc,
    train_y_augmented,
    validation_data=(valid_x_mfcc, valid_y),
    epochs=EPOCHS,
    batch_size=256,
    callbacks=callbacks,
)


In [None]:
# evaluate the model on the test set
qat_loss, qat_acc = qat_model.evaluate(test_x_mfcc, test_y)


In [None]:
# Plot the training history
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0, 1])
plt.legend(loc='lower right')

### QAT to TFLite and Evaluation

In [None]:
#helper functions

def evaluate_tflite_model(tflite_model_path, x_test, y_test):
    interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    input_scale, input_zero_point = input_details[0]['quantization']

    correct = 0
    total = x_test.shape[0]
    y_pred = []

    for i in range(total):
        float_img = x_test[i:i+1]
        int8_img = np.round(float_img / input_scale + input_zero_point).astype(np.int8)
        interpreter.set_tensor(input_details[0]['index'], int8_img)
        interpreter.invoke()

        output_data = interpreter.get_tensor(output_details[0]['index'])
        pred_label = np.argmax(output_data[0])
        y_pred.append(pred_label)

        if pred_label == y_test[i]:
            correct += 1

    accuracy = correct / total
    return accuracy, np.array(y_pred)





import tempfile

def get_gzipped_model_size(file):
    # returns size of the gzipped model in bytes.
    import os
    import zipfile

    _, zipped_file = tempfile.mkstemp('.zip')
    with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
        f.write(file)

    return os.path.getsize(zipped_file)




# plot the confusion matrix 
from sklearn.metrics import confusion_matrix
import itertools

"""
function: plot_confusion_matrix
    - input: cm, classes, normalize, title, cmap
    - output: none
    - description: plots the confusion matrix
"""

def plot_confusion_matrix(cm, classes,
                            normalize=True,
                            title='Confusion matrix',
                            cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    #print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')

In [None]:
#convert to tflite

def representative_data_gen():
    # Provide a small subset of training data for calibration
    for i in range(100):
        yield [train_x_mfcc[i:i+1]]



converter = tf.lite.TFLiteConverter.from_keras_model(qat_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

qat_tflite_model = converter.convert()


# Save the model
qat_tflite_file = f'models/final_qat.tflite'

with open(qat_tflite_file, 'wb') as f:
    f.write(qat_tflite_model)



In [None]:
# evaluate qat tflite model on the test set

quantized_acc, y_pred = evaluate_tflite_model(qat_tflite_file, test_x_mfcc, test_y)
print(f"Quantized TFLite model test accuracy: {quantized_acc:.4f}")

In [None]:
# evaluate qat tflite model on test set

quantized_acc, y_pred = evaluate_tflite_model(qat_tflite_file, test_x_mfcc, test_y)
print(f"Quantized TFLite model test accuracy: {quantized_acc:.4f}")

cm = confusion_matrix(test_y, y_pred)
plot_confusion_matrix(cm, class_names, title='Normalized Confusion matrix of the 8bit QAT model', cmap=plt.cm.Reds)



qat_size = os.path.getsize(qat_tflite_file) / 1024
print("QAT Model size: %d KB" % qat_size)

### Convert to h file (for tflite micro)

In [None]:
# Function: Convert some hex value into an array for C programming
def hex_to_c_array(hex_data, var_name):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += '\nstatic const unsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

    # Declare C variable
    c_str += 'static const unsigned char ' + var_name + '[] = {'
    hex_array = []
    for i, val in enumerate(hex_data) :

        # Construct string from hex
        hex_str = format(val, '#04x')

        # Add formatting so each line stays within 80 characters
        if (i + 1) < len(hex_data):
            hex_str += ','
        if (i + 1) % 12 == 0:
            hex_str += '\n '
        hex_array.append(hex_str)

    # Add closing brace
    c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str




import os
# Make directory for output
if not os.path.exists('cfiles'):
    os.makedirs('cfiles')

# Path to model
model_path = qat_tflite_file
c_model_name = "final_qat"

# Load TFLite model as bytes
with open(model_path, 'rb') as f:
    tflite_model_content = f.read()

# Convert and save as C header
with open(f'cfiles/{c_model_name}.h', 'w') as f:
    f.write(hex_to_c_array(tflite_model_content, c_model_name))

# Test Inference

In [None]:
import librosa

#helper functions

def plot_wav(wav):
    plt.figure(figsize=(10, 3))
    librosa.display.waveshow(wav, sr=16000)
    plt.title("Waveform")
    plt.xlabel("Time (s)")
    plt.ylabel("Amplitude")
    plt.tight_layout()
    plt.show()



def test_read_wav_file(path, target_sr=SAMPLE_RATE):
    full_path = path
    wav, _ = librosa.load(full_path, sr=target_sr, res_type='kaiser_fast')

    max_len = int(target_sr * 1)

    # Truncate if longer, pad with zeros if shorter
    if len(wav) > max_len:
        wav = wav[:max_len]
    elif len(wav) < max_len:
        wav = np.pad(wav, (0, max_len - len(wav)))

    return wav



In [None]:
import os
import sounddevice as sd
import soundfile as sf
import time


def play_and_record(
    wav_path: str,
    output_path: str | None = None,
    playback_device=None,
    record_device=None,
    keep_copy: bool = False,
):
    """Play WAV and record  int16 16 kHz audio to WAV."""
    src, file_sr = sf.read(wav_path, dtype="float32")
    if file_sr != 16000:
        src = librosa.resample(src.T, orig_sr=file_sr, target_sr=16000).T
    out_ch = src.shape[1] if src.ndim == 2 else 1

    if output_path is None:
        stem, ext = os.path.splitext(wav_path)
        output_path = f"{stem}_recorded{ext}"


    print(f"Playing {os.path.basename(output_path)}")

    # Record
    recording = sd.playrec(
        src,
        samplerate=16000,
        channels=1,
        dtype="int16",           
        blocksize=1024,
        device=(playback_device, record_device) if (playback_device or record_device) else None,
    )
    sd.wait()

    # Save as int16 PCM
    sf.write(output_path, recording, 16000, subtype="PCM_16")


    return (output_path, recording) if keep_copy else output_path


def safe_wav(wav):
    fold = os.path.basename(os.path.dirname(wav))
    safe = os.path.join('test_wavs', 'recorded_'+ fold+ '_' + os.path.basename(wav))
    return safe








def record_microphone(
    duration: float,
    output_path: str,
    samplerate: int = 16000,
    record_device=None,
    channels: int = 1,
    dtype: str = "int16",
    blocksize: int = 1024,
    save_npy: bool = False, 
    sound_name: str = None   
):
    print(f"Recording for {duration} seconds")

    recording = sd.rec(
        int(duration * samplerate),
        samplerate=samplerate,
        channels=channels,
        dtype=dtype,
        blocksize=blocksize,
        device=record_device
    )
    sd.wait()

    print(f"Saving WAV to {output_path}")
    sf.write(output_path, recording, samplerate, subtype="PCM_16")

  
    if save_npy and sound_name:
        npy_path = f"test_wavs/rec_{sound_name}.npy"
        print(f"Saving NPY to {npy_path}")
        np.save(npy_path, recording)

    return output_path



#record sound with microphone

#sound_name = 'dog'
#time.sleep(1)
#safe = os.path.join('test_wavs', f'rec_{sound_name}.wav')
#captured = record_microphone(2, safe, save_npy=True, sound_name=sound_name)


In [None]:




#test audio from recordings
if(1):
    sound = 'siren'

    #wav audio 

    path = f"test_wavs/rec_{sound}.wav"
    processed_record = test_read_wav_file(path, SAMPLE_RATE)


    #npy audio 
    #processed_record = np.load("test_wavs/rec_{sound}.npy")
    #processed_record = (processed_record.astype(np.float32).reshape(1, -1))/ 32768.0


    #plot_wav(processed_record)


processed_record = (processed_record.astype(np.float32).reshape(1, -1))



print(processed_record.shape)
mfccs = compute_mfccs_cmsis_batch(
    processed_record,
    mfccf32,
    numDctOutputs=numOfDctOutputs,  
    frame_length=1024,
    frame_step=512
)
x_mfcc_tf = tf.reshape(mfccs, (mfccs.shape[0], mfccs.shape[1], mfccs.shape[2], -1))
print(x_mfcc_tf.shape)


#predictions = baseline_model.predict(x_mfcc_tf)
predictions = qat_model.predict(x_mfcc_tf)

predicted_class = np.argmax(predictions, axis=1)

predicted_labels = [class_names[i] for i in predicted_class]

print(f"Predicted Class: {predicted_labels}")



# Benchmarking with other Models

In [None]:
from prettytable import PrettyTable
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os


# my models
model_types = ['Full Precision', 'QAT']
model_accuracies = [100 * baseline_model_acc, 100 * quantized_acc]
model_accuracies = [round(x, 4) for x in model_accuracies]
model_sizes = [baseline_size, qat_size]
model_sizes = [round(x) for x in model_sizes]  # KB

# models from papers
external_models = [
    "DenseNet-201 [1]",
    "YAMNet [2]",
    "AST [3]",
    "NeuProNet [4]",
    "Distilled Transformer [5]",
    "ESC-NAS [6]",
    "Micro-ACDNet [7]"
]
external_accuracies = [97.25, 95.3, 89.8, 83.75, 83.3, 81.25, 79.0]
external_sizes = [80 * 1024, 15 * 1024, 344 * 1024, 21 * 1024, 3 * 1024, 359, 300]  # KB

# Combine
all_models = model_types + external_models
all_accuracies = model_accuracies + external_accuracies
all_sizes = model_sizes + [round(s) for s in external_sizes]

# Pretty table
table = PrettyTable()
table.field_names = ["Model type", "Accuracy (%)", "Size"]
for i in range(len(all_models)):
    size_str = f"{round(all_sizes[i]/1024, 2)} MB" if all_sizes[i] >= 1024 else f"{all_sizes[i]} KB"
    table.add_row([all_models[i], all_accuracies[i], size_str])
print(table)

# Log and dot scaling
log_sizes = [np.log10(s) for s in all_sizes]
dot_sizes = [100 + 200 * ls for ls in log_sizes]  

# Plot
plt.figure(figsize=(16, 8))
plot = sns.scatterplot(x=all_sizes, y=all_accuracies, size=dot_sizes, sizes=(100, 1000), hue=all_models, legend=False)
plt.xscale('log')  # Log axis
plt.title('Model Accuracy vs Size: Benchmark Against Related Models')
plt.ylabel('Accuracy (%)')
plt.xlabel('Model Size (KB)')
plt.grid(True)
plt.tight_layout()

# Annotate 
for i in range(len(all_models)):
    size_label = f"{round(all_sizes[i]/1024, 1)} MB" if all_sizes[i] >= 1024 else f"{all_sizes[i]} KB"
    plt.annotate(all_models[i], (all_sizes[i], all_accuracies[i] + 0.5), fontsize=15, ha='center')
    plt.annotate(size_label, (all_sizes[i], all_accuracies[i] - 0.7), fontsize=14, ha='center', color='gray')


for line in plot.texts:
    if line.get_text() in ["Full Precision", "QAT"]:
        line.set_color("red")

# references
source_text = (
    "[1] DenseNet-201: Huang et al., CVPR 2017\n"
    "[2] YAMNet: Google AI Blog, 2020\n"
    "[3] AST: Gong et al., ICASSP 2021\n"
    "[4] NeuProNet: Wang et al., Neurocomputing 2022\n"
    "[5] Distilled Transformer: Gong et al., ICASSP 2022\n"
    "[6] ESC-NAS: Nagrani et al., 2022\n"
    "[7] Micro-ACDNet: Yu et al., 2022"
)
plt.figtext(0.99, -0.15, source_text, wrap=True, horizontalalignment='right', fontsize=12)

plt.show()
