Skip to content

Commit

Permalink
update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhao062 committed Jul 29, 2019
1 parent bbf5e50 commit 104d275
Show file tree
Hide file tree
Showing 9 changed files with 693 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ v<0.0.4>, <07/21/2019> -- Add code maintainability.
v<0.0.5>, <07/27/2019> -- Add median combination and score_to_proba function.
v<0.0.5>, <07/28/2019> -- Add Stacking (meta ensembling).
v<0.0.6>, <07/29/2019> -- Enable Appveyor integration.
v<0.0.6>, <07/29/2019> -- Update requirements file.
v<0.0.6>, <07/29/2019> -- Update requirements file.
v<0.0.6>, <07/29/2019> -- Add simple outlier detector combination methods.
126 changes: 126 additions & 0 deletions combo/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
from collections import defaultdict
from abc import ABC, abstractmethod

import numpy as np
from numpy import percentile
from scipy.special import erf
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.multiclass import check_classification_targets

from .sklearn_base import _pprint
from ..utils.utility import _sklearn_version_21

Expand Down Expand Up @@ -86,6 +93,125 @@ def predict_proba(self, X):
"""
pass

def _process_decision_scores(self):
"""Internal function to calculate key attributes for outlier detection
combination tasks.
- threshold_: used to decide the binary label
- labels_: binary labels of training data
Returns
-------
self
"""

self.threshold_ = percentile(self.decision_scores_,
100 * (1 - self.contamination))
self.labels_ = (self.decision_scores_ > self.threshold_).astype(
'int').ravel()

# calculate for predict_proba()

self._mu = np.mean(self.decision_scores_)
self._sigma = np.std(self.decision_scores_)

return self

def _detector_predict(self, X):
"""Internal function to predict if a particular sample is an outlier or not.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""

check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])

pred_score = self.decision_function(X)
return (pred_score > self.threshold_).astype('int').ravel()

def _detector_predict_proba(self, X, proba_method='linear'):
"""Predict the probability of a sample being outlier. Two approaches
are possible:
1. simply use Min-max conversion to linearly transform the outlier
scores into the range of [0,1]. The model must be
fitted first.
2. use unifying scores, see :cite:`kriegel2011interpreting`.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
proba_method : str, optional (default='linear')
Probability conversion method. It must be one of
'linear' or 'unify'.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""

check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
train_scores = self.decision_scores_

test_scores = self.decision_function(X)

probs = np.zeros([X.shape[0], int(self._classes)])
if proba_method == 'linear':
scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1))
probs[:, 1] = scaler.transform(
test_scores.reshape(-1, 1)).ravel().clip(0, 1)
probs[:, 0] = 1 - probs[:, 1]
return probs

elif proba_method == 'unify':
# turn output into probability
pre_erf_score = (test_scores - self._mu) / (
self._sigma * np.sqrt(2))
erf_score = erf(pre_erf_score)
probs[:, 1] = erf_score.clip(0, 1).ravel()
probs[:, 0] = 1 - probs[:, 1]
return probs
else:
raise ValueError(proba_method,
'is not a valid probability conversion method')

def _set_n_classes(self, y):
"""Set the number of classes if `y` is presented, which is not
expected. It could be useful for multi-class outlier detection.
Parameters
----------
y : numpy array of shape (n_samples,)
Ground truth.
Returns
-------
self
"""

self._classes = 2 # default as binary classification
if y is not None:
check_classification_targets(y)
self._classes = len(np.unique(y))
warnings.warn(
"y should not be presented in unsupervised learning.")
return self

def __len__(self):
"""Returns the number of estimators in the ensemble."""
return len(self.base_estimators)
Expand Down
240 changes: 240 additions & 0 deletions combo/models/detector_comb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# -*- coding: utf-8 -*-
"""A collection of methods for combining detectors
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause


import numpy as np

from sklearn.utils import check_array
from sklearn.utils import column_or_1d
from sklearn.utils.validation import check_is_fitted
from pyod.utils.utility import standardizer

from .base import BaseAggregator
from .score_comb import average, maximization, median
from ..utils.utility import check_parameter
from ..utils.utility import score_to_proba


class SimpleDetectorAggregator(BaseAggregator):
"""A collection of simple detector combination methods.
Parameters
----------
base_estimators: list or numpy array (n_estimators,)
A list of base detectors.
method : str, optional (default='average')
Combination method: {'average', 'maximization',
'median'}. Pass in weights of detector for weighted version.
threshold : float in (0, 1), optional (default=0.5)
Cut-off value to convert scores into binary labels.
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set,
i.e. the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function.
standardization : bool, optional (default=True)
If True, perform standardization first to convert
prediction score to zero mean and unit variance.
See http://scikit-learn.org/stable/auto_examples/preprocessing/plot_scaling_importance.html
weights : numpy array of shape (1, n_detectors)
detector weights.
pre_fitted : bool, optional (default=False)
Whether the base detectors are trained. If True, `fit`
process may be skipped.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""

def __init__(self, base_estimators, method='average', threshold=0.5,
contamination=0.1, standardization=True,
weights=None, pre_fitted=False):

super(SimpleDetectorAggregator, self).__init__(
base_estimators=base_estimators, pre_fitted=pre_fitted)

# validate input parameters
if method not in ['average', 'maximization', 'median']:
raise ValueError("{method} is not a valid parameter.".format(
method=method))
self.method = method

if not (0. < contamination <= 0.5):
raise ValueError("contamination must be in (0, 0.5], "
"got: %f" % contamination)
self.contamination = contamination

self.standardization = standardization

check_parameter(threshold, 0, 1, include_left=False,
include_right=False, param_name='threshold')
self.threshold = threshold

if weights is None:
self.weights = np.ones([1, self.n_base_estimators_])
else:

self.weights = column_or_1d(weights).reshape(1, len(weights))
assert (self.weights.shape[1] == self.n_base_estimators_)

# adjust probability by a factor for integrity
adjust_factor = self.weights.shape[1] / np.sum(weights)
self.weights = self.weights * adjust_factor

def fit(self, X, y=None):
"""Fit detector. y is optional for unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : numpy array of shape (n_samples,), optional (default=None)
The ground truth of the input samples (labels).
"""

# Validate inputs X and y
X = check_array(X)
self._set_n_classes(y)

if self.pre_fitted:
print("Training skipped")
else:
for clf in self.base_estimators:
clf.fit(X, y)
clf.fitted_ = True

self.decision_scores_ = self._create_scores(X)
self._process_decision_scores()

return self

def _create_scores(self, X):
"""Internal function to generate and combine scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
agg_score: numpy array of shape (n_samples,)
Aggregated scores.
"""
all_scores = np.zeros([X.shape[0], self.n_base_estimators_])

for i, clf in enumerate(self.base_estimators):
if hasattr(clf, 'decision_function'):
all_scores[:, i] = clf.decision_function(X)
else:
raise ValueError(
"{clf} does not have decision_function.".format(clf=clf))

if self.standardization:
all_scores = standardizer(all_scores)
if self.method == 'average':
agg_score = average(all_scores, estimator_weights=self.weights)
if self.method == 'maximization':
agg_score = maximization(all_scores)
if self.method == 'median':
agg_score = median(all_scores)

return agg_score

def decision_function(self, X):
"""Predict raw anomaly scores of X using the fitted detector.
The anomaly score of an input sample is computed based on the fitted
detector. For consistency, outliers are assigned with
higher anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""

check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)

return self._create_scores(X)

def predict(self, X):
"""Predict if a particular sample is an outlier or not.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._detector_predict(X)

def predict_proba(self, X, proba_method='linear'):
"""Predict the probability of a sample being outlier. Two approaches
are possible:
1. simply use Min-max conversion to linearly transform the outlier
scores into the range of [0,1]. The model must be
fitted first.
2. use unifying scores, see :cite:`kriegel2011interpreting`.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
proba_method : str, optional (default='linear')
Probability conversion method. It must be one of
'linear' or 'unify'.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""

check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._detector_predict_proba(X, proba_method)
Loading

0 comments on commit 104d275

Please sign in to comment.