!pip install gdown
!pip install librosa
!pip install ffmpeg
!pip install pydub
!pip install soundfile

from google.colab import drive
drive.mount('/content/drive')

In [2]:
import librosa
from scipy.signal import butter, filtfilt
import numpy as np
import matplotlib.pyplot as plt
import os
import soundfile as sf

In [None]:
#Download the audio dataset

file_path = '/content/drive/MyDrive/test1'

!gdown --id

usage: gdown [-h] [-V] [-O OUTPUT] [-q] [--fuzzy] [--id] [--proxy PROXY] [--speed SPEED]
             [--no-cookies] [--no-check-certificate] [--continue] [--folder] [--remaining-ok]
             [--format FORMAT] [--user-agent USER_AGENT]
             url_or_id
gdown: error: the following arguments are required: url_or_id


In [3]:
from pydub import AudioSegment
import os

def convert_audio(file_path):
    # Example: If file is not in WAV format, convert it (e.g., MP3 to WAV)
    if file_path.lower().endswith(".mp3"):
        new_file_path = file_path.replace(".mp3", ".wav")
        # Perform conversion (using any library like pydub or ffmpeg for conversion)
        # Placeholder for actual conversion logic
        print(f"Converted and overwrote {file_path} to WAV format")
        return new_file_path
    return file_path

In [4]:
#Changing the file type to .wav
# !pip install pydub

from os import path
from pydub import AudioSegment


# def convert_to_wav(file_path):
#     file_name, file_extension = os.path.splitext(file_path)

#     #If is wav already then skip
#     if file_extension.lower() == ".wav":
#         return file_path

#     sound = AudioSegment.from_file(file_path)
#     sound.export(file_path, format="wav")
#     return file_path

In [5]:
#Changing the audio sampling rate
def resample_audio(audio_path, target_sr):
    y, sr = librosa.load(audio_path, sr=None)
    y_resampled = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
    sf.write(audio_path, y_resampled, target_sr)
    return audio_path


In [6]:

#Convert channels to mono if needed
def convert_to_mono(audio_path):
    y, sr = librosa.load(audio_path, sr=None)
    y_mono = librosa.to_mono(y)
    sf.write(audio_path, y_mono, sr)
    return audio_path

In [7]:

#Normalizing the volume

def normalize_audio(audio_file):
    audio = AudioSegment.from_file(audio_file)

    normalized_audio = audio.apply_gain(-audio.max_dBFS)  # Normalize to 0 dBFS

    #Export the normalized audio to a temporary file
    temp_file = audio_file.rsplit('.', 1)[0] + '_normalized.wav'
    normalized_audio.export(temp_file, format="wav")
    return temp_file # Return the path to the temporary file




In [8]:
def trim_silence(audio_file, silence_thresh=-40, silence_len=1000):

    audio = AudioSegment.from_file(audio_file)
    trimmed_audio = audio.strip_silence(silence_thresh=silence_thresh, silence_len=silence_len)
    #Export the trimmed audio back to the original file
    trimmed_audio.export(audio_file, format="wav")
    return audio_file




In [9]:
#Standardize the duration

def trim_duration(audio, max_duration_ms=5000): #5 sec
    # Check if the audio duration exceeds the max duration
    # Load audio using AudioSegment
    audio_segment = AudioSegment.from_file(audio)
    if len(audio_segment) > max_duration_ms:
        audio_segment = audio_segment[:max_duration_ms]  # Trim the audio to the desired duration
    # Export the trimmed audio back to the original file
    audio_segment.export(audio, format="wav")
    return audio


In [10]:
#Filter noise if needed
#Filter noise if needed

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    y = filtfilt(b, a, data)
    return y

In [14]:
def standardize_audio(audio_path, target_sr=None, lowcut=None, highcut=None, order=5, output_dir=output_dir):
    """Standardizes the audio by applying various transformations and saving the processed files to an output directory."""

    if not os.path.isdir(audio_path):
        print("Invalid input path")
        return

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        print(f"Output will be saved to: {output_dir}")

    for filename in os.listdir(audio_path):
        file_path = os.path.join(audio_path, filename)

        if os.path.isfile(file_path):
            print(f"Processing file: {filename}")

            original_file_path = file_path  # Keep track of the original file path
            #Removing the change of the file_path by convert_audio
            convert_audio(file_path)
            # Resample audio if target_sr is provided
            if target_sr:
                resample_audio(file_path, target_sr)
            convert_to_mono(file_path)
            normalize_audio(file_path)
            trim_silence(file_path)
            trim_duration(file_path)


            # Apply bandpass filter if lowcut and highcut are provided
            if lowcut and highcut:
                try:
                    y, sr = librosa.load(file_path, sr=None)
                    filtered_y = butter_bandpass_filter(y, lowcut, highcut, sr, order)

                    output_file = os.path.join(output_dir, filename)
                    sf.write(output_file, filtered_y, sr)
                    print(f"Bandpass filter applied and saved to {output_file}.")
                except Exception as e:
                    print(f"Error applying bandpass filter to {filename}: {e}")
                    continue
            else:
                # If no bandpass filter was applied, save the final processed version (if modified)
                # if new_file_path != original_file_path:  # Only save if modified #This check is no longer needed
                output_file = os.path.join(output_dir, filename)
                shutil.copy(file_path, output_file) #Copying the file to the output_dir as it is if no filters were applied
                print(f"Processed and saved to: {output_file}")

In [15]:
# Example call to the function
audio_path = 'Unstandardized_full_data/Training'
output_dir = './Standardized_full_data/Training'
target_sr = 16000  # Sample rate
lowcut = 300.0
highcut = 3400.0

standardize_audio(audio_path, target_sr, lowcut, highcut)

Processing file: north_america_v_output_276.mp3
Converted and overwrote Unstandardized_full_data/Training/north_america_v_output_276.mp3 to WAV format
Bandpass filter applied and saved to Standardized_full_data/Training/north_america_v_output_276.mp3.
Processing file: africa_v_output_624.mp3
Converted and overwrote Unstandardized_full_data/Training/africa_v_output_624.mp3 to WAV format
Bandpass filter applied and saved to Standardized_full_data/Training/africa_v_output_624.mp3.
Processing file: v_output_4677.mp3
Converted and overwrote Unstandardized_full_data/Training/v_output_4677.mp3 to WAV format
Bandpass filter applied and saved to Standardized_full_data/Training/v_output_4677.mp3.
Processing file: v_output_2206.mp3
Converted and overwrote Unstandardized_full_data/Training/v_output_2206.mp3 to WAV format
Bandpass filter applied and saved to Standardized_full_data/Training/v_output_2206.mp3.
Processing file: 8c1660ce1c79376ec9e426ff7f9e4b91.wav
Bandpass filter applied and saved to 

KeyboardInterrupt: 

In [18]:
import os
import shutil
import librosa
import soundfile as sf
from multiprocessing import Pool, cpu_count
from pathlib import Path
from functools import partial

# Define your utility functions here (convert_audio, resample_audio, normalize_audio, etc.)

def process_single_file(filename, input_dir, output_dir, target_sr, lowcut, highcut, order):
    input_path = os.path.join(input_dir, filename)
    output_path = os.path.join(output_dir, filename)

    if not os.path.isfile(input_path):
        return

    # ✅ Skip if file already processed
    if os.path.exists(output_path):
        print(f"Skipping (already processed): {filename}")
        return

    try:
        print(f"Processing: {filename}")
        convert_audio(input_path)

        if target_sr:
            resample_audio(input_path, target_sr)

        convert_to_mono(input_path)
        normalize_audio(input_path)
        trim_silence(input_path)
        trim_duration(input_path)

        if lowcut and highcut:
            y, sr = librosa.load(input_path, sr=None)
            filtered_y = butter_bandpass_filter(y, lowcut, highcut, sr, order)
            sf.write(output_path, filtered_y, sr)
        else:
            shutil.copy(input_path, output_path)

        print(f"✅ Saved: {output_path}")
    except Exception as e:
        print(f"❌ Error processing {filename}: {e}")

def standardize_audio_parallel(audio_path, target_sr=None, lowcut=None, highcut=None, order=5, output_dir="standardized_audio"):
    if not os.path.isdir(audio_path):
        print("Invalid input path")
        return

    os.makedirs(output_dir, exist_ok=True)

    files = os.listdir(audio_path)
    num_workers = min(cpu_count() - 2, 6)
    print(num_workers)

    with Pool(num_workers) as pool:
        pool.map(partial(
            process_single_file,
            input_dir=audio_path,
            output_dir=output_dir,
            target_sr=target_sr,
            lowcut=lowcut,
            highcut=highcut,
            order=order
        ), files)

    print("🎉 All done!")

In [19]:
standardize_audio_parallel(
    audio_path='Unstandardized_full_data/Training',
    target_sr=16000,
    lowcut=300.0,
    highcut=3400.0,
    output_dir='./Standardized_full_data/Training'
)

6


Process SpawnPoolWorker-2:
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-3:
Process SpawnPoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/alecnaidoo/miniforge3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/alecnaidoo/miniforge3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/alecnaidoo/miniforge3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/alecnaidoo/miniforge3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/alecnaidoo/miniforge3/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/alecnaidoo/miniforge3/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_singl

KeyboardInterrupt: 