In [None]:
import scipy.io.wavfile
from fastdtw import fastdtw
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

import time
import pydub
import noisereduce as nr

# Load Data

Each set has speech corresponding to the pronounciation of digits from 0 to 9. The training data has multiple sets, while the test data as a single set.

In [None]:
# Assumes all sample rates are the same
sample_rate = None
    
test_cnt = 2
dig_cnt = 10

train = [None]*test_cnt
for t in range(test_cnt):
    train[t] = [None]*dig_cnt
    for i in range(dig_cnt):
        filename = f'base/{t}/{i}.wav'
        [instance_sample_rate, pcm_data] = scipy.io.wavfile.read(filename)
        pcm_data = np.array(pcm_data) / np.max(pcm_data)
        if sample_rate is None:
            sample_rate = instance_sample_rate
        assert sample_rate == instance_sample_rate

        train[t][i] = pcm_data

test = [None]*10
for i in range(10):
    filename = f'test1/{i}.wav'
    [instance_sample_rate, pcm_data] = scipy.io.wavfile.read(filename)
    pcm_data = np.array(pcm_data) / np.max(pcm_data)
    if sample_rate is None:
        sample_rate = instance_sample_rate
    assert sample_rate == instance_sample_rate
        
    test[i] = pcm_data


In [None]:
# Common functions used by multiple cells

overlap_pct = 0.4
window_ms = 15

def get_window():
    sym = False # periodic
    window_size = floor(window_ms/1000*sample_rate)
    return hann(window_size, sym)

def create_overlapping_blocks(x, w):
    n = len(x)
    nw = len(w)
    step = floor(nw * (1 - overlap_pct))
    nb = floor((n - nw) / step) + 1

    B = np.zeros((nb, nw))

    for i in range(nb):
        offset = i * step
        B[i, :] = w * x[offset : nw + offset]
    
    return B

def plot_line(ax, ys, marks):
    xs = np.array(range(len(ys))) / instance_sample_rate * 1000
    for m in marks:
        mt = m / instance_sample_rate * 1000
        ax.axvline(mt, color='red')
    ax.plot(xs, ys)

def transform_data(train, test, tr):
    dig_cnt = len(test)
    tr_train = [0]*len(train)
    for t in range(len(train)):
        tr_train[t] = [None]*dig_cnt
        for i in range(dig_cnt):
            tr_train[t][i] = tr(train[t][i])
        
    tr_test = [0]*dig_cnt
    for i in range(dig_cnt):
        tr_test[i] = tr(test[i])
    
    return (tr_train, tr_test)

def experiment(training_data, test_data, cost_f):
    start_time = time.time()

    dig_cnt = len(test_data)
    test_cnt = len(training_data)
    results = np.empty(shape=(dig_cnt, dig_cnt))

    for i in range(dig_cnt):
        for j in range(dig_cnt):
            total_distance = 0
            for t in range(test_cnt):
                total_distance = total_distance + cost_f(training_data[t][j], test_data[i])            
            results[i][j] = total_distance / test_cnt
    
    print("Elapsed time: %s s" % (time.time() - start_time))

    return results 

def make_table(results):
    columns = list(range(len(results)))
    df = pd.DataFrame(results, columns)

    def is_min(v, x):
        minr = np.min(x)
        return v <= minr + 1e-6

    row_id = [0]
    def get_style(x):
        minx = np.min(x)
        style = []
        for i in range(len(x)):
            v = x[i]
            cell_style = ""
            if v <= minx + 1e-6:
                if i == row_id[0]:
                    cell_style = "background: #B4F8C8"
                else:
                    cell_style = "background: #FFAEBC"
            style.append(cell_style)
        row_id[0] = row_id[0] + 1
        return style

    df = df.style.apply(lambda x: get_style(x), axis=1)
    return df

# Spectrogram

Investigate how well we can detect silence by working in the frequency space. By assuming noise is spectrally flat we can compute the variance of frequencies and discard those below a threshold.

In [None]:
from scipy.signal.windows import hann
from math import floor
from scipy.fft import fft, rfft, ifft, irfft, fftfreq, rfftfreq

def get_frequency_map(samples):
    w = get_window()
    B = create_overlapping_blocks(samples, w)
    [nb, nw] = B.shape

    # indices corresponding to the human frequency range
    # FIXME: this is coupled w/ the sample rate of 14.4kHz
    fi_min = 2
    fi_max = 20
    M = len(B)
    fm = [0]*M
    for i in range(M):
        xsi = B[i, :]
        ysi = fft(xsi)

        fm[i] = np.abs(ysi[fi_min:fi_max])
    return fm

def trim_low_variance(varis):
    threshold = 0.01
    max_var = np.max(varis)
    M = len(varis)
    li = 0
    while li < M:
        if varis[li] > max_var*threshold:
            break
        li += 1
    ri = M - 1
    while ri > li:
        if varis[ri] > max_var*threshold:
            break
        ri -= 1
    return [li, ri]

def detect_silence_spectral(samples):
    w = get_window()
    fm = get_frequency_map(samples)

    varis = [np.var(fs) for fs in fm]
    
    [li, ri] = trim_low_variance(varis)

    step = floor(len(w) * (1 - overlap_pct))
    li *= step
    ri *= step
    return [li, ri + wlen]

def spectrogram(samples):
    w = get_window()
    fm = get_frequency_map(samples)

    N = len(fm[0])
    M = len(B)
    Sxx = np.empty(shape=(N, M))
    for i in range(M):
        Sxx[:, i] = fm[i]
    
    plt.figure(1, figsize=(18, 5))
    
    step = floor(len(w) * (1 - overlap_pct))
    xs = np.array(range(M)) * step
    plt.pcolormesh(xs, fs, Sxx, shading='gouraud')
    
    fig, ax = plt.subplots(1, 1, figsize=(18, 3))
    marks = detect_silence_spectral(samples)
    plot_line(ax, samples, marks)


spectrogram(test[2])



# Trimming Silence

In [None]:
def detect_silence_mov_avg(samples):   
    wlen = int(sample_rate / 1000 * window_ms)
    
    max_amp = np.max(samples)
    abs_samples = np.abs(samples)
    threshold = 0.02 * max_amp
    
    wsum = np.sum(abs_samples[0:wlen])
    li = 0
    # from the left
    while li + wlen < len(abs_samples) and wsum / wlen <= threshold:
        wsum += abs_samples[li + wlen] - abs_samples[li]
        li += 1
    
    # from the right
    wsum = np.sum(abs_samples[-wlen:])
    ri = len(abs_samples) - 1 - wlen
    while ri > 0 and wsum / wlen <= threshold:
        wsum += abs_samples[ri] - abs_samples[ri + wlen]        
        ri -= 1
        
    return [li, ri + wlen]


def trim_silence(samples):
    samples = nr.reduce_noise(y=samples, sr=sample_rate)
    marks = detect_silence_mov_avg(samples)
    return samples[marks[0]: marks[1]]
    
def plot_silence(ax, samples):
    samples = nr.reduce_noise(y=samples, sr=sample_rate)
    marks = detect_silence_mov_avg(samples)
    plot_line(ax, samples, marks)

(trimmed_train, trimmed_test) = transform_data(train, test, trim_silence)

# Display where we are detecting the silence for some data
fig, axs = plt.subplots(10, 3, figsize=(18, 20))
for i in range(10):    
    plot_silence(axs[i][0], train[0][i])
    plot_silence(axs[i][1], train[1][i])    
    plot_silence(axs[i][-1], test[i])


# Linear Predictive Coding

In [None]:
def make_matrix_X(x, p):
    n = len(x)
    # [x_n, ..., x_1, 0, ..., 0]
    xz = np.concatenate([x[::-1], np.zeros(p)])
    
    X = np.zeros((n - 1, p))
    for i in range(n - 1):
        offset = n - 1 - i 
        X[i, :] = xz[offset : offset + p]
    return X

"""
An implementation of LPC.

A detailed explanation can be found at
https://ccrma.stanford.edu/~hskim08/lpc/

x - a vector representing the time-series signal
p - the polynomial order of the all-pole filter

a - the coefficients to the all-pole filter
g - the variance(power) of the source (scalar)
e - the full error signal

NOTE: This is not the most efficient implementation of LPC.
Matlab's own implementation uses FFT to via the auto-correlation method
which is noticeably faster. (O(n log(n)) vs O(n^2))
"""
def solve_lpc(x, p, ii):
    b = x[1:].T
        
    X = make_matrix_X(x, p)
    
    a = np.linalg.lstsq(X, b)[0]

    e = b.T - np.dot(X, a)
    g = np.var(e)

    return [a, g]

"""
Encodes the input signal into lpc coefficients using 50% OLA

x - single channel input signal
p - lpc order
nw - window length
 
A - the coefficients
G - the signal power
E - the full source (error) signal
"""
def lpc_encode(x, p, w):
    B = create_overlapping_blocks(x, w)
    
    [nb, nw] = B.shape

    A = np.zeros((p, nb))
    G = np.zeros((1, nb))

    for i in range(nb):
        [a, g] = solve_lpc(B[i, :], p, i)
   
        A[:, i] = a
        G[:, i] = g
    
    return [A, G]

# Turns multi-series into single series w/ multi y-value
def convert_series(series_set):
    series_cnt = len(series_set)
    series_len = len(series_set[0])
    single_series = []
    for t in range(series_len):
        ys = np.empty(shape=series_cnt)
        for i in range(series_cnt):
            ys[i] = series_set[i][t]
        single_series.append(ys)
    return single_series

def get_lpc_coeff(samples):
    sym = False # periodic
    w = hann(floor(0.03*sample_rate), sym) 
    p = 6 # number of poles
    [A, G] = lpc_encode(samples, p, w)
    return A

def get_as_lpc(samples):
    return convert_series(get_lpc_coeff(samples))

(lpc_coeff_train, lpc_coeff_test) = transform_data(trimmed_train, trimmed_test, get_as_lpc)
    

def lpc_heatmap(ax, data):
    ax.imshow(get_lpc_coeff(data), aspect='auto')

fig, axs = plt.subplots(dig_cnt, 3, figsize=(18, 25))    
for i in range(dig_cnt):
    lpc_heatmap(axs[i][0], trimmed_train[0][i])
    lpc_heatmap(axs[i][1], trimmed_train[1][i])
    lpc_heatmap(axs[i][2], trimmed_test[i])


In [None]:
# DTW over LPC coefficients
def fastdtw_cost(s1, s2):
    cost, path_ = fastdtw(s1, s2)
    return cost

results = experiment(lpc_coeff_data, lpc_coeff_test, fastdtw_cost)
df = make_table(results)
df

In [None]:
# Direct DTW (no processing)
start_time = time.time()

def fastdtw_cost(s1, s2):
    return fastdtw(s1, s2)[0]

results = experiment(data, test, fastdtw_cost)
df = make_table(results)

print("Elapsed time: %s s" % (time.time() - start_time))

df

In [None]:
# DTW after silence removal

def fastdtw_cost(s1, s2):
    cost, path_ = fastdtw(s1, s2)
    return cost

results = experiment(trimmed_train, trimmed_test, fastdtw_cost)
df = make_table(results)

df