# Quantized Clean

Make clean quantized wavs to /data directories.

# 1. Import libraries

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import sys
import math
import array

import re

import scipy.io as sio
import numpy as np
import sys

import time

import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="0"

import matplotlib.pyplot as plt

print(sys.executable)
import librosa
import soundfile as sf
import librosa.display
import seaborn as sns

from skimage.restoration import unwrap_phase

/home/knayem/anaconda3/bin/python


# 2. Helping Functions (a) 

In [2]:
def replace_rev(string, old, new, times=1):
    '''
    Replace a substring (old) with another substring (new) from a string (string) 
    in total a fixed number (times) of times.
    '''
    
    ls = string.split(old)
    length = len(ls)
    
    # times can be atmost (length-1)
    times = times if (length-1)>=times else (length-1)
    
    new_string = old.join(ls[:length-times])
    
    for t in range(times,0,-1):
        new_string = new.join([new_string,ls[length-t]])
                               
    return new_string

# 3. Variables (a)

In [3]:
# .NPY FILE PATH
FILE_SAVE_PATH = '/data/knayem/Quantized_DataFiles' # store .npy data file path for quick access


# 4. Paths 

In [4]:
ROOT_PATH = '/data'
USER_PATH = 'knayem'

ROOT_USER_PATH = os.path.join(ROOT_PATH,USER_PATH)

### IEEE MALE Dataset

In [5]:
IEEE_MALE_CORPORA_PATH = os.path.join(ROOT_PATH,'SpeechCorpora/IEEE_male') # male

### IEEE FEMALE Dataset

In [None]:
IEEE_FEMALE_CORPORA_PATH = os.path.join(ROOT_PATH,'SpeechCorpora/IEEE_female') # female

In [6]:
# Train, Dev, Test Folders for Clean and Mixs
TRAIN_CLEAN_FOLDER = 'train_16k'
DEV_CLEAN_FOLDER = 'dev_16k'
TEST_CLEAN_PFOLDER = 'test_16k'

#### 4.a Clean

In [7]:
# CLEAN 16kHz PATH
CLEAN_wavs_PATH = os.path.join(ROOT_USER_PATH,'IEEE_male_clean_16k') # male
# CLEAN_wavs_PATH = os.path.join(ROOT_USER_PATH,'IEEE_female_clean_16k') # female

CLEAN_wavs_TRAIN_PATH = os.path.join(CLEAN_wavs_PATH, TRAIN_CLEAN_FOLDER)
CLEAN_wavs_DEV_PATH = os.path.join(CLEAN_wavs_PATH, DEV_CLEAN_FOLDER)
CLEAN_wavs_TEST_PATH = os.path.join(CLEAN_wavs_PATH, TEST_CLEAN_PFOLDER)

#### Path Summaray

In [8]:
print("Root path,", ROOT_PATH)
print("\t|-> Root User path,", ROOT_USER_PATH)
print()
print("IEEE Male Data Corpora path,", IEEE_MALE_CORPORA_PATH)
print("IEEE Female Data Corpora path,", IEEE_FEMALE_CORPORA_PATH)
print()
print("Clean .WAV path,", CLEAN_wavs_PATH)
print("\t|-> Train Clean .WAV path,", CLEAN_wavs_TRAIN_PATH)
print("\t|-> Dev Clean .WAV path,", CLEAN_wavs_DEV_PATH)
print("\t|-> Test Clean .WAV path,", CLEAN_wavs_TEST_PATH)
print()
print("Mix SSN .WAV path,", SSN_wavs_PATH)
print("\t|-> Train Mix SSN .WAV path,", SSN_wavs_TRAIN_PATH)
print("\t|-> Dev Mix SSN .WAV path,", SSN_wavs_DEV_PATH)
print("\t|-> Test Mix SSN .WAV path,", SSN_wavs_TEST_PATH)
print()
print("Mix CAFE .WAV path,", CAFE_MIXTURE_PATH)
print("\t|-> Train Mix CAFE .WAV path,", CAFE_wavs_TRAIN_PATH)
print("\t|-> Dev Mix CAFE .WAV path,", CAFE_wavs_DEV_PATH)
print("\t|-> Test Mix CAFE .WAV path,", CAFE_wavs_TEST_PATH)
print()
print("Mix BABBLE .WAV path,", BABBLE_MIXTURE_PATH)
print("\t|-> Train Mix BABBLE .WAV path,", BABBLE_wavs_TRAIN_PATH)
print("\t|-> Dev Mix BABBLE .WAV path,", BABBLE_wavs_DEV_PATH)
print("\t|-> Test Mix BABBLE .WAV path,", BABBLE_wavs_TEST_PATH)
print()
print("Mix FACTORY .WAV path,", FACTORY_MIXTURE_PATH)
print("\t|-> Train Mix FACTORY .WAV path,", FACTORY_wavs_TRAIN_PATH)
print("\t|-> Dev Mix FACTORY .WAV path,", FACTORY_wavs_DEV_PATH)
print("\t|-> Test Mix FACTORY .WAV path,", FACTORY_wavs_TEST_PATH)
print()
print("Enhanced .WAV path,", Enhanced_wavs_PATH)
print("\t|-> SSN Enhanced .WAV path,", SSN_Enhanced_wavs_PATH)
print("\t|-> FACTORY Enhanced .WAV path,", FACTORY_Enhanced_wavs_PATH)
print("\t|-> BABBLE Enhanced .WAV path,", BABBLE_Enhanced_wavs_PATH)
print("\t|-> FACTORY Enhanced .WAV path,", FACTORY_Enhanced_wavs_PATH)

Root path, /data
	|-> Root User path, /data/knayem

IEEE Male Data Corpora path, /data/SpeechCorpora/IEEE_male


NameError: name 'IEEE_FEMALE_CORPORA_PATH' is not defined

### TIMIT Dataset 

#### 2.a Mixture (Noisy)

#### Clean

In [None]:
# PATH = os.path.join(CLEAN_PATH,TRAIN_CLEAN_PATH) # clean train
# PATH = os.path.join(CLEAN_PATH,DEV_CLEAN_PATH) # clean dev
PATH = os.path.join(CLEAN_PATH,TEST_CLEAN_PATH) # clean test

# 5. STFT

### 5.a Parameters 

Followings are the basic parameter for calculating STFT.

In [9]:
fs = int(16e3)

n_fft = 640
win_length = int(40e-3*fs) # librosa needs scalar value
overlap = int(20e-3*fs)
hop_length = win_length - overlap # librosa needs scalar value

NUMS_PRINTS = 10

print('window: {0}, noverlap: {1}, nfft: {2}, fs: {3}, hop_length: {4}'.
      format(win_length,overlap,n_fft,fs,hop_length))

window: 640, noverlap: 320, nfft: 640, fs: 16000, hop_length: 320


### 5.b STFT function

Calculate Magnitude and Group Delay of the PATH (train, dev, test of IEEE/TIMIT) to get an overview of the data.

In [10]:
def mag_gd_phase(filename, fs, n_fft, hop_length, win_length):
    
    y, sr = librosa.load(filename, sr=fs)
    s_stft = librosa.stft(y,n_fft,hop_length,win_length)
    mag, phase = librosa.magphase(s_stft)
    angle = np.angle(phase)

    unwrap_angle = np.unwrap(angle, axis=0) # freq, MATLAB implementation
    unwrap_angle_s = np.roll(unwrap_angle, 1, axis=0) # roll across freq
    unwrap_GD = np.angle(np.exp(1j*(unwrap_angle - unwrap_angle_s))) # paper implementation

    return len(y), mag, unwrap_GD, phase, angle

## 6. Fixed step Quantization

In [11]:
def quantized_val(val, quant_boundary):
    
    proximity = abs(quant_boundary-val)
    closest_boundary_index = np.argmin(proximity)
    return quant_boundary[closest_boundary_index]

In [12]:
def quantized_matrix(matrix, QUANT_STEP, MAX_AMP=200,MIN_AMP=0):
    
    quant_boundary = np.linspace(MIN_AMP,MAX_AMP,MAX_AMP//QUANT_STEP)
    m_shape = matrix.shape
    
    quantized_list = [quantized_val(v,quant_boundary) for row in matrix for v in row]
    return np.array(quantized_list).reshape(m_shape)

In [20]:
def save_enhanced(mag, phase, fs, n_fft, hop_length, win_length, target_directory, filename, tags=None):
    
    D = mag*phase
    enhanced = librosa.istft(D,hop_length,win_length)
    
    # enhanced filename creation
    name = filename.split('.')[0]
    
    if tags is not None:
        if 'quantization_tag' in tags:
            name = "_".join([name,tags['quantization_tag'],str(tags['step'])])
        if 'avg_step' in tags:
            name = "_".join([name,str(tags['avg_step'])])

    name = ".".join([name,"wav"])

    
    # directory creation   
    if not os.path.exists(target_directory):
        print(False,target_directory)
        os.makedirs(target_directory)
    else:
        print(True,target_directory)
        pass
    
    wav_filepath = os.path.join(target_directory,name)
                                       
    # save file
    sf.write(wav_filepath, enhanced, int(fs))
    #print(wav_filepath)
    
    return wav_filepath
                                       

In [21]:
MAX_AMP, MIN_AMP = 100, 0

QUANTIZED_DIRECTORY_TAG = "Quantized"
Fixed_Step_Quantization_TAG = "FS"

In [22]:
corpora_path_list = [CLEAN_wavs_PATH]

# [0.25, 0.125, 0.0625, 0.03125, 0.015625, 0.0078125 0.00390625]
QUANT_STEP_LIST = [0.0625]

In [23]:
for QUANT_STEP in QUANT_STEP_LIST:
    
    for enum1, corpora in enumerate(sorted(corpora_path_list)) :
        print(enum1,"CORPORA:", corpora)
        QUANTIZED_DIRECTORY = corpora+"_"+QUANTIZED_DIRECTORY_TAG

        for root, dirs, files in os.walk(corpora): 
            # .wav files only
            wav_files = list( filter(lambda x: x.split('.')[-1] == 'wav', files) )
            print("ROOT:",root, ", len(DIR):", len(dirs), ", len(FILES):",len(wav_files),root.split('/')[-1])
            
            # folder name
            if len(dirs)==0:
                folder_name = root.split('/')[-1]
                QUANTIZED_DIRECTORY_PATH = os.path.join(QUANTIZED_DIRECTORY,folder_name)
                QUANTIZED_DIRECTORY_PATH = "_".join([QUANTIZED_DIRECTORY_PATH,Fixed_Step_Quantization_TAG,str(QUANT_STEP)])

                npy_list = []
#                 plt.figure()
                
            for enum2, filename in enumerate(sorted(wav_files)):
                clean_wav_full_path = os.path.join(root, filename)
                                                   
                len_y, mag, unwrap_GD, phase, angle = mag_gd_phase(clean_wav_full_path, fs, n_fft, hop_length, win_length)
                quantized_mag = quantized_matrix(mag, QUANT_STEP, MAX_AMP, MIN_AMP)

                diff_mag = abs(mag-quantized_mag)
                total_diff = np.sum(diff_mag)
#                 print(enum2,"|Error| = ", total_diff)

                D = librosa.amplitude_to_db(mag, ref=np.max)
                q_D = librosa.amplitude_to_db(quantized_mag, ref=np.max)

                quant_wav_full_path = save_enhanced(quantized_mag, phase, fs, n_fft, hop_length, win_length, 
                                                    QUANTIZED_DIRECTORY_PATH, filename,
                                                    {'quantization_tag':Fixed_Step_Quantization_TAG,'step':QUANT_STEP})

                print(clean_wav_full_path,"<->",quant_wav_full_path)
                npy_list.append( [filename, clean_wav_full_path, len_y, mag.shape[1]])
                
                
                # plot the spectrogram
#                 plt.subplot(len(wav_files), enum2+1, 1)
#                 plt.subplot(3, 1, enum2+1)
#                 librosa.display.specshow(D, y_axis='hz', x_axis='time', sr=fs)
#                 plt.colorbar(format='%+2.0f dB')
#                 plt.title(":".join([str(enum2),'mag',filename]))
#                 plt.subplots_adjust(hspace=0.5)

#                 plt.subplot(3, 2, enum2+1)
#                 librosa.display.specshow(q_D, y_axis='hz', x_axis='time', sr=fs)
#                 plt.colorbar(format='%+2.0f dB')
#                 plt.title(":".join([str(enum2),'quant-mag',quant_wav_full_path.split('/')[-1]]))
#                 plt.subplots_adjust(hspace=0.5)

#                 plt.subplot(3, 3, enum2+1)
#                 librosa.display.specshow(librosa.amplitude_to_db(librosa.amplitude_to_db(diff_mag, ref=np.max), ref=np.max), y_axis='hz', x_axis='time', sr=fs)
#                 plt.colorbar(format='%+2.0f dB')
#                 plt.title(":".join([str(enum2),'|Error|',str(total_diff)]))
#                 plt.subplots_adjust(hspace=0.5)
                
#                 plt.draw()
                
#                 if enum2>=10:
#                     break
                    
            if len(dirs)==0:    
                npy_path = os.path.join(FILE_SAVE_PATH,QUANTIZED_DIRECTORY_PATH.split('/')[-1])
#                 plt_path = os.path.join(FILE_SAVE_PATH,QUANTIZED_DIRECTORY_PATH.split('/')[-1]+".pdf")

                np.save(npy_path, npy_list)
#                 plt.savefig(plt_path,bbox_inches='tight')
        
            


0 CORPORA: /data/knayem/IEEE_male_clean_16k
ROOT: /data/knayem/IEEE_male_clean_16k , len(DIR): 3 , len(FILES): 0 IEEE_male_clean_16k
ROOT: /data/knayem/IEEE_male_clean_16k/test_16k , len(DIR): 0 , len(FILES): 110 test_16k


  This is separate from the ipykernel package so we can avoid doing imports until


0 |Error| =  433.06206890193886
False /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625
/data/knayem/IEEE_male_clean_16k/test_16k/S_62_01_16k.wav <-> /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625/S_62_01_16k_FS_0.0625.wav
1 |Error| =  452.48037723053716
True /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625
/data/knayem/IEEE_male_clean_16k/test_16k/S_62_02_16k.wav <-> /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625/S_62_02_16k_FS_0.0625.wav
2 |Error| =  560.6148941240745
True /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625
/data/knayem/IEEE_male_clean_16k/test_16k/S_62_03_16k.wav <-> /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625/S_62_03_16k_FS_0.0625.wav
3 |Error| =  456.3120123746147
True /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625
/data/knayem/IEEE_male_clean_16k/test_16k/S_62_04_16k.wav <-> /data/knayem/IEEE_male_clean_16k_Quantized/test_16k_FS_0.0625/S_62_04_16k_FS_0.0625.wav
4 |Er

In [26]:
npy_list = np.load(npy_path+".npy")

In [34]:
npy_list[:,[0,1:2]]

SyntaxError: invalid syntax (<ipython-input-34-74059c6b7878>, line 1)