-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
470 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Example of using COF for outlier detection | ||
""" | ||
# Author: Yahya Almardeny <almardeny@gmail.com> | ||
# License: MIT | ||
|
||
from __future__ import division | ||
from __future__ import print_function | ||
|
||
import os | ||
import sys | ||
|
||
# temporary solution for relative imports in case pyod is not installed | ||
# if pyod is installed, no need to use the following line | ||
sys.path.append( | ||
os.path.abspath(os.path.join(os.path.dirname("__file__"), '..'))) | ||
|
||
import matplotlib.pyplot as plt | ||
|
||
from pyod.models.cof import COF | ||
from pyod.utils.data import generate_data | ||
from pyod.utils.data import get_outliers_inliers | ||
from pyod.utils.data import check_consistent_shape | ||
from pyod.utils.data import evaluate_print | ||
|
||
|
||
def visualize(clf_name, X_train, y_train, X_test, y_test, y_train_pred, | ||
y_test_pred, show_figure=True, save_figure=False): | ||
"""Utility function for visualizing the results in examples. | ||
Internal use only. | ||
Parameters | ||
---------- | ||
clf_name : str | ||
The name of the detector. | ||
X_train : numpy array of shape (n_samples, n_features) | ||
The training samples. | ||
y_train : list or array of shape (n_samples,) | ||
The ground truth of training samples. | ||
X_test : numpy array of shape (n_samples, n_features) | ||
The test samples. | ||
y_test : list or array of shape (n_samples,) | ||
The ground truth of test samples. | ||
y_train_pred : numpy array of shape (n_samples, n_features) | ||
The predicted binary labels of the training samples. | ||
y_test_pred : numpy array of shape (n_samples, n_features) | ||
The predicted binary labels of the test samples. | ||
show_figure : bool, optional (default=True) | ||
If set to True, show the figure. | ||
save_figure : bool, optional (default=False) | ||
If set to True, save the figure to the local. | ||
""" | ||
|
||
def _add_sub_plot(X_inliers, X_outliers, sub_plot_title, | ||
inlier_color='blue', outlier_color='orange'): | ||
"""Internal method to add subplot of inliers and outliers. | ||
Parameters | ||
---------- | ||
X_inliers : numpy array of shape (n_samples, n_features) | ||
Outliers. | ||
X_outliers : numpy array of shape (n_samples, n_features) | ||
Inliers. | ||
sub_plot_title : str | ||
Subplot title. | ||
inlier_color : str, optional (default='blue') | ||
The color of inliers. | ||
outlier_color : str, optional (default='orange') | ||
The color of outliers. | ||
""" | ||
plt.axis("equal") | ||
plt.scatter(X_inliers[:, 0], X_inliers[:, 1], label='inliers', | ||
color=inlier_color, s=40) | ||
plt.scatter(X_outliers[:, 0], X_outliers[:, 1], | ||
label='outliers', color=outlier_color, s=50, marker='^') | ||
plt.title(sub_plot_title, fontsize=15) | ||
plt.xticks([]) | ||
plt.yticks([]) | ||
plt.legend(loc=3, prop={'size': 10}) | ||
return | ||
|
||
# check input data shapes are consistent | ||
X_train, y_train, X_test, y_test, y_train_pred, y_test_pred = \ | ||
check_consistent_shape(X_train, y_train, X_test, y_test, y_train_pred, | ||
y_test_pred) | ||
|
||
if X_train.shape[1] != 2: | ||
raise ValueError("Input data has to be 2-d for visualization. The " | ||
"input data has {shape}.".format(shape=X_train.shape)) | ||
|
||
X_train_outliers, X_train_inliers = get_outliers_inliers(X_train, y_train) | ||
X_train_outliers_pred, X_train_inliers_pred = get_outliers_inliers( | ||
X_train, y_train_pred) | ||
|
||
X_test_outliers, X_test_inliers = get_outliers_inliers(X_test, y_test) | ||
X_test_outliers_pred, X_test_inliers_pred = get_outliers_inliers( | ||
X_test, y_test_pred) | ||
|
||
# plot ground truth vs. predicted results | ||
fig = plt.figure(figsize=(12, 10)) | ||
plt.suptitle("Demo of {clf_name} Detector".format(clf_name=clf_name), | ||
fontsize=15) | ||
|
||
fig.add_subplot(221) | ||
_add_sub_plot(X_train_inliers, X_train_outliers, 'Train Set Ground Truth', | ||
inlier_color='blue', outlier_color='orange') | ||
|
||
fig.add_subplot(222) | ||
_add_sub_plot(X_train_inliers_pred, X_train_outliers_pred, | ||
'Train Set Prediction', inlier_color='blue', | ||
outlier_color='orange') | ||
|
||
fig.add_subplot(223) | ||
_add_sub_plot(X_test_inliers, X_test_outliers, 'Test Set Ground Truth', | ||
inlier_color='green', outlier_color='red') | ||
|
||
fig.add_subplot(224) | ||
_add_sub_plot(X_test_inliers_pred, X_test_outliers_pred, | ||
'Test Set Prediction', inlier_color='green', | ||
outlier_color='red') | ||
|
||
if save_figure: | ||
plt.savefig('{clf_name}.png'.format(clf_name=clf_name), dpi=300) | ||
|
||
if show_figure: | ||
plt.show() | ||
|
||
return | ||
|
||
|
||
if __name__ == "__main__": | ||
contamination = 0.1 # percentage of outliers | ||
n_train = 200 # number of training points | ||
n_test = 100 # number of testing points | ||
|
||
# Generate sample data | ||
X_train, y_train, X_test, y_test = \ | ||
generate_data(n_train=n_train, | ||
n_test=n_test, | ||
n_features=2, | ||
contamination=contamination, | ||
random_state=42) | ||
|
||
# train kNN detector | ||
clf_name = 'COF' | ||
clf = COF() | ||
clf.fit(X_train) | ||
|
||
# get the prediction labels and outlier scores of the training data | ||
y_train_pred = clf.labels_ # binary labels (0: inliers, 1: outliers) | ||
y_train_scores = clf.decision_scores_ # raw outlier scores | ||
|
||
# get the prediction on the test data | ||
y_test_pred = clf.predict(X_test) # outlier labels (0 or 1) | ||
y_test_scores = clf.decision_function(X_test) # outlier scores | ||
|
||
# evaluate and print the results | ||
print("\nOn Training Data:") | ||
evaluate_print(clf_name, y_train, y_train_scores) | ||
print("\nOn Test Data:") | ||
evaluate_print(clf_name, y_test, y_test_scores) | ||
|
||
# visualize the results | ||
visualize(clf_name, X_train, y_train, X_test, y_test, y_train_pred, | ||
y_test_pred, show_figure=True, save_figure=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Connectivity-Based Outlier Factor (COF) Algorithm | ||
""" | ||
# Author: Yahya Almardeny <almardeny@gmail.com> | ||
# License: MIT | ||
from operator import itemgetter | ||
import numpy as np | ||
import pandas as pd | ||
from scipy.spatial import distance_matrix | ||
from sklearn.utils import check_array | ||
from pyod.utils import check_parameter | ||
from .base import BaseDetector | ||
|
||
|
||
class COF(BaseDetector): | ||
""" | ||
Algorithm to calculate the Connectivity-Based Outlier Factor (COF) | ||
as an outlier score for observations. | ||
The implementation is based on the work of: | ||
Tang, J., Chen, Z., Fu, A. W. C., & Cheung, D. W. (2002). | ||
Enhancing Effectiveness of Outlier Detections for Low Density Patterns. | ||
In Pacific-Asia Conf. on Knowledge Discovery and Data Mining (PAKDD). | ||
Taipei. pp. 535-548. DOI: 10.1007/3-540-47887-6_53 | ||
Parameters | ||
---------- | ||
contamination : float in (0., 0.5), optional (default=0.1) | ||
The amount of contamination of the data set, i.e. | ||
the proportion of outliers in the data set. Used when fitting to | ||
define the threshold on the decision function. | ||
n_neighbors : int, optional (default=10) | ||
Number of neighbors to use by default for k neighbors queries. | ||
Note that n_neighbors should be less than the number of samples. | ||
If n_neighbors is larger than the number of samples provided, | ||
all samples will be used. | ||
Attributes | ||
---------- | ||
decision_scores_ : numpy array of shape (n_samples,) | ||
The outlier scores of the training data. | ||
The higher, the more abnormal. Outliers tend to have higher | ||
scores. This value is available once the detector is | ||
fitted. | ||
threshold_ : float | ||
The threshold is based on ``contamination``. It is the | ||
``n_samples * contamination`` most abnormal samples in | ||
``decision_scores_``. The threshold is calculated for generating | ||
binary outlier labels. | ||
labels_ : int, either 0 or 1 | ||
The binary labels of the training data. 0 stands for inliers | ||
and 1 for outliers/anomalies. It is generated by applying | ||
``threshold_`` on ``decision_scores_``. | ||
n_neighbors_: int | ||
Number of neighbors to use by default for k neighbors queries. | ||
""" | ||
def __init__(self, contamination=0.1, n_neighbors=10): | ||
super(COF, self).__init__(contamination=contamination) | ||
if isinstance(n_neighbors, int): | ||
check_parameter(n_neighbors, | ||
low=1, | ||
param_name='n_neighbors') | ||
else: | ||
raise TypeError("n_neighbors should be int. Got %s" % type(n_neighbors)) | ||
self.n_neighbors_ = n_neighbors | ||
self.decision_scores_ = None | ||
|
||
def fit(self, X, y=None): | ||
"""Fit detector. y is optional for unsupervised methods. | ||
Parameters | ||
---------- | ||
X : numpy array of shape (n_samples, n_features) | ||
The input samples. | ||
y : numpy array of shape (n_samples,), optional (default=None) | ||
The ground truth of the input samples (labels). | ||
""" | ||
X = check_array(X) | ||
if self.n_neighbors_ >= X.shape[0]: | ||
self.n_neighbors_ = X.shape[0] - 1 | ||
self._set_n_classes(y) | ||
self.decision_scores_ = self.decision_function(X) | ||
self._process_decision_scores() | ||
|
||
return self | ||
|
||
def decision_function(self, X): | ||
"""Predict raw anomaly score of X using the fitted detector. | ||
The anomaly score of an input sample is computed based on different | ||
detector algorithms. For consistency, outliers are assigned with | ||
larger anomaly scores. | ||
Parameters | ||
---------- | ||
X : numpy array of shape (n_samples, n_features) | ||
The training input samples. Sparse matrices are accepted only | ||
if they are supported by the base estimator. | ||
Returns | ||
------- | ||
anomaly_scores : numpy array of shape (n_samples,) | ||
The anomaly score of the input samples. | ||
""" | ||
return self._cof(X) | ||
|
||
def _cof(self, X): | ||
""" | ||
Connectivity-Based Outlier Factor (COF) Algorithm | ||
This function is called internally to calculate the | ||
Connectivity-Based Outlier Factor (COF) as an outlier | ||
score for observations. | ||
:return: numpy array containing COF scores for observations. | ||
The greater the COF, the greater the outlierness. | ||
""" | ||
dist_matrix = pd.DataFrame(distance_matrix(X, X), | ||
index=range(X.shape[0]), | ||
columns=range(X.shape[0])) | ||
sbn_path_index, ac_dist, cof_ = [], [], [] | ||
for i in range(X.shape[0]): | ||
sbn_path = sorted(range(len(dist_matrix.loc[i].tolist())), | ||
key=dist_matrix.loc[i].tolist().__getitem__) | ||
sbn_path_index.append(sbn_path[1: self.n_neighbors_ + 1]) | ||
cost_desc = [] | ||
# this section takes the most time if number of neighbors is high! | ||
for j in range(self.n_neighbors_): | ||
cost_desc.append(np.min(np.array( | ||
dist_matrix.loc[dist_matrix.index[sbn_path], | ||
dist_matrix.columns[sbn_path]])[j + 1, range(0, j + 1)] | ||
)) | ||
# end of section | ||
acd = [] | ||
for _h, cost_ in enumerate(cost_desc): | ||
acd.append(((2 * (self.n_neighbors_ + 1 - (_h + 1))) / | ||
((self.n_neighbors_ + 1) * self.n_neighbors_)) * cost_) | ||
ac_dist.append(np.sum(acd)) | ||
|
||
for _g in range(X.shape[0]): | ||
cof_.append((ac_dist[_g] * self.n_neighbors_) / | ||
np.sum(itemgetter(*sbn_path_index[_g])(ac_dist))) | ||
return np.array(cof_) |
Oops, something went wrong.