In [None]:
%load_ext tensorboard
import numpy as np
import pandas as pd 
import os
import sklearn 
import tensorflow as tf
import librosa
import librosa.display
import soundfile
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
import math
import random
import datetime
from tensorflow.keras.callbacks import ModelCheckpoint

In [None]:
def get_name_list():
    name_list = []
    for dirname, _, filenames in os.walk('Your Musedb Path'):
        for filename in filenames:
            if '.wav' in filename:
                name_list.append(filename)
    return name_list

[wav 파일 디렉토리 구조]

wav/
 - mix/
 - bass/
 - vocal/
 - drum/
 - other/

In [None]:
get_name_list()

In [None]:
# N_FFT = 2046
# SAPLE_RATE = 44100
# TEIM_LENGTH = 16000

N_FFT = 1022
SAPLE_RATE = 22050
TIME_LENGTH = 8000
TARGET = "other"

In [None]:
# WAV Augmentation
# 출처: https://www.kaggle.com/code/huseinzol05/sound-augmentation-librosa/notebook

def change_pitch_and_speed(samples, random_uniform):
    y_pitch_speed = samples.copy()
    # length_change = np.random.uniform(low=0.8, high = 1)
    length_change = random_uniform
    speed_fac = 1.0  / length_change
    tmp = np.interp(np.arange(0,len(y_pitch_speed),speed_fac),np.arange(0,len(y_pitch_speed)),y_pitch_speed)
    minlen = min(y_pitch_speed.shape[0], tmp.shape[0])
    y_pitch_speed *= 0
    y_pitch_speed[0:minlen] = tmp[0:minlen]
    return y_pitch_speed
    
def value_augmentation(samples, random_uniform):
    y_aug = samples.copy()
    # dyn_change = np.random.uniform(low=1.5,high=3)
    dyn_change = random_uniform
    y_aug = y_aug * dyn_change
    return y_aug

def add_distribution_noise(samples, random_uniform):
    y_noise = samples.copy()
    noise_amp = 0.005*random_uniform*np.amax(y_noise)
    y_noise = y_noise.astype('float64') + noise_amp * np.random.normal(size=y_noise.shape[0])
    return y_noise

def streching(samples):
    input_length = len(samples)
    streching = samples.copy()
    streching = librosa.effects.time_stretch(streching.astype('float'), rate=1.1)
    if len(streching) > input_length:
        streching = streching[:input_length]
    else:
        streching = np.pad(streching, (0, max(0, input_length - len(streching))), "constant")
    return streching

def change_pitch_only(samples, random_uniform):
    y_pitch = samples.copy()
    bins_per_octave = 12
    pitch_pm = 2
    pitch_change =  pitch_pm * 2*random_uniform   
    y_pitch = librosa.effects.pitch_shift(y_pitch.astype('float64'), 
                                      sr=SAPLE_RATE, n_steps=pitch_change, 
                                      bins_per_octave=bins_per_octave)
    return y_pitch
def change_speed_only(samples, random_uniform):
    y_speed = samples.copy()
    # speed_change = np.random.uniform(low=0.9,high=1.1)
    speed_change = random_uniform
    tmp = librosa.effects.time_stretch(y_speed.astype('float64'), rate = speed_change)
    minlen = min(y_speed.shape[0], tmp.shape[0])
    y_speed *= 0 
    y_speed[0:minlen] = tmp[0:minlen]
    return y_speed

def random_shifting(samples, random_uniform):
    y_shift = samples.copy()
    timeshift_fac = 0.2 *2*(random_uniform-0.5)  # up to 20% of length
    start = int(y_shift.shape[0] * timeshift_fac)
    if (start > 0):
        y_shift = np.pad(y_shift,(start,0),mode='constant')[0:y_shift.shape[0]]
    else:
        y_shift = np.pad(y_shift,(0,-start),mode='constant')[0:y_shift.shape[0]]
    return y_shift

def hpss_harmonics(samples):
    y_harm, y_perc = librosa.effects.hpss(samples.astype('float64'))
    return y_harm

def hpss_percussive(samples):
    y_harm, y_perc = librosa.effects.hpss(samples.astype('float64'))
    return y_perc

In [None]:
def audio_aug(mix_audio, target_audio, aug_num):
    index = aug_num % 10
    mix_aug_audio = mix_audio
    target_aug_audio = target_audio
    
    if index == 0:
        mix_aug_audio = mix_audio
        target_aug_audio = target_audio
        
    elif index == 1:
        ran = np.random.uniform(low=0.8, high = 1)
        mix_aug_audio = change_pitch_and_speed(mix_audio, ran)
        target_aug_audio = change_pitch_and_speed(target_audio, ran)
        
    elif index == 2:
        ran = np.random.uniform(low=1.5,high=3)
        mix_aug_audio = value_augmentation(mix_audio, ran)
        target_aug_audio = value_augmentation(target_audio, ran)
        
    elif index == 3:
        ran = np.random.uniform()
        mix_aug_audio = add_distribution_noise(mix_audio, ran)
        target_aug_audio = add_distribution_noise(target_audio, ran)
        
    elif index == 4:
        mix_aug_audio = streching(mix_audio)
        target_aug_audio = streching(target_audio)
        
    elif index == 5:
        ran = np.random.uniform()
        mix_aug_audio = change_pitch_only(mix_audio, ran)
        target_aug_audio = change_pitch_only(target_audio, ran)
        
    elif index == 6:
        ran = np.random.uniform(low=0.9,high=1.1)
        mix_aug_audio = change_speed_only(mix_audio, ran)
        target_aug_audio = change_speed_only(target_audio, ran)
        
    elif index == 7:
        ran = np.random.uniform()
        mix_aug_audio = random_shifting(mix_audio, ran)
        target_aug_audio = random_shifting(target_audio, ran)
        
    elif index == 8:
        mix_aug_audio = hpss_harmonics(mix_audio)
        target_aug_audio = hpss_harmonics(target_audio)
        
    elif index == 9:
        mix_aug_audio = hpss_percussive(mix_audio)
        target_aug_audio = hpss_percussive(target_audio)
    
    return np.array(mix_aug_audio), np.array(target_aug_audio)

In [None]:
def getAudio(file_name, target):
    path = "/Users/hyobeom/git/ML/musdb18/wav/" + target + "/" + file_name
    audio, sr = librosa.load(path, sr=SAPLE_RATE)
    return audio

In [None]:
def getSpectogram(audio):
  # defualt_librosa_sample_rate: 22050
  # defualt_librosa_nfft: 512
  # stft_time_length: 12000 = 6143488 / 512(nfft) + 1
  # stft_time_length: 6000 = 3071488 / 512(nfft) + 1
  # stft_time_length: 8000 = 4095488 / 512(nfft) + 1
  # 6143488 / 22050 = 4.6 분
  # 3071488 / 22050 = 2.3 분
  spectrogram = librosa.stft(audio, n_fft=N_FFT)
  spectrogram = librosa.util.fix_length(spectrogram, size=TIME_LENGTH, axis=1)
  spectrogram = np.abs(spectrogram)**0.5
  spectrogram = np.array(spectrogram).T
  spectrogram = np.array(np.split(spectrogram, 10, axis=0)) 
  return spectrogram

In [None]:
def getSpectogram_full(audio):
  # defualt_librosa_sample_rate: 22050
  # defualt_librosa_nfft: 512
  # stft_time_length: 12000 = 6143488 / 512(nfft) + 1
  # stft_time_length: 6000 = 3071488 / 512(nfft) + 1
  # stft_time_length: 8000 = 4095488 / 512(nfft) + 1
  # 6143488 / 22050 = 4.6 분
  # 3071488 / 22050 = 2.3 분
  spectrogram = librosa.stft(audio, n_fft=N_FFT)
  spectrogram = librosa.util.fix_length(spectrogram, size=12000, axis=1)
  spectrogram = np.abs(spectrogram)**0.5
  spectrogram = np.array(spectrogram).T
  spectrogram = np.array(np.split(spectrogram, 10, axis=0)) 
  return spectrogram

In [None]:
def augmentation_audio_set(file_name, aug_num):
    mix_audio = getAudio(file_name, "mix")
    target_audio = getAudio(file_name, TARGET)
    
    mix_aug_audio, target_aug_audio = audio_aug(mix_audio, target_audio, aug_num)
    mix_spectrogram = getSpectogram(mix_aug_audio)
    target_sepctrogram = getSpectogram(target_aug_audio)
    return mix_spectrogram, target_sepctrogram

In [None]:
mix_spectrogram, target_sepctrogram = augmentation_audio_set("Your Musedb Path", 10)
print(tf.shape(mix_spectrogram))
print(np.min(mix_spectrogram), np.max(mix_spectrogram))

In [None]:
def spectrogram2wav(split_spectrogram, path):
  spectrogram = np.reshape(split_spectrogram, (-1, 512))
  spectrogram = spectrogram **2
  spectrogram = spectrogram.T
  spectrogram = librosa.griffinlim(spectrogram, n_fft=N_FFT)
  soundfile.write(path, spectrogram, SAPLE_RATE,  format='WAV')

In [None]:
spectrogram2wav(mix_spectrogram, "Your Musedb Path")

In [None]:
spectrogram2wav(target_sepctrogram, "Your Musedb Path")

In [None]:
def visualize_spectogram(split_spectrogram):
  spectrogram = np.reshape(split_spectrogram, (-1, 512))
  spectrogram = spectrogram
  spectrogram = spectrogram.T
  spectrogram = np.flip(spectrogram, axis=0)
  plt.imshow(spectrogram, aspect='auto', interpolation='nearest')
  plt.show()

In [None]:
visualize_spectogram(mix_spectrogram)

In [None]:
class Spectogram_DataSet(tf.keras.utils.Sequence):
    def __init__(self, name_list, augmentation=True):
        self.name_list = name_list
        self.augmentation = augmentation

    def __len__(self):
        return int(np.ceil(len(self.name_list)))
    
    def __getitem__(self, index):
        aug_num = 0
        if self.augmentation:
            aug_num = random.randint(0, 4)
        mix_spectrogram, target_sepctrogram = augmentation_audio_set(self.name_list[index], aug_num)
        
        return np.array(mix_spectrogram), np.array(target_sepctrogram)
    def on_epoch_end(self):
        self.name_list = sklearn.utils.shuffle(self.name_list)

In [None]:
def get_train_valid_test():
  name_list = get_name_list()
  train, test, train_la, test_la = train_test_split(name_list, name_list, test_size=0.1, random_state=2021)
  tr, var, tr_la, var_la = train_test_split(train, train_la, test_size=0.2, random_state=2021)
  return tr, var, test, train

In [None]:
tr, var, test, train = get_train_valid_test()

In [None]:
train_ds = Spectogram_DataSet(train, False)
tr_ds = Spectogram_DataSet(tr, False)
val_ds = Spectogram_DataSet(var, False)
test_ds = Spectogram_DataSet(test, False)

In [None]:
def unmix_keras():
  input = layers.Input(shape=(None, 512))
  x = layers.LayerNormalization()(input)
  x = layers.Dense(512)(x)
  x = layers.BatchNormalization()(x)
  x_skip = layers.Activation("tanh")(x)
  x = layers.Bidirectional(layers.LSTM(256, return_sequences=True))(x_skip)
  x = layers.Bidirectional(layers.LSTM(256, return_sequences=True))(x)
  x = layers.Bidirectional(layers.LSTM(256, return_sequences=True))(x)
  x = layers.Add()([x, x_skip])
  x = layers.Dense(512)(x)
  x = layers.BatchNormalization()(x)
  x = layers.Activation("relu")(x)
  x = layers.Dense(512)(x)
  x = layers.BatchNormalization()(x)
  x = layers.Activation("relu")(x)
  model = Model(input, x * input)
  model.compile(optimizer="adam", loss="mse")
  return model
model = open_unmix_keras()
model.summary()

In [None]:
earlyStopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)

In [None]:
checkpoint = ModelCheckpoint("checkpoint-"+ TARGET, monitor='val_loss', verbose=1, save_best_only=True, mode='auto')

In [None]:
log_dir = "logs/fit/" + TARGET + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
# train_ds = Spectogram_DataSet(train, True)
# tr_ds = Spectogram_DataSet(tr, True)
# val_ds = Spectogram_DataSet(var, False)
# test_ds = Spectogram_DataSet(test, False)
history = model.fit(tr_ds, validation_data=val_ds, epochs=60,  callbacks=[checkpoint, tensorboard_callback])

In [None]:
checkModel = unmix_keras()

In [None]:
history = checkModel.fit(tr_ds, validation_data=val_ds, epochs=300,  callbacks=[checkpoint, tensorboard_callback])

In [None]:
checkModel.load_weights("checkpoint-" + TARGET)

In [None]:
model.save('hb-unmix-' + TARGET)

In [None]:
# tr, var, test, train
print(test[0])

In [None]:
audio = getAudio("Your Path" , "mix")

In [None]:
Leave_spectrogram = getSpectogram_full(audio)

In [None]:
predicted = checkModel.predict(Leave_spectrogram)

In [None]:
visualize_spectogram(predicted)

In [None]:
spectrogram2wav(predicted, "Your Path")