In [1]:
# coding: utf-8
import io
from numbers import Number
import re

import numpy as np
from scipy import interpolate


class EmptyFileError(Exception):
    def __init__(self, value):
        self.value = value

    def __str__(self):
        return repr(self.value)


class FileNotSupportedError(Exception):
    def __init__(self, value):
        self.value = value

    def __str__(self):
        return repr(self.value)


def open_rri(pathname_or_fileobj):
    if isinstance(pathname_or_fileobj, str):
        rri = _open_rri_from_path(pathname_or_fileobj)
    elif isinstance(pathname_or_fileobj, io.TextIOWrapper):
        rri = _open_rri_from_fileobj(pathname_or_fileobj)
    return _transform_rri(rri)


def _open_rri_from_path(pathname):
    if pathname.endswith('.txt'):
        with open(pathname, 'r') as fileobj:
            rri = _open_rri_from_fileobj(fileobj)
    elif pathname.endswith('.hrm'):
        with open(pathname, 'r') as fileobj:
            rri = _open_rri_from_fileobj(fileobj)
    else:
        raise FileNotSupportedError("File extension not supported")
    return rri


def _open_rri_from_fileobj(fileobj):
    file_content = fileobj.read()
    file_type = _identify_rri_file_type(file_content)
    if file_type == 'text':
        rri = _open_rri_from_text(file_content)
        if not rri:
            raise EmptyFileError('File without rri data')
    else:
        rri = _open_rri_from_hrm(file_content)
        if not rri:
            raise EmptyFileError('File without rri data')
    return rri


def _open_rri_from_text(file_content):##----------------------------------------
    rri = list(map(float,
                   re.findall(r'\S+' , file_content)))
    return rri


def _open_rri_from_hrm(file_content):
    rri_info_index = file_content.find('[HRData]')
    rri = None
    if rri_info_index >= 0:
        rri = list(map(float,
                       re.findall(r'\d+', file_content[rri_info_index:-1])))
    return rri


def _identify_rri_file_type(file_content):
    is_hrm_file = file_content.find('[HRData]')
    if is_hrm_file >= 0:
        file_type = 'hrm'
    else:
        rri_lines = file_content.split('\n')
        for line in rri_lines:
            current_line_number = re.findall(r'\d+', line)
            if current_line_number:
                if  current_line_number[0] == line.strip():
                    raise FileNotSupportedError('Text file not supported')
        file_type = 'text'
    return file_type


def validate_rri(func):
    def _validate(rri, *args, **kwargs):
        _validate_positive_numbers(rri)
        rri = _transform_rri(rri)
        return func(rri, *args, **kwargs)

    def _validate_positive_numbers(rri):
        if not all(map(lambda value: isinstance(value, Number) and value > 0,
                       rri)):
            raise ValueError('rri must be a list or numpy.ndarray of positive'
                             ' and non-zero numbers')

    return _validate


def _transform_rri(rri):
    rri = _transform_rri_to_miliseconds(rri)
    return np.array(rri)


def validate_frequency_domain_arguments(func):
    def _check_frequency_domain_arguments(rri, fs=4.0, method='welch',
                                          interp_method='cubic', **kwargs):
        _validate_available_methods(method)
        return func(rri, fs, method, interp_method, **kwargs)

    def _validate_available_methods(method):
        available_methods = ('welch', 'ar')
        if method not in available_methods:
            raise ValueError('Method not supported! Choose among: {}'.format(
                ', '.join(available_methods)))

    return _check_frequency_domain_arguments


def _create_time_info(rri):
    rri_time = np.cumsum(rri) / 1000.0  # make it seconds
    return rri_time - rri_time[0]   # force it to start at zero


def _transform_rri_to_miliseconds(rri):
    if np.median(rri) < 1:
        rri *= 1000
    return rri


def _interpolate_rri(rri, fs=4, interp_method='cubic'):
    if interp_method == 'cubic':
        return _interp_cubic_spline(rri, fs)
    elif interp_method == 'linear':
        return _interp_linear(rri, fs)


def _interp_cubic_spline(rri, fs):
    time_rri = _create_time_info(rri)
    time_rri_interp = _create_interp_time(rri, fs)
    tck = interpolate.splrep(time_rri, rri, s=0)
    rri_interp = interpolate.splev(time_rri_interp, tck, der=0)
    return time_rri_interp, rri_interp


def _interp_linear(rri, fs):
    time_rri = _create_time_info(rri)
    time_rri_interp = _create_interp_time(rri, fs)
    rri_interp = np.interp(time_rri_interp, time_rri, rri)
    return time_rri_interp, rri_interp


def _create_interp_time(rri, fs):
    time_rri = _create_time_info(rri)
    return np.arange(0, time_rri[-1], 1 / float(fs))



In [2]:
# coding: utf-8
import numpy as np

from scipy.signal import welch
from spectrum import pburg



@validate_rri
def time_domain(rri):
    diff_rri = np.diff(rri)
    rmssd = np.sqrt(np.mean(diff_rri ** 2))
    sdnn = np.std(rri, ddof=1)  # make it calculates N-1
    nn50 = _nn50(rri)
    pnn50 = _pnn50(rri)
    mrri = np.mean(rri)
    mhr = np.mean(60 / (rri / 1000.0))

    return dict(zip(['rmssd', 'sdnn', 'nn50', 'pnn50', 'mrri', 'mhr'],
                    [rmssd, sdnn, nn50, pnn50, mrri, mhr]))


def _nn50(rri):
    return sum(abs(np.diff(rri)) > 50)


def _pnn50(rri):
    return _nn50(rri) / len(rri) * 100


@validate_frequency_domain_arguments
@validate_rri
def frequency_domain(rri, fs, method, interp_method=None, vlf_band=(0, 0.04),
                     lf_band=(0.04, 0.15), hf_band=(0.15, 0.4), **kwargs):
    if interp_method is not None:
        time_interp, rri = _interpolate_rri(rri, fs, interp_method)

    if method == 'welch':
        fxx, pxx = welch(x=rri, fs=fs, **kwargs)
    elif method == 'ar':
        fxx, pxx = _calc_pburg_psd(rri=rri,  fs=fs, **kwargs)

    return _auc(fxx, pxx, vlf_band, lf_band, hf_band)


def _auc(fxx, pxx, vlf_band, lf_band, hf_band):
    vlf_indexes = np.logical_and(fxx >= vlf_band[0], fxx < vlf_band[1])
    lf_indexes = np.logical_and(fxx >= lf_band[0], fxx < lf_band[1])
    hf_indexes = np.logical_and(fxx >= hf_band[0], fxx < hf_band[1])

    vlf = np.trapz(y=pxx[vlf_indexes], x=fxx[vlf_indexes])
    lf = np.trapz(y=pxx[lf_indexes], x=fxx[lf_indexes])
    hf = np.trapz(y=pxx[hf_indexes], x=fxx[hf_indexes])
    total_power = vlf + lf + hf
    lf_hf = lf / hf
    lfnu = (lf / (total_power - vlf)) * 100
    hfnu = (hf / (total_power - vlf)) * 100

    return dict(zip(['total_power', 'vlf', 'lf', 'hf', 'lf_hf', 'lfnu',
                    'hfnu'], [total_power, vlf, lf, hf, lf_hf, lfnu, hfnu]))


def _calc_pburg_psd(rri, fs, order=16, nfft=None):
    burg = pburg(data=rri, order=order, NFFT=nfft, sampling=fs)
    burg.scale_by_freq = False
    burg()
    return np.array(burg.frequencies()), burg.psd


@validate_rri
def non_linear(rri):
    sd1, sd2 = _poincare(rri)
    return dict(zip(['sd1', 'sd2'], [sd1, sd2]))


def _poincare(rri):
    diff_rri = np.diff(rri)
    sd1 = np.sqrt(np.std(diff_rri, ddof=1) ** 2 * 0.5)
    sd2 = np.sqrt(2 * np.std(rri, ddof=1) ** 2 - 0.5 * np.std(diff_rri,
                                                              ddof=1) ** 2)
    return sd1, sd2

In [35]:
##將輸入資料讀入
rri = open_rri('C:/Users/wesleyhuan/Desktop/PPI.txt')

list1 = []
for i in open('C:/Users/wesleyhuan/Desktop/PPI.txt', 'r', encoding='UTF-8'):
    list1.append(float(i)*1000)
    
list1 = np.array(list1)

#print(list1)
#print(len(list1))

#bpm=60000/(sum(list1)/len(list1))
#bpm
#print(str(len(rri)) + '\n' + str(rri[0]) + '\n' + str(rri[97]))

In [4]:
import matplotlib.pyplot as plt
#plt.plot(list1,'-b')
#plt.show()

In [5]:
#算出時域特徵

#RRI = []
#for i in rri:
#    RRI.append(float(i)*10)
#RRI = np.array(RRI)

time_results = time_domain(list1)
print(time_results)

{'rmssd': 6.530348409189755, 'sdnn': 16.962779887469512, 'nn50': 0, 'pnn50': 0.0, 'mrri': 517.75135869565213, 'mhr': 116.00915005981918}


In [32]:
#算出頻域特徵
from hrv.classical import frequency_domain
results = frequency_domain(
    rri=list1,
    fs=2.0,
    method='welch',
    interp_method='cubic',
    detrend='linear'
)

print(results)


{'total_power': 222.25628564614325, 'vlf': 153.72457488583763, 'lf': 60.033961468257793, 'hf': 8.4977492920478443, 'lf_hf': 7.0646896495801901, 'lfnu': 87.600266799453905, 'hfnu': 12.399733200546105}


In [7]:
#算出非線性特徵
results = non_linear(rri)
print(results)

{'sd1': 4.6650730586365157, 'sd2': 23.509765522730707}


In [38]:
#驗證頻域的正確
results = _interpolate_rri( rri=list1 , fs=2.00 , interp_method='cubic' )
plt.plot( results[0] , results[1]  , '-b')
#plt.show()

#print(results[0])
#print(results[1])
#將切割後的RR_index寫黨
fileObject = open('C:/Users/wesleyhuan/Desktop/RRI_INDEX.txt', 'w')
for i in results[0]:  
    fileObject.write( str(i) ) 
    fileObject.write('\n')  
fileObject.close() 
#將切割後的RR_值寫黨
fileObject = open('C:/Users/wesleyhuan/Desktop/RRI.txt', 'w')
for i in results[1]:  
    fileObject.write( str(i) ) 
    fileObject.write('\n')  
fileObject.close() 

In [37]:
#將頻域結果存成TXT
#算出頻域特徵
#results = frequency_domain(
#    rri=list1,
#    fs=2.0,
#    method='welch',
#    interp_method='cubic',
#    detrend='linear'
#)

#fileObject = open('C:/Users/wesleyhuan/Desktop/frequency.txt', 'w')
  
#fileObject.write(str(results)) 
#fileObject.close() 

