In [1]:
import json
import numpy as np
import scipy.io.wavfile as wf
from sys import exit
from IPython.display import Audio

In [112]:
path_sample = '/content/drive/My Drive/Colab Notebooks/PG/data/1.wav'
path_record = '/content/drive/My Drive/Colab Notebooks/PG/data/record-1.wav'
path_templates = '/content/templates.txt'

window_fun = 'hanning'
window_len = 2048
hop_size = 15
mel_filter_num = 10
dct_filter_num = 5

In [None]:
# 1. load wav
path = path_sample #path = input("WAV path: ")
data, rate = load_wav(path)

# 2. extract words
samples = apply_endpointing(data, rate)

# 3. play each word
while True:
    i = int(input("Sample to play [{}..{}] (type 0 to exit): ".format(1, len(samples))))
    if i <= 0:
        break
    play_wav(samples[i-1], rate)

# 4. save each word as template
path = path_templates #path = input("Templates path: ")
templates = load_templates(path)
for s in samples:
    name = input("Sample name: ")
    templates.append({'name': name, 'lpcc': apply_coding('lpcc'), 'mfcc': apply_coding('mfcc')})
save_templates(path, templates)

In [None]:
# 1. load wav
path = path_sample #path = input("WAV path: ")
data, rate = load_wav(path)

# 2. extract words
samples = apply_endpointing(data, rate)

# 4. perform dtw for each word
path = path_templates #path = input("Templates path: ")
templates = load_templates(path)
coef = input("Coefficients [lpcc, mfcc]: ")
for i, s in enumerate(samples):
    matches = [(t['name'], apply_dtw(s, t[coef])) for t in templates]
    matches.sort(key = lambda x: x[1])
    print("Sample:", i + 1)
    for t, dtw in matches:
        print("Match: {}\t({})".format(t, dtw))

In [3]:
def load_wav(path):
    raw = wf.read(path)
    return raw[1], raw[0]


def play_wav(data, rate):
    display(Audio(data, rate=rate))


def load_templates(path):
    try:
        with open(path, 'r') as fd:
            return json.load(fd)
    except FileNotFoundError:
        return []


def save_templates(path, templates):
    with open(path, 'w') as fd:
        json.dump(templates, fd, indent=4)


def create_scaffolds(windows, curr, sub, length):
    i, lmt = 0, len(windows)
    
    if windows[-1] != sub:
        windows.append(sub)
        lmt -= 1
    
    scaffolds = []

    while i < lmt:
        if windows[i] == curr:
            j = windows.index(sub, i + 1)
            if j - i < length:
                windows[i:j] = [sub] * (j - i)
            else:
                scaffolds.append((i, j))
            i = j
        i += 1
    
    return scaffolds


def apply_endpointing(data, rate):
    p, q = 25, 25
    
    dur_noise, dur_window = 0.1, 0.01
    n_noise, n_window = int(rate * dur_noise), int(rate * dur_window)
    
    noise = np.abs(data[:n_noise])
    lmt = np.mean(noise) + 2 * np.std(noise)

    windows = [1 if np.mean(np.abs(data[i:i+n_window])) > lmt else 0 for i in range(0, len(data), n_window)]
    create_scaffolds(windows, 0, 1, p)
    
    scaffolds = create_scaffolds(windows, 1, 0, q)
    
    if len(scaffolds) == 0:
        sys.exit("Error: Only silence detected")
    
    return [data[i*n_window:j*n_window] for i, j in scaffolds]


def apply_slicing(data, rate, window_len, hop_size):
    data = np.pad(data, window_len // 2, mode='reflect')

    frame_len = rate * hop_size // 1000
    frame_num = (data.shape[0] - window_len) // frame_len + 1
    
    return np.array([data[i*frame_len:i*frame_len+window_len] for i in range(frame_num)])


def apply_windowing(data, rate, fun, size):
    funs = {'hanning': np.hanning, 'hamming': np.hamming, 'none': np.ones}
    for i in range(data.shape[0]):
        data[i] *= funs[fun](data[i].shape[0])
    return data


def apply_dft(data, window_len):
    data_dft = np.empty((1 + window_len // 2, data.shape[0]), dtype=np.complex64)

    for i in range(data_dft.shape[1]):
        data_dft[:, i] = np.fft.fft(data[i])[:data_dft.shape[0]]

    return np.square(np.abs(data_dft))


def freq_to_mel(freq):
    return 2595.0 * np.log10(1.0 + freq / 700.0)


def mel_to_freq(mels):
    return 700.0 * (10.0**(mels / 2595.0) - 1.0)


def get_filter_points(rate, filter_num, window_len):
    mel_min, mel_max = freq_to_mel(0), freq_to_mel(rate // 2)

    freqs = mel_to_freq(np.linspace(mel_min, mel_max, filter_num + 2))
    points = np.floor((window_len + 1) / rate * freqs).astype(int)

    return points, freqs


def get_filters(filter_points, window_len):
    filter_num = filter_points.shape[0] - 2
    filters = np.zeros((filter_num, window_len // 2 + 1))

    for i in range(filter_num):
        prev, curr, next = filter_points[i], filter_points[i+1], filter_points[i+2]
        filters[i, prev:curr] = np.linspace(0, 1, curr - prev)
        filters[i, curr:next] = np.linspace(1, 0, next - curr)

    return filters


def apply_filters(data, filter_num, window_len):
    filter_points, mel_freqs = get_filter_points(rate, filter_num, window_len)
    filters = get_filters(filter_points, window_len)

    enorm = 2.0 / (mel_freqs[2:filter_num+2] - mel_freqs[:filter_num])
    filters *= enorm[:, np.newaxis]

    return np.dot(filters, data)


def apply_dct(data, filter_num, filter_len):
    samples = np.arange(1, 2 * filter_len, 2) * np.pi / (2.0 * filter_len)

    filters = np.empty((filter_num,filter_len))
    filters[0] = 1.0 / np.sqrt(filter_len)
    filters[1:] = [np.cos(i * samples) * np.sqrt(2.0 / filter_len) for i in range(1, filter_num)]

    return np.dot(filters, 10.0 * np.log10(data))


def get_mfcc(data, rate):
    data /= np.max(np.abs(data))

    data_slices = apply_slicing(data, rate, window_len, hop_size)
    data_windows = apply_windowing(data_slices, rate, windo_fun, window_len)
    data_dft = apply_dft(data_windows, window_len)
    data_filtered = apply_filters(data_dft, mel_filter_num, window_len)
    mfcc = apply_dct(data_filtered, dct_filter_num, mel_filter_num)

    return mfcc


def get_lpcc(data):
    n, r = data.shape[0], [data.dot(data)]

    for i in range(1, n + 1):
        r.append(data[i:].dot(data[:-i]))

    a = np.array([1, -r[1] / r[0]])
    e = r[0] + r[1] * a[1]

    for i in range(1, n):
        alpha = -a[:i+1].dot(r[i+1:0:-1]) / (e + 10e-17)
        a = np.hstack([a,0])
        a = a + alpha * a[::-1]
        e *= 1 - alpha**2

    return a


def apply_coding(coef):
    if coef == 'lpcc':
        return get_lpcc()
    elif coef == 'mfcc':
        return get_mfcc()
    else:
        sys.exit("Error: Invalid coefficients")


def apply_dtw(s, t):
    n, m = len(s), len(t)

    mat = np.full((n, m), np.inf)
    mat[0, 0] = 0
    
    for i in range(1, n):
        for j in range(1, m):
            curr = np.linalg.norm(s[i] - t[j])
            prev = np.min([mat[i-1, j], mat[i, j-1], mat[i-1, j-1]])
            mat[i, j] = prev + curr

    return mat[n-1, m-1]