<a href="https://colab.research.google.com/github/noahdanieldsouza/PAM-classification/blob/main/audio_cut.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
#@title Install Libraries { vertical-output: true }
!pip install pydub librosa numpy scipy




In [21]:
#@title Imports { vertical-output: true }
import os
import xml.etree.ElementTree as ET
from pydub import AudioSegment
import numpy as np
import librosa
import soundfile as sf
from datetime import datetime, timedelta

In [22]:
#@title Configuration { vertical-output: true }
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
WAV_DIR = '/content/drive/MyDrive/lake-sounds' #@param {type:'string'}
OUTPUT_DIR = '/content/drive/MyDrive/lake-sounds/output' #@param {type:'string'}
THRESHOLD_DB_MIN = -90  #@param {type:'int'} # quiet limit
THRESHOLD_DB_MAX = -60  #@param {type:'int'} # in dB, adjust based on your noise level
MIN_SILENCE_DURATION = 0.3 #@param {type:'float'} # seconds
CLIP_LENGTH = 5  #@param {type:'int'} second

os.makedirs(OUTPUT_DIR, exist_ok=True)



Mounted at /content/drive


In [23]:
#@title Functions { vertical-output: true }
#parse xml timestamps
def parse_xml_time(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()

    # Look for the right <PROC_EVENT> with <WavFileHandler SamplingStartTimeLocal=... />
    for proc_event in root.findall(".//PROC_EVENT"):
        wav_handler = proc_event.find("WavFileHandler")
        if wav_handler is not None:
            start_time_str = wav_handler.attrib.get("SamplingStartTimeLocal")
            if start_time_str:
                return datetime.fromisoformat(start_time_str)

    raise ValueError(f"SamplingStartTimeLocal not found in {xml_path}")

#convert decibels to amplitude
def db_to_amplitude_ratio(db):
    return 10 ** (db / 20)

#detect audio events
def detect_events(audio, sr, threshold_db=-30):
    # Convert to mono
    if audio.ndim > 1:
        audio = np.mean(audio, axis=1)

    frame_length = int(sr * 0.1)  # 100ms
    hop_length = int(sr * 0.05)
    rms = librosa.feature.rms(y=audio, frame_length=frame_length, hop_length=hop_length)[0]

    times = librosa.frames_to_time(np.arange(len(rms)), sr=sr, hop_length=hop_length)

    threshold = db_to_amplitude_ratio(threshold_db)
    is_loud = rms > threshold

    events = []
    start = None
    for i, loud in enumerate(is_loud):
        if loud and start is None:
            start = times[i]
        elif not loud and start is not None:
            end = times[i]
            if end - start > MIN_SILENCE_DURATION:
                events.append((start, end))
            start = None
    if start is not None:
        events.append((start, times[-1]))
    return events

#create new xml for the five second clip
def write_xml_for_clip(base_datetime, clip_start, output_path):
    clip_time = base_datetime + timedelta(seconds=clip_start)
    clip_time_str = clip_time.isoformat()  # e.g., "2024-08-31T14:02:15"

    # Root element
    root = ET.Element("ST")

    # PROC_EVENT structure
    proc_event = ET.SubElement(root, "PROC_EVENT", attrib={"ID": "4"})
    ET.SubElement(proc_event, "WavFileHandler", attrib={
        "SamplingStartTimeLocal": clip_time_str
    })

    # Save to file
    tree = ET.ElementTree(root)
    tree.write(output_path)


In [24]:
#@title Cut audio and save { vertical-output: true }
def process_file(wav_path, xml_path, chunk_duration=10):
    print(f"Processing: {os.path.basename(wav_path)}")

    base_time = parse_xml_time(xml_path)

    # Read audio file metadata
    with sf.SoundFile(wav_path) as f:
        sr = f.samplerate
        total_samples = len(f)
        total_duration = total_samples / sr

    # Parameters
    chunk_samples = int(chunk_duration * sr)
    amp_min = db_to_amplitude_ratio(THRESHOLD_DB_MIN)
    amp_max = db_to_amplitude_ratio(THRESHOLD_DB_MAX)

    event_windows = []
    buffer = None

    with sf.SoundFile(wav_path) as f:
        for chunk_start in range(0, total_samples, chunk_samples):
            f.seek(chunk_start)
            frames_to_read = min(chunk_samples, total_samples - chunk_start)
            chunk = f.read(frames_to_read)

            # Convert to mono if stereo
            if chunk.ndim > 1:
                chunk = np.mean(chunk, axis=1)

            # Calculate RMS
            rms = np.sqrt(np.mean(chunk ** 2))
            if amp_min < rms < amp_max:
                chunk_start_time = chunk_start / sr
                chunk_end_time = (chunk_start + frames_to_read) / sr

                if not event_windows or chunk_start_time > event_windows[-1][1]:
                    event_windows.append([chunk_start_time, chunk_end_time])
                else:
                    event_windows[-1][1] = chunk_end_time

    print(f"  → Detected {len(event_windows)} sound events")

    # Export clips
    base_name = os.path.splitext(os.path.basename(wav_path))[0]
    audio = AudioSegment.from_file(wav_path)

    for i, (start, end) in enumerate(event_windows):
        duration = end - start
        clip_count = int(np.ceil(duration / CLIP_LENGTH))

        for j in range(clip_count):
            clip_start = start + j * CLIP_LENGTH
            actual_start_ms = int(clip_start * 1000)
            clip = audio[actual_start_ms:actual_start_ms + CLIP_LENGTH * 1000]

            clip_suffix = f"{base_name}_event{i+1}_{j+1}of{clip_count}"
            wav_out_path = os.path.join(OUTPUT_DIR, f"{clip_suffix}.wav")
            xml_out_path = os.path.join(OUTPUT_DIR, f"{clip_suffix}.xml")

            clip.export(wav_out_path, format="wav")
            write_xml_for_clip(base_time, clip_start, xml_out_path)

for filename in os.listdir(WAV_DIR):
    if filename.endswith(".wav"):
        wav_path = os.path.join(WAV_DIR, filename)
        xml_path = os.path.join(WAV_DIR, filename.replace(".wav", ".log.xml"))
        if os.path.exists(xml_path):
            process_file(wav_path, xml_path)



Processing: 8567.240831110000.wav
  → Detected 110 sound events
