# Load data in

In [None]:
import os
from pathlib import Path

current_dir = Path('./')
data_dir = current_dir / 'speech_commands_v0.01'
train_file_path = data_dir / "train_digit_list.txt"
test_file_path = data_dir / "testing_digit_list.txt"

train_file = open(train_file_path, "r")
training_list = [data_dir / x for x in train_file.read().splitlines()]

test_file = open(test_file_path, "r")
testing_list = [data_dir / x for x in test_file.read().splitlines()]

In [None]:
import os
from pathlib import Path
import numpy as np

with open('X_train_original.npy', 'rb') as f:
    X_train_org = np.load(f)

with open('X_train_reverb_random.npy', 'rb') as f:
    X_train_reverb = np.load(f)
    
with open('X_test_reverb_random.npy', 'rb') as f:
    X_test_reverb = np.load(f)

In [None]:
print(X_train_org[2])

In [None]:
import matplotlib.pyplot as plt

sample_num = 25
x = np.arange(0, 16000)

fig, axs = plt.subplots(3)
axs[0].plot(x, X_train_org[sample_num])
axs[1].plot(x, X_train_reverb[sample_num])
axs[2].plot(x, X_train_reverb_random[sample_num])
plt.show()

# Filter bank and Mel cooef

In [None]:
from scipy.fft import dct

def emphaize(signal, plot=True):
    pre_emphasis = 0.97
    emphasized_signal = np.append(signal[0], signal[1:] - pre_emphasis * signal[:-1])
    if plot:
        x = np.arange(0, 1., 6.25e-5)
        plt.plot(x, signal, label='original')
        plt.plot(x, emphasized_signal, label='emphasized')
        plt.legend()
        plt.title('Emphasized signal')
        plt.savefig('img/e_signal.png')
        plt.show()


    return emphasized_signal

def framing(emphasized_signal, size, stride, sample_rate=16000):
    #Framing
    frame_length, frame_step = size * sample_rate, stride * sample_rate 
    signal_length = len(emphasized_signal)
    frame_length = int(round(frame_length))
    frame_step = int(round(frame_step))
    num_frames = int(np.ceil(float(np.abs(signal_length - frame_length)) / frame_step)) 

    pad_signal_length = num_frames * frame_step + frame_length
    z = np.zeros((pad_signal_length - signal_length))
    pad_signal = np.append(emphasized_signal, z)

    indices = np.tile(np.arange(0, frame_length), (num_frames, 1)) + np.tile(np.arange(0, num_frames * frame_step, frame_step), (frame_length, 1)).T
    frames = pad_signal[indices.astype(np.int32, copy=False)]
    
    #Windowing
    frames *= np.hamming(frame_length)
    return frames

def ftps(frames, NFFT = 512):
    mag_frames = np.absolute(np.fft.rfft(frames, NFFT))  # Magnitude of the FFT
    pow_frames = ((1.0 / NFFT) * ((mag_frames) ** 2))  # Power Spectrum
    
    plt.show()
    
    return mag_frames, pow_frames


def filter_bank(pow_frames, sample_rate = 16000, NFFT = 512, nfilt=40):
    low_freq_mel = 0
    high_freq_mel = (2595 * np.log10(1 + (sample_rate / 2) / 700))  # Convert Hz to Mel
    mel_points = np.linspace(low_freq_mel, high_freq_mel, nfilt + 2)  # Equally spaced in Mel scale
    hz_points = (700 * (10**(mel_points / 2595) - 1))  # Convert Mel to Hz
    bin = np.floor((NFFT + 1) * hz_points / sample_rate)

    fbank = np.zeros((nfilt, int(np.floor(NFFT / 2 + 1))))
    for m in range(1, nfilt + 1):
        f_m_minus = int(bin[m - 1])   # left
        f_m = int(bin[m])             # center
        f_m_plus = int(bin[m + 1])    # right

        for k in range(f_m_minus, f_m):
            fbank[m - 1, k] = (k - bin[m - 1]) / (bin[m] - bin[m - 1])
        for k in range(f_m, f_m_plus):
            fbank[m - 1, k] = (bin[m + 1] - k) / (bin[m + 1] - bin[m])
            
    filter_banks = np.dot(pow_frames, fbank.T)
    filter_banks = np.where(filter_banks == 0, np.finfo(float).eps, filter_banks)  # Numerical Stability
    filter_banks = 20 * np.log10(filter_banks)  # dB
    
    filter_banks -= (np.mean(filter_banks, axis=0) + 1e-8)

    return filter_banks

def mfcc(filter_banks, num_ceps = 12, cep_lifter = 22):
    mfcc = dct(filter_banks, type=2, axis=1, norm='ortho')[:, 1 : (num_ceps + 1)] 
    (nframes, ncoeff) = mfcc.shape
    n = np.arange(ncoeff)
    lift = 1 + (cep_lifter / 2) * np.sin(np.pi * n / cep_lifter)
    mfcc *= lift  #*
    
    mfcc -= (np.mean(mfcc, axis=0) + 1e-8)

    return mfcc

def extract_filter_bank_mfcc(signal, plot=True):
    e_signal = emphaize(signal, plot)
    frames = framing(e_signal, 0.025, 0.01)
    mag_frames, pow_frames = ftps(frames)
    f_bank = filter_bank(pow_frames)
    mel = mfcc(f_bank)
    
    if plot:
        plt.imshow(f_bank.T, origin='lower')
        plt.title('Filter bank')
        plt.savefig('img/fb.png')
        plt.show()

        plt.imshow(mel.T, origin='lower')
        plt.title('MFCC')
        plt.savefig('img/mfcc.png')
        plt.show()
    
    return np.concatenate((mel, f_bank), axis=1)



In [None]:
a = extract_filter_bank_mfcc(X_train_org[sample_num])
plt.imshow(a.T)

In [None]:
extract_filter_bank_mfcc(X_train_reverb[sample_num])

In [None]:
extract_filter_bank_mfcc(X_train_org[sample_num])

# Extract feature for all signal

In [None]:
from tqdm.notebook import tnrange

train_data = []
for i in tnrange(len(X_train_reverb)):
    x = extract_filter_bank_mfcc(X_train_reverb[i], plot=False).T
    train_data.append(x)
    
train_data = np.asarray(train_data)

In [None]:
save_dir = Path('./FE_data')
np.save(save_dir / "MFCC_train_reverb.npy", train_data)

In [None]:
test_data = []
for i in tnrange(len(X_test_reverb)):
    x = extract_filter_bank_mfcc(X_test_reverb[i], plot=False).T
    test_data.append(x)
    
test_data = np.asarray(test_data)

In [None]:
save_dir = Path('./FE_data')
np.save(save_dir / "MFCC_test_reverb.npy", test_data)