# Unit Test - Full Algorithm Code

In [1]:
import glob

import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import scipy.io
import scipy.signal
import mpld3

def LoadTroikaDataset():
    """
    Retrieve the .mat filenames for the troika dataset.

    Review the README in ./datasets/troika/ to understand the organization of the .mat files.

    Returns:
        data_fls: Names of the .mat files that contain signal data
        ref_fls: Names of the .mat files that contain reference data
        <data_fls> and <ref_fls> are ordered correspondingly, so that ref_fls[5] is the 
            reference data for data_fls[5], etc...
    """
    data_dir = "./datasets/troika/training_data"
    data_fls = sorted(glob.glob(data_dir + "/DATA_*.mat"))
    ref_fls = sorted(glob.glob(data_dir + "/REF_*.mat"))
    return data_fls, ref_fls


def LoadTroikaDataFile(data_fl):
    """
    Loads and extracts signals from a troika data file.

    Usage:
        data_fls, ref_fls = LoadTroikaDataset()
        ppg, accx, accy, accz = LoadTroikaDataFile(data_fls[0])

    Args:
        data_fl: (str) filepath to a troika .mat file.

    Returns:
        numpy arrays for ppg, accx, accy, accz signals.
    """
    data = sp.io.loadmat(data_fl)['sig']
    return data[2:]


def LoadTroikaRefFile(ref_fl):
    """
    Loads the BPM array from a reference (label) file
    
    Usage:
        data_fls, ref_fls = LoadTroikaDataset()
        lbls = LoadTroikaRefFile(ref_fls[0])
        
    Args:
        ref_fl: (str) filepath to a troika .mat file.

    Returns:
        numpy array of BPM labels.
    """
    ref = sp.io.loadmat(ref_fl)
    return ref['BPM0']

def LoadSample(idx, sigs, refs, fs=125, train=True):
    """
    Loads and bandpasses a sample returning a dict of processed data
    
    Usage:
        data_dict = LoadSample(8)
        
    Args:
        idx: The numeric index of the sample.
        sigs: List of Troika HR signal filenames
        refs: List of Troika HR label filenames
             
    Return:
        A dict containing various data points. All points in the dict have had
        a bandpass filter applied. `ppgspec` and `accspec` are spectrograms for
        the PPG signal and the acceleromter magnitude signal, respectively.
    """
    ppg, accx, accy, accz = LoadTroikaDataFile(sigs[idx])
    ts = np.arange(len(ppg)) / fs
    
    if train:
        lbl = LoadTroikaRefFile(refs[idx])
        lblhz = (lbl / 60.0)

    bppg, baccx, baccy, baccz = bandpass(ppg), bandpass(accx), bandpass(accy), bandpass(accz)

    accmag = np.sqrt(np.square(accx) + np.square(accy) + np.square(accz))
    baccmag = bandpass(accmag)
    
    freqs, _, ppgspec = sp.signal.spectrogram(bppg, fs=fs, nperseg=fs*8, noverlap=fs*6)
    _, _, accspec = sp.signal.spectrogram(baccmag, fs=fs, nperseg=fs*8, noverlap=fs*6)
    
    sample = {}
    sample['index'] = idx
    sample['ftlen'] = len(ppgspec.T)
    sample['ppg'] = bppg
    sample['accx'] = baccx
    sample['accy'] = baccy
    sample['accz'] = baccz
    sample['accmag'] = baccmag
    sample['freqs'] = freqs
    sample['ppgspec'] = ppgspec
    sample['accspec'] = accspec
    
    if train:
        sample['labels'] = lbl
        sample['labelhz'] = lblhz
    
    return sample

def AggregateErrorMetric(pr_errors, confidence_est):
    """
    Computes an aggregate error metric based on confidence estimates.

    Computes the MAE at 90% availability. 

    Args:
        pr_errors: a numpy array of errors between pulse rate estimates and corresponding 
            reference heart rates.
        confidence_est: a numpy array of confidence estimates for each pulse rate
            error.

    Returns:
        the MAE at 90% availability
    """
    # Higher confidence means a better estimate. The best 90% of the estimates
    #    are above the 10th percentile confidence.
    percentile90_confidence = np.percentile(confidence_est, 10)

    # Find the errors of the best pulse rate estimates
    best_estimates = pr_errors[confidence_est >= percentile90_confidence]

    # Return the mean absolute error
    return np.mean(np.abs(best_estimates))


def Evaluate():
    """
    Top-level function evaluation function.

    Runs the pulse rate algorithm on the Troika dataset and returns an aggregate error metric.

    Returns:
        Pulse rate error on the Troika dataset. See AggregateErrorMetric.
    """
    # Retrieve dataset files
    data_fls, ref_fls = LoadTroikaDataset()
    errs, confs = [], []
    
    for idx in range(len(data_fls)):
        data = LoadSample(idx, data_fls, ref_fls)
        # Run the pulse rate algorithm on each trial in the dataset
        errors, confidence = RunPulseRateAlgorithm(data)
        errs.append(errors)
        confs.append(confidence)
        # Compute aggregate error metric
        
    errs = np.hstack(errs)
    confs = np.hstack(confs)
    return AggregateErrorMetric(errs, confs)


def RunPulseRateAlgorithm(data_fl, ref_fl):
    """
    Runs the algorithm on a single sample and returns per-sample errors and confidence
    
    Args:
        data: A data dict for the sample as returned by LoadSample()
        
    Returns:
        A 2-tuple of (errors, confidence) for each 2s slice of the HR signal
    """
    # A small hack to avoid undoing the changes I made to the original function signature
    data = LoadSample(0, [data_fl], [ref_fl])
    
    # Compute pulse rate estimates and estimation confidence.
    predshz = predict_series(data)
    errors = np.abs((predshz * 60.0) - data['labels'])
    confidence = confidence_series(data, predshz)

    # Return per-estimate absolute error and confidence as a 2-tuple of numpy arrays.
    return errors.reshape(-1,), confidence.reshape(-1,)

def bandpass(sig):
    """
    Applies a bandpass signal relevant to heart rate data to the input signal.
    Wrapper function for scipy.signal.butter and scipy.signal.filtfilt
    applied in that order.
    
    Usage:
        filtered_signal = bandpass(input_signal)
        
    Args:
        sig: Input signal, any value that scipy.signal.butter can take
        
    Returns:
        The bandpassed and filtered signal
    """
    lohz, hihz = 40/60.0, 240/60.0
    b, a = sp.signal.butter(3, [lohz, hihz], btype='bandpass', fs=125)
    return sp.signal.filtfilt(b, a, sig)

def hr_peaks(sig, hthresh=0.05, dist=2):
    """
    Wrapper function that finds peaks using scipy.signal.peaks 
    with values relevant to heart rate data.
    
    Usage:
        peak_indices = hr_peaks(sig)
        
    Args:
        sig: np.array containing the input signal
        
    Returns:
        Array of indices locating peaks in the signal
    """
    maxf = np.max(sig)
    minh = maxf * hthresh
    peaks, _  = sp.signal.find_peaks(sig, height=minh, distance=dist)
    
    return peaks


def predict(data,
            idx,
            pred_type='compensated',
            epsilon=0.10, 
            harmonics=lambda x: [x/2, x, x*2],
            peakthresh=0.05,
            peakdist=2):
    """
    Predicts the HR value for a single data point.
    
    Args:
        - data: The data dict as returned by LoadSample
        - idx: The index for the slice of interest within the data spectrograms
        - pred_type: 'naive' to predict the highest magnitude signal,
        'compensated' to adjust via the accelerometer data.
        - epsilon: How close an observation must be to an accelerometer peak
        for us to rule it out as a motion artifact and not a true signal.
        - harmonics: A lambda that returns an array of harmonics that we use to
        define accelerometer peaks. `x` will be the fundamental frequency of
        the accelerometer magnitude.
        - peakthresh: How tall a peak needs to be to be considered a peak, a value
        between 0.0 and 1.0 where 1.0 is the highest peak. EG: 0.5 means the peak
        must be at least half the height of the highest peak.
        - peakdist: How many samples before we can find another peak. Note that the
        Fourier transform leaves few samples so values above 3 will exclude a large
        range of the signal
        
    Returns:
        A predicted value expressed as Hz
    """
    
    ppgsig = data['ppgspec'].T[idx]
    accsig = data['accspec'].T[idx]
    freqs = data['freqs']
    
    if pred_type == 'naive':
        return freqs[np.argmax(ppgsig)]
    else:
        ppgpeakidx = hr_peaks(ppgsig, hthresh=peakthresh, dist=peakdist)
        accpeakidx = hr_peaks(accsig, hthresh=peakthresh, dist=peakdist)

        ppgpeakhz = freqs[ppgpeakidx]
        accpeakhz = freqs[accpeakidx]

        if len(ppgpeakhz) == 0:
            return freqs[np.argmax(ppgsig)]
        elif len(ppgpeakhz) == 1:
            return ppgpeakhz[0]
        else:
            candidates = ppgpeakhz
            maxacchz = accpeakhz[np.argmax(accsig[accpeakidx])]
            excludehz = np.array(harmonics(maxacchz))
            mask = (candidates == candidates)

            for fq in excludehz:
                m = np.abs(candidates - fq) > epsilon
                mask = mask & m

            filtered = candidates[mask]
            if len(filtered) == 0:
                return freqs[np.argmax(ppgsig)]
            if len(filtered) == 1:
                return filtered[0]
            else:
                peakmags = ppgsig[ppgpeakidx[mask]]
                return filtered[np.argmax(peakmags)]
            
    return None
        
        
def predict_series(data,
                   pred_type='compensated',
                   epsilon=0.10,
                   harmonics=lambda x: [x/2, x, x*2],
                   peakthresh=0.05,
                   peakdist=2):
    """
    Runs predictions on every slice of the spectrogram contained in a data dict
    
    Args:
        Refer to predict(), all args are passed through to it
        
    Returns:
        numpy array of  predictions. Shape (n,1) where n is the length of the spectrogram
    """
    
    return np.array([predict(data,
                    i,
                    pred_type=pred_type,
                    epsilon=epsilon,
                    harmonics=harmonics,
                    peakthresh=peakthresh,
                    peakdist=peakdist)
            for i in range(data['ftlen'])]).reshape(-1,1)


def confidence_series(data, preds):
    """
    Returns the confidence level in each prediction
    
    Args:
        - data: The data dict as returned by LoadSample()
        - preds: An array of predictions, as returned by predict_series()
        
    Returns:
        - An array of per-sample confidence levels of the shape (n,1) where
        n is the number of predictions/labels, matches length of the spectrogram
    """
    ppgspec = data['ppgspec']
    freqs = data['freqs']
    
    confidences = []
    for i in range(len(preds)):
        ppgsig = ppgspec.T[i]
        pred = preds[i]
        predidx = np.argmin(np.abs(pred - freqs))
        if predidx == 0:
            checkfreqs = [predidx, predidx+1]
        elif predidx == len(preds)-1:
            checkfreqs = [predidx-1, predidx]
        else:
            checkfreqs = [predidx-1, predidx, predidx+1]
        
        checkintensity = np.sum(ppgsig[checkfreqs])
        totalintensity = np.sum(ppgsig)
        confidences.append(checkintensity / totalintensity)
        
    return np.array(confidences).reshape(-1,1)


def mae(labels, preds):
    """Returns the mean average error between the labels and predictions"""
    return np.mean(np.abs(preds - labels))

# Test Results

<img src='unit_test.png'>