In [1]:
import numpy as np
import librosa
import glob
import os
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Dropout, Flatten, Dense, Reshape
from keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder






In [None]:
def extract_mfcc_features(audio_path, n_mfcc=13, frame_length=0.02, frame_stride=0.02,
                          n_fft=2048, n_mels=32, fmin=300, fmax=8000):
    try:

        audio_data, sr = librosa.load(audio_path, sr=16000)
    except Exception as e:
        print(f"Error loading audio file {audio_path}: {e}")
        return None
    
    win_length = int(frame_length * 16000)  
    hop_length = int(frame_stride * 16000)  

    mfccs = librosa.feature.mfcc(
        y=audio_data,
        sr=16000,          
        n_mfcc=n_mfcc,      
        n_fft=n_fft,        
        hop_length=hop_length,  
        win_length=win_length,  
        n_mels=n_mels,      
        fmin=fmin,          
        fmax=fmax,          
        window='hamming',   
        center=True,        
        power=2.0           
    )

    return mfccs.T

In [None]:

test_audio = "e:\\Project\\esc-50\\audio\\fan\\1-100032-A-0.wav"  # Change path 

Test audio file not found: e:\Project\esc-50\audio\fan\1-100032-A-0.wav
Please update the path to an existing audio file


In [4]:
def created_dataset(path,label):
    X, y = [], []
    
    audio_files = glob.glob(os.path.join(path, "*.wav"))
    for audio_path in audio_files:
        features = extract_mfcc_features(audio_path)
        if features is not None:
            X.append(features)
            y.append(label)
            print(f"pass {label}")
            
        else:
            print("Skip this file")
    
    return X, y


In [5]:
print(np.__version__)

1.25.2


In [6]:
fan_path = "e:\\Project\\esc-50\\audio\\fan"
vacuum_path = "e:\\Project\\esc-50\\audio\\vacuum_cleaner"
washing_path = "e:\\Project\\esc-50\\audio\\washing_machine"
noise_path = "e:\\Project\\esc-50\\audio\\noise"

In [7]:
X_fan, y_fan = created_dataset(fan_path,"fan")
X_vacuum, y_vacuum = created_dataset(vacuum_path,"vacuum")
X_washing, y_washing = created_dataset(washing_path,"washing")
X_noise, y_noise = created_dataset(noise_path,"noise")

pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
pass fan
p

In [8]:
def build_model(input_shape,num_classes):
    model = Sequential()
    
    model.add(Reshape((input_shape[0], input_shape[1]), input_shape=input_shape))
    
    model.add(Conv1D(32, kernel_size=3, activation='relu', padding='same'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.25))

    model.add(Conv1D(16, kernel_size=3, activation='relu', padding='same'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.25))

    model.add(Conv1D(8, kernel_size=3, activation='relu', padding='same'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.25))

    model.add(Flatten())

    model.add(Dense(num_classes, activation='softmax'))

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model

In [9]:
def train_model(X, y, epochs=50, batch_size=32, test_size=0.2):

    encoder = LabelEncoder()
    y_encoded = encoder.fit_transform(y)

    print("X shape:", X.shape)
    print("y_encoded length:", len(y_encoded))
    breakpoint()
    X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=test_size, random_state=42)

    input_shape = X_train.shape[1:]

    num_classes = len(np.unique(y_encoded))

    model = build_model(input_shape, num_classes)

    history = model.fit(X_train, y_train,
                        validation_data=(X_test, y_test),
                        epochs=epochs,
                        batch_size=batch_size)

    return model, history, encoder

In [10]:
if len(X_noise) < 2 or len(X_fan) < 2 or len(X_washing) < 2 or len(X_vacuum) < 2:
        print("Each class should have at least two samples for stratified splitting.")
        print("Combining both classes into one for training.")
        X = np.vstack((X_fan, X_vacuum, X_washing, X_noise))
        y = np.hstack((y_fan, y_vacuum, y_washing, y_noise))
else:
        X = np.vstack((X_fan, X_vacuum, X_washing, X_noise))
        y = np.hstack((y_fan, y_vacuum, y_washing, y_noise))

In [11]:
model, history, encode = train_model(X,y)

X shape: (14278, 51, 13)
y_encoded length: 14278






Epoch 1/50
Epoch 1/50




Epoch 2/50
Epoch 2/50
Epoch 3/50
Epoch 3/50
Epoch 4/50
Epoch 4/50
Epoch 5/50
Epoch 5/50
Epoch 6/50
Epoch 6/50
Epoch 7/50
Epoch 7/50
Epoch 8/50
Epoch 8/50
Epoch 9/50
Epoch 9/50
Epoch 10/50
Epoch 10/50
Epoch 11/50
Epoch 11/50
Epoch 12/50
Epoch 12/50
Epoch 13/50
Epoch 13/50
Epoch 14/50
Epoch 14/50
Epoch 15/50
Epoch 15/50
Epoch 16/50
Epoch 16/50
Epoch 17/50
Epoch 17/50
Epoch 18/50
Epoch 18/50
Epoch 19/50
Epoch 19/50
Epoch 20/50
Epoch 20/50
Epoch 21/50
Epoch 21/50
Epoch 22/50
Epoch 22/50
Epoch 23/50
Epoch 23/50
Epoch 24/50
Epoch 24/50
Epoch 25/50
Epoch 25/50
Epoch 26/50
Epoch 26/50
Epoch 27/50
Epoch 27/50
Epoch 28/50
Epoch 28/50
Epoch 29/50
Epoch 29/50
Epoch 30/50
Epoch 30/50
Epoch 31/50
Epoch 31/50
Epoch 32/50
Epoch 32/50
Epoch 33/50
Epoch 33/50
Epoch 34/50
Epoch 34/50
Epoch 35/50
Epoch 35/50
Epoch 36/50
Epoch 36/50
Epoch 37/50
Epoch 37/50
Epoch 38/50
Epoch 38/50
Epoch 39/50
Epoch 39/50
Epoch 40/50
Epoch 40/50

In [12]:
model.save("model.h5")

  saving_api.save_model(


In [13]:
model.input_shape

(None, 51, 13)

In [None]:
import tensorflow as tf

model = tf.keras.models.load_model("model.h5")


converter = tf.lite.TFLiteConverter.from_keras_model(model)


converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]


def representative_data_gen():
    for i in range(min(100, X.shape[0])): 
        sample = X[i:i+1].astype(np.float32)
        yield [sample]

converter.representative_dataset = representative_data_gen


converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8


tflite_quantized_model = converter.convert()

with open("model.tflite", "wb") as f:
    f.write(tflite_quantized_model)


INFO:tensorflow:Assets written to: C:\Users\doanm\AppData\Local\Temp\tmpqu7i9whf\assets


INFO:tensorflow:Assets written to: C:\Users\doanm\AppData\Local\Temp\tmpqu7i9whf\assets


Original model size: 12632 bytes
Quantized model size: 12528 bytes


In [None]:

with open("model.tflite", "rb") as f:
    data = f.read()

with open("model.h", "w") as f:
    f.write("// Int8 Quantized Model for ESP32\n")
    f.write("// Generated automatically\n\n")
    f.write("#ifndef MODEL_INT8_H\n")
    f.write("#define MODEL_INT8_H\n\n")
    f.write("const unsigned char model_int8[] = {\n")
    
    # Write data in chunks of 12 bytes per line for better readability
    for i in range(0, len(data), 12):
        chunk = data[i:i+12]
        line = "  " + ", ".join(f"0x{b:02x}" for b in chunk)
        if i + 12 < len(data):
            line += ","
        f.write(line + "\n")
    
    f.write("};\n\n")
    f.write(f"const int model_int8_len = {len(data)};\n\n")
    f.write("#endif // MODEL_INT8_H\n")

print(f"Int8 quantized model saved as model_int8.h")
print(f"Model size: {len(data)} bytes")

# Compare with original float model if exists
if os.path.exists("model.tflite"):
    with open("model.tflite", "rb") as f:
        float_data = f.read()
    compression_ratio = len(float_data) / len(data)
    print(f"Compression ratio: {compression_ratio:.2f}x smaller")
else:
    print("Original float model not found for comparison")

Int8 quantized model saved as model_int8.h
Model size: 12528 bytes
Compression ratio: 1.01x smaller


In [17]:
# Test int8 quantized model
interpreter = tf.lite.Interpreter(model_content=tflite_quantized_model)
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print("Input details:")
print(f"  Shape: {input_details[0]['shape']}")
print(f"  Type: {input_details[0]['dtype']}")
print(f"  Quantization: {input_details[0]['quantization']}")

print("\nOutput details:")
print(f"  Shape: {output_details[0]['shape']}")
print(f"  Type: {output_details[0]['dtype']}")
print(f"  Quantization: {output_details[0]['quantization']}")

# Test with a sample
if X.shape[0] > 0:
    # Get quantization parameters
    input_scale, input_zero_point = input_details[0]['quantization']
    output_scale, output_zero_point = output_details[0]['quantization']
    
    # Prepare test sample
    test_sample = X[0:1].astype(np.float32)
    
    # Quantize input
    if input_scale > 0:  # Check if input is quantized
        quantized_input = (test_sample / input_scale + input_zero_point).astype(np.int8)
    else:
        quantized_input = test_sample.astype(np.int8)
    
    # Run inference
    interpreter.set_tensor(input_details[0]['index'], quantized_input)
    interpreter.invoke()
    
    # Get output
    quantized_output = interpreter.get_tensor(output_details[0]['index'])
    
    # Dequantize output if needed
    if output_scale > 0:
        output = (quantized_output.astype(np.float32) - output_zero_point) * output_scale
    else:
        output = quantized_output.astype(np.float32)
    
    print(f"\nTest inference successful!")
    print(f"Input shape: {quantized_input.shape}")
    print(f"Output shape: {quantized_output.shape}")
    print(f"Predicted class: {np.argmax(output)}")
else:
    print("No test data available")

Input details:
  Shape: [ 1 51 13]
  Type: <class 'numpy.int8'>
  Quantization: (1.461472988128662, 58)

Output details:
  Shape: [1 4]
  Type: <class 'numpy.int8'>
  Quantization: (0.00390625, -128)

Test inference successful!
Input shape: (1, 51, 13)
Output shape: (1, 4)
Predicted class: 0
