Skip to content

Commit

Permalink
Report model change annotations via result stream (elastic#1247)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Jun 10, 2020
1 parent 37b96b4 commit ef1dcff
Show file tree
Hide file tree
Showing 44 changed files with 648 additions and 37 deletions.
5 changes: 5 additions & 0 deletions docs/CHANGELOG.asciidoc
Expand Up @@ -30,6 +30,11 @@

== {es} version 7.9.0

=== New Features

* Report significant changes to anomaly detection models in annotations of the results.
(See {ml-pull}1247[#1247], {pull}56342[#56342], {pull}56417[#56417], {pull}57144[#57144], {pull}57278[#57278], {pull}57539[#57539].)

=== Enhancements

* Add support for larger forecasts in memory via max_model_memory setting.
Expand Down
50 changes: 50 additions & 0 deletions include/api/CAnnotationJsonWriter.h
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
#ifndef INCLUDED_ml_api_CAnnotationJsonWriter_h
#define INCLUDED_ml_api_CAnnotationJsonWriter_h

#include <core/CJsonOutputStreamWrapper.h>
#include <core/CNonCopyable.h>
#include <core/CRapidJsonConcurrentLineWriter.h>
#include <core/CoreTypes.h>

#include <api/ImportExport.h>

#include <model/CAnnotation.h>

#include <rapidjson/document.h>

#include <iosfwd>
#include <sstream>
#include <string>

#include <stdint.h>

namespace ml {
namespace api {

//! \brief
//! Write annotation result as a JSON document
class API_EXPORT CAnnotationJsonWriter final : private core::CNonCopyable {
public:
//! Constructor that causes to be written to the specified stream
explicit CAnnotationJsonWriter(core::CJsonOutputStreamWrapper& outStream);

void writeResult(const std::string& jobId, const model::CAnnotation& annotation);

private:
void populateAnnotationObject(const std::string& jobId,
const model::CAnnotation& annotation,
rapidjson::Value& obj);

private:
//! JSON line writer
core::CRapidJsonConcurrentLineWriter m_Writer;
};
}
}

#endif // INCLUDED_ml_api_CAnnotationJsonWriter_h
4 changes: 4 additions & 0 deletions include/api/CAnomalyJob.h
Expand Up @@ -111,6 +111,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
std::pair<model::CSearchKey::TStrCRefKeyCRefPr, TAnomalyDetectorPtr>;
using TKeyCRefAnomalyDetectorPtrPrVec = std::vector<TKeyCRefAnomalyDetectorPtrPr>;
using TModelPlotDataVec = model::CAnomalyDetector::TModelPlotDataVec;
using TAnnotationVec = model::CAnomalyDetector::TAnnotationVec;

struct API_EXPORT SRestoredStateDetail {
ERestoreStateStatus s_RestoredStateStatus;
Expand Down Expand Up @@ -334,6 +335,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! choosing: either file or streamed to the API
void writeOutModelPlot(const TModelPlotDataVec& modelPlotData);

//! Write the annotations to the output stream.
void writeOutAnnotations(const TAnnotationVec& annotations);

//! Persist one detector to a stream.
//! This method is static so that there is no danger of it accessing
//! the member variables of an object. This makes it safer to call
Expand Down
9 changes: 9 additions & 0 deletions include/maths/CModel.h
Expand Up @@ -121,6 +121,12 @@ class MATHS_EXPORT CModelAddSamplesParams {
//! Get the prior sample weights.
const TDouble2VecWeightsAryVec& priorWeights() const;

//! Set the model annotation callback.
CModelAddSamplesParams&
annotationCallback(const maths_t::TModelAnnotationCallback& modelAnnotationCallback);
//! Get the model annotation callback.
const maths_t::TModelAnnotationCallback& annotationCallback() const;

private:
//! The data type.
maths_t::EDataType m_Type = maths_t::E_MixedData;
Expand All @@ -132,6 +138,9 @@ class MATHS_EXPORT CModelAddSamplesParams {
const TDouble2VecWeightsAryVec* m_TrendWeights = nullptr;
//! The prior sample weights.
const TDouble2VecWeightsAryVec* m_PriorWeights = nullptr;
//! The model change callback.
maths_t::TModelAnnotationCallback m_ModelAnnotationCallback =
[](core_t::TTime, const std::string&) {};
};

//! \brief The extra parameters needed by CModel::probability.
Expand Down
11 changes: 7 additions & 4 deletions include/maths/CTimeSeriesDecomposition.h
Expand Up @@ -106,10 +106,13 @@ class MATHS_EXPORT EMPTY_BASE_OPT CTimeSeriesDecomposition
//! and it's local variance.
//! \param[in] componentChangeCallback Called if the components
//! change as a result of adding the data point.
virtual void addPoint(core_t::TTime time,
double value,
const maths_t::TDoubleWeightsAry& weights = TWeights::UNIT,
const TComponentChangeCallback& componentChangeCallback = noop);
//! \param[in] modelAnnotationCallback Called if the model changes as a result of adding the data point.
virtual void
addPoint(core_t::TTime time,
double value,
const maths_t::TDoubleWeightsAry& weights = TWeights::UNIT,
const TComponentChangeCallback& componentChangeCallback = noopComponentChange,
const maths_t::TModelAnnotationCallback& modelAnnotationCallback = noopModelAnnotation);

//! Apply \p change at \p time.
//!
Expand Down
9 changes: 7 additions & 2 deletions include/maths/CTimeSeriesDecompositionDetail.h
Expand Up @@ -359,7 +359,8 @@ class MATHS_EXPORT CTimeSeriesDecompositionDetail : private CTimeSeriesDecomposi
class CScopeAttachComponentChangeCallback {
public:
CScopeAttachComponentChangeCallback(CComponents& components,
TComponentChangeCallback callback);
TComponentChangeCallback componentChangeCallback,
maths_t::TModelAnnotationCallback modelAnnotationCallback);
~CScopeAttachComponentChangeCallback();
CScopeAttachComponentChangeCallback(const CScopeAttachComponentChangeCallback&) = delete;
CScopeAttachComponentChangeCallback&
Expand Down Expand Up @@ -768,7 +769,8 @@ class MATHS_EXPORT CTimeSeriesDecompositionDetail : private CTimeSeriesDecomposi
std::size_t maxSize() const;

//! Add new seasonal components to \p components.
bool addSeasonalComponents(const CPeriodicityHypothesisTestsResult& result,
bool addSeasonalComponents(core_t::TTime time,
const CPeriodicityHypothesisTestsResult& result,
const CExpandingWindow& window,
const TPredictor& predictor);

Expand Down Expand Up @@ -865,6 +867,9 @@ class MATHS_EXPORT CTimeSeriesDecompositionDetail : private CTimeSeriesDecomposi
//! Called if the components change.
TComponentChangeCallback m_ComponentChangeCallback;

//! Called if the model change annotation is reported.
maths_t::TModelAnnotationCallback m_ModelAnnotationCallback;

//! Set to true when testing for a change.
bool m_TestingForChange = false;

Expand Down
6 changes: 4 additions & 2 deletions include/maths/CTimeSeriesDecompositionInterface.h
Expand Up @@ -96,7 +96,8 @@ class MATHS_EXPORT CTimeSeriesDecompositionInterface : public CTimeSeriesDecompo
addPoint(core_t::TTime time,
double value,
const maths_t::TDoubleWeightsAry& weights = TWeights::UNIT,
const TComponentChangeCallback& componentChangeCallback = noop) = 0;
const TComponentChangeCallback& componentChangeCallback = noopComponentChange,
const maths_t::TModelAnnotationCallback& modelAnnotationCallback = noopModelAnnotation) = 0;

//! Apply \p change at \p time.
//!
Expand Down Expand Up @@ -193,7 +194,8 @@ class MATHS_EXPORT CTimeSeriesDecompositionInterface : public CTimeSeriesDecompo
virtual core_t::TTime lastValueTime() const = 0;

protected:
static void noop(TFloatMeanAccumulatorVec) {}
static void noopComponentChange(TFloatMeanAccumulatorVec) {}
static void noopModelAnnotation(core_t::TTime, const std::string&) {}
};
}
}
Expand Down
10 changes: 6 additions & 4 deletions include/maths/CTimeSeriesDecompositionStub.h
Expand Up @@ -41,10 +41,12 @@ class MATHS_EXPORT CTimeSeriesDecompositionStub : public CTimeSeriesDecompositio
virtual void testingForChange(bool value);

//! No-op returning false.
virtual void addPoint(core_t::TTime time,
double value,
const maths_t::TDoubleWeightsAry& weights = TWeights::UNIT,
const TComponentChangeCallback& componentChangeCallback = noop);
virtual void
addPoint(core_t::TTime time,
double value,
const maths_t::TDoubleWeightsAry& weights = TWeights::UNIT,
const TComponentChangeCallback& componentChangeCallback = noopComponentChange,
const maths_t::TModelAnnotationCallback& modelAnnotationCallback = noopModelAnnotation);

//! No-op returning false.
virtual bool applyChange(core_t::TTime time, double value, const SChangeDescription& change);
Expand Down
8 changes: 4 additions & 4 deletions include/maths/CTimeSeriesModel.h
Expand Up @@ -234,8 +234,8 @@ class MATHS_EXPORT CUnivariateTimeSeriesModel : public CModel {
EUpdateResult applyChange(const SChangeDescription& change);

//! Update the trend with \p samples.
EUpdateResult updateTrend(const TTimeDouble2VecSizeTrVec& samples,
const TDouble2VecWeightsAryVec& trendWeights);
EUpdateResult updateTrend(const CModelAddSamplesParams& params,
const TTimeDouble2VecSizeTrVec& samples);

//! Update the various model decay rates based on the prediction errors
//! for \p samples.
Expand Down Expand Up @@ -689,8 +689,8 @@ class MATHS_EXPORT CMultivariateTimeSeriesModel : public CModel {

private:
//! Update the trend with \p samples.
EUpdateResult updateTrend(const TTimeDouble2VecSizeTrVec& samples,
const TDouble2VecWeightsAryVec& trendWeights);
EUpdateResult updateTrend(const CModelAddSamplesParams& params,
const TTimeDouble2VecSizeTrVec& samples);

//! Update the various model decay rates based on the prediction errors
//! for \p samples.
Expand Down
4 changes: 4 additions & 0 deletions include/maths/MathsTypes.h
Expand Up @@ -9,6 +9,7 @@

#include <core/CFloatStorage.h>
#include <core/CSmallVector.h>
#include <core/CoreTypes.h>

#include <maths/ImportExport.h>

Expand Down Expand Up @@ -79,6 +80,9 @@ using TDouble2VecWeightsAry1Vec = core::CSmallVector<TDouble2VecWeightsAry, 1>;
using TDouble10VecWeightsAry = TWeightsAry<TDouble10Vec>;
using TDouble10VecWeightsAry1Vec = core::CSmallVector<TDouble10VecWeightsAry, 1>;

// Functional type used for reporting model change annotations to higher-level layers.
using TModelAnnotationCallback = std::function<void(core_t::TTime, const std::string&)>;

namespace maths_types_detail {

//! \brief Constructs a unit weight.
Expand Down
57 changes: 57 additions & 0 deletions include/model/CAnnotation.h
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#ifndef INCLUDED_ml_model_CAnnotation_h
#define INCLUDED_ml_model_CAnnotation_h

#include <model/ImportExport.h>

#include <model/ModelTypes.h>

#include <string>

namespace ml {
namespace model {

//! \brief Data necessary to create an annotation
class MODEL_EXPORT CAnnotation {
public:
CAnnotation() = default;
CAnnotation(core_t::TTime time,
const std::string& annotation,
int detectorIndex,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& overFieldName,
const std::string& overFieldValue,
const std::string& byFieldName,
const std::string& byFieldValue);
core_t::TTime time() const;
const std::string& annotation() const;
const std::string& event() const;
int detectorIndex() const;
const std::string& partitionFieldName() const;
const std::string& partitionFieldValue() const;
const std::string& overFieldName() const;
const std::string& overFieldValue() const;
const std::string& byFieldName() const;
const std::string& byFieldValue() const;

private:
core_t::TTime m_Time = 0;
std::string m_Annotation;
int m_DetectorIndex = -1;
std::string m_PartitionFieldName;
std::string m_PartitionFieldValue;
std::string m_OverFieldName;
std::string m_OverFieldValue;
std::string m_ByFieldName;
std::string m_ByFieldValue;
};
}
}

#endif // INCLUDED_ml_model_CAnnotation_h
6 changes: 6 additions & 0 deletions include/model/CAnomalyDetector.h
Expand Up @@ -63,6 +63,7 @@ class MODEL_EXPORT CAnomalyDetector : public CMonitoredResource {
using TStrVec = std::vector<std::string>;
using TStrCPtrVec = std::vector<const std::string*>;
using TModelPlotDataVec = std::vector<CModelPlotData>;
using TAnnotationVec = CAnomalyDetectorModel::TAnnotationVec;
using TDataGathererPtr = std::shared_ptr<CDataGatherer>;
using TModelFactoryCPtr = std::shared_ptr<const CModelFactory>;
using TModelPtr = std::unique_ptr<CAnomalyDetectorModel>;
Expand Down Expand Up @@ -224,6 +225,11 @@ class MODEL_EXPORT CAnomalyDetector : public CMonitoredResource {
const TStrSet& terms,
TModelPlotDataVec& modelPlots) const;

//! Generate the annotations.
void generateAnnotations(core_t::TTime bucketStartTime,
core_t::TTime bucketEndTime,
TAnnotationVec& annotations) const;

//! Generate ForecastPrerequistes, e.g. memory requirements
CForecastDataSink::SForecastModelPrerequisites getForecastPrerequisites() const;

Expand Down
6 changes: 6 additions & 0 deletions include/model/CAnomalyDetectorModel.h
Expand Up @@ -15,6 +15,7 @@
#include <maths/CTimeSeriesModel.h>
#include <maths/MathsTypes.h>

#include <model/CAnnotation.h>
#include <model/CMemoryUsageEstimator.h>
#include <model/CModelParams.h>
#include <model/CPartitioningFields.h>
Expand Down Expand Up @@ -50,6 +51,7 @@ class CAttributeFrequencyGreaterThan;
class CInterimBucketCorrector;
class CDataGatherer;
class CHierarchicalResults;
class CAnnotation;
class CModelDetailsView;
class CPersonFrequencyGreaterThan;
class CResourceMonitor;
Expand Down Expand Up @@ -164,6 +166,7 @@ class MODEL_EXPORT CAnomalyDetectorModel {
using TDataGathererPtr = std::shared_ptr<CDataGatherer>;
using TModelDetailsViewUPtr = std::unique_ptr<CModelDetailsView>;
using TModelPtr = std::unique_ptr<CAnomalyDetectorModel>;
using TAnnotationVec = std::vector<CAnnotation>;

public:
//! A value used to indicate a time variable is unset
Expand Down Expand Up @@ -483,6 +486,9 @@ class MODEL_EXPORT CAnomalyDetectorModel {
//! Get the descriptions of any occurring scheduled event descriptions for the bucket time
virtual const TStr1Vec& scheduledEventDescriptions(core_t::TTime time) const;

//! Get the annotations produced by this model.
virtual const TAnnotationVec& annotations() const = 0;

protected:
using TStrCRef = std::reference_wrapper<const std::string>;
using TSizeSize1VecUMap = boost::unordered_map<std::size_t, TSize1Vec>;
Expand Down
6 changes: 6 additions & 0 deletions include/model/CCountingModel.h
Expand Up @@ -219,6 +219,9 @@ class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
//! Get the descriptions of any occurring scheduled event descriptions for the bucket time
const TStr1Vec& scheduledEventDescriptions(core_t::TTime time) const override;

//! Get the annotations produced by this model.
const TAnnotationVec& annotations() const override;

protected:
//! Get the start time of the current bucket.
core_t::TTime currentBucketStartTime() const override;
Expand Down Expand Up @@ -293,6 +296,9 @@ class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
//! Calculates corrections for interim buckets.
TInterimBucketCorrectorPtr m_InterimBucketCorrector;

//! Annotations produced by this model.
TAnnotationVec m_Annotations;

friend struct CCountingModelTest::testCheckScheduledEvents;
};
}
Expand Down
5 changes: 5 additions & 0 deletions include/model/CEventRateModel.h
Expand Up @@ -77,6 +77,8 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
//! The key is <feature, pid, pid> for non-correlated corrections
//! or <feature, pid, correlated_pid> for correlated corrections
mutable TFeatureSizeSizeTripleDouble1VecUMap s_InterimCorrections;
//! Annotations produced by this model.
TAnnotationVec s_Annotations;
};

public:
Expand Down Expand Up @@ -270,6 +272,9 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
const TFeatureData*
featureData(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const;

//! Get the annotations produced by this model.
const TAnnotationVec& annotations() const override;

private:
//! Get the start time of the current bucket.
core_t::TTime currentBucketStartTime() const override;
Expand Down
5 changes: 5 additions & 0 deletions include/model/CEventRatePopulationModel.h
Expand Up @@ -98,6 +98,8 @@ class MODEL_EXPORT CEventRatePopulationModel : public CPopulationModel {
TFeatureSizeSizePrFeatureDataPrVecMap s_FeatureData;
//! A cache of the corrections applied to interim results.
mutable TCorrectionKeyDouble1VecUMap s_InterimCorrections;
//! Annotations produced by this model.
TAnnotationVec s_Annotations;
};

//! Lift the overloads of currentBucketValue into the class scope.
Expand Down Expand Up @@ -308,6 +310,9 @@ class MODEL_EXPORT CEventRatePopulationModel : public CPopulationModel {
const TSizeSizePrFeatureDataPrVec& featureData(model_t::EFeature feature,
core_t::TTime time) const;

//! Get the annotations produced by this model.
const TAnnotationVec& annotations() const override;

private:
//! Initialize the feature models.
void initialize(const TFeatureMathsModelSPtrPrVec& newFeatureModels,
Expand Down

0 comments on commit ef1dcff

Please sign in to comment.