# 寻找单调性跳变点

In [1]:
# 通过Mann-Kendall作为单调性判断标准
# data是时间序列Series
def get_index_of_bottom_and_top_by_mk(data):
    win_size=10
    step=5
    first = np.inf
    second = -np.inf
    flag = True
    final_win_change = None
    while(second < first):
        # 记录总共划分的窗格数
        win_sum = int(1 + (data.shape[0]-win_size)/step if data.shape[0] > win_size else 0)
        # 各窗格起始索引
        win_start_index = np.arange(0,win_sum*step,step=step)
        # 突变窗格索引【后经处理为中间窗格中间索引】
        win_change = np.array([])
        # 当前窗口趋势【increasing,decreasing,no trend】
        win_trend = np.array([])

        # 遍历所有窗格
        for i in range(win_sum):
            # 窗格数据
            win_data = data[win_start_index[i]:win_start_index[i]+win_size]
            # 添加当前窗口趋势情况
            win_trend = np.append(win_trend,mk.original_test(win_data)[0])  

        # 遍历各窗格趋势
        for i in range(1,win_trend.shape[0]):
            # 上一个不是递增，当下是递增，说明这是突变点
            if win_trend[i] == 'increasing' and win_trend[i-1]!='increasing':
                win_change = np.append(win_change,win_start_index[i-1])
            elif win_trend[i] == 'decreasing' and win_trend[i-1]!='decreasing':
                win_change = np.append(win_change,win_start_index[i-1])

        # 窗口取中间值作为分割线
        win_change = win_change + win_size/2
        win_change = win_change.astype(np.int32)
        # 起始和结尾都划上分割线
        win_change = np.insert(win_change,0,0)
        # -1防止越界
        win_change = np.append(win_change,data.shape[0]-1)
        # 当前划分窗格的方差
        variance = np.var(np.diff(win_change))
        if flag:
            first = variance
        else:
            second = variance
        # 说明此时方差大于上一个切割方式的方差，选定上一种切割方式为最终方法
        if second >= first:
            break
        flag = not flag
        win_size = win_size + 5
        final_win_change = win_change
    
    # 索引为上升沿起始标记为1
    top_win = np.zeros(final_win_change.shape[0])
    if data[final_win_change[0]] <= data[final_win_change[1]]:
        top_win[0] = 1
    for i in range(1,final_win_change.shape[0]):
        if data[final_win_change[i-1]] >= data[final_win_change[i]]:
            top_win[i] = 1 
    return final_win_change,top_win,win_size,step

# DTW比较

调用fastdtw库即可

# 周期计数

In [10]:
def get_count(result):
    count = 1
    for i in range(result.shape[0]):
        # 0不做处理
        if i == 0 or result[i] == 0 or result[i-1]==0:
            continue
        # 二者误差小，是相同动作
        if result[i]-result[i-1] <= 2:
            count = count + 1
        # 误差过大，后边已无周期动作【因为已经排序】
        else:
            return count
    return count

In [None]:
# 返回时间序列上升/下降沿相互比较后代价的平均值
def get_dtw_mean_cost(win_change,top_win,data):
    win_change_length = win_change.shape[0]
    # 上升沿个数
    top_count = np.count_nonzero(top_win==1)
    # 下降沿个数
    bottom_count = top_win.shape[0] - top_count
    # 存放上升沿DTW对比结果
    avg_cost_by_dtw_top = np.zeros(shape=(top_count,top_count))
    avg_cost_by_dtw_bottom = np.zeros(shape=(bottom_count,bottom_count))
    # 上升/下降沿行索引
    k_top = 0
    k_bottom = 0
    # 列索引
    g_top = 0
    g_bottom = 0
    # 保证不越界，冒泡比较
    for i in range(0,win_change_length-1):
        # 避免同一段被重复比较
        if top_win[i] == top_win[i+1]:
            continue
        # 说明 i 指向的是上升沿起始
        if top_win[i] == 1:
            g_top = k_top + 1
        else:
            g_bottom = k_bottom + 1
        for j in range(i+1,win_change_length-1):
            if top_win[j]==top_win[j+1]:
                continue
            # 需要让 j 指向 i 的后一个上升沿
            if top_win[i] == 1 and top_win[j] == 1:
                # 得到两个时间序列的匹配代价
                cost, _ = fastdtw(data[win_change[i]:win_change[i+1]], data[win_change[j]:win_change[j+1]])
                # 将上升沿比较代价存入
                num = data[win_change[j]:win_change[j+1]].shape[0] + data[win_change[i]:win_change[i+1]].shape[0]
                avg_cost_by_dtw_top[k_top][g_top] = cost/num
                avg_cost_by_dtw_top[g_top][k_top] = cost/num
                g_top = g_top + 1
            elif top_win[i] == 0 and top_win[j] == 0:
                # 得到两个时间序列的匹配代价
                cost, _ = fastdtw(data[win_change[i]:win_change[i+1]], data[win_change[j]:win_change[j+1]])
                # 将上升沿比较代价存入
                num = data[win_change[j]:win_change[j+1]].shape[0] + data[win_change[i]:win_change[i+1]].shape[0]
                avg_cost_by_dtw_bottom[k_bottom][g_bottom] = cost/num
                avg_cost_by_dtw_bottom[g_bottom][k_bottom] = cost/num
                g_top = g_top + 1
            
        if top_win[i] == 1:
            k_top = k_top + 1
        else:
            k_bottom = k_bottom + 1
    # 将全0的行删除以及最后一个0删除
    avg_cost_by_dtw_top = avg_cost_by_dtw_top[:-1,:-1]
    avg_cost_by_dtw_bottom = avg_cost_by_dtw_bottom[:-1,:-1]
    # 将自身比较代价替换为其他数的平均值
    for arr in avg_cost_by_dtw_top:
        temp = np.sum(arr)/(arr.shape[0]-1)
        arr[arr==0] = temp
    for arr in avg_cost_by_dtw_bottom:
        temp = np.sum(arr)/(arr.shape[0]-1)
        arr[arr==0] = temp
    # 求当前序列与其他序列进行比较的代价平均数
    result_top = np.sort(np.mean(avg_cost_by_dtw_top,axis=1))
    result_bottom = np.sort(np.mean(avg_cost_by_dtw_bottom,axis=1))
    # 得到上升沿计数
    count_top = get_count(result_top)
    # 下降沿计数
    count_bottom = get_count(result_bottom)
    # 返回上下沿计数更大的
    return count_top if count_top > count_bottom else count_bottom

In [2]:
def get_count_by_cost(file_name,class_num=-1,nature_flag=True):
    file_path = None
    if nature_flag:
        file_path = f'../../event_csv/compress_event_manhattan/class{class_num}/smooth_by_pca/compress_by_mean/{file_name}'
    else:
        # 人工合成数据
        file_path = f'../../event_csv/compress_event_manhattan/articicial/smooth_by_pca/compress_by_mean/{file_name}'
    # 经过PCA之后的数据
    pca_data = pd.read_csv(file_path)['value']
    win_change,top_win,win_size,step = get_index_of_bottom_and_top_by_mk(pca_data)
    return get_dtw_mean_cost(win_change,top_win,pca_data)

In [None]:
# 得到所有动作周期预测信息
def get_all_count(nature_flag=True):
    # 存储预测值
    pred_count = np.array([])
    if nature_flag:
        for i in range(2,8):
            for name in file_names:
                if i==3:
                    continue
                count = get_count_by_cost(f'{name}',i)
                pred_count = np.append(pred_count,count)
                print(f'文件名为{name}中class_num={i}动作重复次数为：{count}')
            print('-----------------')
    else:
    # 人工合成数据
        for name in artificial_file_names:
            for i in range(5):
                count = get_count_by_cost(f'{name[:-4]}{artificial_file_suffix[i]}',nature_flag=False)
                pred_count = np.append(pred_count,count)
                print(f'文件名为{name[:-4]}{artificial_file_suffix[i]}的动作重复次数为：{count}')
            print('-----------------')
    return pred_count

# 结果评判标准

In [3]:
# Mean Absolute Error，平均绝对误差
def MAE(pred_count,real_count):
    return np.mean(np.abs(real_count - pred_count)/real_count)
# Root Mean Squared Error，均方根误差
def RMSE(pred_count,real_count):
    return np.sqrt(np.mean((real_count - pred_count)**2))
# OffBy-One (OBO) count error.
def OBO(pred_count,real_count):
    # 预测值与真实值误差
    temp = np.abs(real_count-pred_count)
    # 误差小于等于1的预测值所占比重
    return temp[temp<=1].shape[0]/temp.shape[0]

# PCA主成分分析

In [1]:
def PCA_method(data):
    # 得到DataFrame的ndarray形式
    data = data.values
    # 降成一维
    pca = PCA(n_components=1)
    # 将PCA应用在数据上
    pca_data = pca.fit_transform(data)
    pca_data = np.reshape(pca_data,-1)
    # HP滤波
    _, smooth = sm.tsa.filters.hpfilter(pca_data)
    return smooth

# 不同条件下指标计算

In [None]:
# 计算所有光照下所有动作整体误差
def all_illumination_all_action(pred_count=None,pred_count_artificial=None):
#     print('我们的方法测得自然数据MAE=',MAE(pred_count,nature_real_count))
#     print('repnet的方法测得自然数据MAE=',MAE(repnet_nature_pred_count,nature_real_count))
#     print('----------------------------')
#     print('我们的方法测得自然数据OBO=',OBO(pred_count,nature_real_count))
#     print('repnet的方法测得自然数据OBO=',OBO(repnet_nature_pred_count,nature_real_count))
#     print('----------------------------')
#     print('----------------------------')
    print('我们的方法测得人工合成数据MAE=',MAE(pred_count_artificial,artificial_real_count))
    print('repnet测得人工合成数据MAE=',MAE(repnet_artificial_pred_count,artificial_real_count))
    print('----------------------------')
    print('我们的方法测得人工合成数据OBO=',OBO(pred_count_artificial,artificial_real_count))
    print('repnet测得人工合成数据OBO=',OBO(repnet_artificial_pred_count,artificial_real_count))

In [None]:
# 计算不同光照下同一动作整体误差
def diff_illumination_all_action(pred_count):
    start = 0
    for name in file_names:
        print(f'光照条件为{name[7:-4]}的MAE=',MAE(pred_count[start::5],nature_real_count[start::5]))
        print(f'repnet中光照条件为{name[7:-4]}的MAE=',MAE(repnet_nature_pred_count[start::5],nature_real_count[start::5]))
        print(f'光照条件为{name[7:-4]}的OBO=',OBO(pred_count[start::5],nature_real_count[start::5]))
        print(f'repnet中光照条件为{name[7:-4]}的OBO=',OBO(repnet_nature_pred_count[start::5],nature_real_count[start::5]))
        
        start = start + 1
        print('-----------------')

In [None]:
# 计算同一光照下不同动作整体误差
def all_illumination_diff_action(pred_count):
    start = 0
    for i in range(2,8):
        if i == 3:
            continue
        print(f'class={i}动作的MAE=',MAE(pred_count[start:start+5],nature_real_count[start:start+5]))
        print(f'repnet中class={i}动作的MAE=',MAE(repnet_nature_pred_count[start:start+5],nature_real_count[start:start+5]))
        print(f'class={i}动作的OBO=',OBO(pred_count[start:start+5],nature_real_count[start:start+5]))
        print(f'repnet中class={i}动作的OBO=',OBO(repnet_nature_pred_count[start:start+5],nature_real_count[start:start+5]))
        
        start = start + 5
        print('-----------------')