diff --git a/.gitignore b/.gitignore index 86bbbe7..284e830 100644 --- a/.gitignore +++ b/.gitignore @@ -103,3 +103,7 @@ venv.bak/ .mypy_cache/ .idea +data +lb_kim.result +.swp +*.lprof diff --git a/CHANGELOG b/CHANGELOG index 85e944a..818cfd9 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -5,9 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [PEP440](https://www.python.org/dev/peps/pep-0440/) ## [Unreleased] +... + +1.2.0dev1 - 2019-10-20 +--- ### Added -- pandas display more columns/col_width -- test speed up +- UCR DTW implementation 1.1.0 - 2019-09-29 --- diff --git a/docs/time_series.md b/docs/time_series.md new file mode 100644 index 0000000..8ee57d6 --- /dev/null +++ b/docs/time_series.md @@ -0,0 +1,43 @@ +# Time Series Analysis Tool + +## Python implementation of the famous UCR DTW algorithm + +For theoretical details, +please read the [paper](https://www.cs.ucr.edu/~eamonn/SIGKDD_trillion.pdf) or this [article]() + +Usage below: + +```python +from kinoko.time_series.dtw import UCR_DTW +import numpy as np +import pytest + +# `dist_cb` means "callback function for distance", you can try `abs(x-y)` etc. +# `window_frac` is the fraction of query length used as window during various LB calculation +# for "Euclidean Distance" instead of DTW, just set window_frac to zero +ucr_dtw = UCR_DTW(dist_cb=lambda x,y: (x-y)**2, window_frac=0.05) + +x1 = np.linspace(0, 50, 100, endpoint=False) +y1 = 3.1 * np.sin(x1 / 1.5) + 3.5 + +x2 = np.linspace(0, 25, 50, endpoint=False) # half slice of x1 +y2 = 3.1 * np.sin((x2 + 4) / 1.5) + 3.5 # same function but slided 4-units toward west + +# `content` can be a `Iterable`(stream) of float/int, of any length +# `query` is supposed to be a sequence of fixed length, which would be loaded into RAM +loc, dist, _stat = ucr_dtw.search(content=y1, query=y2) +assert 8 == loc # 4 unit / 0.5 gap = 8 +assert pytest.approx(0) == dist # almost zero +``` + +!!!caution "Known Issues" + + - Currently, it only supports ==float/int== sequence + - vectors or even `object`s can not be uniformly `norm`ed + - hook mechanism seems like a "Premature Optimization" + + - Speed is approximately same as [another Python implementation](https://github.com/JozeeLin/ucr-suite-python/blob/master/DTW.ipynb) + Completed time/memory efficiency comparison with the [original C implementation](https://github.com/klon/ucrdtw/blob/master/src/ucrdtw.c) + was not conducted. + + - It is ==NOT production-ready==, use it with caution. diff --git a/kinoko/time_series/__init__.py b/kinoko/time_series/__init__.py new file mode 100644 index 0000000..f3c641a --- /dev/null +++ b/kinoko/time_series/__init__.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 expandtab number +""" +TODO: module_docstring$ qianweishuo$ 2019/10/17 + +Authors: qianweishuo +Date: 2019/10/17 下午2:27 +""" + diff --git a/kinoko/time_series/dtw.py b/kinoko/time_series/dtw.py new file mode 100644 index 0000000..355bb5a --- /dev/null +++ b/kinoko/time_series/dtw.py @@ -0,0 +1,450 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 expandtab number +""" +Python implementation of UCR_DTW & UCR_ED algorithm + +ref: + +- [code-python] https://github.com/JozeeLin/ucr-suite-python/blob/master/DTW.ipynb +- [code-C] https://github.com/klon/ucrdtw/blob/master/src/ucrdtw.c +- [paper] https://www.cs.ucr.edu/~eamonn/SIGKDD_trillion.pdf + +Authors: qianweishuo +Date: 2019/10/17 下午2:27 +""" + +from __future__ import unicode_literals, division, print_function + +from collections import Counter + +from typing import Sequence, Callable, TypeVar, Iterable, Tuple, Any, Union +import numpy as np +from numpy import ndarray +from functional import seq +from scipy.sparse import dok_matrix +from six.moves import map as map +from six.moves import zip as zip +from six.moves import xrange as range + +from sklearn.preprocessing import StandardScaler + +from kinoko.func import profile +from kinoko.misc.log_writer import init_log + +INF = float('inf') +NAN = np.nan +logger = init_log(__name__, level='DEBUG') + + +def square_dist_fn(v1, v2): + return (v1 - v2) ** 2 + + +class MovingStatistics(object): + def __init__(self, mean=NAN, std=NAN): + self.cum_x = 0 + self.cum_x2 = 0 + self.n_points = 0 + + self.mean = mean + self.std = std + + def feed(self, x): + self.cum_x += x + self.cum_x2 += x ** 2 + self.n_points += 1 + + def drop(self, x): + self.cum_x -= x + self.cum_x2 -= x ** 2 + self.n_points -= 1 + + def mv_mean(self): + return self.cum_x / self.n_points + + def mv_std(self): + return np.sqrt(self.cum_x2 / self.n_points - self.mv_mean() ** 2) + + def snapshot(self): + self.mean = self.mv_mean() + self.std = self.mv_std() + + def znorm(self, *args): + # type: (Any) -> Union[float, ndarray] + if len(args) == 1: + return (args[0] - self.mean) / self.std + else: + return (np.array(args) - self.mean) / self.std + + +class UCR_DTW(object): + # reset_period = 5000 # for debug + reset_period = 100000 # for “flush out” any accumulated floating error + + def __init__(self, dist_cb=square_dist_fn, window_frac=0.05): + # type: (Callable[[float, float], float], float) -> None + """ + :param dist_cb: callback function to calculate distance two floats + :param window_frac: window_size will be calculated by `int(len(query) * window_frac)` + read the KDD-paper chapter 3.1 for Sakoe-Chiba-Band related concepts + """ + self.dist_cb = dist_cb + self.window_frac = window_frac # type: float + + # a few internal states for `search()` + self.buffer = [] + self.loc = None # type: int + self.best_so_far = INF + + @profile + def search(self, content, query, window_frac=None): + # type: (Iterable[float], Sequence[float], Union[float, None]) -> Tuple[int, float, dict] + """ + Search `query` in `content`, find its nearest match period; + returning that period's starting index and DTW_distance and running details + + :param content: a sequence of floats, which can be used to compute distance by `self.dist_cb()` + :param query: a (typically) shorter sequence than content, which need to be searched for + :param window_frac: overwrite object variable if needed; see `__init__()` for detail + :return: tuple of: location, DTW_distance, running_details_as_dict + """ + Q = len(query) + self.best_so_far = INF # reset best cost for a new search + q_norm = StandardScaler().fit_transform(query[:, None]).flatten() # z-norm the q + + # create envelops for normalized query (viz. LB_Keogh_EQ) + window_size = int(Q * (window_frac or self.window_frac)) + q_norm_L, q_norm_U = self._lower_upper_lemire(q_norm, r=window_size) + q_argidx = q_norm.__abs__().argsort()[::-1] # decreasing order + q_norm_dec, q_norm_L_dec, q_norm_U_dec = q_norm[q_argidx], q_norm_L[q_argidx], q_norm_U[q_argidx] + + idx_buf = 0 + done = False + prune_cnt = Counter(kim=0, eg=0, ec=0) # pruning counters for each phase + while not done: + # use the last `m-1` points if available + self.buffer = [] if idx_buf == 0 else self.buffer[-(Q - 1):] + self.buffer += seq(content).take(self.reset_period - len(self.buffer)).to_list() + # CAUTION: `self.buffer` is huge, DO NOT PUT IT INNER LOOP + buf_L, buf_U = self._lower_upper_lemire(self.buffer, r=window_size) # for calc LB_Keogh_EC + + if len(self.buffer) <= Q - 1: + break + + C_stat = MovingStatistics() # start calculating online z-norm for points in buffer + # a circular array for keeping current content region; double size for avoiding "%" operator + C = np.zeros(Q * 2) # candidate C sequence + + for idx_p, p in enumerate(self.buffer): + C_stat.feed(p) + C[(idx_p % Q) + Q] = C[idx_p % Q] = p + if idx_p < Q - 1: + continue + + C_stat.snapshot() + i = (idx_p + 1) % Q # index in C + + # ----- LB_KimFL + lb_kim = self._lb_kim_hierarchy(C, i, C_stat, q_norm) + if lb_kim >= self.best_so_far: + prune_cnt['kim'] += 1 + # reduce obsolute points from sum and sum square + C_stat.drop(C[i]) # recall: `i = (idx_p + 1) % Q` ; circularly map to the left neighbor + continue # CAUTION: DO NOT FORGET TO `drop` BEFORE `continue` + + # ----- LB_Keogh_EQ + lb_keogh_eg, cb_eg = self._lb_keogh_online(C_stat, q_argidx, q_norm_L_dec, q_norm_U_dec, C=C[i:]) + if lb_keogh_eg >= self.best_so_far: + prune_cnt['eg'] += 1 + C_stat.drop(C[i]) + continue + + # ----- LB_Keogh_EC + idx_in_query = idx_p - (Q - 1) # start location of the data in `query` + lb_keogh_ec, cb_ec = self._lb_keogh_online(C_stat, q_argidx, # CAUTION: keep ordered beforehand + buf_L[idx_in_query:][q_argidx], + buf_U[idx_in_query:][q_argidx], q_norm=q_norm) + if lb_keogh_ec >= self.best_so_far: + prune_cnt['ec'] += 1 + C_stat.drop(C[i]) + continue + + # ----- DTW + # backward cumsum cb_eg & cb_ec to use in early abandoning DTW + cb_backcum = np.cumsum((cb_ec if lb_keogh_ec > lb_keogh_eg else cb_eg)[::-1])[::-1] + c = self.dtw_distance(C_stat.znorm(C[i:i + Q]), q_norm, max_stray=window_size, cb_backcum=cb_backcum) + if c < self.best_so_far: + self.best_so_far = c + self.loc = idx_buf * (self.reset_period - Q + 1) + idx_p - Q + 1 + C_stat.drop(C[i]) + logger.debug((idx_buf, idx_p, c, self.best_so_far)) + + # if idx_buf >= 2: # for debug + # done = True + if len(self.buffer) < self.reset_period: + done = True + else: + idx_buf += 1 + + logger.info("#################### %d %d ####################", idx_buf, len(self.buffer)) + + n_scanned = idx_buf * (self.reset_period - Q + 1) + len(self.buffer) + result_json = { + "location": self.loc, "dtw_distance": np.sqrt(self.best_so_far), + "n_scanned: ": n_scanned, "n_prunes": prune_cnt, "n_calc_dtw": (n_scanned - sum(prune_cnt.values())) + } + return self.loc, np.sqrt(self.best_so_far), result_json + + @staticmethod + def _lower_upper_lemire(s, r): + # type: (Sequence[float], int) -> Tuple[ndarray, ndarray] + """ + Finding the envelop of min and max value for LB_Keogh_EQ or EC + Implementation idea is introducted by Danial Lemire in his paper + "Faster Retrieval with a Two-Pass Dynamic-Time-Warping Lower Bound",Pattern Recognition 42(9) 2009 + """ + size = len(s) + L, U = np.empty(size), np.empty(size) + for j in range(size): # should replace loop with vectorization for speed + region = s[max(j - r, 0): min(j + r + 1, size)] # CAUTION: `r+1` right open + L[j], U[j] = min(region), max(region) + return L, U + + def _lb_kim_hierarchy(self, C, i, C_stat, query): + # type: (ndarray, int, MovingStatistics, Sequence[float]) -> float + """ + Usually, LB_Kim take time O(m) for finding top,bottom,first and last + however, because of z-normalization the top and bottom cannot give significant benefits + and using the first and last points can be computed in constant time + the prunning power of LB_Kim is non-trivial,especially when the query is not long, say in length 128 + + :param C: C un-normed + :param i: starting index in C + :param C_stat: + :param query: query normed + :return: + """ + Q = len(query) + # 1 point at front and back + x0, y0 = C_stat.znorm(C[i], C[(i + Q - 1)]) + lb = self.dist_cb(x0, query[0]) + self.dist_cb(y0, query[Q - 1]) + if lb >= self.best_so_far: + return lb + + # 2 points at front + x1 = C_stat.znorm(C[i + 1]) + lb += min(self.dist_cb(x1, query[0]), self.dist_cb(x0, query[1]), self.dist_cb(x1, query[1])) + if lb >= self.best_so_far: + return lb + + # 2 points at back + y1 = C_stat.znorm(C[(i + Q - 2)]) + lb += min(self.dist_cb(y1, query[Q - 1]), self.dist_cb(y0, query[Q - 2]), self.dist_cb(y1, query[Q - 2])) + if lb >= self.best_so_far: + return lb + + # 3 points at front + x2 = C_stat.znorm(C[(i + 2)]) + lb += min(self.dist_cb(x0, query[2]), self.dist_cb(x1, query[2]), + self.dist_cb(x2, query[2]), self.dist_cb(x2, query[1]), self.dist_cb(x2, query[0])) + if lb >= self.best_so_far: + return lb + + # 3 points at back + y2 = C_stat.znorm(C[(i + Q - 3)]) + lb += min(self.dist_cb(y0, query[Q - 3]), self.dist_cb(y1, query[Q - 3]), + self.dist_cb(y2, query[Q - 3]), self.dist_cb(y2, query[Q - 2]), self.dist_cb(y2, query[Q - 1])) + return lb + + @profile + def _lb_keogh_online(self, C_stat, q_argidx, L, U, C=None, q_norm=None): + # type: (MovingStatistics, ndarray, ndarray, ndarray, ndarray, ndarray) -> Tuple[float, ndarray] + """ + LB_Keogh for EG/EC : Create Envelop for the query or data + + :param C_stat: statistical info of C + :param q_argidx: pseudo-code: map(abs, query).argidx().reverse() + :param L: for "LB_keogh_EG" case, `L` means neighbor_lower_bound for normalized `query` + for "...EC" case, means ... for `buffer` region, un-normed, but reordered by q_argidx + :param U: ... upper ... + :param C: needed in "EG" case + :param q_norm: needed in "EC" case + :return: a tuple of: lowest_bound, current_bound_for_each_position + """ + assert not (C is None and q_norm is None), 'C and q_norm should not be None together' + lb = 0.0 + Q = len(U) + cb = np.zeros(Q) + + for i, l, u in zip(q_argidx, L, U): + if lb >= self.best_so_far: + break + + if C is not None: # EG case + x = C_stat.znorm(C[i]) # elements of C need to be normed "on the fly" + # `u, l` of query (in EG case) are already normed + else: + x = q_norm[i] + # noinspection PyPep8 + u, l = C_stat.znorm(u, l) + + if x > u: + d = self.dist_cb(x, u) + elif x < l: + d = self.dist_cb(x, l) + else: + d = 0 + lb += d + cb[i] = d # for usage in Early Abandoning of DTW + return lb, cb + + @profile + def dtw_distance(self, content, query, max_stray=None, cb_backcum=None, rolling_level=2): + # type: (Sequence[float], Sequence[float], int, ndarray, int) -> float + """ + calculate the DTW distance between two sequences: `content` & `query` + + :param content: to which compare against + :param query: to which compare for + :param max_stray: how many cells should not the warping path deviate beyond. c.f. Sakoe-Chiba Band + default use a tight value: abs(len(C)-len(Q)) + :param cb_backcum: current_lower_bound which is cumsum'ed backward; + approximates the lower bound of `dtw_cost(current_position -> north_east_corner)`. + qv. Figure 6, right subplot in the KDD paper. + :param rolling_level: level of DP rolling trick for memory-optimization + :return: DTW distance + """ + C, Q = len(content), len(query) + assert C > 0, 'Empty content to compare against' + assert Q > 0, 'Empty query to compare for' + if max_stray is None: + max_stray = abs(C - Q) # default + assert 0 <= max_stray <= min(C, Q) - 1, 'invalid max_stray' + if cb_backcum is not None: + assert len(content) == len(query), 'len(content) != len(query), `cb` should be None' + + def early_abandon_by_cb_backcum(i, mc): # inner function to save arguments + """ + Consider a "higher"(northern) point, say `(x, y)` where x > i; + whose DP path cost consists of two parts: + + 1. `(0,0) --> (x,y)` + 2. `(x,y) --> (C-1,Q-1)` + + For the first part, define `min_cost(x)` as `min(dp[x,:])`. + Since cost is mono-increasing w.r.t. row index, and `x > i`; + we known that `min_cost(x) >= min_cost(i)` + + For the second part, just use `cb_backcum` as a lower bound, read Figure 6 in KDD paper for detail. + we deduce that cost of `(x,y) --> (C-1,Q-1)` >= cb_backcum[x] + + Put it together, we conclude that: + cost( (0,0) -> (x,y) -> (C-1, Q-1) ) + = cost( (0,0) -> (x,y) ) + cost( (x,y) -> (C-1, Q-1) ) + >= min(dp[x,:]) + cb_backcum(x) + >= min(dp[i,:]) + cb_backcum(x) + """ + x = i + max_stray + 1 # take `x` as large as possibly allowed, for "earlier" abandon + if cb_backcum is not None and x < C and mc + cb_backcum[x] >= self.best_so_far: + return mc + cb_backcum[x] + else: + return None + + if rolling_level == 0: # naive version, for easy understanding; O(C*Q) space + # dp[i, j] defined as: DTW_distance between content[:i+1] & query[:j+1], excluding right bound + dp = dok_matrix((C, Q), dtype=float) + for i in range(C): + mc = INF # ignore for now; means the `min(dp[i,:])` concept as pseudo code below + + for j in range(Q): + if abs(j - i) > max_stray: + dp[i, j] = INF + continue + if i == j == 0: # can not go back beyond 0 + base = 0 + elif i == 0: + base = dp[i, j - 1] + elif j == 0: + base = dp[i - 1, j] + else: + base = min(dp[i - 1, j], dp[i, j - 1], dp[i - 1, j - 1]) + dp[i, j] = base + self.dist_cb(content[i], query[j]) + mc = min(mc, dp[i, j]) + + r = early_abandon_by_cb_backcum(i, mc) + if r is not None: + return r + return dp[-1, -1] + + elif rolling_level == 1: # O(Q) space + # rolling DP now defined as: current row of DTW distance + dp = np.cumsum([self.dist_cb(content[0], q) if j <= max_stray else INF + for j, q in enumerate(query)]) + for i in range(1, C): # starting from the second row, and goes north + mc = INF # see above `rolling_level == 0` case + for j in range(Q): # going east + if not i - max_stray <= j <= i + max_stray: + # keep current state of `dp[j]` as south_west for next j + south_west = dp[j] # CAUTION: before modification by current j: `dp[j] = INF` + dp[j] = INF + continue + + if j == 0: # there is no west or south_west for the head + base = dp[j] + else: + # noinspection PyUnboundLocalVariable + base = min(dp[j - 1], south_west, dp[j]) # base comes from: min(west, south_west, south) + # CAUTION: + # - after usage (see above): `... = min(..., south_west, ...)` + # - before modification (see below): `dp[j] = base + ...` + south_west = dp[j] + dp[j] = base + self.dist_cb(content[i], query[j]) # we can modify it now, after keeping + mc = min(mc, dp[j]) + + r = early_abandon_by_cb_backcum(i, mc) + if r is not None: + return r + return dp[-1] + + elif rolling_level == 2: # O(max_stray) space + # when calculating cell (i,j), we only need last row's cache of `[i-1, j-max_stray: j+max_stray+1]` + # thus, achieving O(max_stray) space + # IT IS NOT REALLY INTUITIVE; PLEASE READ C-CODE BELOW FOR DETAIL + # https://github.com/klon/ucrdtw/blob/afc6ff0ac83746378da8cc493bfb09f259f988ee/src/ucrdtw.c#L281 + cost = [INF] * (2 * max_stray + 1) + cost_prev = [INF] * (2 * max_stray + 1) + for i in range(C): + k = max(0, max_stray - i) + mc = INF + + for j in range(max(0, i - max_stray), min(Q, i + max_stray + 1)): + if i == j == 0: + base = 0 + else: + u = INF if i < 1 or k + 1 > 2 * max_stray else cost_prev[k + 1] + v = INF if j < 1 or k < 1 else cost[k - 1] + w = INF if i < 1 or j < 1 else cost_prev[k] + base = min(u, v, w) + cost[k] = base + self.dist_cb(content[i], query[j]) + if cost[k] < mc: + mc = cost[k] + k += 1 + + r = early_abandon_by_cb_backcum(i, mc) + if r is not None: + return r + cost, cost_prev = cost_prev, cost + k -= 1 + return cost_prev[k] + + else: + raise ValueError('invalid value for rolling_level: {}'.format(rolling_level)) + + +def _demo_entry(): + model = UCR_DTW() + model.search(content=map(eval, open('data/Data_new.txt')), query=np.loadtxt('data/Query_new.txt')) + +# if __name__ == '__main__': +# _demo_entry() diff --git a/kinoko/time_series/old_dtw.py b/kinoko/time_series/old_dtw.py new file mode 100644 index 0000000..db9e490 --- /dev/null +++ b/kinoko/time_series/old_dtw.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 expandtab number +""" +[DO NOT USE IT] +reference python implementation for developing and test + +https://github.com/JozeeLin/ucr-suite-python/blob/master/DTW.ipynb + +Authors: qianweishuo +Date: 2019/10/17 下午10:20 +""" +from __future__ import print_function, division, unicode_literals + +import math +import re +import time + +from kinoko.func import profile + + +def ConvertELogStrToValue(eLogStr): + """ + convert string of natural logarithm base of E to value + return (convertOK, convertedValue) + eg: + input: -1.1694737e-03 + output: -0.001169 + input: 8.9455025e-04 + output: 0.000895 + """ + + convertOK, convertedValue = (False, 0.0) + foundEPower = re.search("(?P-?\d+\.\d+)e(?P[-+]\d+)", eLogStr, re.I) + # print "foundEPower=",foundEPower + if (foundEPower): + coefficientPart = foundEPower.group("coefficientPart") + ePowerPart = foundEPower.group("ePowerPart") + # print "coefficientPart=%s,ePower=%s"%(coefficientPart, ePower) + coefficientValue = float(coefficientPart) + ePowerValue = float(ePowerPart) + # print "coefficientValue=%f,ePowerValue=%f"%(coefficientValue, ePowerValue) + # math.e= 2.71828182846 + # wholeOrigValue = coefficientValue * math.pow(math.e, ePowerValue) + wholeOrigValue = coefficientValue * math.pow(10, ePowerValue) + + # print "wholeOrigValue=",wholeOrigValue; + + (convertOK, convertedValue) = (True, wholeOrigValue) + else: + (convertOK, convertedValue) = (False, 0.0) + + return (convertOK, convertedValue) + + +class Old_UCR_DTW(object): + + def __init__(self, data_file=None, query_file=None, m=128, R=0.05): + self.fp = open(data_file, 'r') # data file path + self.qp = open(query_file, 'r') # query file path + self.result = open("lb_kim.result", 'w') + self.bsf = float('inf') # best-so-far + self.m = m # the size of the query + self.t = [None] * (self.m * 2) # candidate C sequence + self.q = [0] * self.m # query array + self.order = [None] * self.m # new order of the query + self.u, self.l = [None] * self.m, [None] * self.m # LB_Keogh Upper,Lower bound + self.qo = [None] * self.m + self.uo = [None] * self.m + self.lo = [None] * self.m + self.tz = [None] * self.m # 标准化后的C + self.cb = [0] * self.m # the cumulative lower bound + self.cb1 = [0] * self.m # the cumulative lower bound + self.cb2 = [0] * self.m # the cumulative lower bound + self.u_d = [None] * self.m + self.l_d = [None] * self.m + + """ + for every EPOCH points, all cummulative values, such as ex(sum),ex2, + will be restarted for reducing the floating point error + """ + # self.EPOCH = 100000 # for “flush out” any accumulated floating error + self.EPOCH = 5000 # for debug + + self.d = None # distance + self.ex, self.ex2, self.mean, self.std = 0.0, 0.0, 0.0, 0.0 + self.r = int(self.m * R) # LB_Keogh window,warping windows for LB_Keogh lower bound + self.loc = 0 + self.t1, self.t2 = None, None # start clock,end clock for time cost calculation + self.kim = 0 + self.keogh = 0 # LB_Keogh_EQ + self.keogh2 = 0 # LB_Keogh_EC + self.buffer = [0.0] * self.EPOCH + self.u_buff = [0.0] * self.EPOCH + self.l_buff = [0.0] * self.EPOCH + + def print_result(self, i): + print("Location: ", self.loc) + print("Distance: ", math.sqrt(self.bsf)) + print("Data Scanned: ", i) + print("Total Execution Time: ", (self.t2 - self.t1), " sec") + print("Pruned by LB_Kim : %.2f" % ((self.kim / i) * 100), '%') + print("Pruned by LB_Keogh : %.2f" % ((self.keogh / i) * 100), '%') + print("Pruned by LB_Keogh2: %.2f" % ((self.keogh2 / i) * 100), '%') + print("DTW Calculation : %.2f" % ((100 - self.kim + self.keogh + self.keogh2) / i * 100), '%') + + @profile + def main_run(self): + self.t1 = time.time() # start the clock + + self.Q_normalize() + # 求解标准化后的Q对应的lower bound;create envelop of the query,LB_Keogh_EQ self.l and self.u + self.lower_upper_lemire(self.q, self.m, self.r, self.l, self.u) + + self.sort_query_order() + + i = 0 # current index of the data in current chunk of size EPOCH + j = 0 # the starting index of the data in the circular array, t + it = 0 # 用于表示self.buffer的长度是否为self.EPOCH,如果是,则设置为1,否则设置为0 + done = False + while not done: + # read buffer of size EPOCH or when all data has been read + ep = self.buffer_init(it) + + # data are read in chunk of size EPOCH + # when there is nothing to read, the loop is end + if ep <= self.m - 1: + # 如果data file中的数据长度小于m-1,则直接返回,不需要进行后续的操作 + break + + # 这里的buffer并没有进行normalization处理(所以,在lb_keogh_data_cumulative函数中进行标准化处理),完整求出整个chunk中的lower bound + self.lower_upper_lemire(self.buffer, ep, self.r, self.l_buff, self.u_buff) # LB_Keogh_EC lower bound计算 + + # 开始计算online z-normalize + ex, ex2, mean, std = 0.0, 0.0, 0.0, 0.0 + for i in range(ep): + # A bunch of data has been read and pick one of them at a time to use + d = self.buffer[i] + # Calcualte sum and sum square + ex += d + ex2 += d * d + + # t is a circular array for keeping current data + self.t[i % self.m] = d + # double the size for avoiding using module "%" operator + self.t[(i % self.m) + self.m] = d + if i < self.m - 1: + continue + + # start the task when there are more than m-1 points in the current chunk + # 公式(5) + mean, std = self.Compute_Mean_Std(ex, ex2) + + # 候选子序列的两个索引信息 + # compute the start location of the data in the current circular array,self.t中的起始位置 + j = (i + 1) % self.m + # the start location of the data in the current chunk,在self.buffer中的位置 + I = i - (self.m - 1) + + # LB_KimFL + lb_kim = self.lb_kim_hierarchy(self.t, self.q, j, self.m, mean, std, self.bsf) + print("lb_kim:%f best_so_far:%f" % (lb_kim, self.bsf), file=self.result) + # 级联lower bound策略 + if lb_kim < self.bsf: + # LB_Keogh_EQ + lb_k = self.lb_keogh_cumulative(self.order, self.t, self.uo, self.lo, self.cb1, j, self.m, mean, + std, self.bsf) + print("lb_k:%f best_so_far:%f" % (lb_k, self.bsf), file=self.result) + if lb_k < self.bsf: + for k in range(self.m): + # z-normalization of t will be computed on the fly + self.tz[k] = (self.t[(k + j)] - mean) / std # 对完整的C序列进行标准化处理 + + # LB_Keogh_EC + lb_k2 = self.lb_keogh_data_cumulative(self.order, self.tz, self.qo, self.cb2, self.l_buff[I:], + self.u_buff[I:], self.m, mean, std, self.bsf) + print("lb_k2:%f best_so_far:%f" % (lb_k2, self.bsf), file=self.result) + if lb_k2 < self.bsf: + # choose better lower bound between lb_keogh and lb_keogh2 to be used in early abandoning DTW + # Note that cb and cb2 will be cumulative summed here + self.LB_Keogh_for_abandon_DTW(lb_k, lb_k2) + self.Early_Abandon_DTW(it, i) + else: + self.keogh2 += 1 + else: + self.keogh += 1 + else: + self.kim += 1 + + # reduce obsolute points from sum and sum square,公式(5)的第二项 + ex -= self.t[j] + ex2 -= self.t[j] * self.t[j] + + if it >= 2: # for debug + done = True + if ep < self.EPOCH: + done = True + else: + it += 1 + print("#" * 20, it, ep, "#" * 20) + + print(self.kim, self.keogh, self.keogh2) + + i = it * (self.EPOCH - self.m + 1) + ep + self.fp.close() + self.result.close() + self.t2 = time.time() + self.print_result(i) + + def buffer_init(self, it): + # read first m-1 points + if it == 0: + # 把data file中的m-1个数据读进buffer + for k in range(self.m - 1): + try: + line = self.line_to_float(next(self.fp)) + self.buffer[k] = line + except: + break + else: + for k in range(self.m - 1): + # 把最后的m-1个时间点移到buffer的前面 + self.buffer[k] = self.buffer[self.EPOCH - self.m + 1 + k] + + # fill the rest of self.buffer + # read buffer of size EPOCH or when all data has been read + ep = self.m - 1 + while ep < self.EPOCH: + try: + line = self.line_to_float(next(self.fp)) # 把data file中的数据读入buffer中 + self.buffer[ep] = line + except: + break + ep += 1 + return ep + + def Compute_Mean_Std(self, ex, ex2): + mean = ex / self.m + std = ex2 / self.m + std = math.sqrt(std - mean * mean) + return mean, std + + def Early_Abandon_DTW(self, it, i): + # compute DTW and early abandoning if possible + dist = self.dtw(self.tz, self.q, self.cb, self.m, self.r, self.bsf) + print("dtw-dist:%f best_so_far:%f" % (dist, self.bsf), file=self.result) + if dist < self.bsf: + # update bsf + # loc is the real starting locatin of the nearest neighbor in the file + self.bsf = dist + self.loc = it * (self.EPOCH - self.m + 1) + i - self.m + 1 + print((it, i, dist, self.bsf)) # for debug + + def LB_Keogh_for_abandon_DTW(self, lb_k, lb_k2): + # 结合两个LB_Keogh来进行early abandoning DTW + if lb_k > lb_k2: + self.cb[self.m - 1] = self.cb1[self.m - 1] + for k in range(self.m - 2, -1, -1): + self.cb[k] = self.cb[k + 1] + self.cb1[k] + else: + self.cb[self.m - 1] = self.cb2[self.m - 1] + for k in range(self.m - 2, -1, -1): + self.cb[k] = self.cb[k + 1] + self.cb2[k] + + def line_to_float(self, line): + return ConvertELogStrToValue(line.strip())[1] + + def sort_query_order(self): + """ + 对标准化后的Q的所有元素取绝对值,然后对这些时间点的绝对值进行排序,获取对应的时间点索引序列 + """ + Q_tmp = {} + # 求解获取连续最大子序列的order + # sort the query one time by abs(z-norm(q[i])) + for i in range(self.m): + Q_tmp[i] = abs(self.q[i]) + sorted_index = [item[0] for item in sorted(Q_tmp.items(), key=lambda x: x[1], reverse=True)] + # also create another arrays for keeping sorted envelop + self.order = sorted_index + for i, ind in enumerate(sorted_index): + self.qo[i] = self.q[ind] + self.uo[i] = self.u[ind] + self.lo[i] = self.l[ind] + + def Q_normalize(self): + """ + 对Query进行标准化处理 + """ + i = 0 + ex = 0.0 + ex2 = 0.0 + while i < self.m: + line = self.line_to_float(next(self.qp)) + ex += line + ex2 += line ** 2 + self.q[i] = line + i += 1 + + # [Query z-normalize] Do z-normalize the query, keep in same array, self.q + mean, std = self.Compute_Mean_Std(ex, ex2) + # print("[Q_normalize]ex=%f,ex2=%f,mean=%f,std=%f"%(ex,ex2,mean,std),file=self.result) + for i in range(self.m): + old = self.q[i] + self.q[i] = (self.q[i] - mean) / std + # print("old_q[i]=%f;q[%d]=%f"%(old,i,self.q[i]),file=self.result) + self.qp.close() + + def lower_upper_lemire(self, t, lenght, r, l, u): + """ + Finding the envelop of min and max value for LB_Keogh_EQ + Implementation idea is introducted by Danial Lemire in his paper + "Faster Retrieval with a Two-Pass Dynamic-Time-Warping Lower Bound",Pattern Recognition 42(9) 2009 + 参数: + t: the query + lenght: query lenght + r: window + l: lower + u: upper + """ + for ind in range(lenght): # CAUTION: 右开区间 + l[ind] = min(t[(ind - r if ind - r >= 0 else 0):(ind + r + 1 if ind + r + 1 < lenght else lenght)]) + u[ind] = max(t[(ind - r if ind - r >= 0 else 0):(ind + r + 1 if ind + r + 1 < lenght else lenght)]) + + def lb_kim_hierarchy(self, t, q, j, lenght, mean, std, bsf=float('inf')): + """ + Calculate quick lower bound + Usually, LB_Kim take time O(m) for finding top,bottom,first and last + however, because of z-normalization the top and bottom cannot give significant benefits + and using the first and last points can be computed in constant time + the prunning power of LB_Kim is non-trivial,especially when the query is not long,say in lenght 128 + 参数: + t: C-unnormalized + q: Q-normalized + j: t中的起始索引号 + lenght: q的长度 + mean: 均值 + std: 方差 + bsf: best-so-far + """ + # 1 point at front and back + x0 = (t[j] - mean) / std # 第一个元素进行标准化处理 + y0 = (t[(lenght - 1 + j)] - mean) / std # 最后一个元素进行标准化处理 + # 只计算first和last两个时间点之间距离之和 + lb = self.dist(x0, q[0]) + self.dist(y0, q[lenght - 1]) + if lb >= bsf: return lb + + # 2 points at front + x1 = (t[j + 1] - mean) / std + d = min(self.dist(x1, q[0]), self.dist(x0, q[1])) + d = min(d, self.dist(x1, q[1])) + lb += d + if lb >= bsf: return lb + + # 2 points at back + y1 = (t[(lenght - 2 + j)] - mean) / std + d = min(self.dist(y1, q[lenght - 1]), self.dist(y0, q[lenght - 2])) + d = min(d, self.dist(y1, q[lenght - 2])) + lb += d + if lb >= bsf: return lb + + # 3 points at front + x2 = (t[(j + 2)] - mean) / std + d = min(self.dist(x0, q[2]), self.dist(x1, q[2])) + d = min(d, self.dist(x2, q[2])) + d = min(d, self.dist(x2, q[1])) + d = min(d, self.dist(x2, q[0])) + lb += d + if lb >= bsf: return lb + + # 3 points at back + y2 = (t[(lenght - 3 + j)] - mean) / std + d = min(self.dist(y0, q[lenght - 3]), self.dist(y1, q[lenght - 3])) + d = min(d, self.dist(y2, q[lenght - 3])) + d = min(d, self.dist(y2, q[lenght - 2])) + d = min(d, self.dist(y2, q[lenght - 1])) + lb += d + + return lb + + def lb_keogh_cumulative(self, order, t, uo, lo, cb, j, lenght, mean, std, best_so_far=float('inf')): + """ + LB_Keogh 1: Create Envelop for the query + Note that because the query is known, envelop can be created once at the begenining. + + 参数: + order: sorted indices for the query + uo,lo: upper and lower envelops for the query,which already sorted + t : C (candidate subsequence) a circular array keeping the current data + j : index of the starting location in t + cb : (output)current bound at each position.It will be used later for early abandoning in DTW + """ + lb = 0 + print("lb_keogh_cumulative:mean=%f;std=%f" % (mean, std), file=self.result) + + for i in range(lenght): + if lb >= best_so_far: + break + + x = (t[order[i] + j] - mean) / std # 进行标准化 + d = 0 + # 计算重新排序后的Q的lower bound之间的距离 + if x > uo[i]: + d = self.dist(x, uo[i]) + elif x < lo[i]: + d = self.dist(x, lo[i]) + lb += d + cb[order[i]] = d # 把每个距离都记录下来,提供给后面Early Abandoning of DTW 使用 + print( + "lb_keogh_cumulative:j=%d;order[i]=%d,q[order[i]]=%f;x=%f;d=%f" % (j, order[i], self.q[order[i]], x, d), + file=self.result) + return lb + + def lb_keogh_data_cumulative(self, order, tz, qo, cb, l, u, lenght, mean, std, best_so_far=float('inf')): + ''' + LB_Keogh 2: Create Envelop for the data + Note that the envelops have bean created (in main function) when each data point has been read. + + 参数: + order: 根据标准化后的Q进行的排序对应的index列表 + tz:Z-normalized C + qo:sorted query + cb:(cb2)current bound at each position.Used later for early abandoning in DTW + l,u:lower and upper envelop of the current data(l_buff,u_buff) + ''' + lb = 0 + for i in range(lenght): + if lb >= best_so_far: + break + uu = (u[order[i]] - mean) / std # z-normalization,对lower bound进行标准化处理(用于应对对chunk中的所有数据进行标准化) + ll = (l[order[i]] - mean) / std + d = 0 + if qo[i] > uu: + d = self.dist(qo[i], uu) + elif qo[i] < ll: + d = self.dist(qo[i], ll) + lb += d + cb[order[i]] = d + return lb + + def dtw(self, A, B, cb, m, r, bsf=float('inf')): + """ + Calculate Dynamic Time Wrapping distance + A,B: data and query + cb: cummulative bound used for early abandoning + r: size of Sakoe-Chiba warpping band + """ + x, y, z, min_cost = 0.0, 0.0, 0.0, 0.0 + + # instead of using matrix of size O(m^2) or O(mr),we will reuse two array of size O(r) + cost = [float('inf')] * (2 * r + 1) + cost_prev = [float('inf')] * (2 * r + 1) + for i in range(m): + k = max(0, r - i) + min_cost = float('inf') + + for j in range(max(0, i - r), min(m - 1, i + r) + 1): + # Initialize all row and column + if i == 0 and j == 0: + cost[k] = self.dist(A[0], B[0]) + min_cost = cost[k] + k += 1 + continue + + if j - 1 < 0 or k - 1 < 0: + y = float('inf') + else: + y = cost[k - 1] + + if i - 1 < 0 or k + 1 > 2 * r: + x = float('inf') + else: # CAUTION: 改成python时,这里漏了 + x = cost_prev[k + 1] + + if i - 1 < 0 or j - 1 < 0: + z = float('inf') + else: + z = cost_prev[k] + + # Classic DTW calculation + cost[k] = min(min(x, y), z) + self.dist(A[i], B[j]) + # Find minimum cost in row for early abandoning(possibly to use column instead of row) + if cost[k] < min_cost: + min_cost = cost[k] + k += 1 + + # we can abandon early if the current cummulative distance with lower bound together are larger than bsf + if i + r < m - 1 and min_cost + cb[i + r + 1] >= bsf: + return min_cost + cb[i + r + 1] + + # Move current array to previous array + cost, cost_prev = cost_prev, cost + k -= 1 + + # the DTW distance is in the last cell in the matrix of size O(m^2) or at the middle of our array + final_dtw = cost_prev[k] + return final_dtw + + def dist(self, x, y): + return (x - y) ** 2 + + +if __name__ == '__main__': + model = Old_UCR_DTW('data/Data_new.txt', 'data/Query_new.txt') + model.main_run() diff --git a/mkdocs.yml b/mkdocs.yml index 5ed063d..dac2ade 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -66,5 +66,6 @@ nav: - Bash Utils: bash.md - Functional Utils: func.md - Visual Utils: visual.md + - Time Series Analysis Tool: time_series.md - Release Notes: changelog.md - Format Specimen: format_specimen.md diff --git a/requirements.txt b/requirements.txt index f8c67fc..1ed79a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,5 @@ tornado==4.3; python_version < "3.4" tornado; python_version >= "3.4" tqdm typing; python_version < "3.4" +pathlib +pandas<=0.24.0 diff --git a/setup.cfg b/setup.cfg index cfdf7d5..61093c8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,7 +4,7 @@ name = kinoko author = qianweishuo author_email = qzy922@gmail.com -version = 1.1.0 +version = 1.2.0dev1 description = python package for Japanese NLP and many other utils url = https://github.com/koyo922/kinoko license = MIT @@ -37,9 +37,11 @@ install_requires = six tqdm typing; python_version < "3.4" + numpy==1.16.5; python_version < "3.4" tornado<=4.4.2; python_version < "3.4" tornado; python_version >= "3.4" - numpy==1.16.5; python_version < "3.4" + scipy<=1.2.2; python_version < "3.4" + scikit-learn<=0.20.4; python_version < "3.4" pandas==0.24.0; python_version < "3.4" pandas; python_version >= "3.4" IPython<=5.8.0; python_version < "3.4" diff --git a/tests/test_time_series.py b/tests/test_time_series.py new file mode 100644 index 0000000..e8fde4e --- /dev/null +++ b/tests/test_time_series.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 expandtab number +""" +Authors: qianweishuo +Date: 2019/10/17 下午4:02 +""" +from __future__ import unicode_literals, division + +import pandas as pd +import numpy as np +import pytest + +from kinoko.time_series.dtw import UCR_DTW, square_dist_fn + +ucr_dtw = UCR_DTW(dist_cb=square_dist_fn) + +x = np.linspace(0, 50, 100) +ts1 = pd.Series(3.1 * np.sin(x / 1.5) + 3.5) +ts2 = pd.Series(2.2 * np.sin(x / 3.5 + 2.4) + 3.2) +ts3 = pd.Series(0.04 * x + 3.0) + + +def _FastDTWDistance(s1, s2, w): + """ 用作测试标准的 参考实现 https://github.com/JozeeLin/ucr-suite-python/blob/master/DTW.ipynb """ + DTW = {} + + w = max(w, abs(len(s1) - len(s2))) # window + + for i in range(-1, len(s1)): + for j in range(-1, len(s2)): + DTW[(i, j)] = float('inf') + DTW[(-1, -1)] = 0 + + for i in range(len(s1)): + for j in range(max(0, i - w), min(len(s2), i + w + 1)): # 注意 range()是右开区间,要用 w+1 + dist = (s1[i] - s2[j]) ** 2 + DTW[(i, j)] = dist + min(DTW[(i - 1, j)], DTW[(i, j - 1)], DTW[(i - 1, j - 1)]) + + return np.sqrt(DTW[len(s1) - 1, len(s2) - 1]) + + +@pytest.mark.parametrize("content, query", [(ts1, ts2), (ts1, ts3)]) +def test_dtw_distance(content, query): + assert (pytest.approx(_FastDTWDistance(content, query, 10) ** 2) == + ucr_dtw.dtw_distance(content, query, max_stray=10, rolling_level=0) == + ucr_dtw.dtw_distance(content, query, max_stray=10, rolling_level=1) == + ucr_dtw.dtw_distance(content, query, max_stray=10, rolling_level=2) + ) + + +def test_dtw_search_sin(): + dtw = UCR_DTW(dist_cb=lambda x, y: (x - y) ** 2, window_frac=0.05) + + x1 = np.linspace(0, 50, 100, endpoint=False) + y1 = 3.1 * np.sin(x1 / 1.5) + 3.5 + + x2 = np.linspace(0, 25, 50, endpoint=False) # half slice of x1 + y2 = 3.1 * np.sin((x2 + 4) / 1.5) + 3.5 # same function but slided 4-units toward west + + loc, dist, _stat = dtw.search(y1, y2) + assert 8 == loc # 4 unit / 0.5 gap = 8 + assert pytest.approx(0) == dist # almost zero