New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerated the computation of singular spectrum transformation by Krylov subspace #6

Merged
merged 5 commits into from Feb 3, 2019
Merged
Diff settings

Always

Just for now

Copy path View file
@@ -41,6 +41,11 @@ raw_data = pd.read_csv('./tests/test_data/periodic_wave.csv')
data = raw_data['y']
```

SST processing can be accelerated using the Lanczos method which is one of Krylov subspace methods by specifying `True` for the `is_lanczos` argument like below.
```python
results = model.detect(data, is_lanczos=True)
```

## Real-time monitoring with Bokeh
Banpei is developed with the goal of constructing the environment of real-time abnormality monitoring. In order to achieve the goal, Banpei has the function corresponded to the streaming data. With the help of Bokeh, which is great visualization library, we can construct the simple monitoring tool.
Here's a simple demonstration movie of change-point detection of the data trends.
Copy path View file
@@ -1,6 +1,6 @@
import numpy as np
from banpei.base.model import BaseModel

from .utils import *

class SST(BaseModel):
def __init__(self, w, m=2, k=None, L=None):
@@ -27,14 +27,16 @@ def __init__(self, w, m=2, k=None, L=None):
else:
self.L = L

def detect(self, data):
def detect(self, data, is_lanczos=False):
"""
Batch mode detection
Parameters
----------
data : array_like
Input array or object that can be converted to an array.
is_lanczos : boolean
If true, the change score is calculated based on the lanczos method
Returns
-------
@@ -67,18 +69,23 @@ def detect(self, data):
test_matrix = self._extract_matrix(data, start_test, end_test, self.w)

# Calculate the score by singular value decomposition(SVD)
change_scores[t] = self._calculate_score(tra_matrix, test_matrix)
if is_lanczos:
change_scores[t] = self._calculate_score_by_lanczos(tra_matrix, test_matrix)
else:
change_scores[t] = self._calculate_score_by_svd(tra_matrix, test_matrix)

return change_scores

def stream_detect(self, data):
def stream_detect(self, data, is_lanczos=False):
"""
Stream mode detection for live monitoring.
Parameters
----------
data : array_like
Input array or object that can be converted to an array.
is_lanczos : boolean
If true, the change score is calculated based on the lanczos method
Returns
-------
@@ -107,19 +114,28 @@ def stream_detect(self, data):
end_test = end_tra + self.L
test_matrix = self._extract_matrix(data, start_test, end_test, self.w)

# Calculate the score by singular value decomposition(SVD)
score = self._calculate_score(tra_matrix, test_matrix)

return score
# Calculate the change score
if is_lanczos:
return self._calculate_score_by_lanczos(tra_matrix, test_matrix)
else:
return self._calculate_score_by_svd(tra_matrix, test_matrix)

def _calculate_score(self, tra_matrix, test_matrix):
def _calculate_score_by_svd(self, tra_matrix, test_matrix):
U_tra, _, _ = np.linalg.svd(tra_matrix, full_matrices=False)
U_test, _, _ = np.linalg.svd(test_matrix, full_matrices=False)
U_tra_m = U_tra[:, :self.m]
U_test_m = U_test[:, :self.m]
s = np.linalg.svd(np.dot(U_tra_m.T, U_test_m), full_matrices=False, compute_uv=False)
return 1 - s[0]

def _calculate_score_by_lanczos(self, tra_matrix, test_matrix):
m, _, _ = power_method(test_matrix)
k = 2 * self.m if self.m % 2 == 0 else 2 * self.m - 1
P = np.dot(tra_matrix, tra_matrix.T)
T = tridiagonalize_by_lanczos(P, m, k)
eigenvalue, eigenvectors = tridiag_eigen(T)
return 1 - np.sum(eigenvectors[0, np.argsort(eigenvalue)[::-1][:self.m]] ** 2)

def _extract_matrix(self, data, start, end, w):
row = w
column = end - start + 1
Copy path View file
@@ -33,3 +33,116 @@ def power_method(A, iter_num=1):
u = Av / s

return u, s, v

def tridiagonalize_by_lanczos(P, m, k):
"""
Tridiagonalize matrix by lanczos method
Parameters
----------
P : numpy array
Target matrix
q : numpy array
Initial vector
k : int
Size of the tridiagonal matrix
Returns
-------
T : numpy array
tridiagonal matrix
"""
# Initialize variables
T = np.zeros((k, k))
r0 = m
beta0 = 1
q0 = np.zeros(m.shape)

for i in range(k):
q1 = r0 / beta0
C = np.dot(P, q1)
alpha1 = np.dot(q1, C)
r1 = C - alpha1 * q1 - beta0 * q0
beta1 = np.linalg.norm(r1)

T[i, i] = alpha1
if i + 1 < k:
T[i, i + 1] = beta1
T[i + 1, i] = beta1

q0 = q1
beta0 = beta1
r0 = r1

return T

def tridiag_eigen(T, iter_num=1, tol=1e-3):
"""
Calculate eigenvalues and eigenvectors of tridiagonal matrix
Parameters
----------
P : numpy array
Target matrix (tridiagonal)
iter_num : int
Number of iterations
tol : float
Stop iteration if the target matrix converges to a diagonal matrix with acceptable tolerance `tol`
Returns
-------
eigenvalue : numpy array
Calculated eigenvalues
eigenvectors : numpy array
Calculated eigenvectors
"""
eigenvectors = np.identity(T.shape[0])

for i in range(iter_num):
Q, R = tridiag_qr_decomposition(T)
T = np.dot(R, Q)
eigenvectors = np.dot(eigenvectors, Q)
eigenvalue = np.diag(T)
if np.all((T - np.diag(eigenvalue) < tol)):
break

return eigenvalue, eigenvectors

def tridiag_qr_decomposition(T):
"""
QR decomposition for a tridiagonal matrix
Ref. http://www.ericmart.in/blog/optimizing_julia_tridiag_qr
Parameters
----------
T : numpy array
Target matrix (tridiagonal)
Returns
-------
Qt.T : numpy array
R : numpy array
"""
R = T.copy()
Qt = np.eye(T.shape[0])

for i in range(T.shape[0] - 1):
u = householder(R[i:i + 2, i])
M = np.outer(u, u)
R[i:i + 2, :(i + 3)] -= 2 * np.dot(M, R[i:i + 2, :(i + 3)])
Qt[i:i + 2, :(i + 3)] -= 2 * np.dot(M, Qt[i:i + 2, :(i + 3)])

return Qt.T, R

def householder(x):
"""
Householder projection for vector.
Parameters
----------
x : numpy array
Target vector
Returns
-------
x : numpy array
"""
x[0] = x[0] + np.sign(x[0]) * np.linalg.norm(x)
x = x / np.linalg.norm(x)
return x
Copy path View file
@@ -2,7 +2,7 @@

setup(
name='banpei',
version='0.0.1',
version='0.1.0',
description='Anomaly detection library with Python',
author='Hirofumi Tsuruta',
packages=find_packages(exclude=('tests', 'docs', 'demo')),
Copy path View file
@@ -9,11 +9,16 @@ def setUp(self):
self.raw_data = pd.read_csv('tests/test_data/periodic_wave.csv')
self.data = self.raw_data['y']

def test_detect(self):
def test_detect_by_svd(self):
model = SST(w=50)
results = model.detect(self.data)
self.assertEqual(len(self.data), len(results))

def test_detect_by_lanczos(self):
model = SST(w=50)
results = model.detect(self.data, is_lanczos=True)
self.assertEqual(len(self.data), len(results))

def test_stream_detect(self):
model = SST(w=50)
result = model.stream_detect(self.data)
ProTip! Use n and p to navigate between commits in a pull request.