In [2]:
# Mount Google Drive
from google.colab import drive # import drive from google colab
 
ROOT = "/content/drive"     # default location for the drive
print(ROOT)                 # print content of ROOT (Optional)
 
drive.mount(ROOT)           # we mount the google drive at /content/drive
%cd "/content/drive/My Drive/Deep Unfolded NMF"
%pwd

/content/drive
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/My Drive/Deep Unfolded NMF


'/content/drive/My Drive/Deep Unfolded NMF'

## This Notebook Gave in SMR=0 SDR=2.57 and 2.45

In [4]:
!ls 

DR-NMF.ipynb  helpers2.py


In [3]:
import numpy as np
import os
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter, freqz

from scipy.io.wavfile import read, write
from scipy import signal
from sklearn.decomposition import NMF
from sklearn.preprocessing import MinMaxScaler

from numpy import linalg as LA
from numpy.linalg import inv
from helpers2 import Reconstruct, Viz_Y,SMR,get_mixed_signal,SDR,ReconstructSoft,butter_lowpass_filter
import seaborn as sns
import warnings
import math
from tqdm import tqdm
warnings.simplefilter('ignore')

In [4]:
# Best 1-20 min

start = 1 * 60 * 44100
end = 20 * 60 * 44100 

samplerate_s, data_speech = read("../PFE/DATA/Conversation.wav")
speech=data_speech[start:end,0]
length=speech.shape[0]/samplerate_s
print('Shape of the speech {} ... Length : {:.2f}s ... Sample rate : {}'.format(speech.shape[0],length,samplerate_s))

start = 1 * 60 * 44100
end = 5 * 60 * 44100 
samplerate_m, data_music = read("../PFE/DATA/Bigmusic2.wav")
music=data_music[start:end,0]
length=music.shape[0]/samplerate_m
print('Shape of the music {} ... Length : {:.2f}s ... Sample rate : {}'.format(music.shape[0],length,samplerate_m))


Shape of the speech 50274000 ... Length : 1140.00s ... Sample rate : 44100
Shape of the music 10584000 ... Length : 240.00s ... Sample rate : 44100


In [5]:
fs = 16000

rate = samplerate_s / fs


start = 1 * 60 * 44100
end = 20 * 60 * 44100


speech_t=data_speech[start : end, 0]
music_t = data_music[start : end, 0]


speech_t = signal.resample(speech_t,int(speech_t.shape[0]/rate))
music_t = signal.resample(music_t,int(music_t.shape[0]/rate))
samplerate=int(samplerate_m/rate)
length=music_t.shape[0]/samplerate

print('Shape of the test {} ... Length : {:.2f}s ... Sample rate : {}'.format(music_t.shape[0],length,samplerate))

speech = signal.resample(speech,int(speech.shape[0]/rate))
music = signal.resample(music,int(music.shape[0]/rate))


print('Downsampled rate = {}'.format(samplerate))

speech = butter_lowpass_filter(speech,5000,fs)
music = butter_lowpass_filter(music,5000,fs)

music_t = butter_lowpass_filter(music_t,5000,fs)
speech_t = butter_lowpass_filter(speech_t,5000,fs)

Shape of the test 18240000 ... Length : 1140.00s ... Sample rate : 16000
Downsampled rate = 16000


## Training STFT :


In [6]:
WINDOW = 'hamming'
WINDOW_SIZE=480
OVERLAP = 0.8 * WINDOW_SIZE
NFFT=512

f,t,Y= signal.stft(speech,samplerate,window=WINDOW,nperseg=WINDOW_SIZE,noverlap=OVERLAP,nfft=NFFT)
Yabs_s=np.abs(Y)
f,t,Y= signal.stft(music,samplerate,window=WINDOW,nperseg=WINDOW_SIZE,noverlap=OVERLAP,nfft=NFFT)
Yabs_m=np.abs(Y)







SMR_db = 0
mix,speech_mix,music_mix=get_mixed_signal(speech_t,music_t,SMR_db)


f,t,Ymix= signal.stft(mix,samplerate,window=WINDOW,nperseg=WINDOW_SIZE,noverlap=OVERLAP,nfft=NFFT)
Yabs_mix=np.abs(Ymix)

Yabs_mix[Yabs_mix==0]=0.00001
write("results/MixX.wav", samplerate, mix.astype(np.int16))



SMR = 0.00


## Test STFT :

In [7]:
fs = 16000

rate = samplerate_s / fs


start = 15 * 60 * 44100
step = int(0.5 * 60 * 44100)

test_s = np.array([])
test_m = np.array([])

for i in range(1):

  test_s = np.hstack([test_s,data_speech[start+i*step:start+(i+1)*step,0]])
  test_m = np.hstack([test_m,data_music[start+i*step:start+(i+1)*step,0]])


test_s = signal.resample(test_s,int(test_s.shape[0]/rate))
test_m = signal.resample(test_m,int(test_m.shape[0]/rate))
samplerate=int(samplerate_m/rate)
length=music_t.shape[0]/samplerate


test_s = butter_lowpass_filter(test_s,5000,fs)
test_m = butter_lowpass_filter(test_m,5000,fs)


################################################################################
SMR_db = 0
test,speech_test,music_test=get_mixed_signal(test_s,test_m,SMR_db)


f,t,Ytest= signal.stft(test,samplerate,window=WINDOW,nperseg=WINDOW_SIZE,noverlap=OVERLAP,nfft=NFFT)
Yabs_test=np.abs(Ytest)

Yabs_test[Yabs_test==0]=0.00001


SMR = -0.00


In [54]:
test.shape

(4800000,)

# Train First NMF on Clean Speech :

In [288]:
def softmax(x):

  e_x = np.exp(x)
  return e_x / e_x.sum(axis=0)

In [13]:
Nc = 8 #8
Nm = 8

model = NMF(n_components=Nc, init='random',alpha=0.0,beta_loss='frobenius',solver="mu",max_iter=100, random_state=7)
model.fit(np.transpose(Yabs_s))
Dc = np.transpose(model.components_)
#Dc=softmax(Dc)
scaler = MinMaxScaler()
Dc = scaler.fit_transform(Dc)


In [300]:
model.reconstruction_err_

216548.88619941662

# Train NMF on Noisy Speech :

In [14]:
def nmf(X, Dc, Nn, lamb=0.1, maxit=100):

    Nc = Dc.shape[1]
    H = np.random.rand(Nc+Nn, X.shape[1])

    Dn = np.random.rand(X. shape[0], Nn)
    print(f"Shape of Dc {Dc.shape} Shape of Dn {Dn.shape}")
    D = np.hstack([Dc,Dn])
    Dnorm = D / np.sum(D**2, axis=0)**(.5)

    print(f'Dnorm shape {Dnorm.shape} and X shape {X.shape} and H shape {H.shape}')
    hist=[]
    for i in tqdm.tqdm(maxit):
        H = H * (np.matmul(Dnorm.T, X)) / (np.matmul(np.matmul(Dnorm.T, Dnorm), H) + lamb)
        D[:,Dc.shape[1]:] = (Dnorm * (np.matmul(X, H.T) + Dnorm * (np.matmul(np.ones((X.shape[0], X.shape[0])), np.matmul(Dnorm, np.matmul(H, H.T)) * Dnorm))) / (np.matmul(Dnorm, np.matmul(H, H.T)) + Dnorm * (np.matmul(np.ones((X.shape[0], X.shape[0])), np.matmul(X, H.T) * Dnorm))))[:,Dc.shape[1]:]
        Dnorm = D / np.sum(D**2, axis=0)**(.5)
        hist.append(LA.norm(X-np.matmul(Dnorm,H)))
    #Dnorm[:,Dc.shape[1]:] = softmax(Dnorm[:,Dc.shape[1]:])
    return Dnorm, H,hist

In [15]:
D,H,hist = nmf(Yabs_mix,Dc,Nm)

scaler = MinMaxScaler()
D=scaler.fit_transform(D)

Shape of Dc (257, 8) Shape of Dn (257, 8)
Dnorm shape (257, 16) and X shape (257, 190001) and H shape (16, 190001)


# Test NMF :

In [16]:
model_test = NMF(n_components=Nc+Nm, init='nndsvd',alpha=0.1,beta_loss='frobenius',solver="mu",max_iter=100, random_state=7)
model_test.fit(np.transpose(Yabs_test))
    
model_test.components_= np.transpose(D)
G_test=np.transpose(model_test.transform(np.transpose(Yabs_test)))

In [17]:
Sources,Masks=Reconstruct(B=D,G=G_test,Ns=Dc.shape[1],Nm=Nm,Yabs=Ytest,p=0.5)

print('Reconstruction Step .... Done')
speech_est = Sources[0]
music_est = Sources[1]

_, speech_est =  signal.istft(speech_est,
                    samplerate,
                    window = WINDOW,
                    nperseg=WINDOW_SIZE,
                    noverlap=OVERLAP,
                    nfft = NFFT)

_, music_est =  signal.istft(music_est,
                    samplerate,
                    window = WINDOW,
                    nperseg=WINDOW_SIZE,
                    noverlap=OVERLAP,
                    nfft = NFFT)

sdr_speech = SDR(s_est=speech_est,s=test_s)
sdr_music = SDR(s_est=music_est, s=test_m)

print(f'Speech SDR = {sdr_speech}')
print(f'Music SDR = {sdr_music}')

Reconstruction Step .... Done
Speech SDR = 2.5721198978779314
Music SDR = 2.457712416849758


In [37]:
write("./results/SpeechX.wav", samplerate, speech_est.astype(np.int16))
write("./results/MusicX.wav", samplerate, music_est.astype(np.int16))
