In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, LeakyReLU, UpSampling1D, Concatenate, Subtract
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Cropping1D
from tensorflow.keras.layers import Reshape
from tensorflow.nn import sigmoid
import os
import librosa
import numpy as np
import random
from concurrent.futures import ProcessPoolExecutor


In [2]:
trainDir = '/Users/rei/Documents/Machine_Learning/Data/Audio/Shaking_Through/Dataset/Train'
testDir = '/Users/rei/Documents/Machine_Learning/Data/Audio/Shaking_Through/Dataset/Test'
tfRecord_Datasets = '/Users/rei/Documents/Machine_Learning/MODELS/Unet/Unet_Sound_Seperation/Unet-Sound-Seperation/tf_Record'


In [3]:

SAMPLE_RATE = 22050
SNIPPET_LENGTH = 16384 *2  # Length of random snippets
AUGMENTATION = True    # Toggle data augmentation
# Time Jittering
def time_jitter(audio, max_offset=500):
    offset = np.random.randint(max_offset)
    augmented_audio = np.pad(audio, (offset, 0), "constant")
    return augmented_audio[:len(audio)]

# Noise Injection
def add_noise(audio, noise_level=0.005):
    noise = np.random.randn(len(audio))
    augmented_audio = audio + noise_level * noise
    return np.clip(augmented_audio, -1, 1)

# Reverb (simple decay)
def add_reverb(audio, decay=0.5):
    impulse_response = np.zeros(len(audio))
    impulse_response[::4000] = decay
    augmented_audio = np.convolve(audio, impulse_response, mode='same')
    return np.clip(augmented_audio, -1, 1)

# Random Cropping
def random_cropping(audio, segment_length=SNIPPET_LENGTH):
    start = np.random.randint(0, len(audio) - segment_length)
    return audio[start: start + segment_length]

# Frequency Masking (in the spectrogram domain)
def freq_masking(spec, F=30, num_masks=1):
    num_channels, num_frames = spec.shape
    for _ in range(num_masks):
        f = np.random.uniform(low=0.0, high=F)
        f = int(f)
        f0 = np.random.uniform(low=0.0, high=num_channels - f)
        f0 = int(f0)
        spec[f0:f0 + f, :] = 0
    return spec

# Time Masking (in the spectrogram domain)
def time_masking(spec, T=40, num_masks=1):
    num_channels, num_frames = spec.shape
    for _ in range(num_masks):
        t = np.random.uniform(low=0.0, high=T)
        t = int(t)
        t0 = np.random.uniform(low=0.0, high=num_frames - t)
        t0 = int(t0)
        spec[:, t0:t0 + t] = 0
    return spec

def random_amplify(audio):
    factor = random.uniform(0.7, 1.3)  # Random amplification factor
    return audio * factor

def load_and_process_data(directory, min_mix=2, max_mix=5, augmentations={}):
    X = []
    y = []
    vocal_dir = os.path.join(directory, '08Vox')
    other_dirs = [os.path.join(directory, folder) for folder in os.listdir(directory) if folder != '08Vox' and not folder.startswith('.')]

    for vocal_file in os.listdir(vocal_dir):
        if not vocal_file.lower().endswith(('.wav', '.mp3', '.flac')):
            continue
        
        vocal_path = os.path.join(vocal_dir, vocal_file)
        vocal_signal, _ = librosa.load(vocal_path, sr=SAMPLE_RATE)

        # Skip if the length is shorter than the snippet length
        if len(vocal_signal) < SNIPPET_LENGTH:
            continue

        # Apply augmentations
        if "time_jitter" in augmentations and augmentations["time_jitter"]:
            vocal_signal = time_jitter(vocal_signal)
        if "noise_injection" in augmentations and augmentations["noise_injection"]:
            vocal_signal = add_noise(vocal_signal)
        if "reverb" in augmentations and augmentations["reverb"]:
            vocal_signal = add_reverb(vocal_signal)
        if "random_cropping" in augmentations and augmentations["random_cropping"]:
            vocal_signal = random_cropping(vocal_signal)

        # Normalize the vocal signal
        vocal_signal = normalize_audio(vocal_signal)

        # Randomly select a number of mixes
        num_mixes = random.randint(min_mix, max_mix)

        mixed_signal = vocal_signal.copy()  # Create a copy of the vocal signal to be mixed

        # Randomly select other samples to mix with the vocal
        for _ in range(num_mixes):
            other_dir = random.choice(other_dirs)
            other_file = random.choice([f for f in os.listdir(other_dir) if f.lower().endswith(('.wav', '.mp3', '.flac'))])
            other_path = os.path.join(other_dir, other_file)
            other_signal, _ = librosa.load(other_path, sr=SAMPLE_RATE)

            # Skip if the length is shorter than the snippet length
            if len(other_signal) < SNIPPET_LENGTH:
                continue

            other_signal = normalize_audio(other_signal)
            other_signal = pad_or_crop(other_signal, target_length=len(mixed_signal))
            mixed_signal += other_signal

        # Apply Frequency and Time Masking on the spectrogram
        S = librosa.stft(vocal_signal)
        if "freq_masking" in augmentations and augmentations["freq_masking"]:
            S = freq_masking(S)
        if "time_masking" in augmentations and augmentations["time_masking"]:
            S = time_masking(S)

        # Convert back to time domain
        vocal_signal = librosa.istft(S)

        # Divide into segments of 16384 samples
        for i in range(0, len(vocal_signal), SNIPPET_LENGTH):
            vocal_segment = pad_or_crop(vocal_signal[i:i + SNIPPET_LENGTH], SNIPPET_LENGTH)
            mixed_segment = pad_or_crop(mixed_signal[i:i + SNIPPET_LENGTH], SNIPPET_LENGTH)

            X.append(mixed_segment)
            y.append(vocal_segment)

    return np.array(X), np.array(y)


def pad_or_crop(audio, target_length):
    length = len(audio)
    if length < target_length:
        padding = target_length - length
        audio = np.pad(audio, (0, padding), 'constant')
    elif length > target_length:
        audio = audio[:target_length]
    return audio

def normalize_audio(audio):
    return 2 * (audio - np.min(audio)) / (np.max(audio) - np.min(audio)) - 1


In [4]:
# Load and process the data for training and testing
augmentation_config = {
    "time_jitter": True,
    "noise_injection": True,
    "reverb": False,
    "random_cropping": True,
    "freq_masking": True,
    "time_masking": True
}
X_train, y_train = load_and_process_data(trainDir, augmentations=augmentation_config)
X_test, y_test = load_and_process_data(testDir, augmentations=augmentation_config)
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)


In [5]:
def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def serialize_example(mixed_signal, vocal_signal):
    feature = {
        'mixed_signal': _float_feature(mixed_signal),
        'vocal_signal': _float_feature(vocal_signal)
    }
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

def write_tfrecord(filename, X, y):
    with tf.io.TFRecordWriter(filename) as writer:
        for i in range(len(X)):
            example = serialize_example(X[i].flatten(), y[i].flatten())
            writer.write(example)

# Save your training, validation and test data
write_tfrecord(os.path.join(tfRecord_Datasets,'train_2.tfrecord'), X_train, y_train)
write_tfrecord(os.path.join(tfRecord_Datasets,'val_2.tfrecord'), X_val, y_val)
write_tfrecord(os.path.join(tfRecord_Datasets,'test_2.tfrecord'), X_test, y_test)
