# Notebook A: Audio Preprocessing Pipeline
## HeAR - Standardizing Audio for Respiratory Disease Detection

Converts audio to **16kHz mono WAV (2-second clips)** for HeAR model.

**Datasets:** Coughvid, Parkinson's Voice, Respiratory Sounds, Coswara

In [1]:
import os
from pathlib import Path
import numpy as np
import librosa
import soundfile as sf
from tqdm.notebook import tqdm
import json
import warnings
warnings.filterwarnings('ignore')

# Configuration
DATASETS_ROOT = Path(r"D:\datasets")
TARGET_SR = 16000
CLIP_DURATION = 2.0
N_SAMPLES = int(TARGET_SR * CLIP_DURATION)

DATASET_PATHS = {
    'coughvid': DATASETS_ROOT / 'coughvid',
    'parkinsons': DATASETS_ROOT / 'parkinsons',
    'respiratory_sounds': DATASETS_ROOT / 'respiratory_sounds',
    'coswara': DATASETS_ROOT / 'coswara'
}

PROCESSED_ROOT = DATASETS_ROOT / 'processed'
for name in DATASET_PATHS:
    (PROCESSED_ROOT / name).mkdir(parents=True, exist_ok=True)

print(f"Target: {TARGET_SR}Hz, {CLIP_DURATION}s clips, {N_SAMPLES} samples")

Target: 16000Hz, 2.0s clips, 32000 samples


In [2]:
def load_audio(file_path, target_sr=TARGET_SR):
    try:
        audio, _ = librosa.load(str(file_path), sr=target_sr, mono=True)
        return audio
    except:
        return None

def segment_audio(audio, segment_length=N_SAMPLES):
    if len(audio) < segment_length // 2:
        return []
    if len(audio) < segment_length:
        padded = np.zeros(segment_length)
        padded[:len(audio)] = audio
        return [padded]
    return [audio[i:i+segment_length] for i in range(0, len(audio)-segment_length+1, segment_length)]

def normalize_audio(audio, target_db=-20.0):
    rms = np.sqrt(np.mean(audio**2))
    if rms == 0:
        return audio
    return np.clip(audio * (10**(target_db/20) / rms), -1.0, 1.0)

def get_audio_files(directory):
    exts = {'.wav', '.mp3', '.ogg', '.flac', '.webm', '.m4a'}
    return [f for f in directory.rglob('*') if f.suffix.lower() in exts]

print("✓ Audio utilities loaded")

✓ Audio utilities loaded


In [3]:
def process_dataset(name):
    input_dir = DATASET_PATHS[name]
    output_dir = PROCESSED_ROOT / name
    
    if not input_dir.exists():
        print(f"⚠ {name}: Not found")
        return 0
    
    audio_files = get_audio_files(input_dir)
    if not audio_files:
        print(f"⚠ {name}: No audio files")
        return 0
    
    total_clips = 0
    for f in tqdm(audio_files, desc=f"Processing {name}"):
        audio = load_audio(f)
        if audio is None:
            continue
        segments = segment_audio(normalize_audio(audio))
        for i, seg in enumerate(segments):
            out_path = output_dir / f"{name}_{f.stem}_{i:03d}.wav"
            sf.write(str(out_path), seg, TARGET_SR, subtype='PCM_16')
            total_clips += 1
    
    print(f"✓ {name}: {total_clips} clips")
    return total_clips

In [4]:
# Process all datasets
results = {name: process_dataset(name) for name in DATASET_PATHS}

# Save summary
summary = {'sample_rate': TARGET_SR, 'clip_duration': CLIP_DURATION, 'datasets': results}
with open(PROCESSED_ROOT / 'summary.json', 'w') as f:
    json.dump(summary, f, indent=2)

print(f"\nTotal clips: {sum(results.values())}")
print(f"Output: {PROCESSED_ROOT}")
print("\nProceed to: B_feature_extraction.ipynb")

Processing coughvid:   0%|          | 0/34434 [00:00<?, ?it/s]

✓ coughvid: 11314 clips
⚠ parkinsons: No audio files


Processing respiratory_sounds:   0%|          | 0/920 [00:00<?, ?it/s]

✓ respiratory_sounds: 9836 clips
⚠ coswara: No audio files

Total clips: 21150
Output: D:\datasets\processed

Proceed to: B_feature_extraction.ipynb
