## Audio Spletting

This procedure performs silence-based segmentation of audio files to produce training-ready chunks. Each audio file is iteratively split at detected silences while enforcing minimum and maximum duration constraints to avoid loss or overlap. Segments are exported as individual .wav files, and their metadata (path, duration) is stored. The process runs in parallel threads for efficiency, generating a structured JSON output summarizing all segments for downstream processing.

In [None]:
import numpy as np
import os 
import json 
from pydub import AudioSegment
from pydub.silence import detect_nonsilent, detect_silence
import math
from tqdm import tqdm

import concurrent.futures
from tqdm import tqdm
import threading

In [None]:
folders = [i for i in os.listdir(".") if not (i.endswith("py") or i.endswith("md") or i.endswith("ipynb"))]

In [None]:

data:list[dict] = []

for base_folder in folders:

    audios = os.listdir(base_folder)
    audios = [os.path.join(base_folder,audio) for audio in audios]

    for audio in audios:
        data.append({"path": audio})

In [None]:
def split_audio_on_silence(audio_path, min_duration=15, max_duration=40, 
                          min_silence_len=500, silence_thresh=-40):
    """
    Split audio file into segments based on silence detection, ensuring no audio is lost
    and all segments are within the specified duration limits.
    """

    audio = AudioSegment.from_file(audio_path)
    print(f"Total audio duration: {len(audio)/1000:.2f} seconds")
    

    min_duration_ms = min_duration * 1000
    max_duration_ms = max_duration * 1000
    
    def find_split_point(audio_chunk, max_duration_ms):
        """Find the best point to split the audio chunk"""

        silences = detect_silence(
            audio_chunk,
            min_silence_len=min_silence_len,
            silence_thresh=silence_thresh
        )
        

        for silence_start, silence_end in silences:
            if silence_start <= max_duration_ms:
                return silence_end
        
        return max_duration_ms

    segments = []
    current_position = 0
    audio_length = len(audio)
    
    while current_position < audio_length:
        print(f"Processing position {current_position/1000:.2f}s / {audio_length/1000:.2f}s")
        

        if (audio_length - current_position) <= max_duration_ms:
            remaining_segment = audio[current_position:]
            if len(remaining_segment) >= min_duration_ms:
                segments.append(remaining_segment)
            break
        

        end_position = min(current_position + max_duration_ms + min_silence_len, audio_length)
        chunk = audio[current_position:end_position]
        

        split_point = find_split_point(chunk, max_duration_ms)
        
        segment = audio[current_position:current_position + split_point -300]
        
        if len(segment) >= min_duration_ms:
            segments.append(segment)
        

        current_position += split_point
        
        if split_point == 0:
            print("Warning: No progress made in splitting. Forcing a split.")
            current_position += max_duration_ms

    return segments


In [None]:
def export_segments(segments, output_prefix="segment"):
    """
    Export audio segments to files with duration information.
    """
    output_files = []
    total_duration = 0
    
    for i, segment in enumerate(segments):
        output_path = f"{output_prefix}_{i+1}.wav"
        duration_sec = len(segment) / 1000.0  
        segment.export(output_path, format="wav")
        output_files.append({"audio": output_path, "duration" : duration_sec})
        total_duration += duration_sec
    return output_files, total_duration

In [None]:
segmentation = []
segmentation_lock = threading.Lock()

In [None]:
def process_single_element(element):
    info = {}
    
    try:
        _path = element["path"]
        audio_name = _path.split("/")[-1]
        audio_path = _path
        
        info["audio"] = _path
        
        segments = split_audio_on_silence(
            audio_path,
            min_duration=1,
            max_duration=30,
            min_silence_len=700,
            silence_thresh=-30
        )
        
        output_files, total_duration = export_segments(segments, f"{audio_name[:-4]}_segment")
        
        info["segments"] = output_files
        info["segments_duration"] = total_duration
        

        with segmentation_lock:
            segmentation.append(info)
            
        return True
    except Exception as e:
        print(f"Error processing {_path}: {str(e)}")
        return False

In [None]:
def generate_parallel(max_workers=4):


    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:

        futures = [executor.submit(process_single_element, element) for element in data]
        

        for _ in tqdm(concurrent.futures.as_completed(futures), total=len(data)):
            pass

    successful = sum(1 for future in futures if future.result())
    print(f"Processing completed: {successful}/{len(data)} files processed successfully")
    
    return segmentation

In [None]:

results = generate_parallel(max_workers=32)
with open("data_structured.json", "w") as file_object:
    json.dump(segmentation, file_object)