In [None]:
import math
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import datetime
from chinese_calendar import is_workday, is_holiday
import pickle as pk

In [None]:
data10_in = pd.read_csv('../../data/data_10m_in.csv',index_col=[0])

data10_in

In [None]:
def get_common_seq(best_path, threshold=1):
    com_ls = []
    pre = best_path[0]
    length = 1
    for i, element in enumerate(best_path):
        if i == 0:
            continue
        cur = best_path[i]
        if cur[0] == pre[0] + 1 and cur[1] == pre[1] + 1:
            length = length + 1
        else:
            com_ls.append(length)
            length = 1
        pre = cur
    com_ls.append(length)
    return list(filter(lambda num: True if threshold < num else False, com_ls))


def calculate_attenuate_weight(seqLen, com_ls):
    weight = 0
    for comlen in com_ls:
        weight = weight + (comlen * comlen) / (seqLen * seqLen)
    return 1 - math.sqrt(weight)


def best_path(paths):
    """Compute the optimal path from the nxm warping paths matrix."""
    i, j = int(paths.shape[0] - 1), int(paths.shape[1] - 1)
    p = []
    if paths[i, j] != -1:
        p.append((i - 1, j - 1))
    while i > 0 and j > 0:
        c = np.argmin([paths[i - 1, j - 1], paths[i - 1, j], paths[i, j - 1]])
        if c == 0:
            i, j = i - 1, j - 1
        elif c == 1:
            i = i - 1
        elif c == 2:
            j = j - 1
        if paths[i, j] != -1:
            p.append((i - 1, j - 1))
    p.pop()
    p.reverse()
    return p


def TimeSeriesSimilarity(s1, s2):
    l1 = len(s1)
    l2 = len(s2)
    paths = np.full((l1 + 1, l2 + 1), np.inf)  # 全部赋予无穷大
    paths[0, 0] = 0
    for i in range(l1):
        for j in range(l2):
            d = s1[i] - s2[j]
            cost = d ** 2
            paths[i + 1, j + 1] = cost + min(paths[i, j + 1], paths[i + 1, j], paths[i, j])

    paths = np.sqrt(paths)
    s = paths[l1, l2]
    return s, paths.T


if __name__ == '__main__':
    # 测试数据
    s1 = np.array([1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1])
    s2 = np.array([0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2])
    s3 = np.array([0.8, 1.5, 0, 1.2, 0, 0, 0.6, 1, 1.2, 0, 0, 1, 0.2, 2.4, 0.5, 0.4])

    # 原始算法
    distance12, paths12 = TimeSeriesSimilarity(s1, s2)
    distance13, paths13 = TimeSeriesSimilarity(s1, s3)

    print("更新前s1和s2距离：" + str(distance12))
    print("更新前s1和s3距离：" + str(distance13))

    best_path12 = best_path(paths12)
    best_path13 = best_path(paths13)

    # 衰减系数
    com_ls1 = get_common_seq(best_path12)
    com_ls2 = get_common_seq(best_path13)

    # print(len(best_path12), com_ls1)
    # print(len(best_path13), com_ls2)
    weight12 = calculate_attenuate_weight(len(best_path12), com_ls1)
    weight13 = calculate_attenuate_weight(len(best_path13), com_ls2)

    # 更新距离
    print("更新后s1和s2距离：" + str(distance12 * weight12))
    print("更新后s1和s3距离：" + str(distance13 * weight13))

In [None]:
x = data10_in.iloc[:,0].values.reshape(2808,1)
x.shape

In [None]:
import numpy as np
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw

# x = np.array([[1,1], [2,2], [3,3], [4,4], [5,5]])
# y = np.array([[2,2], [3,3], [4,4]])
x = data10_in.iloc[:,0]
y = data10_in.iloc[:,1]

distance, path = fastdtw(x, y, dist=euclidean)
print(distance)

In [None]:
import numpy as np
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw

# x = np.array([[1,1], [2,2], [3,3], [4,4], [5,5]])
# y = np.array([[2,2], [3,3], [4,4]])
x = data10_in.iloc[:,0]
y = data10_in.iloc[:,1]

distance, path = fastdtw(y, x, dist=euclidean)
print(distance)

In [None]:
num_sensors = 81
dist_mx = np.zeros((num_sensors, num_sensors), dtype=np.float32)
dist_mx[:] = 0
dist_mx

In [None]:
for i in range(81):
    x_id = i
    x = data10_in.iloc[:,i]
    for j in range(81):
        y_id = j
        y = data10_in.iloc[:,j]
        distance, path = fastdtw(x, y, dist=euclidean)
        dist_mx[x_id][y_id]=distance

In [None]:
np.arange(0,27)

In [None]:
import datetime
import multiprocessing as mp


def cal(task,managed_dict, managed_locker):
    dist_mx1 = np.zeros((num_sensors, num_sensors), dtype=np.float32)
    dist_mx1[:] = 0
    for i in range(81):
        x_id = i
        x = data10_in.iloc[:,i]
        for j in range(task*3+j):
            y_id = j
            y = data10_in.iloc[:,j]
            distance, path = fastdtw(x, y, dist=euclidean)
            dist_mx1[x_id][y_id]=distance
    return dist_mx1
            

# def train_on_parameter(name, param, result_dict, result_lock):
#     result = 0
#     for num in param:
#         result += math.sqrt(num * math.tanh(num) / math.log2(num) / math.log10(num))
#     with result_lock:
#         result_dict[name] = result
#     return


if __name__ == '__main__':

    start_t = datetime.datetime.now()

    num_cores = int(mp.cpu_count())
    print("本地计算机有: " + str(num_cores) + " 核心")
    pool = mp.Pool(num_cores-1)

    manager = mp.Manager()
    managed_locker = manager.Lock()
    managed_dict = manager.dict()
    results = [pool.apply_async(cal, args=(task, managed_dict, managed_locker)) for task in np.arange(0,27)]
    results = [p.get() for p in results]

    print(managed_dict)

    end_t = datetime.datetime.now()
    elapsed_sec = (end_t - start_t).total_seconds()
    print("多线程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")

In [None]:
import math
import datetime
import multiprocessing as mp


def train_on_parameter(name, param, result_dict, result_lock):
    result = 0
    for num in param:
        result += math.sqrt(num * math.tanh(num) / math.log2(num) / math.log10(num))
    with result_lock:
        result_dict[name] = result
    return name


if __name__ == '__main__':

    start_t = datetime.datetime.now()

    num_cores = int(mp.cpu_count())
    print("本地计算机有: " + str(num_cores) + " 核心")
    pool = mp.Pool(num_cores)
    param_dict = {'task1': list(range(10, 30000000)),
                  'task2': list(range(30000000, 60000000)),
                  'task3': list(range(60000000, 90000000)),
                  'task4': list(range(90000000, 120000000)),
                  'task5': list(range(120000000, 150000000)),
                  'task6': list(range(150000000, 180000000)),
                  'task7': list(range(180000000, 210000000)),
                  'task8': list(range(210000000, 240000000))}
    manager = mp.Manager()
    managed_locker = manager.Lock()
    managed_dict = manager.dict()
    results = [pool.apply_async(train_on_parameter, args=(name, param, managed_dict, managed_locker)) for name, param in param_dict.items()]
    results = [p.get() for p in results]

    print(managed_dict)

    end_t = datetime.datetime.now()
    elapsed_sec = (end_t - start_t).total_seconds()
    print("多线程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")


In [None]:
results[0]

In [None]:
import math
import datetime
import multiprocessing as mp


def train_on_parameter(name, param):
    result = 0
    for num in param:
        result += math.sqrt(num * math.tanh(num) / math.log2(num) / math.log10(num))
    return name


if __name__ == '__main__':

    start_t = datetime.datetime.now()

    num_cores = int(mp.cpu_count())
    print("本地计算机有: " + str(num_cores) + " 核心")
    pool = mp.Pool(num_cores)
    param_dict = {'task1': list(range(10, 30000000)),
                  'task2': list(range(30000000, 60000000)),
                  'task3': list(range(60000000, 90000000)),
                  'task4': list(range(90000000, 120000000)),
                  'task5': list(range(120000000, 150000000)),
                  'task6': list(range(150000000, 180000000)),
                  'task7': list(range(180000000, 210000000)),
                  'task8': list(range(210000000, 240000000))}
    results = [pool.apply_async(train_on_parameter, args=(name, param)) for name, param in param_dict.items()]
    results = [p.get() for p in results]

    end_t = datetime.datetime.now()
    elapsed_sec = (end_t - start_t).total_seconds()
    print("多进程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")


In [None]:
results