In [None]:
import plotly as plt
import pandas as pd
import pickle
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import plotly.figure_factory as ff
import plotly.io as pio
from scipy import signal
import plotly.offline
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import gc

In [None]:
 # 替换为文件路径
file_path1 = r"Z:\LabFiles\Data\Mouse\ECG\230913_SJY023_1SNP_0.txt" 
file_path2 = r"Z:\LabFiles\Data\Mouse\ECG\230913_SJY023_1SNP_1.txt" 
# 使用loadtxt函数读取文本文件数据到NumPy数组
# data_array = np.loadtxt(file_path1, delimiter='   ',skiprows=1,)
data_array1 = np.loadtxt(file_path1, delimiter='   ',skiprows=1,)
data_array2 = np.loadtxt(file_path2, delimiter='   ',skiprows=1,)  
# 打印NumPy数组

data_array = np.append(data_array1,data_array2,axis=0)
print(np.shape(data_array),np.shape(data_array1),np.shape(data_array2))

In [None]:
def Preprocessing(data,timerange,flip,lower_limit,upper_limit,show_figure=True):
    #读取数据
    t = data[:timerange,0]
    value = data[:timerange,1]
    #滤波器设置
    order = 1  # 滤波器阶数
    cutoff_freq = 0.2  # 截止频率
    b, a = signal.butter(order, cutoff_freq, btype='high', analog=False, output='ba')
    # 应用滤波器
    processed_data = signal.lfilter(b, a, value)
    #判断信号正负
    if flip:
        prsdata = -processed_data
    else : 
        prsdata = processed_data
    #FindPeaks
    peaks, _ = signal.find_peaks(prsdata,height=(lower_limit, upper_limit))
    
    if show_figure:
    # 创建Plotly折线图
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=t, y=processed_data, name='原始波形', marker = dict(color = '#0099e5')))
        fig.add_trace(go.Scatter(x=t, y=prsdata, name='反转后波形', marker = dict(color = '#ff4c4c')))
        fig.add_trace(go.Scatter(x=t[peaks], y=prsdata[peaks], mode='markers', name='峰值', marker = dict(color = '#34bf49')))
        # 设置图表布局和样式
        fig.update_layout(
            title='findpeaks',
            xaxis_title='time,s',
            yaxis_title='potential,mV',
            template="simple_white",    
        )
        # 显示图表
        fig.show()        
    return(t,prsdata,peaks)

def TrimDataTosecond(data):

    # 计算要保留的数据的长度
    new_length = len(data) - (len(data) % 1000)

    # 对数组进行切片以删除多余的数据
    trimmed_data = data[:new_length]
    return(trimmed_data)

def calculateHR(peaks,window_size,start_time,end_time):
    # 初始化结果列表
    event_counts = []
    heartratebpm = []
    # 计算每个时间范围内的事件数量
    for t in range(start_time, end_time, window_size):
        # 使用 count_nonzero 计算在时间范围内的事件数量
        count = np.count_nonzero((peaks >= t) & (peaks < t + window_size))
        event_counts.append(count)
        heartratebpm.append(count*60)
         
    return(event_counts,heartratebpm)

#数据存储
def SaveData(data,name):
    
    f_save = open(name,'wb')
    pickle.dump(data,f_save)
    f_save.close()

def LoadData(name):
    f_read = open(name,'rb')
    data = pickle.load(f_read)
    f_read.close()
    return(data)



#画图相关
def plot_line_chart(time,value):
    fig = px.line(x=time, y=value)
    fig.update_layout(
        xaxis_title='Time',
        yaxis_title='voltage',
        title='ECG',
        template="simple_white",
    )
    fig.show()
    return(fig)

def plot_split_violin(data1,data2):

    # 假设您有一个名为numpy_array的NumPy数组
    # 创建标签列
    group = {}
    labels1 = np.full(len(data1), 'Pre')

    # 创建Pandas DataFrame
    df1 = pd.DataFrame({'group': labels1, 'heart rate': data1})

    # 创建标签列
    labels2 = np.full(len(data2), 'Post')

    # 创建Pandas DataFrame
    df2 = pd.DataFrame({'group': labels2, 'heart rate': data2})


    merged_df = pd.concat([df1, df2], axis=0, ignore_index=True)

    # 打印合并后的DataFrame
    fig1 = go.Figure()
    exp = 'Post'
    ctr = 'Pre'
    fig1.add_trace(go.Violin(
                            y=merged_df['heart rate'][ merged_df['group'] == ctr ],
                            width = 0 ,
                            side='negative',
                            pointpos=-1.3,
                            legendgroup=ctr, scalegroup= ctr, name='measurements',
                            line_color='#caccd1', points="all",box_width = 0.8
                            )
                )
    fig1.add_trace(go.Violin(
                            y=merged_df['heart rate'][ merged_df['group'] == exp ],
                            width = 0 ,
                            side='positive',
                            pointpos=1.3,
                            legendgroup= exp, scalegroup= exp, name='measurements',
                            line_color='#ff4c4c', points="all",box_width = 0.8
                            )
                )

    fig1.update_traces(marker_size=10 ,box_visible=True, meanline_visible=True)
    fig1.update_layout(template= 'simple_white',
                    autosize = False,
                    width = 600,
                    height = 800,
                    )

    plotly.offline.plot(fig1, image_filename='pre-post heart rate', image='svg',image_width = 600,image_height = 800)
    fig1.show()
    return(merged_df)

def plot_defined_sampletime(x,y,timestart,timeend,ticktime,ytick,ystart,yend,width,height):

    # 创建图表对象
    layout = go.Layout(
    xaxis=dict(
        tickmode='linear',   # 刻度模式为线性模式
        dtick=ticktime,  # 每个刻度代表的数值间隔
        range=[timestart,timeend],
    ),
    yaxis=dict(
        tickmode='linear',   # 刻度模式为线性模式
        dtick=ytick,          # 每个刻度代表的数值间隔
        range=[ystart,yend],
    ),
    xaxis_title='Time,s',
    yaxis_title='ECG signal,mV',
    template="simple_white",
    width = width,
    height = height,
    )
    fig = go.Figure(data=go.Scatter(x=x, y=y), layout=layout)
    fig.show()
    return(fig)

def splittime(data,timepoint,timeduration):
    df= pd.DataFrame(data)
    # T = len(data)-timepoint
    pre = df.loc[timepoint-timeduration+1:timepoint].astype(int)
    post = df.loc[timepoint:timepoint+timeduration-1].astype(int)
    return(pre,post)



In [None]:
t,dataf,pks = Preprocessing(data_array,10000,flip = False,lower_limit=0.1,upper_limit=0.5,show_figure=True)

In [None]:
data_array_trimmed = TrimDataTosecond(data_array)
print(np.shape(data_array),'timmed:',np.shape(data_array_trimmed))

In [None]:
t1,dataf1,pks1 = Preprocessing(data_array,len(data_array_trimmed),flip = False,lower_limit=0.1,upper_limit=0.5,show_figure=False)

In [None]:
heartratehz,heartratebpm= calculateHR(pks1,1000,0,len(data_array_trimmed))

print(np.shape(heartratehz),np.shape(heartratebpm))

In [None]:
fig = px.line(heartratebpm)
fig.update_layout(
    xaxis_title='Time',
    yaxis_title='heartrate,bpm',
    title='ECG recorded heart rate',
    template="simple_white",
)
fig.show()

In [None]:
pre = data_array[:600000]
post1 = data_array[600000:1200000]
post2 = data_array[1200000:]

In [None]:
t1,dataf1,peaks1= Preprocessing(pre,timerange=600000,flip=False,lower_limit=0.1,upper_limit=0.4,show_figure=False)
t2,dataf2,peaks2= Preprocessing(post1,timerange=600000,flip=False,lower_limit=0.1,upper_limit=0.4,show_figure=False)
t3,dataf3,peaks3= Preprocessing(post2,timerange=600000,flip=False,lower_limit=0.1,upper_limit=0.4,show_figure=False)

In [None]:
heartratehz1,heartratebpm1 = calculateHR(peaks1,1000,0,len(t1))
heartratehz2,heartratebpm2 = calculateHR(peaks2,1000,0,len(t2))
heartratehz3,heartratebpm3 = calculateHR(peaks2,1000,0,len(t3))


In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=heartratebpm1, name='pre', marker = dict(color = '#0099e5')))
fig.add_trace(go.Scatter(y=heartratebpm2, name='post1', marker = dict(color = '#ff4c4c')))
fig.add_trace(go.Scatter(y=heartratebpm3, name='post2', marker = dict(color = '#34bf49')))
fig.update_layout(
    xaxis_title='Time',
    yaxis_title='heartrate,bpm',
    title='ECG recorded heart rate',
    template="simple_white",
)
fig.show()

In [None]:
import numpy as np

# 示例数据，您需要将这部分替换为您的实际数据
data = np.array([800, 1200, 900, 1100, 1300, 950, 1050, 1400, 750])

# 创建一个新的数组来存储结果
result = data.copy()

# 遍历数组并替换大于1000的元素
for i in range(len(data)):
    if data[i] > 1000:
        # 获取左边元素的索引（确保不越界）
        left_idx = i - 1 if i - 1 >= 0 else i
        # 获取右边元素的索引（确保不越界）
        right_idx = i + 1 if i + 1 < len(data) else i

        # 计算左右两个元素的平均值
        average = (data[left_idx] + data[right_idx]) / 2.0

        # 将大于1000的元素替换为平均值
        result[i] = average

# 打印结果
print("替换后的数组：", result)


In [None]:
def removeabnormalnumber(data,threshold):
    result = data.copy()
    ## 需要检查整个数组里有没有超过threshold的元素，有就进行一遍循环，如果循环后再做一次检查
    for i in range(len(data)):
        if data[i] > threshold:
            # 获取左边元素的索引（确保不越界）
            left_idx = i - 1 if i - 1 >= 0 else i
            # 获取右边元素的索引（确保不越界）
            right_idx = i + 1 if i + 1 < len(data) else i

            # 计算左右两个元素的平均值
            average = (data[left_idx] + data[right_idx]) / 2.0
            # 将大于1000的元素替换为平均值
            result[i] = average
    

    return(result)
    

In [None]:
heartratebpm11 = removeabnormalnumber(heartratebpm1,1200)
heartratebpm22 = removeabnormalnumber(heartratebpm2,1200)
heartratebpm33 = removeabnormalnumber(heartratebpm3,1200)



In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=heartratebpm1, name='raw', marker = dict(color = '#0099e5')))
fig.add_trace(go.Scatter(y=heartratebpm11, name='smooth', marker = dict(color = '#34bf49')))
fig.update_layout(
    xaxis_title='Time',
    yaxis_title='heartrate,bpm',
    title='ECG recorded heart rate',
    template="simple_white",
)
fig.show()

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=heartratebpm2, name='raw', marker = dict(color = '#0099e5')))
fig.add_trace(go.Scatter(y=heartratebpm22, name='smooth', marker = dict(color = '#34bf49')))
fig.update_layout(
    xaxis_title='Time',
    yaxis_title='heartrate,bpm',
    title='ECG recorded heart rate',
    template="simple_white",
)
fig.show()

In [None]:
def moving_average(data, window_size):
    window = np.ones(window_size) / window_size
    smoothed_data = np.convolve(data, window, mode='valid')
    return smoothed_data

In [None]:
window_size = 100

# 计算平滑数据
smoothed_result60 = moving_average(heartratebpm22, 60)
smoothed_result100 = moving_average(heartratebpm22, 100)
smoothed_result300 = moving_average(heartratebpm22, 300)

fig = go.Figure()
fig.add_trace(go.Scatter(y=smoothed_result60, name='raw', marker = dict(color = '#0099e5')))
fig.add_trace(go.Scatter(y=smoothed_result100, name='smooth', marker = dict(color = '#34bf49')))
fig.add_trace(go.Scatter(y=smoothed_result300, name='smooth', marker = dict(color = '#ff4c4c')))

fig.update_layout(
    xaxis_title='Time',
    yaxis_title='heartrate,bpm',
    title='ECG recorded heart rate',
    template="simple_white",
)
fig.show()
