The code uses Python with Keras (TensorFlow backend) for a 1D CNN model, along with scikit-learn for label encoding and data splitting, plus librosa for audio feature extraction.

In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten, Conv1D, MaxPooling1D

def load_audio_data(data_path):
   
    features = []
    labels = []
    
    # Target sampling rate
    target_sr = 22050
    
    # For all subdirectories
    for root, dirs, files in os.walk(data_path):
        for file in files:
            if file.endswith(('.wav', '.mp3')):
                file_path = os.path.join(root, file)
                # Get the class label from the parent directory name
                label = os.path.basename(root)
                
                try:
                    # Load and resample the audio file
                    audio, sample_rate = librosa.load(file_path, sr=target_sr)
                    
                    # Extract MFCC features
                    mfccs = librosa.feature.mfcc(y=audio, sr=target_sr, n_mfcc=40)
                    mfccs_scaled = np.mean(mfccs.T, axis=0)
                    
                    # Append features and labels
                    features.append(mfccs_scaled)
                    labels.append(label)
                except Exception as e:
                    print(f"Error processing {file_path}: {str(e)}")
    
    return np.array(features), np.array(labels)

def build_model(input_shape, num_classes):
    
    # Building the 1D CNN model
    
    model = Sequential()
    
    # First convolutional layer
    model.add(Conv1D(64, 3, padding='same', activation='relu', input_shape=input_shape))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.25))
    
    # Second convolutional layer
    model.add(Conv1D(128, 3, padding='same', activation='relu'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.25))
    
    # Dense layers
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    
    return model

def make_prediction(model, le, file_path):
    
    # Make prediction on a single audio file
    
    audio, sample_rate = librosa.load(file_path, sr=22050)
    mfccs = librosa.feature.mfcc(y=audio, sr=22050, n_mfcc=40)
    mfccs_scaled = np.mean(mfccs.T, axis=0)
    features = mfccs_scaled.reshape(1, mfccs_scaled.shape[0], 1)
    predicted_vector = model.predict(features)
    predicted_class_index = np.argmax(predicted_vector, axis=-1)
    return le.inverse_transform(predicted_class_index)[0]

def main():
    
    data_path = "sounds"
    
    print("Loading and preprocessing audio files...")
    features, labels = load_audio_data(data_path)
    
    # Encode labels
    le = LabelEncoder()
    labels_encoded = le.fit_transform(labels)
    labels_onehot = to_categorical(labels_encoded)
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        features, labels_onehot, test_size=0.2, random_state=42, stratify=labels_onehot
    )
    
    # Reshape the data for CNN
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
    
    # Build and compile the model
    input_shape = (X_train.shape[1], 1)
    model = build_model(input_shape, len(le.classes_))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    # Train the model
    print("Training the model...")
    batch_size = 32
    epochs = 50
    model.fit(X_train, y_train, 
              batch_size=batch_size, 
              epochs=epochs,
              validation_data=(X_test, y_test),
              verbose=1)
    
    # Evaluate the model
    print("Evaluating the model...")
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test accuracy: {test_accuracy*100:.2f}%")
    
    # Save the model
    model.save('model/doorbell_classifier.h5')
    print("Model saved as 'doorbell_classifier.h5'")
    
    # Convert the model to TensorFlow Lite
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    
    # Save the TensorFlow Lite model
    with open('model/doorbell_classifier.tflite', 'wb') as f:
        f.write(tflite_model)
    
    print("Model converted and saved as 'doorbell_classifier.tflite'")
    
    return model, le

if __name__ == "__main__":
    model, le = main()

Loading and preprocessing audio files...
Training the model...
Epoch 1/50


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 785ms/step - accuracy: 0.5294 - loss: 5.6831 - val_accuracy: 0.6000 - val_loss: 11.1201
Epoch 2/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 47ms/step - accuracy: 0.6471 - loss: 10.6192 - val_accuracy: 0.6000 - val_loss: 8.4045
Epoch 3/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 47ms/step - accuracy: 0.6471 - loss: 9.5476 - val_accuracy: 0.6000 - val_loss: 1.3863
Epoch 4/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step - accuracy: 0.5882 - loss: 5.9159 - val_accuracy: 0.4000 - val_loss: 2.3838
Epoch 5/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step - accuracy: 0.4706 - loss: 7.1869 - val_accuracy: 0.4000 - val_loss: 1.4891
Epoch 6/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 47ms/step - accuracy: 0.5882 - loss: 6.0578 - val_accuracy: 1.0000 - val_loss: 0.0243
Epoch 7/50
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[

INFO:tensorflow:Assets written to: /var/folders/dy/k058gq41483gf6fpw0l7n7gc0000gn/T/tmpndfkuo86/assets


Saved artifact at '/var/folders/dy/k058gq41483gf6fpw0l7n7gc0000gn/T/tmpndfkuo86'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 40, 1), dtype=tf.float32, name='keras_tensor_55')
Output Type:
  TensorSpec(shape=(None, 2), dtype=tf.float32, name=None)
Captures:
  13109578640: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13109579984: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13109583632: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13109578448: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13109580368: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13109578256: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13109576528: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13109577680: TensorSpec(shape=(), dtype=tf.resource, name=None)


W0000 00:00:1739309527.405110 2478864 tf_tfl_flatbuffer_helpers.cc:365] Ignored output_format.


Model converted and saved as 'doorbell_classifier.tflite'


W0000 00:00:1739309527.405286 2478864 tf_tfl_flatbuffer_helpers.cc:368] Ignored drop_control_dependency.
I0000 00:00:1739309527.410309 2478864 mlir_graph_optimization_pass.cc:401] MLIR V1 optimization pass is not enabled


In [4]:
import numpy as np
import sounddevice as sd
import librosa
import queue
import time
from keras.models import load_model
from sklearn.preprocessing import LabelEncoder
import pygame.mixer
import firebase_admin
from firebase_admin import credentials, messaging

def initialize_firebase():
    
    if not firebase_admin._apps:
        cred = credentials.Certificate('config/doorbell-notification-14f52-ddf5adc0791f.json')
        return firebase_admin.initialize_app(cred)
    return firebase_admin.get_app()


class DoorbellDetector:
    def __init__(self, model_path='model/doorbell_classifier.h5', threshold=0.7):
        
        # Load the trained model
        self.model = load_model(model_path)
        self.threshold = threshold
        
        # Initialize label encoder
        self.le = LabelEncoder()
        self.le.fit(['background', 'doorbell'])
        
        # Initialize Firebase
        self.firebase_app = initialize_firebase()
        
        # Audio parameters
        self.sample_rate = 22050
        self.chunk_duration = 0.5  # seconds
        self.chunk_samples = int(self.sample_rate * self.chunk_duration)
        self.channels = 1
        
        # Buffer for collecting audio chunks
        self.audio_buffer = np.array([])
        self.buffer_duration = 2.0
        self.buffer_samples = int(self.sample_rate * self.buffer_duration)
        
        # Initialize audio queue
        self.audio_queue = queue.Queue()
        
        # Initialize pygame for alert sound
        pygame.mixer.init()
        
        # Instance variable to track the last notification time
        self.last_notification_time = 0
        self.notification_cooldown = 20  # Cooldown period to not send multiple notifications in (s)
        
    def process_audio(self, audio_data):
        
        try:
            
            if len(audio_data.shape) > 1:
                audio_data = np.mean(audio_data, axis=1)
            
            # Ensure minimum length for FFT
            if len(audio_data) < 2048:
                audio_data = np.pad(audio_data, (0, 2048 - len(audio_data)))
            
            # Extract MFCC features
            mfccs = librosa.feature.mfcc(
                y=audio_data, 
                sr=self.sample_rate, 
                n_mfcc=40,
                n_fft=1024,  
                hop_length=512
            )
            mfccs_scaled = np.mean(mfccs.T, axis=0)
            return mfccs_scaled.reshape(1, mfccs_scaled.shape[0], 1)
        except Exception as e:
            print(f"Error processing audio: {str(e)}")
            return None

    def audio_callback(self, indata, frames, time, status):
        
        if status:
            print(f"Status: {status}")
        self.audio_queue.put(indata.copy())

    def predict_audio(self, audio_data):
        
        features = self.process_audio(audio_data)
        if features is not None:
            pred_probs = self.model.predict(features, verbose=0)[0]
            pred_class = self.le.inverse_transform([np.argmax(pred_probs)])[0]
            confidence = float(pred_probs[self.le.transform(['doorbell'])[0]])
            return pred_class, confidence
        return None, 0.0

    def alert(self):
        
        current_time = time.time()
        
        # Check if enough time has passed since the last notification
        if current_time - self.last_notification_time < self.notification_cooldown:
            print("Notification cooldown active, skipping alert...")
            return
            
        print("\nDOORBELL DETECTED!")
        
        # Create message
        message = messaging.Message(
            notification=messaging.Notification(
                title='Doorbell Alert',
                body='Someone is at your door!'
            ),
            topic='doorbell_alerts'
        )
        
        # Send message
        try:
            response = messaging.send(message)
            print(f"Successfully sent notification: {response}")
            # Update the last notification time
            self.last_notification_time = current_time
        except Exception as e:
            print(f"Error sending notification: {e}")
        
        # Beep sound
        # sd.play(np.sin(2 * np.pi * 440 * np.linspace(0, 0.1, 4410)), 44100)
        # time.sleep(0.1)

    def start_listening(self):
        
        try:
            print("Starting doorbell detection...")
            
            # Start audio stream
            with sd.InputStream(
                callback=self.audio_callback,
                channels=self.channels,
                samplerate=self.sample_rate,
                blocksize=self.chunk_samples
            ):
                while True:
                    try:
                        # Get audio data from queue
                        audio_chunk = self.audio_queue.get(timeout=1.0)
                        
                        # Add to buffer
                        self.audio_buffer = np.append(self.audio_buffer, audio_chunk.flatten())
                        
                        # Keep buffer at desired length
                        if len(self.audio_buffer) > self.buffer_samples:
                            # Process when buffer is full
                            pred_class, confidence = self.predict_audio(self.audio_buffer)
                            
                            if pred_class == 'doorbell' and confidence > self.threshold:
                                self.alert()
                            
                            # Reset buffer with overlap
                            overlap_samples = int(self.sample_rate * 0.1)  # 0.1 second overlap
                            self.audio_buffer = self.audio_buffer[-overlap_samples:]
                            
                    except queue.Empty:
                        continue
                    except Exception as e:
                        print(f"Error in processing: {str(e)}")
                        continue
                    
        except Exception as e:
            print(f"Error in audio stream: {str(e)}")

def main():
    # Create detector instance
    detector = DoorbellDetector(threshold=0.7)
    
    # Start detection
    detector.start_listening()

if __name__ == "__main__":
    main()



Starting doorbell detection...


KeyboardInterrupt: 