In [None]:
#Import necessary packages
import numpy as np
import math

#HRV METRICS
#Signal processing
from scipy import signal
from scipy.ndimage import label
from scipy.stats import zscore
from scipy.interpolate import interp1d
from scipy.integrate import trapz
from scipy.signal import argrelextrema

#ARIMA 
from statsmodels.tsa.arima.model import ARIMA


Define the corrections methods:
- Deletion (does not need specific code)
- Linear interpolation
- Cubic interpolation
- ARIMA (does not need specific code)

In [None]:
def cubic_spline(rr,start_hole,pred_length):

    time_rr = np.cumsum(rr)
    
    x1 = time_rr[start_hole-2]
    x2 = time_rr[start_hole-1]
    x3 = time_rr[start_hole]
    x4 = time_rr[start_hole+1]
    
    y1 = rr[start_hole-2]
    y2 = rr[start_hole-1]
    y3 = rr[start_hole]
    y4 = rr[start_hole+1]

    x = np.array([x1,x2,x3,x4])
    f = interp1d(x, [y1,y2,y3,y4], kind='cubic') #In milliseconds

    # now we can sample from interpolation function
    xx = np.linspace(x2, x3,pred_length+2)
    rr_interpolated = f(xx)
    return rr_interpolated[1:-1]

def linear_interpolation(rr,start_hole,pred_length):
    time_rr = np.cumsum(rr)

    x1 = time_rr[start_hole-1]
    x2 = time_rr[start_hole]

    y1 = rr[start_hole-1]
    y2 = rr[start_hole]

    x = np.array([x1,x2])
    f = interp1d(x, [y1,y2], kind='linear')

    xx = np.linspace(x1,x2,pred_length+2)
    rr_interpolated = f(xx)
    return rr_interpolated[1:-1]


## Create detection and correction function

- Detection is based on threshold filtering of surrounding values

In [None]:
#Artifact removal and correction
#Input: rr: list of raw RR intervals
#       threshold: threshold for artifact detection
#       window: window size for median filter
#       method: method for correction
#Output: rr: list of RR intervals with artifacts removed and corrected
#        len_errors: percentage of removed artifacts

def hrv_correction(rr,threshold,window, method):
    org_len = len(rr)
    if method == 'none':
        return rr
    threshold_up = 1+threshold
    threshold_down = 1-threshold
    errors = []
    len_hole = []
    for i in range(len(rr)):
        if i<window:
            median = np.median(rr[:window])
        elif i>len(rr)-1-window:
            median = np.median(rr[len(rr)-1-window:])
        else:
            median = np.median(rr[i-window:i+window])
        
        if rr[i]>threshold_up*median or rr[i]<threshold_down*median:
            errors.append(i)

            len_hole.append(int(np.round(rr[i]/median)))
    

    len_errors = np.round((len(errors)/len(rr))*100,1)
    rr = np.delete(rr,errors)
    add_prev = 0
    if method == 'deletion':
        return rr, len_errors    
        
    elif method == 'linear':
            for i in range(len(errors)):
                
                #Position of error in rr after deletion (we need add_prev to account for corrections that have been added earlier)
                new_loc = errors[i]-i + add_prev

                #If the error is in the first point, set first point to median value
                if new_loc == 0:
                    corrected = np.ones(len_hole[i])*np.median(rr[:window])
                
                #If the error is in the last point, set last point to median value
                elif new_loc > len(rr)-1:
                    corrected = np.ones(len_hole[i])*np.median(rr[-window:])

                else: 
                    corrected = linear_interpolation(rr,new_loc,len_hole[i])

                #Add the length of the hole to the add_prev variable
                add_prev += len_hole[i]
                
                #Insert corrected in rr
                rr = np.insert(rr,new_loc,corrected)
            

    elif method == 'cubic':
        for i in range(len(errors)):
                
                new_loc = errors[i]-i + add_prev
                
                if new_loc == 0:
                    corrected = np.ones(len_hole[i])*np.median(rr[:window])
                
                #If the error is in the last point, set last point to median value
                elif new_loc > (len(rr)-2):
                    corrected = np.ones(len_hole[i])*np.median(rr[-window:])

                else:
                    corrected = cubic_spline(rr,new_loc,len_hole[i])

                
                add_prev += len_hole[i]
                #Insert corrected in rr
                rr = np.insert(rr,new_loc,corrected)

    elif method == 'arima':
        for i in range(len(errors)):
            
            new_loc = errors[i]-i + add_prev
            
            if new_loc == 0:
                    corrected = np.ones(len_hole[i])*np.median(rr[:window])

            else:
                model = ARIMA(rr[new_loc-15:new_loc], order=(3, 2, 4))  
                fitted = model.fit()
                forecast = fitted.get_forecast(len_hole[i], alpha=0.05)  # 95% conf
                corrected = forecast.predicted_mean

            
            add_prev += len_hole[i]
            #Insert corrected in rr
            rr = np.insert(rr,new_loc,corrected)

    return rr, len_errors