In [2]:
import sys
sys.path.append('/path/to/ffmpeg') #ensures dependencies 

In [24]:
from pydub import AudioSegment
import numpy as np
import os
import random
import shutil

# Paths to clean speech and noise directories
clean_speech_dir = '/Users/anude/Downloads/ARU_Speech_Corpus_v1_0'
noise_dir = "/Users/anude/Downloads/Raw_noisedata"
# pointsource noise data from: https://www.openslr.org/28/
output_dir = "/Users/anude/Desktop/output"
# clean speech data from: https://datacat.liverpool.ac.uk/681/
# SNR levels to generate (-5 dB and 0 dB)
snr_levels = [-5, 0]

# Function to mix clean speech with noise at a specific SNR level
def mix_audio(clean_audio, noise_audio, snr):
    # Convert AudioSegment to numpy arrays
    clean_array = np.array(clean_audio.get_array_of_samples())
    noise_array = np.array(noise_audio.get_array_of_samples())
    
    # Calculate the scaling factor for noise to achieve the desired SNR
    snr_ratio = 10 ** (snr / 10)
    
    # Ensure both arrays are the same length
    min_length = min(len(clean_array), len(noise_array))
    clean_array = clean_array[:min_length]
    noise_array = noise_array[:min_length]
    
    # Scale the noise array to match the desired SNR
    scaled_noise_array = clean_array + (noise_array * (1 / snr_ratio))
    
    # Convert the scaled array back to an AudioSegment
    mixed_audio = AudioSegment(scaled_noise_array.tobytes(), frame_rate=clean_audio.frame_rate, sample_width=clean_audio.sample_width, channels=clean_audio.channels)
    
    return mixed_audio

# Create a directory to store the randomly selected clean files
selected_clean_dir = "/Users/anude/Desktop/selected_clean"
os.makedirs(selected_clean_dir, exist_ok=True)

# Get a list of clean speech files
clean_files = [file for file in os.listdir(clean_speech_dir) if file.endswith(".wav")]

# Randomly select 4620 clean speech files
random_clean_files = random.sample(clean_files, 4620)

# Copy the selected clean files to the "selected_clean" directory
for file in random_clean_files:
    source_path = os.path.join(clean_speech_dir, file)
    destination_path = os.path.join(selected_clean_dir, file)
    shutil.copy(source_path, destination_path)

# Iterate through the selected clean speech files
for file in random_clean_files:
    file_path = os.path.join(selected_clean_dir, file)
    clean_audio = AudioSegment.from_file(file_path)
    
    # Randomly select noise file from the noise directory
    noise_file = random.choice(os.listdir(noise_dir))
    noise_file_path = os.path.join(noise_dir, noise_file)
    
    # Check if the selected noise file is a valid WAV file
    if noise_file.endswith(".wav"):
        noise_audio = AudioSegment.from_file(noise_file_path, 'wav')
        
        # Generate noisy speech at multiple SNR levels
        for snr in snr_levels:
            mixed_audio = mix_audio(clean_audio, noise_audio, snr)
            
            # Define the output path for the noisy audio
            output_path = os.path.join(output_dir, f"SNR_{snr}dB", file)
            
            # Create the output directory if it doesn't exist
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
            
            # Export the mixed audio as a WAV file
            mixed_audio.export(output_path, format="wav")
        
    else:
        print(f"Skipping invalid noise file: {noise_file_path}")

print("Noise generation and mixing complete.")


Skipping invalid noise file: /Users/anude/Downloads/RIRS_NOISES/pointsource_noises/.DS_Store
Skipping invalid noise file: /Users/anude/Downloads/RIRS_NOISES/pointsource_noises/.DS_Store
Noise generation and mixing complete.


In [25]:

import os
import random
import shutil

# Define paths to clean and noisy data directories
clean_speech_dir =  '/Users/anude/Desktop/selected_clean'
noisy_speech_dir1 = "/Users/anude/Desktop/output/SNR_0dB"
noisy_speech_dir2 = "/Users/anude/Desktop/output/SNR_-5dB"
output_dir = "/Users/anude/Desktop/Processed_Data" #export to folder

# Define the split ratios
train_ratio = 0.6
validation_ratio = 0.2
test_ratio = 0.2

# Define SNR levels
snr_levels = ["SNR_-5dB", "SNR_0dB"]  # Add more levels as needed

# Create directories for train, validation, and test sets
splits = ['train', 'validation', 'test']
for split in splits:
    os.makedirs(os.path.join(output_dir, split, 'clean'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, split, 'noisy'), exist_ok=True)
    for snr_level in snr_levels:
        os.makedirs(os.path.join(output_dir, split, 'clean', snr_level), exist_ok=True)
        os.makedirs(os.path.join(output_dir, split, 'noisy', snr_level), exist_ok=True)

# Create a list of clean and noisy file pairs
file_pairs = []

# Iterate through the clean speech directory to find matching noisy files
for root, dirs, files in os.walk(clean_speech_dir):
    for file in files:
        clean_file_path = os.path.join(root, file)
        noisy_file_path1 = os.path.join(noisy_speech_dir1, file)
        noisy_file_path2 = os.path.join(noisy_speech_dir2, file)
        if os.path.exists(noisy_file_path1):
            file_pairs.append((clean_file_path, noisy_file_path1))
        if os.path.exists(noisy_file_path1):
            file_pairs.append((clean_file_path, noisy_file_path2))

# Randomly shuffle the list of file pairs
random.shuffle(file_pairs)

# Split the data according to the specified ratios
num_samples = len(file_pairs)
train_split = int(train_ratio * num_samples)
validation_split = train_split + int(validation_ratio * num_samples)

# Distribute the data into the train, validation, and test sets with SNR levels
for idx, (clean_path, noisy_path) in enumerate(file_pairs):
    if idx < train_split:
        split = 'train'
    elif idx < validation_split:
        split = 'validation'
    else:
        split = 'test'

    # Determine the SNR level based on the file path 
    if "SNR_-5dB" in noisy_path:
        snr_level = "SNR_-5dB"
    else:
        snr_level = "SNR_0dB"

    # Copy the clean and noisy files to the corresponding directories
    clean_output_path = os.path.join(output_dir, split, 'clean', snr_level, os.path.basename(clean_path))
    noisy_output_path = os.path.join(output_dir, split, 'noisy', snr_level, os.path.basename(noisy_path))

    shutil.copy(clean_path, clean_output_path)
    shutil.copy(noisy_path, noisy_output_path)

print("Data splitting and organization complete.")


Data splitting and organization complete.
