From e5ab82941058e4443073f10190c62c393179d922 Mon Sep 17 00:00:00 2001 From: xichen2020 Date: Fri, 8 Feb 2019 20:25:06 -0500 Subject: [PATCH] Implement MarshalJSON for query results (#114) --- calculation/calculation_op.go | 30 +- calculation/result.go | 87 ++--- calculation/value.go | 40 ++- document/field/value.go | 23 ++ integration/client.go | 15 +- integration/query_result.go | 5 + ...tion_test.go => raw_query_orderby_test.go} | 4 +- query/grouped_result.go | 16 +- query/multi_key_result_groups.go | 61 +++- query/multi_key_result_groups_test.go | 115 +++++++ query/raw_result.go | 38 ++- query/raw_result_test.go | 82 +++++ query/result_group.go | 69 ++-- query/single_key_result_groups.go | 298 ++++++++++++------ query/single_key_result_groups_test.go | 114 +++++++ query/time_bucket_result.go | 32 +- query/time_bucket_result_test.go | 21 ++ server/http/handlers/service.go | 2 +- 18 files changed, 842 insertions(+), 210 deletions(-) create mode 100644 integration/query_result.go rename integration/{integration_test.go => raw_query_orderby_test.go} (91%) create mode 100644 query/multi_key_result_groups_test.go create mode 100644 query/single_key_result_groups_test.go create mode 100644 query/time_bucket_result_test.go diff --git a/calculation/calculation_op.go b/calculation/calculation_op.go index 720d86b..67e7bf3 100644 --- a/calculation/calculation_op.go +++ b/calculation/calculation_op.go @@ -132,32 +132,32 @@ var ( // For operators that do not require fields. newResultFnsByOpsNoType = map[Op]newResultFn{ - Count: newCountResult, + Count: NewCountResult, } // For operators that require fields. newResultFnsByOpsAndType = map[Op]map[field.ValueType]newResultFn{ Sum: map[field.ValueType]newResultFn{ - field.IntType: newSumResult, - field.DoubleType: newSumResult, - field.TimeType: newSumResult, + field.IntType: NewSumResult, + field.DoubleType: NewSumResult, + field.TimeType: NewSumResult, }, Avg: map[field.ValueType]newResultFn{ - field.IntType: newAvgResult, - field.DoubleType: newAvgResult, - field.TimeType: newAvgResult, + field.IntType: NewAvgResult, + field.DoubleType: NewAvgResult, + field.TimeType: NewAvgResult, }, Min: map[field.ValueType]newResultFn{ - field.IntType: newMinNumberResult, - field.DoubleType: newMinNumberResult, - field.StringType: newMinStringResult, - field.TimeType: newMinNumberResult, + field.IntType: NewMinNumberResult, + field.DoubleType: NewMinNumberResult, + field.StringType: NewMinStringResult, + field.TimeType: NewMinNumberResult, }, Max: map[field.ValueType]newResultFn{ - field.IntType: newMaxNumberResult, - field.DoubleType: newMaxNumberResult, - field.StringType: newMaxStringResult, - field.TimeType: newMaxNumberResult, + field.IntType: NewMaxNumberResult, + field.DoubleType: NewMaxNumberResult, + field.StringType: NewMaxStringResult, + field.TimeType: NewMaxNumberResult, }, } diff --git a/calculation/result.go b/calculation/result.go index ed01b08..e33d174 100644 --- a/calculation/result.go +++ b/calculation/result.go @@ -1,6 +1,7 @@ package calculation import ( + "encoding/json" "errors" "fmt" "math" @@ -25,6 +26,9 @@ type Result interface { // Value returns the result value. Value() ValueUnion + + // MarshalJSON marshals the result as a JSON object. + MarshalJSON() ([]byte, error) } var ( @@ -43,11 +47,12 @@ type countResult struct { v int } -func newCountResult() Result { +// NewCountResult creates a new count result. +func NewCountResult() Result { return &countResult{} } -func (r *countResult) New() Result { return newCountResult() } +func (r *countResult) New() Result { return NewCountResult() } func (r *countResult) Add(ValueUnion) { r.v++ } @@ -60,17 +65,18 @@ func (r *countResult) MergeInPlace(other Result) error { return nil } -func (r *countResult) Value() ValueUnion { return newNumberUnion(float64(r.v)) } +func (r *countResult) Value() ValueUnion { return NewNumberUnion(float64(r.v)) } + +func (r *countResult) MarshalJSON() ([]byte, error) { return json.Marshal(r.Value()) } type sumResult struct { v float64 } -func newSumResult() Result { - return &sumResult{} -} +// NewSumResult creates a new sum result. +func NewSumResult() Result { return &sumResult{} } -func (r *sumResult) New() Result { return newSumResult() } +func (r *sumResult) New() Result { return NewSumResult() } func (r *sumResult) Add(v ValueUnion) { r.v += v.NumberVal } @@ -83,18 +89,19 @@ func (r *sumResult) MergeInPlace(other Result) error { return nil } -func (r *sumResult) Value() ValueUnion { return newNumberUnion(r.v) } +func (r *sumResult) Value() ValueUnion { return NewNumberUnion(r.v) } + +func (r *sumResult) MarshalJSON() ([]byte, error) { return json.Marshal(r.Value()) } type avgResult struct { s float64 c int } -func newAvgResult() Result { - return &avgResult{} -} +// NewAvgResult creates a new average result. +func NewAvgResult() Result { return &avgResult{} } -func (r *avgResult) New() Result { return newAvgResult() } +func (r *avgResult) New() Result { return NewAvgResult() } func (r *avgResult) Add(v ValueUnion) { r.s += v.NumberVal @@ -111,18 +118,19 @@ func (r *avgResult) MergeInPlace(other Result) error { return nil } -func (r *avgResult) Value() ValueUnion { return newNumberUnion(r.s / float64(r.c)) } +func (r *avgResult) Value() ValueUnion { return NewNumberUnion(r.s / float64(r.c)) } + +func (r *avgResult) MarshalJSON() ([]byte, error) { return json.Marshal(r.Value()) } type minNumberResult struct { hasValues bool v float64 } -func newMinNumberResult() Result { - return &minNumberResult{} -} +// NewMinNumberResult creates a new minimum number result. +func NewMinNumberResult() Result { return &minNumberResult{} } -func (r *minNumberResult) New() Result { return newMinNumberResult() } +func (r *minNumberResult) New() Result { return NewMinNumberResult() } func (r *minNumberResult) Add(v ValueUnion) { if !r.hasValues { @@ -155,21 +163,22 @@ func (r *minNumberResult) MergeInPlace(other Result) error { func (r *minNumberResult) Value() ValueUnion { if !r.hasValues { - return newNumberUnion(nan) + return NewNumberUnion(nan) } - return newNumberUnion(r.v) + return NewNumberUnion(r.v) } +func (r *minNumberResult) MarshalJSON() ([]byte, error) { return json.Marshal(r.Value()) } + type minStringResult struct { hasValues bool v string } -func newMinStringResult() Result { - return &minStringResult{} -} +// NewMinStringResult creates a new minimum string result. +func NewMinStringResult() Result { return &minStringResult{} } -func (r *minStringResult) New() Result { return newMinStringResult() } +func (r *minStringResult) New() Result { return NewMinStringResult() } func (r *minStringResult) Add(v ValueUnion) { if !r.hasValues { @@ -202,21 +211,22 @@ func (r *minStringResult) MergeInPlace(other Result) error { func (r *minStringResult) Value() ValueUnion { if !r.hasValues { - return newStringUnion(emptyString) + return NewStringUnion(emptyString) } - return newStringUnion(r.v) + return NewStringUnion(r.v) } +func (r *minStringResult) MarshalJSON() ([]byte, error) { return json.Marshal(r.Value()) } + type maxNumberResult struct { hasValues bool v float64 } -func newMaxNumberResult() Result { - return &maxNumberResult{} -} +// NewMaxNumberResult creates a new maximum number result. +func NewMaxNumberResult() Result { return &maxNumberResult{} } -func (r *maxNumberResult) New() Result { return newMaxNumberResult() } +func (r *maxNumberResult) New() Result { return NewMaxNumberResult() } func (r *maxNumberResult) Add(v ValueUnion) { if !r.hasValues { @@ -249,21 +259,22 @@ func (r *maxNumberResult) MergeInPlace(other Result) error { func (r *maxNumberResult) Value() ValueUnion { if !r.hasValues { - return newNumberUnion(nan) + return NewNumberUnion(nan) } - return newNumberUnion(r.v) + return NewNumberUnion(r.v) } +func (r *maxNumberResult) MarshalJSON() ([]byte, error) { return json.Marshal(r.Value()) } + type maxStringResult struct { hasValues bool v string } -func newMaxStringResult() Result { - return &maxStringResult{} -} +// NewMaxStringResult creates a new maximum string result. +func NewMaxStringResult() Result { return &maxStringResult{} } -func (r *maxStringResult) New() Result { return newMaxStringResult() } +func (r *maxStringResult) New() Result { return NewMaxStringResult() } func (r *maxStringResult) Add(v ValueUnion) { if !r.hasValues { @@ -296,11 +307,13 @@ func (r *maxStringResult) MergeInPlace(other Result) error { func (r *maxStringResult) Value() ValueUnion { if !r.hasValues { - return newStringUnion(emptyString) + return NewStringUnion(emptyString) } - return newStringUnion(r.v) + return NewStringUnion(r.v) } +func (r *maxStringResult) MarshalJSON() ([]byte, error) { return json.Marshal(r.Value()) } + // ResultArray is an array of calculation result. type ResultArray []Result diff --git a/calculation/value.go b/calculation/value.go index 5abb81a..3e43498 100644 --- a/calculation/value.go +++ b/calculation/value.go @@ -1,7 +1,9 @@ package calculation import ( + "encoding/json" "fmt" + "math" "github.com/xichen2020/eventdb/document/field" "github.com/xichen2020/eventdb/x/compare" @@ -16,6 +18,10 @@ const ( StringType ) +var ( + nullBytes = []byte("null") +) + // ValueUnion is a value union. type ValueUnion struct { Type ValueType @@ -23,11 +29,29 @@ type ValueUnion struct { StringVal string } -func newNumberUnion(v float64) ValueUnion { +// MarshalJSON marshals value as a JSON object. +func (u ValueUnion) MarshalJSON() ([]byte, error) { + switch u.Type { + case NumberType: + // NaN cannot be marshalled as JSON, so marshal it as null. + if math.IsNaN(u.NumberVal) { + return nullBytes, nil + } + return json.Marshal(u.NumberVal) + case StringType: + return json.Marshal(u.StringVal) + default: + return nil, fmt.Errorf("unexpected value type %v", u.Type) + } +} + +// NewNumberUnion creates a new number union. +func NewNumberUnion(v float64) ValueUnion { return ValueUnion{Type: NumberType, NumberVal: v} } -func newStringUnion(v string) ValueUnion { +// NewStringUnion creates a new string union. +func NewStringUnion(v string) ValueUnion { return ValueUnion{Type: StringType, StringVal: v} } @@ -100,7 +124,7 @@ func AsValueFns(fieldTypes field.OptionalTypeArray) ([]FieldValueToValueFn, erro } func nullToValue(v *field.ValueUnion) ValueUnion { - return newNumberUnion(0) + return NewNumberUnion(0) } func boolToValue(v *field.ValueUnion) ValueUnion { @@ -108,23 +132,23 @@ func boolToValue(v *field.ValueUnion) ValueUnion { if v.BoolVal { val = 1 } - return newNumberUnion(val) + return NewNumberUnion(val) } func intToValue(v *field.ValueUnion) ValueUnion { - return newNumberUnion(float64(v.IntVal)) + return NewNumberUnion(float64(v.IntVal)) } func doubleToValue(v *field.ValueUnion) ValueUnion { - return newNumberUnion(v.DoubleVal) + return NewNumberUnion(v.DoubleVal) } func stringToValue(v *field.ValueUnion) ValueUnion { - return newStringUnion(v.StringVal) + return NewStringUnion(v.StringVal) } func timeToValue(v *field.ValueUnion) ValueUnion { - return newNumberUnion(float64(v.TimeNanosVal)) + return NewNumberUnion(float64(v.TimeNanosVal)) } var ( diff --git a/document/field/value.go b/document/field/value.go index 639dbe4..d1e1375 100644 --- a/document/field/value.go +++ b/document/field/value.go @@ -1,6 +1,7 @@ package field import ( + "encoding/json" "fmt" "math" @@ -27,6 +28,8 @@ const ( var ( // NumValidFieldTypes returns the number of valid field types. NumValidFieldTypes = len(validTypes) + + nullBytes = []byte("null") ) // IsValid returns true if this is a valid value type. @@ -197,6 +200,26 @@ func NewTimeUnion(v int64) ValueUnion { } } +// MarshalJSON marshals value as a JSON object. +func (v ValueUnion) MarshalJSON() ([]byte, error) { + switch v.Type { + case NullType: + return nullBytes, nil + case BoolType: + return json.Marshal(v.BoolVal) + case IntType: + return json.Marshal(v.IntVal) + case DoubleType: + return json.Marshal(v.DoubleVal) + case StringType: + return json.Marshal(v.StringVal) + case TimeType: + return json.Marshal(v.TimeNanosVal) + default: + return nil, fmt.Errorf("unknown value type: %v", v.Type) + } +} + // Equal returns true if two value unions are considered equal. func (v *ValueUnion) Equal(other *ValueUnion) bool { if v == nil && other == nil { diff --git a/integration/client.go b/integration/client.go index 7e21f0b..ffc3cbb 100644 --- a/integration/client.go +++ b/integration/client.go @@ -8,7 +8,6 @@ import ( "net/http" "strings" - "github.com/xichen2020/eventdb/query" "github.com/xichen2020/eventdb/server/http/handlers" ) @@ -53,21 +52,21 @@ func (c client) write(data []byte) error { return err } -func (c client) query(data []byte) ([]query.RawResult, error) { +func (c client) queryRaw(data []byte) (rawQueryResults, error) { req, err := http.NewRequest(http.MethodPost, c.queryURL, bytes.NewReader(data)) if err != nil { - return nil, err + return rawQueryResults{}, err } resp, err := c.do(req) if err != nil { - return nil, err + return rawQueryResults{}, err } - var result []query.RawResult - err = json.Unmarshal(resp, &result) + var results rawQueryResults + err = json.Unmarshal(resp, &results) if err != nil { - return nil, fmt.Errorf("unable to unmarshal response: %v", err) + return rawQueryResults{}, fmt.Errorf("unable to unmarshal response: %v", err) } - return result, nil + return results, nil } func (c client) do(req *http.Request) ([]byte, error) { diff --git a/integration/query_result.go b/integration/query_result.go new file mode 100644 index 0000000..328afe4 --- /dev/null +++ b/integration/query_result.go @@ -0,0 +1,5 @@ +package integration + +type rawQueryResults struct { + Raw []string `json:"raw"` +} diff --git a/integration/integration_test.go b/integration/raw_query_orderby_test.go similarity index 91% rename from integration/integration_test.go rename to integration/raw_query_orderby_test.go index 937ad60..0dde3d5 100644 --- a/integration/integration_test.go +++ b/integration/raw_query_orderby_test.go @@ -57,9 +57,9 @@ func TestRawQueryOrderBy(t *testing.T) { require.NoError(t, client.write([]byte(strings.TrimSpace(testData1)))) for _, test := range tests { - resp, err := client.query([]byte(test.queryJSON)) + resp, err := client.queryRaw([]byte(test.queryJSON)) assert.NoError(t, err) // TODO(wjang): Allow actually comparing results. - assert.Len(t, resp, test.expectedResults) + assert.Len(t, resp.Raw, test.expectedResults) } } diff --git a/query/grouped_result.go b/query/grouped_result.go index e1e4b93..95c3bd5 100644 --- a/query/grouped_result.go +++ b/query/grouped_result.go @@ -36,7 +36,6 @@ const ( ) // GroupedResults is a collection of result groups. -// TODO(xichen): Add JSON marshaling / unmarshaling. type GroupedResults struct { // GroupBy contains a list of field paths to group results by. GroupBy [][]string @@ -173,6 +172,21 @@ func (r *GroupedResults) TrimIfNeeded() { r.trim() } +// MarshalJSON marshals the grouped results as a JSON object. +func (r *GroupedResults) MarshalJSON() ([]byte, error) { + if r.IsEmpty() { + return nil, nil + } + var ( + limit = r.Limit + topNRequired = r.IsOrdered() + ) + if r.HasSingleKey() { + return r.SingleKeyGroups.MarshalJSON(limit, topNRequired) + } + return r.MultiKeyGroups.MarshalJSON(limit, topNRequired) +} + // Only trim the results if this is an ordered query. For unordered query, the group limit // is the same as the result limit, and the number of groups will never exceed the group limit, // and as such no trimming is ever required. diff --git a/query/multi_key_result_groups.go b/query/multi_key_result_groups.go index 2ca6317..5e8a274 100644 --- a/query/multi_key_result_groups.go +++ b/query/multi_key_result_groups.go @@ -1,6 +1,8 @@ package query import ( + "encoding/json" + "github.com/xichen2020/eventdb/calculation" "github.com/xichen2020/eventdb/document/field" ) @@ -105,22 +107,57 @@ func (m *MultiKeyResultGroups) Clear() { m.topNGroups = nil } -// trimToTopN trims the number of result groups to the target size. -// Precondition: `m.groupReverseLessThanFn` is not nil. -func (m *MultiKeyResultGroups) trimToTopN(targetSize int) { - if m.Len() <= targetSize { - return +type multiKeyResultGroupsJSON struct { + Groups []multiKeyResultGroup `json:"groups"` +} + +// MarshalJSON marshals `numGroups` result groups as a JSON object. +// If `topNRequired` is true, top N groups are selected based on `m.groupReverseLessThanFn`. +// Otherwise, an arbitrary set of groups is selected. +func (m *MultiKeyResultGroups) MarshalJSON( + numGroups int, + topNRequired bool, +) ([]byte, error) { + if numGroups <= 0 { + return nil, nil } + var res []multiKeyResultGroup + if topNRequired { + m.computeTopN(numGroups) + res = m.topNGroups.SortInPlace() + } else { + res = make([]multiKeyResultGroup, 0, numGroups) + iter := m.results.Iter() + for _, entry := range iter { + group := multiKeyResultGroup{Key: entry.Key(), Values: entry.Value()} + res = append(res, group) + } + } + gj := multiKeyResultGroupsJSON{Groups: res} + return json.Marshal(gj) +} - // Find the top N groups. +// computeTopN computes the top N groups and store them in `topNGroups`. +func (m *MultiKeyResultGroups) computeTopN(targetSize int) { if m.topNGroups == nil || m.topNGroups.Cap() < targetSize { m.topNGroups = newTopNMultiKeyResultGroup(targetSize, m.groupReverseLessThanFn) } iter := m.results.Iter() for _, entry := range iter { - group := multiKeyResultGroup{key: entry.Key(), value: entry.Value()} + group := multiKeyResultGroup{Key: entry.Key(), Values: entry.Value()} m.topNGroups.Add(group, multiKeyResultGroupAddOptions{}) } +} + +// trimToTopN trims the number of result groups to the target size. +// Precondition: `m.groupReverseLessThanFn` is not nil. +func (m *MultiKeyResultGroups) trimToTopN(targetSize int) { + if m.Len() <= targetSize { + return + } + + // Find the top N groups. + m.computeTopN(targetSize) // Allocate a new map and insert the top n groups into the map. m.results = NewValuesResultArrayMap(targetSize) @@ -130,7 +167,7 @@ func (m *MultiKeyResultGroups) trimToTopN(targetSize int) { } data := m.topNGroups.RawData() for i := 0; i < len(data); i++ { - m.results.SetUnsafe(data[i].key, data[i].value, setOpts) + m.results.SetUnsafe(data[i].Key, data[i].Values, setOpts) data[i] = emptyMultiKeyResultGroup } m.topNGroups.Reset() @@ -138,8 +175,8 @@ func (m *MultiKeyResultGroups) trimToTopN(targetSize int) { // multiKeyResultGroup is a multi-key result group. type multiKeyResultGroup struct { - key field.Values - value calculation.ResultArray + Key field.Values `json:"key"` + Values calculation.ResultArray `json:"values"` } var emptyMultiKeyResultGroup multiKeyResultGroup @@ -174,9 +211,9 @@ func newMultiKeyResultGroupReverseLessThanFn(orderBy []OrderBy) (multiKeyResultG for i, ob := range orderBy { var res int if ob.FieldType == GroupByField { - res = compareFieldValueFns[i](g1.key[ob.FieldIndex], g2.key[ob.FieldIndex]) + res = compareFieldValueFns[i](g1.Key[ob.FieldIndex], g2.Key[ob.FieldIndex]) } else { - res = compareCalcValueFns[i](g1.value[ob.FieldIndex].Value(), g2.value[ob.FieldIndex].Value()) + res = compareCalcValueFns[i](g1.Values[ob.FieldIndex].Value(), g2.Values[ob.FieldIndex].Value()) } if res > 0 { return true diff --git a/query/multi_key_result_groups_test.go b/query/multi_key_result_groups_test.go new file mode 100644 index 0000000..7c3e86e --- /dev/null +++ b/query/multi_key_result_groups_test.go @@ -0,0 +1,115 @@ +package query + +import ( + "testing" + + "github.com/xichen2020/eventdb/calculation" + "github.com/xichen2020/eventdb/document/field" + + "github.com/stretchr/testify/require" +) + +func TestUnorderedMultiKeyResultGroupsMarshalJSON(t *testing.T) { + results := calculation.ResultArray{ + calculation.NewCountResult(), + calculation.NewAvgResult(), + calculation.NewMaxStringResult(), + } + groups, err := NewMultiKeyResultGroups(results, nil, 10, 10) + require.NoError(t, err) + + // Add some data. + keys1 := []field.ValueUnion{ + field.NewIntUnion(12), + field.NewStringUnion("aa"), + } + res, status := groups.GetOrInsert(keys1) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(32)) + res[2].Add(calculation.NewStringUnion("foo")) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(16)) + res[2].Add(calculation.NewStringUnion("bar")) + + keys2 := []field.ValueUnion{ + field.NewIntUnion(30), + field.NewStringUnion("cc"), + } + res, status = groups.GetOrInsert(keys2) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("baz")) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("must")) + + b, err := groups.MarshalJSON(5, false) + require.NoError(t, err) + expectedVals := map[string]struct{}{ + `{"groups":[{"key":[12,"aa"],"values":[2,24,"foo"]},{"key":[30,"cc"],"values":[2,null,"must"]}]}`: struct{}{}, + `{"groups":[{"key":[30,"cc"],"values":[2,null,"must"]},{"key":[12,"aa"],"values":[2,24,"foo"]}]}`: struct{}{}, + } + _, exists := expectedVals[string(b)] + require.True(t, exists) +} + +func TestOrderedMultiKeyResultGroupsMarshalJSON(t *testing.T) { + results := calculation.ResultArray{ + calculation.NewCountResult(), + calculation.NewAvgResult(), + calculation.NewMaxStringResult(), + } + orderBys := []OrderBy{ + { + FieldType: GroupByField, + FieldIndex: 1, + SortOrder: Descending, + }, + { + FieldType: CalculationField, + FieldIndex: 2, + SortOrder: Ascending, + }, + } + groups, err := NewMultiKeyResultGroups(results, orderBys, 10, 10) + require.NoError(t, err) + + // Add some data. + keys1 := []field.ValueUnion{ + field.NewIntUnion(12), + field.NewStringUnion("aa"), + } + res, status := groups.GetOrInsert(keys1) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(32)) + res[2].Add(calculation.NewStringUnion("foo")) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(16)) + res[2].Add(calculation.NewStringUnion("bar")) + + keys2 := []field.ValueUnion{ + field.NewIntUnion(30), + field.NewStringUnion("cc"), + } + res, status = groups.GetOrInsert(keys2) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("baz")) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("must")) + + keys3 := []field.ValueUnion{ + field.NewIntUnion(12), + field.NewStringUnion("bb"), + } + res, status = groups.GetOrInsert(keys3) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("cat")) + + b, err := groups.MarshalJSON(5, true) + require.NoError(t, err) + expected := `{"groups":[{"key":[30,"cc"],"values":[2,null,"must"]},{"key":[12,"bb"],"values":[1,null,"cat"]},{"key":[12,"aa"],"values":[2,24,"foo"]}]}` + require.Equal(t, expected, string(b)) +} diff --git a/query/raw_result.go b/query/raw_result.go index 4eafe66..723aefc 100644 --- a/query/raw_result.go +++ b/query/raw_result.go @@ -1,23 +1,27 @@ package query import ( + "encoding/json" "fmt" "github.com/xichen2020/eventdb/document/field" ) // RawResult is a single raw result returned from a raw query. -// TODO(xichen): Implement `MarshalJSON` to only marshal the `Data` field without the `data` tag. type RawResult struct { Data string // This is the raw doc source data OrderByValues field.Values // For ordering purposes, empty for unsorted raw results } +// MarshalJSON marshals the raw results as a JSON object using the data field. +func (r RawResult) MarshalJSON() ([]byte, error) { + return json.Marshal(r.Data) +} + // RawResultLessThanFn compares two raw results. type RawResultLessThanFn func(v1, v2 RawResult) bool // RawResults is a collection of raw results. -// TODO(xichen): Add JSON marshaling / unmarshaling. type RawResults struct { OrderBy []OrderBy Limit int @@ -179,16 +183,14 @@ func (r *RawResults) MergeInPlace(other *RawResults) error { return r.mergeOrderedInPlace(other) } -// FinalData returns the final data for the accumulated results. This is called immediately -// before sending results to the caller. -func (r *RawResults) FinalData() []RawResult { - if r.Len() == 0 { - return nil - } - if !r.IsOrdered() { - return r.Unordered - } - return r.Ordered.SortInPlace() +type rawResultsJSON struct { + Raw []RawResult `json:"raw"` +} + +// MarshalJSON marshals the raw results as a JSON objedct. +func (r *RawResults) MarshalJSON() ([]byte, error) { + rj := rawResultsJSON{Raw: r.finalData()} + return json.Marshal(rj) } func (r *RawResults) mergeUnorderedInPlace(other *RawResults) { @@ -238,3 +240,15 @@ func (r *RawResults) mergeOrderedInPlace(other *RawResults) error { other.Clear() return nil } + +// finalData returns the final data for the accumulated results. This is called immediately +// before sending results to the caller. +func (r *RawResults) finalData() []RawResult { + if r.Len() == 0 { + return nil + } + if !r.IsOrdered() { + return r.Unordered + } + return r.Ordered.SortInPlace() +} diff --git a/query/raw_result_test.go b/query/raw_result_test.go index 8befcd8..3369266 100644 --- a/query/raw_result_test.go +++ b/query/raw_result_test.go @@ -1,6 +1,7 @@ package query import ( + "encoding/json" "testing" "github.com/xichen2020/eventdb/document/field" @@ -49,3 +50,84 @@ func TestRawResultHeapSortInPlace(t *testing.T) { expected := []RawResult{input[0], input[3]} require.Equal(t, expected, sortedResults) } + +func TestEmptyRawResultsMarshalJSON(t *testing.T) { + input := &RawResults{} + b, err := json.Marshal(input) + require.NoError(t, err) + expected := `{"raw":null}` + require.Equal(t, expected, string(b)) +} + +func TestUnorderedRawResultsMarshalJSON(t *testing.T) { + input := &RawResults{ + Unordered: []RawResult{ + { + Data: "foo", + }, + { + Data: "bar", + }, + { + Data: "baz", + }, + }, + } + + b, err := json.Marshal(input) + require.NoError(t, err) + expected := `{"raw":["foo","bar","baz"]}` + require.Equal(t, expected, string(b)) +} + +func TestOrderedRawResultsMarshalJSON(t *testing.T) { + input := []RawResult{ + { + Data: "foo", + OrderByValues: field.Values{ + field.NewStringUnion("o1"), + }, + }, + { + Data: "bar", + OrderByValues: field.Values{ + field.NewStringUnion("o3"), + }, + }, + { + Data: "baz", + OrderByValues: field.Values{ + field.NewStringUnion("o4"), + }, + }, + { + Data: "cat", + OrderByValues: field.Values{ + field.NewStringUnion("o2"), + }, + }, + } + valuesReverseLessThanFn := func(v1, v2 field.Values) bool { + return v1[0].StringVal > v2[0].StringVal + } + rawResultLessThanFn := func(v1, v2 RawResult) bool { + return valuesReverseLessThanFn(v1.OrderByValues, v2.OrderByValues) + } + h := NewTopNRawResults(4, rawResultLessThanFn) + for _, r := range input { + h.Add(r, RawResultAddOptions{}) + } + res := &RawResults{ + OrderBy: []OrderBy{ + { + FieldType: RawField, + FieldPath: []string{"foo", "bar"}, + }, + }, + Ordered: h, + } + b, err := json.Marshal(res) + require.NoError(t, err) + expected := `{"raw":["foo","cat","bar","baz"]}` + require.Equal(t, expected, string(b)) +} diff --git a/query/result_group.go b/query/result_group.go index 2ecfa53..5d68672 100644 --- a/query/result_group.go +++ b/query/result_group.go @@ -5,15 +5,28 @@ import ( "github.com/xichen2020/eventdb/x/compare" ) +type nullResultGroup struct { + Key *string `json:"key"` // This should always be a nil pointer to represent null key + Values calculation.ResultArray `json:"values"` +} + +type nullResultGroupsJSON struct { + Groups []nullResultGroup `json:"groups"` +} + type boolResultGroup struct { - key bool - value calculation.ResultArray + Key bool `json:"key"` + Values calculation.ResultArray `json:"values"` } var emptyBoolResultGroup boolResultGroup type boolResultGroupLessThanFn func(v1, v2 boolResultGroup) bool +type boolResultGroupsJSON struct { + Groups []boolResultGroup `json:"groups"` +} + func newBoolResultGroupReverseLessThanFn(orderBy []OrderBy) (boolResultGroupLessThanFn, error) { if len(orderBy) == 0 { return nil, nil @@ -42,9 +55,9 @@ func newBoolResultGroupReverseLessThanFn(orderBy []OrderBy) (boolResultGroupLess for i, ob := range orderBy { var res int if ob.FieldType == GroupByField { - res = compareBoolFns[i](g1.key, g2.key) + res = compareBoolFns[i](g1.Key, g2.Key) } else { - res = compareCalcValueFns[i](g1.value[ob.FieldIndex].Value(), g2.value[ob.FieldIndex].Value()) + res = compareCalcValueFns[i](g1.Values[ob.FieldIndex].Value(), g2.Values[ob.FieldIndex].Value()) } if res > 0 { return true @@ -59,14 +72,18 @@ func newBoolResultGroupReverseLessThanFn(orderBy []OrderBy) (boolResultGroupLess } type intResultGroup struct { - key int - value calculation.ResultArray + Key int `json:"key"` + Values calculation.ResultArray `json:"values"` } var emptyIntResultGroup intResultGroup type intResultGroupLessThanFn func(v1, v2 intResultGroup) bool +type intResultGroupsJSON struct { + Groups []intResultGroup `json:"groups"` +} + func newIntResultGroupReverseLessThanFn(orderBy []OrderBy) (intResultGroupLessThanFn, error) { if len(orderBy) == 0 { return nil, nil @@ -95,9 +112,9 @@ func newIntResultGroupReverseLessThanFn(orderBy []OrderBy) (intResultGroupLessTh for i, ob := range orderBy { var res int if ob.FieldType == GroupByField { - res = compareIntFns[i](g1.key, g2.key) + res = compareIntFns[i](g1.Key, g2.Key) } else { - res = compareCalcValueFns[i](g1.value[ob.FieldIndex].Value(), g2.value[ob.FieldIndex].Value()) + res = compareCalcValueFns[i](g1.Values[ob.FieldIndex].Value(), g2.Values[ob.FieldIndex].Value()) } if res > 0 { return true @@ -112,14 +129,18 @@ func newIntResultGroupReverseLessThanFn(orderBy []OrderBy) (intResultGroupLessTh } type doubleResultGroup struct { - key float64 - value calculation.ResultArray + Key float64 `json:"key"` + Values calculation.ResultArray `json:"values"` } var emptyDoubleResultGroup doubleResultGroup type doubleResultGroupLessThanFn func(v1, v2 doubleResultGroup) bool +type doubleResultGroupsJSON struct { + Groups []doubleResultGroup `json:"groups"` +} + func newDoubleResultGroupReverseLessThanFn(orderBy []OrderBy) (doubleResultGroupLessThanFn, error) { if len(orderBy) == 0 { return nil, nil @@ -148,9 +169,9 @@ func newDoubleResultGroupReverseLessThanFn(orderBy []OrderBy) (doubleResultGroup for i, ob := range orderBy { var res int if ob.FieldType == GroupByField { - res = compareDoubleFns[i](g1.key, g2.key) + res = compareDoubleFns[i](g1.Key, g2.Key) } else { - res = compareCalcValueFns[i](g1.value[ob.FieldIndex].Value(), g2.value[ob.FieldIndex].Value()) + res = compareCalcValueFns[i](g1.Values[ob.FieldIndex].Value(), g2.Values[ob.FieldIndex].Value()) } if res > 0 { return true @@ -165,14 +186,18 @@ func newDoubleResultGroupReverseLessThanFn(orderBy []OrderBy) (doubleResultGroup } type stringResultGroup struct { - key string - value calculation.ResultArray + Key string `json:"key"` + Values calculation.ResultArray `json:"values"` } var emptyStringResultGroup stringResultGroup type stringResultGroupLessThanFn func(v1, v2 stringResultGroup) bool +type stringResultGroupsJSON struct { + Groups []stringResultGroup `json:"groups"` +} + func newStringResultGroupReverseLessThanFn(orderBy []OrderBy) (stringResultGroupLessThanFn, error) { if len(orderBy) == 0 { return nil, nil @@ -201,9 +226,9 @@ func newStringResultGroupReverseLessThanFn(orderBy []OrderBy) (stringResultGroup for i, ob := range orderBy { var res int if ob.FieldType == GroupByField { - res = compareStringFns[i](g1.key, g2.key) + res = compareStringFns[i](g1.Key, g2.Key) } else { - res = compareCalcValueFns[i](g1.value[ob.FieldIndex].Value(), g2.value[ob.FieldIndex].Value()) + res = compareCalcValueFns[i](g1.Values[ob.FieldIndex].Value(), g2.Values[ob.FieldIndex].Value()) } if res > 0 { return true @@ -218,14 +243,18 @@ func newStringResultGroupReverseLessThanFn(orderBy []OrderBy) (stringResultGroup } type timeResultGroup struct { - key int64 - value calculation.ResultArray + Key int64 `json:"key"` + Values calculation.ResultArray `json:"values"` } var emptyTimeResultGroup timeResultGroup type timeResultGroupLessThanFn func(v1, v2 timeResultGroup) bool +type timeResultGroupsJSON struct { + Groups []timeResultGroup `json:"groups"` +} + func newTimeResultGroupReverseLessThanFn(orderBy []OrderBy) (timeResultGroupLessThanFn, error) { if len(orderBy) == 0 { return nil, nil @@ -254,9 +283,9 @@ func newTimeResultGroupReverseLessThanFn(orderBy []OrderBy) (timeResultGroupLess for i, ob := range orderBy { var res int if ob.FieldType == GroupByField { - res = compareTimeFns[i](g1.key, g2.key) + res = compareTimeFns[i](g1.Key, g2.Key) } else { - res = compareCalcValueFns[i](g1.value[ob.FieldIndex].Value(), g2.value[ob.FieldIndex].Value()) + res = compareCalcValueFns[i](g1.Values[ob.FieldIndex].Value(), g2.Values[ob.FieldIndex].Value()) } if res > 0 { return true diff --git a/query/single_key_result_groups.go b/query/single_key_result_groups.go index d2a9c33..d2d5d37 100644 --- a/query/single_key_result_groups.go +++ b/query/single_key_result_groups.go @@ -1,6 +1,7 @@ package query import ( + "encoding/json" "fmt" "github.com/xichen2020/eventdb/calculation" @@ -13,8 +14,8 @@ type ForEachSingleKeyResultGroupFn func(k field.ValueUnion, v calculation.Result type lenFn func() int type getOrInsertSingleKeyFn func(k *field.ValueUnion) (calculation.ResultArray, InsertionStatus) -type forEachSingleKeyGroupFn func(fn ForEachSingleKeyResultGroupFn) type mergeSingleKeyGroupInPlaceFn func(other *SingleKeyResultGroups) +type marshalJSONFn func(numGroups int, topNRequired bool) ([]byte, error) type trimToTopNFn func(targetSize int) // SingleKeyResultGroups stores the result mappings keyed on values from a single field @@ -25,8 +26,8 @@ type SingleKeyResultGroups struct { sizeLimit int lenFn lenFn getOrInsertFn getOrInsertSingleKeyFn - forEachGroupFn forEachSingleKeyGroupFn mergeInPlaceFn mergeSingleKeyGroupInPlaceFn + marshalJSONFn marshalJSONFn trimToTopNFn trimToTopNFn boolGroupReverseLessThanFn boolResultGroupLessThanFn intGroupReverseLessThanFn intResultGroupLessThanFn @@ -67,47 +68,47 @@ func NewSingleKeyResultGroups( case field.NullType: m.lenFn = m.getNullLen m.getOrInsertFn = m.getOrInsertNull - m.forEachGroupFn = m.forEachNullGroup m.mergeInPlaceFn = m.mergeNullGroups + m.marshalJSONFn = m.marshalJSONNullGroups m.trimToTopNFn = m.trimNullToTopN case field.BoolType: m.boolResults = make(map[bool]calculation.ResultArray, initCapacity) m.lenFn = m.getBoolLen m.getOrInsertFn = m.getOrInsertBool - m.forEachGroupFn = m.forEachBoolGroup m.mergeInPlaceFn = m.mergeBoolGroups + m.marshalJSONFn = m.marshalJSONBoolGroups m.trimToTopNFn = m.trimBoolToTopN m.boolGroupReverseLessThanFn, err = newBoolResultGroupReverseLessThanFn(orderBy) case field.IntType: m.intResults = make(map[int]calculation.ResultArray, initCapacity) m.lenFn = m.getIntLen m.getOrInsertFn = m.getOrInsertInt - m.forEachGroupFn = m.forEachIntGroup m.mergeInPlaceFn = m.mergeIntGroups + m.marshalJSONFn = m.marshalJSONIntGroups m.trimToTopNFn = m.trimIntToTopN m.intGroupReverseLessThanFn, err = newIntResultGroupReverseLessThanFn(orderBy) case field.DoubleType: m.doubleResults = make(map[float64]calculation.ResultArray, initCapacity) m.lenFn = m.getDoubleLen m.getOrInsertFn = m.getOrInsertDouble - m.forEachGroupFn = m.forEachDoubleGroup m.mergeInPlaceFn = m.mergeDoubleGroups + m.marshalJSONFn = m.marshalJSONDoubleGroups m.trimToTopNFn = m.trimDoubleToTopN m.doubleGroupReverseLessThanFn, err = newDoubleResultGroupReverseLessThanFn(orderBy) case field.StringType: m.stringResults = make(map[string]calculation.ResultArray, initCapacity) m.lenFn = m.getStringLen m.getOrInsertFn = m.getOrInsertString - m.forEachGroupFn = m.forEachStringGroup m.mergeInPlaceFn = m.mergeStringGroups + m.marshalJSONFn = m.marshalJSONStringGroups m.trimToTopNFn = m.trimStringToTopN m.stringGroupReverseLessThanFn, err = newStringResultGroupReverseLessThanFn(orderBy) case field.TimeType: m.timeResults = make(map[int64]calculation.ResultArray, initCapacity) m.lenFn = m.getTimeLen m.getOrInsertFn = m.getOrInsertTime - m.forEachGroupFn = m.forEachTimeGroup m.mergeInPlaceFn = m.mergeTimeGroups + m.marshalJSONFn = m.marshalJSONTimeGroups m.trimToTopNFn = m.trimTimeToTopN m.timeGroupReverseLessThanFn, err = newTimeResultGroupReverseLessThanFn(orderBy) default: @@ -140,12 +141,6 @@ func (m *SingleKeyResultGroups) GetOrInsertNoCheck( return m.getOrInsertFn(key) } -// ForEach applies the function against each result group, and stops iterating -// as soon as the function returns false. -func (m *SingleKeyResultGroups) ForEach(fn ForEachSingleKeyResultGroupFn) { - m.forEachGroupFn(fn) -} - // MergeInPlace merges the other result gruops into the current groups in place. // The other result groups become invalid after the merge. // Precondition: The two result groups collect results for the same query, and @@ -163,12 +158,21 @@ func (m *SingleKeyResultGroups) MergeInPlace(other *SingleKeyResultGroups) { other.Clear() } +// MarshalJSON marshals `numGroups` result groups as a JSON object. +// If `topNRequired` is true, top N groups are selected based on the corresponding +// `ReverseLessThanFn`. Otherwise, an arbitrary set of groups is selected. +func (m *SingleKeyResultGroups) MarshalJSON(numGroups int, topNRequired bool) ([]byte, error) { + if numGroups <= 0 { + return nil, nil + } + return m.marshalJSONFn(numGroups, topNRequired) +} + // Clear clears the result groups. func (m *SingleKeyResultGroups) Clear() { m.resultArrayProtoType = nil m.lenFn = nil m.getOrInsertFn = nil - m.forEachGroupFn = nil m.mergeInPlaceFn = nil m.trimToTopNFn = nil m.boolGroupReverseLessThanFn = nil @@ -300,43 +304,6 @@ func (m *SingleKeyResultGroups) getOrInsertTime( return arr, Inserted } -func (m *SingleKeyResultGroups) forEachNullGroup(fn ForEachSingleKeyResultGroupFn) { - if m.nullResults == nil { - return - } - fn(field.NullUnion, m.nullResults) -} - -func (m *SingleKeyResultGroups) forEachBoolGroup(fn ForEachSingleKeyResultGroupFn) { - for k, v := range m.boolResults { - fn(field.NewBoolUnion(k), v) - } -} - -func (m *SingleKeyResultGroups) forEachIntGroup(fn ForEachSingleKeyResultGroupFn) { - for k, v := range m.intResults { - fn(field.NewIntUnion(k), v) - } -} - -func (m *SingleKeyResultGroups) forEachDoubleGroup(fn ForEachSingleKeyResultGroupFn) { - for k, v := range m.doubleResults { - fn(field.NewDoubleUnion(k), v) - } -} - -func (m *SingleKeyResultGroups) forEachStringGroup(fn ForEachSingleKeyResultGroupFn) { - for k, v := range m.stringResults { - fn(field.NewStringUnion(k), v) - } -} - -func (m *SingleKeyResultGroups) forEachTimeGroup(fn ForEachSingleKeyResultGroupFn) { - for k, v := range m.timeResults { - fn(field.NewTimeUnion(k), v) - } -} - func (m *SingleKeyResultGroups) mergeNullGroups(other *SingleKeyResultGroups) { if len(other.nullResults) == 0 { return @@ -463,6 +430,183 @@ func (m *SingleKeyResultGroups) mergeTimeGroups(other *SingleKeyResultGroups) { } } +func (m *SingleKeyResultGroups) marshalJSONNullGroups(numGroups int, _ bool) ([]byte, error) { + if numGroups <= 0 { + return nil, nil + } + groups := nullResultGroupsJSON{ + Groups: []nullResultGroup{ + {Values: m.nullResults}, + }, + } + return json.Marshal(groups) +} + +func (m *SingleKeyResultGroups) marshalJSONBoolGroups( + numGroups int, + topNRequired bool, +) ([]byte, error) { + if numGroups <= 0 { + return nil, nil + } + var res []boolResultGroup + if topNRequired { + m.computeTopNBoolGroups(numGroups) + res = m.topNBools.SortInPlace() + } else { + res = make([]boolResultGroup, 0, numGroups) + for k, v := range m.boolResults { + group := boolResultGroup{Key: k, Values: v} + res = append(res, group) + } + } + groups := boolResultGroupsJSON{Groups: res} + return json.Marshal(groups) +} + +func (m *SingleKeyResultGroups) marshalJSONIntGroups( + numGroups int, + topNRequired bool, +) ([]byte, error) { + if numGroups <= 0 { + return nil, nil + } + var res []intResultGroup + if topNRequired { + m.computeTopNIntGroups(numGroups) + res = m.topNInts.SortInPlace() + } else { + res = make([]intResultGroup, 0, numGroups) + for k, v := range m.intResults { + group := intResultGroup{Key: k, Values: v} + res = append(res, group) + } + } + groups := intResultGroupsJSON{Groups: res} + return json.Marshal(groups) +} + +func (m *SingleKeyResultGroups) marshalJSONDoubleGroups( + numGroups int, + topNRequired bool, +) ([]byte, error) { + if numGroups <= 0 { + return nil, nil + } + var res []doubleResultGroup + if topNRequired { + m.computeTopNDoubleGroups(numGroups) + res = m.topNDoubles.SortInPlace() + } else { + res = make([]doubleResultGroup, 0, numGroups) + for k, v := range m.doubleResults { + group := doubleResultGroup{Key: k, Values: v} + res = append(res, group) + } + } + groups := doubleResultGroupsJSON{Groups: res} + return json.Marshal(groups) +} + +func (m *SingleKeyResultGroups) marshalJSONStringGroups( + numGroups int, + topNRequired bool, +) ([]byte, error) { + if numGroups <= 0 { + return nil, nil + } + var res []stringResultGroup + if topNRequired { + m.computeTopNStringGroups(numGroups) + res = m.topNStrings.SortInPlace() + } else { + res = make([]stringResultGroup, 0, numGroups) + for k, v := range m.stringResults { + group := stringResultGroup{Key: k, Values: v} + res = append(res, group) + } + } + groups := stringResultGroupsJSON{Groups: res} + return json.Marshal(groups) +} + +func (m *SingleKeyResultGroups) marshalJSONTimeGroups( + numGroups int, + topNRequired bool, +) ([]byte, error) { + if numGroups <= 0 { + return nil, nil + } + var res []timeResultGroup + if topNRequired { + m.computeTopNTimeGroups(numGroups) + res = m.topNTimes.SortInPlace() + } else { + res = make([]timeResultGroup, 0, numGroups) + for k, v := range m.timeResults { + group := timeResultGroup{Key: k, Values: v} + res = append(res, group) + } + } + groups := timeResultGroupsJSON{Groups: res} + return json.Marshal(groups) +} + +// computeTopNBoolGroups computes the top N bool groups and stores them in `topNBools`. +func (m *SingleKeyResultGroups) computeTopNBoolGroups(targetSize int) { + if m.topNBools == nil || m.topNBools.Cap() < targetSize { + m.topNBools = newTopNBools(targetSize, m.boolGroupReverseLessThanFn) + } + for k, v := range m.boolResults { + group := boolResultGroup{Key: k, Values: v} + m.topNBools.Add(group, boolAddOptions{}) + } +} + +// computeTopNIntGroups computes the top N int groups and stores them in `topNInts`. +func (m *SingleKeyResultGroups) computeTopNIntGroups(targetSize int) { + if m.topNInts == nil || m.topNInts.Cap() < targetSize { + m.topNInts = newTopNInts(targetSize, m.intGroupReverseLessThanFn) + } + for k, v := range m.intResults { + group := intResultGroup{Key: k, Values: v} + m.topNInts.Add(group, intAddOptions{}) + } +} + +// computeTopNDoubleGroups computes the top N double groups and stores them in `topNDoubles`. +func (m *SingleKeyResultGroups) computeTopNDoubleGroups(targetSize int) { + if m.topNDoubles == nil || m.topNDoubles.Cap() < targetSize { + m.topNDoubles = newTopNDoubles(targetSize, m.doubleGroupReverseLessThanFn) + } + for k, v := range m.doubleResults { + group := doubleResultGroup{Key: k, Values: v} + m.topNDoubles.Add(group, doubleAddOptions{}) + } +} + +// computeTopNStringGroups computes the top N string groups and stores them in `topNStrings`. +func (m *SingleKeyResultGroups) computeTopNStringGroups(targetSize int) { + if m.topNStrings == nil || m.topNStrings.Cap() < targetSize { + m.topNStrings = newTopNStrings(targetSize, m.stringGroupReverseLessThanFn) + } + for k, v := range m.stringResults { + group := stringResultGroup{Key: k, Values: v} + m.topNStrings.Add(group, stringAddOptions{}) + } +} + +// computeTopNTimeGroups computes the top N time groups and stores them in `topNTimes`. +func (m *SingleKeyResultGroups) computeTopNTimeGroups(targetSize int) { + if m.topNTimes == nil || m.topNTimes.Cap() < targetSize { + m.topNTimes = newTopNTimes(targetSize, m.timeGroupReverseLessThanFn) + } + for k, v := range m.timeResults { + group := timeResultGroup{Key: k, Values: v} + m.topNTimes.Add(group, timeAddOptions{}) + } +} + func (m *SingleKeyResultGroups) trimNullToTopN(targetSize int) { if m.Len() <= targetSize { return @@ -476,19 +620,13 @@ func (m *SingleKeyResultGroups) trimBoolToTopN(targetSize int) { } // Find the top N groups. - if m.topNBools == nil || m.topNBools.Cap() < targetSize { - m.topNBools = newTopNBools(targetSize, m.boolGroupReverseLessThanFn) - } - for k, v := range m.boolResults { - group := boolResultGroup{key: k, value: v} - m.topNBools.Add(group, boolAddOptions{}) - } + m.computeTopNBoolGroups(targetSize) // Allocate a new map and insert the top n bools into the map. m.boolResults = make(map[bool]calculation.ResultArray, targetSize) data := m.topNBools.RawData() for i := 0; i < len(data); i++ { - m.boolResults[data[i].key] = data[i].value + m.boolResults[data[i].Key] = data[i].Values data[i] = emptyBoolResultGroup } m.topNBools.Reset() @@ -500,19 +638,13 @@ func (m *SingleKeyResultGroups) trimIntToTopN(targetSize int) { } // Find the top N groups. - if m.topNInts == nil || m.topNInts.Cap() < targetSize { - m.topNInts = newTopNInts(targetSize, m.intGroupReverseLessThanFn) - } - for k, v := range m.intResults { - group := intResultGroup{key: k, value: v} - m.topNInts.Add(group, intAddOptions{}) - } + m.computeTopNIntGroups(targetSize) // Allocate a new map and insert the top n ints into the map. m.intResults = make(map[int]calculation.ResultArray, targetSize) data := m.topNInts.RawData() for i := 0; i < len(data); i++ { - m.intResults[data[i].key] = data[i].value + m.intResults[data[i].Key] = data[i].Values data[i] = emptyIntResultGroup } m.topNInts.Reset() @@ -524,19 +656,13 @@ func (m *SingleKeyResultGroups) trimDoubleToTopN(targetSize int) { } // Find the top N groups. - if m.topNDoubles == nil || m.topNDoubles.Cap() < targetSize { - m.topNDoubles = newTopNDoubles(targetSize, m.doubleGroupReverseLessThanFn) - } - for k, v := range m.doubleResults { - group := doubleResultGroup{key: k, value: v} - m.topNDoubles.Add(group, doubleAddOptions{}) - } + m.computeTopNDoubleGroups(targetSize) // Allocate a new map and insert the top n doubles into the map. m.doubleResults = make(map[float64]calculation.ResultArray, targetSize) data := m.topNDoubles.RawData() for i := 0; i < len(data); i++ { - m.doubleResults[data[i].key] = data[i].value + m.doubleResults[data[i].Key] = data[i].Values data[i] = emptyDoubleResultGroup } m.topNDoubles.Reset() @@ -548,19 +674,13 @@ func (m *SingleKeyResultGroups) trimStringToTopN(targetSize int) { } // Find the top N groups. - if m.topNStrings == nil || m.topNStrings.Cap() < targetSize { - m.topNStrings = newTopNStrings(targetSize, m.stringGroupReverseLessThanFn) - } - for k, v := range m.stringResults { - group := stringResultGroup{key: k, value: v} - m.topNStrings.Add(group, stringAddOptions{}) - } + m.computeTopNStringGroups(targetSize) // Allocate a new map and insert the top n strings into the map. m.stringResults = make(map[string]calculation.ResultArray, targetSize) data := m.topNStrings.RawData() for i := 0; i < len(data); i++ { - m.stringResults[data[i].key] = data[i].value + m.stringResults[data[i].Key] = data[i].Values data[i] = emptyStringResultGroup } m.topNStrings.Reset() @@ -572,19 +692,13 @@ func (m *SingleKeyResultGroups) trimTimeToTopN(targetSize int) { } // Find the top N groups. - if m.topNTimes == nil || m.topNTimes.Cap() < targetSize { - m.topNTimes = newTopNTimes(targetSize, m.timeGroupReverseLessThanFn) - } - for k, v := range m.timeResults { - group := timeResultGroup{key: k, value: v} - m.topNTimes.Add(group, timeAddOptions{}) - } + m.computeTopNTimeGroups(targetSize) // Allocate a new map and insert the top n times into the map. m.timeResults = make(map[int64]calculation.ResultArray, targetSize) data := m.topNTimes.RawData() for i := 0; i < len(data); i++ { - m.timeResults[data[i].key] = data[i].value + m.timeResults[data[i].Key] = data[i].Values data[i] = emptyTimeResultGroup } m.topNTimes.Reset() diff --git a/query/single_key_result_groups_test.go b/query/single_key_result_groups_test.go new file mode 100644 index 0000000..472ac32 --- /dev/null +++ b/query/single_key_result_groups_test.go @@ -0,0 +1,114 @@ +package query + +import ( + "testing" + + "github.com/xichen2020/eventdb/calculation" + "github.com/xichen2020/eventdb/document/field" + + "github.com/stretchr/testify/require" +) + +func TestUnorderedSingleKeyResultGroupMarshalJSON(t *testing.T) { + results := calculation.ResultArray{ + calculation.NewCountResult(), + calculation.NewAvgResult(), + calculation.NewMaxStringResult(), + } + groups, err := NewSingleKeyResultGroups(field.StringType, results, nil, 10, 10) + require.NoError(t, err) + + key1 := &field.ValueUnion{ + Type: field.StringType, + StringVal: "foo", + } + res, status := groups.GetOrInsertNoCheck(key1) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(32)) + res[2].Add(calculation.NewStringUnion("foo")) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(16)) + res[2].Add(calculation.NewStringUnion("bar")) + + key2 := &field.ValueUnion{ + Type: field.StringType, + StringVal: "bar", + } + res, status = groups.GetOrInsertNoCheck(key2) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("baz")) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("must")) + + b, err := groups.MarshalJSON(5, false) + require.NoError(t, err) + expectedVals := map[string]struct{}{ + `{"groups":[{"key":"foo","values":[2,24,"foo"]},{"key":"bar","values":[2,null,"must"]}]}`: struct{}{}, + `{"groups":[{"key":"bar","values":[2,null,"must"]},{"key":"foo","values":[2,24,"foo"]}]}`: struct{}{}, + } + _, exists := expectedVals[string(b)] + require.True(t, exists) +} + +func TestOrderedSingleKeyResultGroupMarshalJSON(t *testing.T) { + results := calculation.ResultArray{ + calculation.NewCountResult(), + calculation.NewAvgResult(), + calculation.NewMaxStringResult(), + } + orderBys := []OrderBy{ + { + FieldType: GroupByField, + FieldIndex: 0, + SortOrder: Descending, + }, + { + FieldType: CalculationField, + FieldIndex: 2, + SortOrder: Ascending, + }, + } + groups, err := NewSingleKeyResultGroups(field.IntType, results, orderBys, 10, 10) + require.NoError(t, err) + + // Add some data. + key1 := &field.ValueUnion{ + Type: field.IntType, + IntVal: 10, + } + res, status := groups.GetOrInsertNoCheck(key1) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(32)) + res[2].Add(calculation.NewStringUnion("foo")) + res[0].Add(calculation.ValueUnion{}) + res[1].Add(calculation.NewNumberUnion(16)) + res[2].Add(calculation.NewStringUnion("bar")) + + key2 := &field.ValueUnion{ + Type: field.IntType, + IntVal: 30, + } + res, status = groups.GetOrInsertNoCheck(key2) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("baz")) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("must")) + + key3 := &field.ValueUnion{ + Type: field.IntType, + IntVal: 14, + } + res, status = groups.GetOrInsertNoCheck(key3) + require.Equal(t, Inserted, status) + res[0].Add(calculation.ValueUnion{}) + res[2].Add(calculation.NewStringUnion("cat")) + + b, err := groups.MarshalJSON(5, true) + require.NoError(t, err) + expected := `{"groups":[{"key":30,"values":[2,null,"must"]},{"key":14,"values":[1,null,"cat"]},{"key":10,"values":[2,24,"foo"]}]}` + require.Equal(t, expected, string(b)) +} diff --git a/query/time_bucket_result.go b/query/time_bucket_result.go index 0f91aa0..be700fc 100644 --- a/query/time_bucket_result.go +++ b/query/time_bucket_result.go @@ -1,6 +1,7 @@ package query import ( + "encoding/json" "fmt" ) @@ -12,7 +13,7 @@ type TimeBucketResults struct { NumBuckets int // buckets records the document count in each bucket. - buckets []int64 + buckets []int } // IsEmpty returns true if the result has not collected any values. @@ -26,7 +27,7 @@ func (r *TimeBucketResults) Clear() { // AddAt adds a document at the given timestamp. func (r *TimeBucketResults) AddAt(timestampNanos int64) { if r.buckets == nil { - r.buckets = make([]int64, r.NumBuckets) + r.buckets = make([]int, r.NumBuckets) } bucketIdx := (timestampNanos - r.StartBucketNanos) / r.BucketSizeNanos r.buckets[bucketIdx]++ @@ -60,3 +61,30 @@ func (r *TimeBucketResults) MergeInPlace(other *TimeBucketResults) error { other.Clear() return nil } + +// MarshalJSON marshals the time bucket results as a JSON object. +func (r *TimeBucketResults) MarshalJSON() ([]byte, error) { + buckets := make([]timeBucketJSON, 0, len(r.buckets)) + for i := 0; i < len(r.buckets); i++ { + bucket := timeBucketJSON{ + StartAtNanos: r.StartBucketNanos + r.BucketSizeNanos*int64(i), + Value: r.buckets[i], + } + buckets = append(buckets, bucket) + } + res := timeBucketResultsJSON{ + Granularity: r.BucketSizeNanos, + Buckets: buckets, + } + return json.Marshal(res) +} + +type timeBucketJSON struct { + StartAtNanos int64 `json:"startAtNanos"` // Start time of the bucket in nanoseconds + Value int `json:"value"` // Count +} + +type timeBucketResultsJSON struct { + Granularity int64 `json:"granularity"` + Buckets []timeBucketJSON `json:"buckets"` +} diff --git a/query/time_bucket_result_test.go b/query/time_bucket_result_test.go new file mode 100644 index 0000000..3db8299 --- /dev/null +++ b/query/time_bucket_result_test.go @@ -0,0 +1,21 @@ +package query + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTimeBucketResultsMarshalJSON(t *testing.T) { + res := &TimeBucketResults{ + StartBucketNanos: 10000, + BucketSizeNanos: 2000, + NumBuckets: 5, + buckets: []int{12, 0, 78, 0, 15}, + } + b, err := json.Marshal(res) + require.NoError(t, err) + expected := `{"granularity":2000,"buckets":[{"startAtNanos":10000,"value":12},{"startAtNanos":12000,"value":0},{"startAtNanos":14000,"value":78},{"startAtNanos":16000,"value":0},{"startAtNanos":18000,"value":15}]}` + require.Equal(t, expected, string(b)) +} diff --git a/server/http/handlers/service.go b/server/http/handlers/service.go index 7ffa041..4fe9921 100644 --- a/server/http/handlers/service.go +++ b/server/http/handlers/service.go @@ -230,7 +230,7 @@ func (s *service) queryRaw( writeErrorResponse(w, err) return err } - writeResponse(w, res.FinalData(), nil) + writeResponse(w, res, nil) return nil }