In [1]:
import numpy as np
import soundfile as sf
import librosa
import glob
import os
import argparse

# Constants
TARGET_SR = 16000  # Target Sampling Rate
TARGET_RMS = -25   # Target Normalization Level
LOW_ENERGY_THRESH = -60  # dB threshold for low energy detection

def resample_audio(audio, sr, target_sr):
    """Resamples the audio to the target sampling rate if needed."""
    if sr != target_sr:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
    return audio, target_sr

def normalize_audio(audio, target_rms=-25):
    """Normalizes the audio signal to the target RMS level."""
    rms = np.sqrt(np.mean(audio**2))
    rms_db = 20 * np.log10(rms + 1e-10)
    gain = 10 ** ((target_rms - rms_db) / 20)
    return audio * gain

def detect_clipping(audio, threshold=0.99):
    """Detects if clipping occurs in an audio signal."""
    return np.any(np.abs(audio) >= threshold)

def fix_clipping(audio, threshold=0.99):
    """Fixes clipping by scaling the audio down."""
    if detect_clipping(audio, threshold):
        return audio * (threshold / np.max(np.abs(audio)))
    return audio

def remove_zero_energy(audio, num_samples=16000, low_energy_thresh=LOW_ENERGY_THRESH):
    """Ensures the start and end of the audio do not have zero energy segments."""
    start_energy = 20 * np.log10(np.mean(audio[:num_samples]**2) + 1e-10)
    end_energy = 20 * np.log10(np.mean(audio[-num_samples:]**2) + 1e-10)

    if start_energy < low_energy_thresh:
        audio[:num_samples] += np.random.uniform(-0.0001, 0.0001, num_samples)
    
    if end_energy < low_energy_thresh:
        audio[-num_samples:] += np.random.uniform(-0.0001, 0.0001, num_samples)
    
    return audio

def process_audio(file_path, output_dir):
    """Processes a single audio file by fixing sampling rate, normalization, clipping, and zero-energy issues."""
    audio, sr = sf.read(file_path)

    # Fix sampling rate
    audio, sr = resample_audio(audio, sr, TARGET_SR)

    # Normalize audio
    audio = normalize_audio(audio, TARGET_RMS)

    # Fix clipping
    audio = fix_clipping(audio)

    # Remove zero-energy segments
    audio = remove_zero_energy(audio)

    # Save processed file
    output_path = os.path.join(output_dir, os.path.basename(file_path))
    sf.write(output_path, audio, sr)
    return output_path

def batch_process(input_dir, output_dir):
    """Processes all WAV files in the input directory and saves them to the output directory."""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    files = glob.glob(os.path.join(input_dir, "*.wav"))
    print(f"🔹 Found {len(files)} test files to process.")

    for file_path in files:
        output_path = process_audio(file_path, output_dir)
        print(f"✅ Processed: {output_path}")

    print("\n🔹 Batch Processing Complete! 🔹")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--test_dir', type=str, required=True, help='Path to test set directory')
    parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory for fixed files')
    args = parser.parse_args()

    batch_process(args.test_dir, args.output_dir)


usage: ipykernel_launcher.py [-h] --test_dir TEST_DIR --output_dir OUTPUT_DIR
ipykernel_launcher.py: error: the following arguments are required: --test_dir, --output_dir


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
