# 1. Import and Install Dependencies

## 1.1 Install Dependencies

In [5]:
%pip install --upgrade pip
%pip install torch torchvision torchaudio matplotlib numpy==2.2.1 sounddevice scikit-learn

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## 1.2 Load Dependencies

In [1]:
import os, random
from matplotlib import pyplot as plt
import torch 
import torchaudio
from torchaudio import transforms
import numpy as np
from itertools import groupby
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import classification_report

# 2. Build Data Loading Function

In [2]:
class AudioUtil():
    @staticmethod
    def open(filename):
        signal, sr = torchaudio.load(filename)
        return(signal, sr)
    
    @staticmethod
    def rechannel(audio):
        signal, sr = audio
        if (signal.shape[0] == 2):
            audio = (signal[:1], sr)
        return audio
        
    @staticmethod
    def resample(audio, new_sr):
        sample, sr = audio
        if (sr == new_sr):
            return audio
        resample = transforms.Resample(sr, new_sr)(sample[:1])
        return (resample, new_sr)
    
    @staticmethod
    def pad_trunc(aud, max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if (sig_len > max_len):
            # Truncate the signal to the given length
            sig = sig[:,:max_len]

        elif (sig_len < max_len):
            # Length of padding to add at the beginning and end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            # Pad with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)
        
        return (sig, sr)
    
    @staticmethod
    def MFCC(audio, n_mfcc=32, n_mels=64):
        signal, sr = audio

        return transforms.MFCC(sr, n_mfcc=n_mfcc, melkwargs={"n_mels": n_mels})(signal)
        
    @staticmethod
    def preprocess(audio):
        rechannel = AudioUtil.rechannel(audio)
        resample = AudioUtil.resample(rechannel, 16000)
        pad = AudioUtil.pad_trunc(resample, 3500)
        mel = AudioUtil.MFCC(pad)
        return torch.flatten(mel)
    

In [3]:
def load_data(root): 
    # Find all class folders and map them to integer labels
    classes = sorted(entry.name for entry in os.scandir(root) if entry.is_dir())
    class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
    
    samples = []
    # Find all audio files and their corresponding class index
    for class_name in classes:
        class_idx = class_to_idx[class_name]
        class_dir = os.path.join(root, class_name)
        for filename in os.listdir(class_dir):
            if filename.lower().endswith(('.wav', '.mp3', '.flac')):
                path = os.path.join(class_dir, filename)
                audio = AudioUtil.open(path)
                data = AudioUtil.preprocess(audio)
                samples.append((data, class_idx))

    data, class_idx = zip(*samples)
    return np.array(data), np.array(class_idx)

# 6. Create Training and Testing Partitions

In [4]:
feat, label = load_data("data/")
feat_train, feat_test, label_train, label_test = train_test_split(
    feat, label, test_size=0.2, random_state=42, stratify=label
)

scaler = StandardScaler()
feat_train_scaled = scaler.fit_transform(feat_train)
feat_test_scaled = scaler.transform(feat_test)

# train_dl = torch.utils.data.DataLoader(train_ds, batch_size=CONFIG['train_batch_size'], shuffle=True)
# val_dl = torch.utils.data.DataLoader(val_ds, batch_size=CONFIG['valid_batch_size'], shuffle=False)

# 7. Build Deep Learning Model

## 7.1 Load Tensorflow Dependencies

## 7.2 Build Sequential Model, Compile and View Summary

In [5]:
# Create the model and train
myModel = SVC(kernel='rbf', C=10, gamma='scale', probability=True)
myModel.fit(feat_train_scaled, label_train)


0,1,2
,C,10
,kernel,'rbf'
,degree,3
,gamma,'scale'
,coef0,0.0
,shrinking,True
,probability,True
,tol,0.001
,cache_size,200
,class_weight,


In [6]:
predictions = myModel.predict(feat_test_scaled)
print(classification_report(label_test, predictions))

              precision    recall  f1-score   support

           0       0.99      1.00      1.00       119
           1       1.00      0.98      0.99        43

    accuracy                           0.99       162
   macro avg       1.00      0.99      0.99       162
weighted avg       0.99      0.99      0.99       162



In [7]:
def load_mp3_16k_mono(filename):
    """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
    audio = AudioUtil.open(filename)
    rechannel = AudioUtil.rechannel(audio)
    resample = AudioUtil.resample(rechannel, 16000)
    return resample

In [8]:
def window(filename, clip_length, stride):
    """
    A dataset for creating shorter clips from a long audio file.

    Args:
        filename: name of audio file
        clip_length (int): The length of each audio clip in samples.
        stride (int): The number of samples to move between clips.
    """
    waveform, sr = load_mp3_16k_mono(filename)

    # Calculate the total number of clips that can be created
    num_clips = (waveform.shape[1] - clip_length) // stride + 1

    """
    Returns the clip at the given index.
    """
    inference_data = []
    for idx in range(num_clips):
        start_idx = idx * stride
        end_idx = start_idx + clip_length
        
        # Slice the waveform to get the clip
        clip = waveform[:, start_idx:end_idx]

        # preprocess
        data = AudioUtil.MFCC((clip, sr))
        flattened_data = torch.flatten(data)
        inference_data.append(flattened_data)

    return scaler.transform(inference_data)
        
        


# 10. Make Predictions

## 10.1 Loop over all recordings and make predictions

In [9]:
results = {}
for file in os.listdir('Forest Recordings'):
    FILEPATH = os.path.join('Forest Recordings', file)
    
    print("searching " + file)
    data = window(FILEPATH, 56000, 56000)
    predictions = myModel.predict(data)
            
    results[file] = predictions
    
    


searching recording_95.mp3
searching recording_81.mp3
searching recording_56.mp3
searching recording_42.mp3
searching recording_43.mp3
searching recording_57.mp3
searching recording_80.mp3
searching recording_94.mp3
searching recording_82.mp3
searching recording_96.mp3
searching recording_69.mp3
searching recording_41.mp3
searching recording_55.mp3
searching recording_54.mp3
searching recording_40.mp3
searching recording_68.mp3
searching recording_97.mp3
searching recording_83.mp3
searching recording_87.mp3
searching recording_93.mp3
searching recording_44.mp3
searching recording_50.mp3
searching recording_78.mp3
searching recording_79.mp3
searching recording_51.mp3
searching recording_45.mp3
searching recording_92.mp3
searching recording_86.mp3
searching recording_90.mp3
searching recording_84.mp3
searching recording_53.mp3
searching recording_47.mp3
searching recording_46.mp3
searching recording_52.mp3
searching recording_85.mp3
searching recording_91.mp3
searching recording_09.mp3
s

In [10]:
print(results)

{'recording_95.mp3': array([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 1, 0]), 'recording_81.mp3': array([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
       1, 0, 0, 0, 0, 0, 0]), 'recording_56.mp3': array([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
       0, 0, 0, 0, 0, 0, 0]), 'recording_42.mp3': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0]), 'recording_43.mp3': array([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,
       0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0,
       0, 0, 0, 0, 0, 0, 0]), 'recording_57.mp3':

## 10.2 Convert Predictions into Classes

In [11]:
class_preds = {}
for file, logits in results.items():
    class_preds[file] = [1 if pred > 0.90 else 0 for pred in logits]
class_preds

{'recording_95.mp3': [0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0],
 'recording_81.mp3': [0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0],
 'recording_56.mp3': [0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  1,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0],
 'recording_42.mp3': [0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  

## 10.3 Group Consecutive Detections

In [12]:
postprocessed = {}
for file, scores in class_preds.items():
    postprocessed[file] = sum([key for key, group in groupby(scores)])
postprocessed

{'recording_95.mp3': 5,
 'recording_81.mp3': 5,
 'recording_56.mp3': 4,
 'recording_42.mp3': 0,
 'recording_43.mp3': 5,
 'recording_57.mp3': 3,
 'recording_80.mp3': 1,
 'recording_94.mp3': 3,
 'recording_82.mp3': 0,
 'recording_96.mp3': 1,
 'recording_69.mp3': 1,
 'recording_41.mp3': 0,
 'recording_55.mp3': 0,
 'recording_54.mp3': 2,
 'recording_40.mp3': 1,
 'recording_68.mp3': 1,
 'recording_97.mp3': 4,
 'recording_83.mp3': 0,
 'recording_87.mp3': 24,
 'recording_93.mp3': 5,
 'recording_44.mp3': 1,
 'recording_50.mp3': 0,
 'recording_78.mp3': 2,
 'recording_79.mp3': 0,
 'recording_51.mp3': 3,
 'recording_45.mp3': 3,
 'recording_92.mp3': 0,
 'recording_86.mp3': 4,
 'recording_90.mp3': 0,
 'recording_84.mp3': 2,
 'recording_53.mp3': 0,
 'recording_47.mp3': 4,
 'recording_46.mp3': 4,
 'recording_52.mp3': 0,
 'recording_85.mp3': 0,
 'recording_91.mp3': 0,
 'recording_09.mp3': 0,
 'recording_35.mp3': 0,
 'recording_21.mp3': 1,
 'recording_20.mp3': 0,
 'recording_34.mp3': 4,
 'recording_08.

In [None]:
import sounddevice as sd
from collections import deque
import time
import threading

# --- Configuration ---
SAMPLE_RATE = 16000
CLIP_SECONDS = 3.5  # Duration of clips to analyze
STRIDE_SECONDS = 1.0 # How often to run inference
DEVICE_ID = None     # Use None for default device, or specify the device ID

# Convert durations to sample counts
CLIP_LENGTH_SAMPLES = int(CLIP_SECONDS * SAMPLE_RATE)

# --- 1. Find your Microphone's Device ID ---
# Run this part once to see your available devices
print("Available audio devices:")
print(sd.query_devices())
# Look for your microphone in the list and note its ID number.
# You will set DEVICE_ID to that number.
# For now, we can proceed with the default (None).


# --- 3. Setup the Audio Buffer and Stream ---
# A deque is a thread-safe, fixed-size queue.
# It will automatically discard old audio as new audio comes in.
audio_buffer = deque(maxlen=CLIP_LENGTH_SAMPLES)
buffer_lock = threading.Lock()
shutdown_event = threading.Event()

def audio_callback(indata, frames, time, status):
    """This function is called by sounddevice for each new audio chunk."""
    if status:
        print(status)
    # The input data is a NumPy array. We'll take the first channel.
    with buffer_lock:
        audio_buffer.extend(indata[:, 0])

# --- 4. Main Inference Loop ---
try:
    print("\nStarting live inference... Press Ctrl+C to stop.")
    # Create and start the microphone stream
    stream = sd.InputStream(
        device=DEVICE_ID,
        channels=1,
        samplerate=SAMPLE_RATE,
        callback=audio_callback
    )
    with stream:
        while not shutdown_event.is_set():
            # Wait until the buffer has enough data for a full clip
            clip = None

            # fetch the data using the buffer lock
            with buffer_lock:
                if len(audio_buffer) == CLIP_LENGTH_SAMPLES:
                    clip = torch.tensor(list(audio_buffer))

            # process the data separately so the stream doesn't wait
            if clip is not None:
                # Process the clip
                data = AudioUtil.MFCC((clip, SAMPLE_RATE))
                flat_data = torch.flatten(data)
                scaled_data = scaler.transform(flat_data.unsqueeze(0))
                
                output = myModel.predict(scaled_data)
                
                # Print the prediction
                print(output)
                if output == 1:
                    print(f"Prediction: CLASS 1 DETECTED")
                else:
                    print(f"Prediction: Class 0")
                
                # Wait for the stride duration before processing the next clip
                time.sleep(STRIDE_SECONDS)
            else:
                # Wait for more audio data to fill the buffer
                print(f"Buffering... {len(audio_buffer)}/{CLIP_LENGTH_SAMPLES}", end='\r')
                time.sleep(0.1)

except KeyboardInterrupt:
    shutdown_event.set()
    print("\nStopping inference.")    

Available audio devices:
> 0 MacBook Air Microphone, Core Audio (1 in, 0 out)
< 1 MacBook Air Speakers, Core Audio (0 in, 2 out)

Starting live inference... Press Ctrl+C to stop.
[0]fering... 55552/56000
Prediction: Class 0
[0]
Prediction: Class 0
[0]
Prediction: Class 0
[0]
Prediction: Class 0
[0]
Prediction: Class 0
