Skip to content

Commit

Permalink
Add matrix stats aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
olivere committed Apr 14, 2017
1 parent b5d4d0d commit 1da5bfd
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 2 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details.
- [x] Bucket Script
- [x] Bucket Selector
- [x] Serial Differencing
- [ ] Matrix Aggregations
- [ ] Matrix Stats
- [x] Matrix Aggregations
- [x] Matrix Stats
- [x] Aggregation Metadata

### Indices APIs
Expand Down
55 changes: 55 additions & 0 deletions search_aggs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ func (a Aggregations) ExtendedStats(name string) (*AggregationExtendedStatsMetri
return nil, false
}

// MatrixStats returns matrix stats aggregation results.
// https://www.elastic.co/guide/en/elasticsearch/reference/5.2/search-aggregations-matrix-stats-aggregation.html
func (a Aggregations) MatrixStats(name string) (*AggregationMatrixStats, bool) {
if raw, found := a[name]; found {
agg := new(AggregationMatrixStats)
if raw == nil {
return agg, true
}
if err := json.Unmarshal(*raw, agg); err == nil {
return agg, true
}
}
return nil, false
}

// Percentiles returns percentiles results.
// See: https://www.elastic.co/guide/en/elasticsearch/reference/5.2/search-aggregations-metrics-percentile-aggregation.html
func (a Aggregations) Percentiles(name string) (*AggregationPercentilesMetric, bool) {
Expand Down Expand Up @@ -746,6 +761,46 @@ func (a *AggregationExtendedStatsMetric) UnmarshalJSON(data []byte) error {
return nil
}

// -- Matrix Stats --

// AggregationMatrixStats is returned by a MatrixStats aggregation.
type AggregationMatrixStats struct {
Aggregations

Fields []*AggregationMatrixStatsField // `json:"field,omitempty"`
Meta map[string]interface{} // `json:"meta,omitempty"`
}

// AggregationMatrixStatsField represents running stats of a single field
// returned from MatrixStats aggregation.
type AggregationMatrixStatsField struct {
Name string `json:"name"`
Count int64 `json:"count"`
Mean float64 `json:"mean,omitempty"`
Variance float64 `json:"variance,omitempty"`
Skewness float64 `json:"skewness,omitempty"`
Kurtosis float64 `json:"kurtosis,omitempty"`
Covariance map[string]float64 `json:"covariance,omitempty"`
Correlation map[string]float64 `json:"correlation,omitempty"`
}

// UnmarshalJSON decodes JSON data and initializes an AggregationMatrixStats structure.
func (a *AggregationMatrixStats) UnmarshalJSON(data []byte) error {
var aggs map[string]*json.RawMessage
if err := json.Unmarshal(data, &aggs); err != nil {
return err
}
if v, ok := aggs["fields"]; ok && v != nil {
// RunningStats for every field
json.Unmarshal(*v, &a.Fields)
}
if v, ok := aggs["meta"]; ok && v != nil {
json.Unmarshal(*v, &a.Meta)
}
a.Aggregations = aggs
return nil
}

// -- Percentiles metric --

// AggregationPercentilesMetric is a multi-value metric, returned by a Percentiles aggregation.
Expand Down
120 changes: 120 additions & 0 deletions search_aggs_matrix_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

// MatrixMatrixStatsAggregation ...
// See https://www.elastic.co/guide/en/elasticsearch/reference/5.3/search-aggregations-metrics-stats-aggregation.html
// for details.
type MatrixStatsAggregation struct {
fields []string
missing interface{}
format string
valueType interface{}
mode string
subAggregations map[string]Aggregation
meta map[string]interface{}
}

// NewMatrixStatsAggregation initializes a new MatrixStatsAggregation.
func NewMatrixStatsAggregation() *MatrixStatsAggregation {
return &MatrixStatsAggregation{
subAggregations: make(map[string]Aggregation),
}
}

func (a *MatrixStatsAggregation) Fields(fields ...string) *MatrixStatsAggregation {
a.fields = append(a.fields, fields...)
return a
}

// Missing configures the value to use when documents miss a value.
func (a *MatrixStatsAggregation) Missing(missing interface{}) *MatrixStatsAggregation {
a.missing = missing
return a
}

// Mode specifies how to operate. Valid values are: sum, avg, median, min, or max.
func (a *MatrixStatsAggregation) Mode(mode string) *MatrixStatsAggregation {
a.mode = mode
return a
}

func (a *MatrixStatsAggregation) Format(format string) *MatrixStatsAggregation {
a.format = format
return a
}

func (a *MatrixStatsAggregation) ValueType(valueType interface{}) *MatrixStatsAggregation {
a.valueType = valueType
return a
}

func (a *MatrixStatsAggregation) SubAggregation(name string, subAggregation Aggregation) *MatrixStatsAggregation {
a.subAggregations[name] = subAggregation
return a
}

// Meta sets the meta data to be included in the aggregation response.
func (a *MatrixStatsAggregation) Meta(metaData map[string]interface{}) *MatrixStatsAggregation {
a.meta = metaData
return a
}

// Source returns the JSON to serialize into the request, or an error.
func (a *MatrixStatsAggregation) Source() (interface{}, error) {
// Example:
// {
// "aggs" : {
// "matrixstats" : {
// "matrix_stats" : {
// "fields" : ["poverty", "income"],
// "missing": {"income": 50000},
// "mode": "avg",
// ...
// }
// }
// }
// }
// This method returns only the { "matrix_stats" : { ... } } part.

source := make(map[string]interface{})
opts := make(map[string]interface{})
source["matrix_stats"] = opts

// MatrixStatsAggregationBuilder
opts["fields"] = a.fields
if a.missing != nil {
opts["missing"] = a.missing
}
if a.format != "" {
opts["format"] = a.format
}
if a.valueType != nil {
opts["value_type"] = a.valueType
}
if a.mode != "" {
opts["mode"] = a.mode
}

// AggregationBuilder (SubAggregations)
if len(a.subAggregations) > 0 {
aggsMap := make(map[string]interface{})
source["aggregations"] = aggsMap
for name, aggregate := range a.subAggregations {
src, err := aggregate.Source()
if err != nil {
return nil, err
}
aggsMap[name] = src
}
}

// Add Meta data if available
if len(a.meta) > 0 {
source["meta"] = a.meta
}

return source, nil
}
53 changes: 53 additions & 0 deletions search_aggs_matrix_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import (
"encoding/json"
"testing"
)

func TestMatrixStatsAggregation(t *testing.T) {
agg := NewMatrixStatsAggregation().
Fields("poverty", "income").
Missing(map[string]interface{}{
"income": 50000,
}).
Mode("avg").
Format("0000.0").
ValueType("double")
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"matrix_stats":{"fields":["poverty","income"],"format":"0000.0","missing":{"income":50000},"mode":"avg","value_type":"double"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestMatrixStatsAggregationWithMetaData(t *testing.T) {
agg := NewMatrixStatsAggregation().
Fields("poverty", "income").
Meta(map[string]interface{}{"name": "Oliver"})
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"matrix_stats":{"fields":["poverty","income"]},"meta":{"name":"Oliver"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}
99 changes: 99 additions & 0 deletions search_aggs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,105 @@ func TestAggsMetricsExtendedStats(t *testing.T) {
}
}

func TestAggsMatrixStats(t *testing.T) {
s := `{
"matrixstats": {
"fields": [{
"name": "income",
"count": 50,
"mean": 51985.1,
"variance": 7.383377037755103E7,
"skewness": 0.5595114003506483,
"kurtosis": 2.5692365287787124,
"covariance": {
"income": 7.383377037755103E7,
"poverty": -21093.65836734694
},
"correlation": {
"income": 1.0,
"poverty": -0.8352655256272504
}
}, {
"name": "poverty",
"count": 51,
"mean": 12.732000000000001,
"variance": 8.637730612244896,
"skewness": 0.4516049811903419,
"kurtosis": 2.8615929677997767,
"covariance": {
"income": -21093.65836734694,
"poverty": 8.637730612244896
},
"correlation": {
"income": -0.8352655256272504,
"poverty": 1.0
}
}]
}
}`

aggs := new(Aggregations)
err := json.Unmarshal([]byte(s), &aggs)
if err != nil {
t.Fatalf("expected no error decoding; got: %v", err)
}

agg, found := aggs.MatrixStats("matrixstats")
if !found {
t.Fatalf("expected aggregation to be found; got: %v", found)
}
if agg == nil {
t.Fatalf("expected aggregation != nil; got: %v", agg)
}
if want, got := 2, len(agg.Fields); want != got {
t.Fatalf("expected aggregaton len(Fields) = %v; got: %v", want, got)
}
field := agg.Fields[0]
if want, got := "income", field.Name; want != got {
t.Fatalf("expected aggregation field name == %q; got: %q", want, got)
}
if want, got := int64(50), field.Count; want != got {
t.Fatalf("expected aggregation field count == %v; got: %v", want, got)
}
if want, got := 51985.1, field.Mean; want != got {
t.Fatalf("expected aggregation field mean == %v; got: %v", want, got)
}
if want, got := 7.383377037755103e7, field.Variance; want != got {
t.Fatalf("expected aggregation field variance == %v; got: %v", want, got)
}
if want, got := 0.5595114003506483, field.Skewness; want != got {
t.Fatalf("expected aggregation field skewness == %v; got: %v", want, got)
}
if want, got := 2.5692365287787124, field.Kurtosis; want != got {
t.Fatalf("expected aggregation field kurtosis == %v; got: %v", want, got)
}
if field.Covariance == nil {
t.Fatalf("expected aggregation field covariance != nil; got: %v", nil)
}
if want, got := 7.383377037755103e7, field.Covariance["income"]; want != got {
t.Fatalf("expected aggregation field covariance == %v; got: %v", want, got)
}
if want, got := -21093.65836734694, field.Covariance["poverty"]; want != got {
t.Fatalf("expected aggregation field covariance == %v; got: %v", want, got)
}
if field.Correlation == nil {
t.Fatalf("expected aggregation field correlation != nil; got: %v", nil)
}
if want, got := 1.0, field.Correlation["income"]; want != got {
t.Fatalf("expected aggregation field correlation == %v; got: %v", want, got)
}
if want, got := -0.8352655256272504, field.Correlation["poverty"]; want != got {
t.Fatalf("expected aggregation field correlation == %v; got: %v", want, got)
}
field = agg.Fields[1]
if want, got := "poverty", field.Name; want != got {
t.Fatalf("expected aggregation field name == %q; got: %q", want, got)
}
if want, got := int64(51), field.Count; want != got {
t.Fatalf("expected aggregation field count == %v; got: %v", want, got)
}
}

func TestAggsMetricsPercentiles(t *testing.T) {
s := `{
"load_time_outlier": {
Expand Down

0 comments on commit 1da5bfd

Please sign in to comment.