In [4]:
import numpy as np
import os
import time


def initialize_B_W(M, rank=20):
    """
    Initialize B and W matrices for NMF with small random values.
    
    Parameters:
        M (numpy.ndarray): Input magnitude matrix.
        rank (int): Rank of the factorization.
        
    Returns:
        B_init (numpy.ndarray): Initialized basis matrix.
        W_init (numpy.ndarray): Initialized weights matrix.
    """
    eps = 1e-3
    rows, cols = M.shape
    B_init = np.random.rand(rows, rank) * 100  + eps # Small random values
    W_init = np.random.rand(rank, cols) * 100 + eps # Small random values
    return B_init, W_init

# Load M matrices from saved files
def load_M_matrices(output_path, track_names):
    """
    Load M matrices from the specified output path for given track names.
    
    Parameters:
        output_path (str): Path to the directory containing the saved .npz files.
        track_names (list): List of track names to load.
        
    Returns:
        M_matrices (dict): Dictionary of track names and their corresponding M matrices.
    """
    M_matrices = {}
    for track in track_names:
        file_path = os.path.join(output_path, f"{track}_200sec.npz")
        if os.path.exists(file_path):
            data = np.load(file_path)
            M_matrices[track] = data['magnitude']
            print(f"Loaded M matrix for {track}, shape: {M_matrices[track].shape}.")
        else:
            print(f"File not found: {file_path}. Skipping.")
    return M_matrices

# Path to saved spectrograms and track names
output_path = r"C:\Users\sunre\Desktop\outputtracks"
track_names = ["vocals", "bass", "drums", "other", "mixture"]

# Load M matrices
M_matrices = load_M_matrices(output_path, track_names)

# Initialize B_init and W_init for each track
B_W_initializations = {}
rank = 50  # Adjust rank as needed
for track, M in M_matrices.items():
    B_init, W_init = initialize_B_W(M, rank=rank)
    B_W_initializations[track] = (B_init, W_init)
    print(f"Initialized B and W for {track}, B shape: {B_init.shape}, W shape: {W_init.shape}.")

# Results are now stored in B_W_initializations
print("Initialization complete!")

Loaded M matrix for vocals, shape: (1025, 34380).
Loaded M matrix for bass, shape: (1025, 34380).
Loaded M matrix for drums, shape: (1025, 34380).
Loaded M matrix for other, shape: (1025, 34380).
Loaded M matrix for mixture, shape: (1025, 34380).
Initialized B and W for vocals, B shape: (1025, 50), W shape: (50, 34380).
Initialized B and W for bass, B shape: (1025, 50), W shape: (50, 34380).
Initialized B and W for drums, B shape: (1025, 50), W shape: (50, 34380).
Initialized B and W for other, B shape: (1025, 50), W shape: (50, 34380).
Initialized B and W for mixture, B shape: (1025, 50), W shape: (50, 34380).
Initialization complete!


In [7]:

def NMF_train_is(M, B_init, W_init, max_iter=500, tol=1e-6):
    """
    Train NMF using Itakura-Saito divergence.

    Parameters:
    M : numpy.ndarray
        Input matrix to be factorized (m x n).
    B_init : numpy.ndarray
        Initial basis matrix (m x rank).
    W_init : numpy.ndarray
        Initial weights matrix (rank x n).
    max_iter : int
        Maximum number of iterations.
    tol : float
        Convergence tolerance.

    Returns:
    B : numpy.ndarray
        Trained basis matrix.
    W : numpy.ndarray
        Trained weights matrix.
    """
    B = B_init
    W = W_init
    epsilon = 1e-6  # Small constant to avoid division by zero

    for iteration in range(max_iter):
        M_hat = np.dot(B, W) + epsilon

        # Update W using IS divergence
        numerator_W = np.dot(B.T, M / M_hat**2)
        denominator_W = np.dot(B.T, 1 / M_hat)
        W *= numerator_W / (denominator_W + epsilon)

        # Update B using IS divergence
        M_hat = np.dot(B, W) + epsilon
        numerator_B = np.dot(M / M_hat**2, W.T)
        denominator_B = np.dot(1 / M_hat, W.T)
        B *= numerator_B / (denominator_B + epsilon)

        # Check for convergence
        M_hat = np.dot(B, W) + epsilon
        divergence = np.sum(((M + epsilon) / M_hat) - np.log((M + epsilon) / M_hat) - 1)
        # if iteration > 0 and abs(prev_divergence - divergence) < tol:
        #     print(f"Converged at iteration {iteration}")
        #     break
        # prev_divergence = divergence
        if iteration % 10 == 0:
            print(f"Iteration {iteration}: IS divergence = {divergence}")

    return B, W


# Apply verbose NMF_train to each track
trained_B_W_verbose = {}

for track, (B_init, W_init) in B_W_initializations.items():
    # print the track name
    print(f"Training NMF for {track} with verbose output...")
    
    timerun = time.time()
    
    M = M_matrices[track]
    print(f"Training NMF for {track} with verbose output...")
    B, W = NMF_train_is(M, B_init, W_init, max_iter=100)
    trained_B_W_verbose[track] = (B, W)
    
    timerun = time.time() - timerun
    print(f"Training complete for {track}. Final B shape: {B.shape}, W shape: {W.shape}. Time taken: {timerun}.")
    
    # Save the trained B and W matrices
    np.savetxt(f'{track}_B_is.csv', B, delimiter=',')
    np.savetxt(f'{track}_W_is.csv', W, delimiter=',')
    
    
    
    print(f"Training complete for {track}. Final B shape: {B.shape}, W shape: {W.shape}.")

print("NMF training with IS div verbose output complete for all tracks!")


Training NMF for vocals with verbose output...
Training NMF for vocals with verbose output...
Iteration 0: IS divergence = 19418693.10827248
Iteration 10: IS divergence = 10693259.553833954
Iteration 20: IS divergence = 8417046.875603328
Iteration 30: IS divergence = 7562069.1947759595
Iteration 40: IS divergence = 7149565.464706596
Iteration 50: IS divergence = 6910262.988433997
Iteration 60: IS divergence = 6757424.0041053975
Iteration 70: IS divergence = 6653418.156029433
Iteration 80: IS divergence = 6577915.276454585
Iteration 90: IS divergence = 6520036.435867991
Training complete for vocals. Final B shape: (1025, 50), W shape: (50, 34380). Time taken: 402.90939474105835.
Training complete for vocals. Final B shape: (1025, 50), W shape: (50, 34380).
Training NMF for bass with verbose output...
Training NMF for bass with verbose output...
Iteration 0: IS divergence = 68426903.69015874
Iteration 10: IS divergence = 10991315.911732782
Iteration 20: IS divergence = 7687856.154598329


In [8]:

def separate_signals_is(M_mixed, B_vocals, B_bass, B_drums, B_other, max_iter=500, tol=1e-4):
    """
    Separate mixed signal into components using fixed NMF bases and IS divergence.

    Parameters:
    M_mixed : numpy.ndarray
        Mixed signal matrix (e.g., magnitude spectrogram) (m x n).
    B_vocals : numpy.ndarray
        Learned basis matrix for vocals (m x r_vocals).
    B_bass : numpy.ndarray
        Learned basis matrix for bass (m x r_bass).
    B_drums : numpy.ndarray
        Learned basis matrix for drums (m x r_drums).
    B_other : numpy.ndarray
        Learned basis matrix for other components (m x r_other).
    max_iter : int
        Maximum number of iterations.
    tol : float
        Convergence tolerance.

    Returns:
    M_vocals, M_bass, M_drums, M_other: Separated magnitude spectrograms for each component.
    """
    # Combine learned basis matrices
    B_combined = np.hstack([B_vocals, B_bass, B_drums, B_other])

    # Initialize W randomly with non-negative values
    _, n = M_mixed.shape
    rank_combined = B_combined.shape[1]
    W = np.random.rand(rank_combined, n)
    print("Initial shapes:")
    print("W_mixed:", W.shape)
    print("B:", B_combined.shape)
    
    epsilon = 1e-10

    for iteration in range(max_iter):
        # Update W using IS divergence
        M_hat = np.dot(B_combined, W) + epsilon
        numerator_W = np.dot(B_combined.T, M_mixed / M_hat**2)
        denominator_W = np.dot(B_combined.T, 1 / M_hat)
        W *= numerator_W / (denominator_W + epsilon)

        # Check for convergence
        M_hat = np.dot(B_combined, W) + epsilon
        divergence = np.sum(((epsilon + M_mixed) / M_hat) - np.log((epsilon + M_mixed) / M_hat) - 1)
        if iteration > 0 and abs(prev_divergence - divergence) < tol:
            print(f"Converged at iteration {iteration}")
            break
        prev_divergence = divergence
        if iteration % 10 == 0:
            print(f"Iteration {iteration}: IS divergence = {divergence}")

    # Separate W into respective components
    idx_vocals = B_vocals.shape[1]
    idx_bass = idx_vocals + B_bass.shape[1]
    idx_drums = idx_bass + B_drums.shape[1]

    W_vocals = W[:idx_vocals, :]
    W_bass = W[idx_vocals:idx_bass, :]
    W_drums = W[idx_bass:idx_drums, :]
    W_other = W[idx_drums:, :]

    # Reconstruct the components
    M_vocals = np.dot(B_vocals, W_vocals)
    M_bass = np.dot(B_bass, W_bass)
    M_drums = np.dot(B_drums, W_drums)
    M_other = np.dot(B_other, W_other)

    return M_vocals, M_bass, M_drums, M_other



In [10]:
import librosa as lb
import  soundfile as sf

testpath = r"H:\My Drive\Courses\Fall2024\MLSP\Project\MLSP_Project\full_track_short_21s.mp3"

# Load the mixed signal's magnitude spectrogram 
y_mixed, sr = lb.load(testpath, sr = None)
spectrogram_mixed = lb.stft(y_mixed, n_fft = 2048, hop_length = 256, center = False, win_length = 2048)
M_mixed = np.abs(spectrogram_mixed)
phase_mixed = spectrogram_mixed / (M_mixed + 1e-8)

# Load the trained B matrices
B_vocals = np.loadtxt('vocals_B_is.csv', delimiter=',')
B_bass = np.loadtxt('bass_B_is.csv', delimiter=',')
B_drums = np.loadtxt('drums_B_is.csv', delimiter=',')
B_other = np.loadtxt('other_B_is.csv', delimiter=',')

# Separate the mixed signal into components
M_vocals, M_bass, M_drums, M_other = separate_signals_is(M_mixed, B_vocals, B_bass, B_drums, B_other, max_iter=100)

# Save the separated components as audio files 
y_vocals = lb.istft(M_vocals * phase_mixed, hop_length=256, win_length=2048)
sf.write('vocals_is.wav', y_vocals, sr)

y_bass = lb.istft(M_bass * phase_mixed, hop_length=256, win_length=2048)
sf.write('bass_is.wav', y_bass, sr)

y_drums = lb.istft(M_drums * phase_mixed, hop_length=256, win_length=2048)
sf.write('drums_is.wav', y_drums, sr)

y_other = lb.istft(M_other * phase_mixed, hop_length=256, win_length=2048)
sf.write('other_is.wav', y_other, sr)
print("Separation complete! Audio files saved.")


Iteration 0: IS divergence = 3491927.6528527397
Iteration 10: IS divergence = 2543082.3943731957
Iteration 20: IS divergence = 2436002.3164068293
Iteration 30: IS divergence = 2397923.8714014976
Iteration 40: IS divergence = 2380441.495011827
Iteration 50: IS divergence = 2371204.9325647783
Iteration 60: IS divergence = 2365803.740977314
Iteration 70: IS divergence = 2362424.8860616433
Iteration 80: IS divergence = 2360211.358891441
Iteration 90: IS divergence = 2358693.295405265
Separation complete! Audio files saved.


In [16]:
trainset = r"C:\Users\sunre\Desktop\songs\train"
trainfiles = os.listdir(trainset)

for trainfile in trainfiles:
    print(os.path.join(trainset, trainfile))
    trainpath = os.path.join(trainset, trainfile, 'mixture.wav')
    # Load the mixed signal's magnitude spectrogram 
    y_mixed, sr = lb.load(trainpath, sr = None)
    spectrogram_mixed = lb.stft(y_mixed, n_fft = 2048, hop_length = 256, center = False, win_length = 2048)
    M_mixed = np.abs(spectrogram_mixed)
    phase_mixed = spectrogram_mixed / (M_mixed + 1e-8)

    # Separate the mixed signal into components
    M_vocals, M_bass, M_drums, M_other = separate_signals_is(M_mixed, B_vocals, B_bass, B_drums, B_other, max_iter=100)

    # Save the separated components as audio files 
    y_vocals = lb.istft(M_vocals * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'train_{trainfile}_vocals_is.wav', y_vocals, sr)

    y_bass = lb.istft(M_bass * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'train_{trainfile}_bass_is.wav', y_bass, sr)

    y_drums = lb.istft(M_drums * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'train_{trainfile}_drums_is.wav', y_drums, sr)

    y_other = lb.istft(M_other * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'train_{trainfile}_other_is.wav', y_other, sr)
    print(f"Separation complete for {trainfile}! Audio files saved.")

C:\Users\sunre\Desktop\songs\train\1
Iteration 0: IS divergence = 1542627.6399260208
Iteration 10: IS divergence = 719292.4310526453
Iteration 20: IS divergence = 680685.7037209453
Iteration 30: IS divergence = 665327.8940088216
Iteration 40: IS divergence = 657086.5296476244
Iteration 50: IS divergence = 651955.69891151
Iteration 60: IS divergence = 648488.0733445456
Iteration 70: IS divergence = 646016.4986227519
Iteration 80: IS divergence = 644185.1076215665
Iteration 90: IS divergence = 642787.6355220728
Separation complete for 1! Audio files saved.
C:\Users\sunre\Desktop\songs\train\10
Iteration 0: IS divergence = 1370009.102314122
Iteration 10: IS divergence = 776896.1850177596
Iteration 20: IS divergence = 728591.7392643972
Iteration 30: IS divergence = 711397.7983730787
Iteration 40: IS divergence = 702399.6219012584
Iteration 50: IS divergence = 696894.7803478481
Iteration 60: IS divergence = 693233.228318208
Iteration 70: IS divergence = 690655.3524323318
Iteration 80: IS di

In [14]:
testset = r"C:\Users\sunre\Desktop\songs\test"
testfiles = os.listdir(testset)

for testfile in testfiles:
    print(os.path.join(testset, testfile))
    testpath = os.path.join(testset, testfile, 'mixture.wav')
    # Load the mixed signal's magnitude spectrogram 
    y_mixed, sr = lb.load(testpath, sr = None)
    spectrogram_mixed = lb.stft(y_mixed, n_fft = 2048, hop_length = 256, center = False, win_length = 2048)
    M_mixed = np.abs(spectrogram_mixed)
    phase_mixed = spectrogram_mixed / (M_mixed + 1e-8)

    # Separate the mixed signal into components
    M_vocals, M_bass, M_drums, M_other = separate_signals_is(M_mixed, B_vocals, B_bass, B_drums, B_other, max_iter=100)

    # Save the separated components as audio files 
    y_vocals = lb.istft(M_vocals * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'{testfile}_vocals_is.wav', y_vocals, sr)

    y_bass = lb.istft(M_bass * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'{testfile}_bass_is.wav', y_bass, sr)

    y_drums = lb.istft(M_drums * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'{testfile}_drums_is.wav', y_drums, sr)

    y_other = lb.istft(M_other * phase_mixed, hop_length=256, win_length=2048)
    sf.write(f'{testfile}_other_is.wav', y_other, sr)
    print(f"Separation complete for {testfile}! Audio files saved.")

C:\Users\sunre\Desktop\songs\test\1
Iteration 0: IS divergence = 1423354.862821032
Iteration 10: IS divergence = 725358.152798245
Iteration 20: IS divergence = 695011.874912793
Iteration 30: IS divergence = 681811.7606629932
Iteration 40: IS divergence = 674187.5732928131
Iteration 50: IS divergence = 669229.2085920303
Iteration 60: IS divergence = 665783.2127358533
Iteration 70: IS divergence = 663281.041200145
Iteration 80: IS divergence = 661403.8772797805
Iteration 90: IS divergence = 659959.8115469267
Separation complete for 1! Audio files saved.
C:\Users\sunre\Desktop\songs\test\10
Iteration 0: IS divergence = 1660158.8641820564
Iteration 10: IS divergence = 752227.6705639561
Iteration 20: IS divergence = 720636.7545058149
Iteration 30: IS divergence = 707669.7692580654
Iteration 40: IS divergence = 700243.5300319776
Iteration 50: IS divergence = 695414.1143362403
Iteration 60: IS divergence = 692052.4096229064
Iteration 70: IS divergence = 689603.9494400906
Iteration 80: IS dive