## Some technical notes about audio parameters

- The sampled signal is obtained in the Linear Pulse Code Modulation (LPCM).
- The signal is stereo (`nchanells=2`), but it is only used the left-side signal.
- It is utilized 16 bits (2 bytes) per sample to encode the audio. The native data type of this data is `int16`, which is capable of storing a [range from](https://www.mathworks.com/help/matlab/ref/audioread.html) `-32768` up to `+32767`.
- The data type is converted to `float` because of the numeric precision and because the floating point in `Python` [is interpreted as](https://docs.python.org/3/library/stdtypes.html#numeric-types-int-float-complex) `double` in `C`, which is convenient.
- The float-converted raw data is then normalized by the maximum value reachable of the `int16` format, that is, `32768`. The resulting signal is the same achieved by the `audioread()` command of `matlab`.
- The original sampling rate is $44.1\;kHz$. But each recording is downsampled into two different signals, with a sampling rate of $F_s = 22.05\;kHz$.
- The audio dataset comprises five classes (the speeches "avançar", "recuar", "parar", "direita", and "esquerda"), each with 10 recordings, totalizing 50 files. With the downsampling, we have 20 recordings by class. Considering that the `.wav` file is stereo, that is, `nchannel=2`, the number of audio recordings by class is increased to 40. From each of these recordings, it is extracted a discrete-time signal, which is converted to a $N_s$-dimensional vector, being $N_s$ the number of samples of this signal.

## Some notes about the LPC (linear predictive coding) and the Yule-Walker algorithms

- The AR(p) model is implemented for `p=10`, `p=15`, and `p=20`.
- A single recording is divided into 31 frames without overlapping. The number of samples per frames, $N_f$, and the number of samples between each frame, $N_{gap}$, are given by
    $$ N_f = \frac{T_{sig}T_{f_{min}} F_s}{T_{min}} $$
    and
    $$ N_{gap} = \frac{T_{sig} F_s-31N_f}{30},$$
    where $T_{sig}$ is the signal duration, $T_{min}$ is the minimum signal duration of the dataset, and $T_{f_{min}} \triangleq 15\;ms $ is the minimum frame duration. All these variables are defined in seconds.
- The Yule-Walker equation is applied to each of the 31 frames produced from a single audio recording. The final vector is achieved by concatenating all the $31$ coefficients obtained by the Yule-Walker equation, it is given by
$$\mathbf{a}_{p} = \begin{bmatrix}
\mathbf{a}_{p,1}^\mathsf{T} & \mathbf{a}_{p,2}^\mathsf{T} & \cdots & \mathbf{a}_{p,31}^\mathsf{T}
\end{bmatrix}^\mathsf{T} \in \mathbb{R}^{31p},
$$
where $\mathbf{a}_{p,i} \in \mathbb{R}^p$ is the coefficient vector obtained form the $i$-th frame. This procedure is repeated for each of the four signals (channel a and b, samples even and odd) from a single audio recording, for each audio recording.
- For sake of clarity, it  is chosen the normalized (by its l2 norm) version of the autocorrelation function. It makes $r(\tau)$ invariant to the signal energy of the frame.
- It is chosen a split for 50%-50% for train and train dataset, as done in the article.

---

> 1. Carregar os diversos arquivos de áudio e realizar a subamostragem dos sinais de cada canal a fim
de gerar a base de dados de treino e teste.

### Initializing

In [43]:
# %reset -f
from numpy import inf, empty, concatenate, arange, inner, array, logical_and, where, identity, logspace, zeros, any
from numpy.linalg import norm, cond, matrix_rank as rank, inv
from numpy.random import normal
from numpy.fft import fft
from statsmodels.regression.linear_model import yule_walker
from scipy.io import wavfile
from scipy.linalg import toeplitz
from scipy.signal import periodogram
from math import floor, log as ln, sqrt
from warnings import warn
from os import listdir
from matplotlib import pyplot as plt
import itertools

# number of files per commands
n_files = 10
# AR(p) model order -> p = 10, 15, 20
all_p = range(10,21,5)
# all commands
all_commands = ('avancar', 'esquerda', 'direita', 'parar', 'recuar')
# a dictionary that gather all signals and the coefficient estimation for each audio file
all_data = {f'{command}_{file_number}_p{p}_s{signal}': {'signal': empty(0), 'all_a': empty(0), 'all_a+noise_01': empty(0), 'all_a+noise_0625': empty(0), 'all_a_hat': empty(0), 'all_a_hat+noise_01': empty(0), 'all_a_hat+noise_0625': empty(0), 'all_a box-cox': empty(0), 'all_a+noise_01 box-cox': empty(0), 'all_a+noise_0625 box-cox': empty(0)} for p in all_p for command in all_commands for file_number in range(1,n_files+1) for signal in ('0a', '0b', '1a', '1b')}

def get_T_min(root_dir):
    T_min = inf
    for file_name in listdir(root_dir):
        F_s, s_n = wavfile.read(root_dir+file_name)
        # signal duration
        T_sig = s_n[:,0].size * (1/F_s)
        if T_sig < T_min:
            T_min = T_sig
    return T_min

def box_cox(x, gamma):
    assert 0 <= gamma and gamma < 1, 'the hyperparameter should be 0<=gamma<1'
    return ln(x) if gamma == 0 else (x**gamma -1)/gamma

# minimum audio duration of the dataset
T_min = get_T_min('./Audio_files_TCC_Jefferson/')
# minimum frame duration, 15ms (user-defined)
T_f_min = 15e-3

### LPC and Yule-Walker algorithm

In [44]:
# %reset_selective -f all_a
for p in all_p:
    for command in all_commands:
        for file_number in range(1,n_files+1):
            file_name = f'./Audio_files_TCC_Jefferson/comando_{command}_{file_number:0>2d}.wav'
            # input audio vector, s_n -> [s[0], s[1], ..., s[N_s-1]]
            F_s, s_n = wavfile.read(file_name)
            # Number of samples
            N_s = s_n[:,0].size
            # signal duration
            T_sig = N_s/F_s
            # convert from int16 to float type and normalize it to range from -1 up to 1 (as matlab does)
            s_n = s_n.astype(float)/32768
            # downsampling: generate s0_n (even samples) and s1_n (odd samples) from s_n
            s0_n, s1_n = s_n[range(0,N_s,2),:], s_n[range(1,N_s,2),:]
            N_s //= 2
            F_s //= 2
            # number of samples per frame
            N_f = floor(T_sig*T_f_min*F_s/T_min)
            # number of samples between each frame (gap)
            N_gap = floor((N_s - 31*N_f)/30)
            # get channel a and chanell b
            s0a_n, s0b_n, s1a_n, s1b_n = s0_n[:,0], s0_n[:,1], s1_n[:,0], s1_n[:,1]

            # for each of the 4 signals from a single recording: channel a and b, samples even and odd
            for s, sig_id in zip((s0a_n, s0b_n, s1a_n, s1b_n), ('0a', '0b', '1a', '1b')):
                # save signal - no noise
                all_data[f'{command}_{file_number}_p{p}_s{sig_id}']['signal'] = s
                # set and save the one-hot-encoding
                all_data[f'{command}_{file_number}_p{p}_s{sig_id}']['d'] = array([command==c for c in all_commands])
                # generate and save signal + w ~ N(0,sd^2)
                s_01 = s + sqrt(.01)*normal(size=s.shape)
                s_0625 = s + sqrt(.0625)*normal(size=s.shape)
                all_data[f'{command}_{file_number}_p{p}_s{sig_id}']['signal+noise_01'] = s_01
                all_data[f'{command}_{file_number}_p{p}_s{sig_id}']['signal+noise_0625'] = s_0625
                # for each noisy signal
                for s_noise, noise_id in zip((s, s_01, s_0625), ('', '+noise_01', '+noise_0625')):
                    # for each frame
                    for i, n in enumerate(range(0, N_s+1, N_f)):
                        # ensure that it is get only 31 frames
                        if i == 31:
                            break
                        # s_n0 -> [s[n0], s[n0+1], ..., s[n0+N_f-1]], being n0\in\mathbb{N}
                        s_n0 = s_noise[n+i*N_gap:n+i*N_gap+N_f]
                        # compute the autocorrelation function (normalized version), r_k -> r[k] -> [r[0], r[1], ..., r[p]]
                        r_k = empty(p+1)
                        for k in range(p+1):
                            r_k[k] = inner(s_n0[:N_f-k], s_n0[k:N_f])/norm(s_n0)
                        # autocorrelation matrix
                        # r_k[:p] -> [r[0], r[1], ..., r[p-1]]
                        R = toeplitz(r_k[:p])
                        # autocorrelation vector
                        # r -> [r[1], r[2], ..., r[p]]
                        r = r_k[1:]
                        if rank(R) == R.shape[0]:
                            # Yule-Walker equation
                            a = inv(R) @ r
                            # built-in function for comparison purpose
                            a_hat, _ = yule_walker(s_n0, order=p, method='mle')
                            # save a
                            all_data[f'{command}_{file_number}_p{p}_s{sig_id}'][f'all_a{noise_id}'] = concatenate((all_data[f'{command}_{file_number}_p{p}_s{sig_id}'][f'all_a{noise_id}'], a))
                            # save a_hat
                            all_data[f'{command}_{file_number}_p{p}_s{sig_id}'][f'all_a_hat{noise_id}'] = concatenate((all_data[f'{command}_{file_number}_p{p}_s{sig_id}'][f'all_a_hat{noise_id}'], a_hat))
                        else:
                            warn(f'The autocorrelation matrix of the audio {file_name.split("/")[0]} is rank-deficient, skip over to the next audio recording.')

### Determine each voice signal using the PSD segmentation method estimated by the periodogram

In [45]:
# my periodogram estimate function - FROM SCRATCH
def my_periodogram(g, Fs):
    N = g.size
    # Nyquist theorem
    W = Fs/2
    # FFT
    G = fft(g)
    # two-sided periodogram estimate
    p = abs(G)**2 / (N*Fs)
    # transform it into one-sided
    p = p[0:1+N//2]
    p[1:-1] *= 2
    # all frequencies
    all_f = arange(0,W+Fs/N,Fs/N)[:p.size]
    
    return p, all_f

# lower and upper bound
all_lb = (0, 100, 200, 360, 500, 700, 850, 1000, 1200, 1320, 1500, 2200, 2700)
all_ub = all_lb[1:] + (3700,)

for key, data in all_data.items():
    # As the 4 signals are identical for p \in {10, 15, 20}, it is not necessary to compute x for different values of p
    if 'p10' not in key:
        continue
    for noise_id in '', '+noise_01', '+noise_0625':
        s = data[f'signal{noise_id}']
        p, all_f = my_periodogram(s, F_s)
        _, p_builtin = periodogram(s, fs=F_s)
        # for each lower and upper bound interval, get the maximum value
        psd_sampled = array([max(p[logical_and(all_f>=lb, all_f<=ub)]) for lb, ub in zip(all_lb, all_ub)])
        # save the attribute vector and all PSD
        all_data[key][f'psd{noise_id} sampled'] = psd_sampled
        # save the attribute vector and all PSD with box-cox
        all_data[key][f'psd{noise_id} sampled box-cox'] = box_cox(psd_sampled, 0.1)
        # save PSD from my periodogram estimate
        all_data[key][f'psd{noise_id}'] = p
        # save PSD from my periodogram estimate - built-in function
        all_data[key][f'psd{noise_id} built-in'] = p_builtin

### The Linear Least-Squares (LS) classifier

In [46]:
class LS_Classifier():
    def __init__(self):
        self.n_errors = []
        self.confusion_matrix_best = zeros((5, 5))
        self.confusion_matrix_worst = zeros((5, 5))
    
    # training
    def train(self, X, D):
        # regularized estimation of the linear transformation matrix of the attribute vectors
        for lambda_ in logspace(0,10,base=2):
            W_hat = D @ X.T @ inv(X @ X.T + (lambda_-1)*identity(X.shape[0]))
            # if W_hat is ill-conditioned, regularize it
            if cond(X @ X.T + (lambda_-1)*identity(X.shape[0])) <= 5e3:
                break
        self.W_hat = W_hat
    
    # testing
    def test(self, X, D):
        n_errors = 0
        confusion_matrix = zeros((5,5))
        # testing phase
        for x, d in zip(X.T, D.T): # for each column
            d_hat = self.W_hat @ x
            label = where(d == max(d))[0][0]
            guessed = where(d_hat == max(d_hat))[0][0]
            if label != guessed:
                n_errors += 1
            confusion_matrix[guessed][label] += 1
        
        # saving phase
        if self.n_errors: # if self.n_errors is not empty
            if n_errors > max(self.n_errors): # this is the worst test dataset?
                self.confusion_matrix_worst = confusion_matrix
            elif n_errors < min(self.n_errors): # this is the best test dataset?
                self.confusion_matrix_best = confusion_matrix
        else:
            self.confusion_matrix_worst = confusion_matrix
            self.confusion_matrix_best = confusion_matrix
        # save n_errors
        self.n_errors.append(n_errors)

#### K-fold cross validation: LS classifier + LPC

In [48]:
# LPC attribute vector size for LPC algorithm -> 31 frames x 10 attributes per frame
P_LPC = 310
n_folds = 10
# columns_per_fold = X_LPC.shape[1]//10
kfold_iterator = itertools.combinations(range(1,n_folds+1), n_folds//2)
ls_classifier = LS_Classifier()

for train_set in kfold_iterator:
    # generate train dataset
    X_train, D_train = empty((P_LPC, 0)), empty((5, 0))
    all_data_train = {key:value for key, value in all_data.items() if 'p10' in key and any([key.split('_')[1] == str(e) for e in train_set])}
    for data in all_data_train.values():
        X_train = concatenate((X_train, data['all_a+noise_0625'][:,None]), axis=1)
        D_train = concatenate((D_train, data['d'][:,None]), axis=1)
    # generate test dataset
    X_test, D_test = empty((P_LPC, 0)), empty((5, 0))
    test_set = list(range(1,11))
    for e in train_set:
        test_set.remove(e)
    all_data_test = {key:value for key, value in all_data.items() if 'p10' in key and any([key.split('_')[1] == str(e) for e in test_set])}
    for data in all_data_test.values():
        X_test = concatenate((X_test, data['all_a+noise_0625'][:,None]), axis=1)
        D_test = concatenate((D_test, data['d'][:,None]), axis=1)
    ls_classifier.train(X_train, D_train)
    ls_classifier.test(X_test, D_test)

ls_classifier.n_errors

[26,
 33,
 32,
 35,
 32,
 27,
 29,
 30,
 31,
 30,
 23,
 35,
 39,
 35,
 32,
 41,
 34,
 36,
 38,
 37,
 36,
 30,
 29,
 32,
 27,
 29,
 32,
 34,
 32,
 27,
 31,
 25,
 31,
 30,
 32,
 27,
 34,
 33,
 32,
 28,
 32,
 30,
 31,
 25,
 29,
 26,
 39,
 32,
 28,
 32,
 31,
 32,
 28,
 34,
 28,
 31,
 29,
 27,
 31,
 25,
 26,
 30,
 31,
 25,
 24,
 33,
 29,
 26,
 31,
 32,
 30,
 31,
 36,
 29,
 28,
 31,
 25,
 30,
 31,
 30,
 27,
 35,
 29,
 33,
 32,
 32,
 24,
 34,
 37,
 28,
 34,
 26,
 32,
 29,
 25,
 33,
 24,
 26,
 27,
 27,
 30,
 28,
 25,
 25,
 32,
 27,
 29,
 26,
 28,
 24,
 31,
 31,
 28,
 25,
 29,
 27,
 30,
 32,
 26,
 27,
 33,
 30,
 26,
 25,
 30,
 31,
 33,
 29,
 33,
 33,
 22,
 33,
 33,
 33,
 29,
 36,
 33,
 31,
 37,
 34,
 32,
 33,
 30,
 30,
 25,
 31,
 26,
 27,
 31,
 26,
 25,
 38,
 30,
 33,
 32,
 38,
 34,
 39,
 37,
 31,
 40,
 33,
 35,
 31,
 28,
 38,
 33,
 27,
 30,
 25,
 25,
 35,
 29,
 30,
 30,
 27,
 27,
 29,
 35,
 31,
 25,
 35,
 31,
 28,
 31,
 27,
 27,
 30,
 25,
 25,
 22,
 26,
 30,
 26,
 23,
 27,
 34,
 39,
 38,
 30,


#### K-fold cross validation: LS classifier + PSD

In [49]:
# PSD attribute vector size -> 13 frames of the PSD
P_PSD = 13
# columns_per_fold = X_LPC.shape[1]//10
kfold_iterator = itertools.combinations(range(1,n_folds+1), n_folds//2)
ls_classifier = LS_Classifier()

for train_set in kfold_iterator:
    # generate train dataset
    X_train, D_train = empty((P_PSD, 0)), empty((5, 0))
    all_data_train = {key:value for key, value in all_data.items() if 'p10' in key and any([key.split('_')[1] == str(e) for e in train_set])}
    for data in all_data_train.values():
        X_train = concatenate((X_train, data['psd sampled'][:,None]), axis=1)
        D_train = concatenate((D_train, data['d'][:,None]), axis=1)
    # generate test dataset
    X_test, D_test = empty((P_PSD, 0)), empty((5, 0))
    test_set = list(range(1,11))
    for e in train_set:
        test_set.remove(e)
    all_data_test = {key:value for key, value in all_data.items() if 'p10' in key and any([key.split('_')[1] == str(e) for e in test_set])}
    for data in all_data_test.values():
        X_test = concatenate((X_test, data['psd sampled'][:,None]), axis=1)
        D_test = concatenate((D_test, data['d'][:,None]), axis=1)
    ls_classifier.train(X_train, D_train)
    ls_classifier.test(X_test, D_test)

ls_classifier.n_errors

[68,
 64,
 64,
 64,
 64,
 60,
 64,
 64,
 64,
 68,
 76,
 52,
 60,
 60,
 60,
 60,
 60,
 56,
 64,
 48,
 52,
 64,
 64,
 64,
 64,
 60,
 54,
 64,
 64,
 60,
 60,
 64,
 60,
 64,
 56,
 56,
 56,
 64,
 60,
 60,
 60,
 64,
 60,
 64,
 60,
 60,
 48,
 60,
 56,
 60,
 60,
 60,
 60,
 56,
 56,
 52,
 68,
 64,
 64,
 64,
 60,
 60,
 68,
 64,
 60,
 60,
 64,
 60,
 64,
 56,
 48,
 60,
 68,
 64,
 60,
 60,
 64,
 56,
 64,
 52,
 56,
 56,
 60,
 60,
 64,
 60,
 60,
 60,
 52,
 56,
 48,
 60,
 68,
 64,
 60,
 60,
 64,
 60,
 64,
 56,
 52,
 60,
 60,
 56,
 64,
 60,
 60,
 60,
 60,
 60,
 48,
 56,
 60,
 60,
 64,
 60,
 60,
 60,
 52,
 56,
 48,
 52,
 60,
 60,
 60,
 56,
 72,
 64,
 64,
 70,
 80,
 68,
 68,
 64,
 60,
 64,
 64,
 60,
 64,
 58,
 52,
 64,
 64,
 64,
 60,
 64,
 64,
 56,
 68,
 68,
 76,
 60,
 56,
 60,
 60,
 60,
 60,
 56,
 52,
 56,
 52,
 68,
 68,
 64,
 60,
 64,
 64,
 60,
 64,
 60,
 64,
 60,
 64,
 60,
 64,
 60,
 60,
 64,
 60,
 60,
 52,
 60,
 60,
 60,
 64,
 60,
 60,
 60,
 52,
 56,
 52,
 56,
 60,
 60,
 60,
 56,
 68,
 68,
 64,
 60,


### k-fold cross validation: LS classifier + PSD + Box-Cox

In [52]:
# columns_per_fold = X_LPC.shape[1]//10
kfold_iterator = itertools.combinations(range(1,n_folds+1), n_folds//2)
ls_classifier = LS_Classifier()

for train_set in kfold_iterator:
    # generate train dataset
    X_train, D_train = empty((P_PSD, 0)), empty((5, 0))
    all_data_train = {key:value for key, value in all_data.items() if 'p10' in key and any([key.split('_')[1] == str(e) for e in train_set])}
    for data in all_data_train.values():
        X_train = concatenate((X_train, data['psd sampled box-cox'][:,None]), axis=1)
        D_train = concatenate((D_train, data['d'][:,None]), axis=1)
    # generate test dataset
    X_test, D_test = empty((P_PSD, 0)), empty((5, 0))
    test_set = list(range(1,11))
    for e in train_set:
        test_set.remove(e)
    all_data_test = {key:value for key, value in all_data.items() if 'p10' in key and any([key.split('_')[1] == str(e) for e in test_set])}
    for data in all_data_test.values():
        X_test = concatenate((X_test, data['psd sampled box-cox'][:,None]), axis=1)
        D_test = concatenate((D_test, data['d'][:,None]), axis=1)
    ls_classifier.train(X_train, D_train)
    ls_classifier.test(X_test, D_test)

ls_classifier.n_errors

[42,
 33,
 33,
 38,
 44,
 46,
 40,
 41,
 43,
 39,
 40,
 35,
 41,
 38,
 33,
 34,
 41,
 37,
 45,
 39,
 52,
 31,
 33,
 34,
 41,
 38,
 24,
 32,
 37,
 32,
 32,
 39,
 39,
 46,
 41,
 55,
 29,
 38,
 34,
 28,
 32,
 34,
 34,
 37,
 38,
 48,
 33,
 34,
 34,
 37,
 35,
 41,
 40,
 38,
 48,
 53,
 29,
 31,
 37,
 40,
 39,
 24,
 29,
 35,
 30,
 32,
 40,
 34,
 47,
 40,
 53,
 25,
 32,
 30,
 23,
 27,
 30,
 29,
 36,
 36,
 41,
 30,
 31,
 29,
 33,
 32,
 39,
 37,
 36,
 47,
 51,
 27,
 35,
 35,
 34,
 36,
 43,
 39,
 49,
 44,
 52,
 32,
 36,
 34,
 43,
 39,
 48,
 47,
 45,
 54,
 58,
 28,
 30,
 31,
 33,
 32,
 38,
 36,
 36,
 44,
 48,
 41,
 41,
 42,
 48,
 52,
 22,
 35,
 33,
 40,
 41,
 17,
 23,
 27,
 25,
 27,
 32,
 33,
 36,
 39,
 55,
 22,
 19,
 27,
 20,
 20,
 28,
 32,
 30,
 32,
 42,
 30,
 29,
 28,
 31,
 31,
 36,
 38,
 37,
 45,
 48,
 16,
 25,
 26,
 26,
 32,
 34,
 39,
 37,
 41,
 53,
 31,
 33,
 32,
 37,
 31,
 42,
 42,
 38,
 49,
 54,
 29,
 27,
 26,
 29,
 30,
 32,
 36,
 39,
 43,
 45,
 39,
 38,
 39,
 42,
 50,
 15,
 20,
 24,
 24,


In [42]:
ls_classifier.confusion_matrix_worst

array([[ 4.,  0.,  0.,  0.,  8.],
       [ 0.,  8.,  4.,  0.,  4.],
       [ 0., 12., 16.,  0.,  0.],
       [16.,  0.,  0., 20.,  0.],
       [ 0.,  0.,  0.,  0.,  8.]])