From 62e9c24b256820a8d9f9acec2be5cb3862ff5bea Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 8 Sep 2015 14:15:48 -0500 Subject: [PATCH 1/2] fixes #3926 --- cmd/influxd/run/server_test.go | 82 ++++++++++++++++++++++++++++++++++ tsdb/mapper.go | 19 +++++--- tsdb/query_executor.go | 7 ++- 3 files changed, 99 insertions(+), 9 deletions(-) diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 12b880205e3..9f319fe4458 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -2481,6 +2481,88 @@ func TestServer_Query_AggregatesIdenticalTime(t *testing.T) { } } +// This will test that when using a group by, that it observes the time you asked for +// but will only put the values in the bucket that match the time range +func TestServer_Query_GroupByTimeCutoffs(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig(), "") + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + writes := []string{ + fmt.Sprintf(`cpu value=1i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`cpu value=2i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()), + fmt.Sprintf(`cpu value=3i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:05Z").UnixNano()), + fmt.Sprintf(`cpu value=4i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:08Z").UnixNano()), + fmt.Sprintf(`cpu value=5i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:09Z").UnixNano()), + fmt.Sprintf(`cpu value=6i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()), + } + test := NewTest("db0", "rp0") + test.write = strings.Join(writes, "\n") + + test.addQueries([]*Query{ + &Query{ + name: "sum all time", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT SUM(value) FROM cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",21]]}]}]}`, + }, + &Query{ + name: "sum all time grouped by time 5s", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`, + }, + &Query{ + name: "sum all time grouped by time 5s missing first point", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:01Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",2],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`, + }, + &Query{ + name: "sum all time grouped by time 5s missing first points (null for bucket)", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:02Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",null],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`, + }, + &Query{ + name: "sum all time grouped by time 5s missing last point - 2 time intervals", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:09Z' group by time(5s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",12]]}]}]}`, + }, + &Query{ + name: "sum all time grouped by time 5s missing last 2 points - 2 time intervals", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:08Z' group by time(5s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",7]]}]}]}`, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + func TestServer_Write_Precision(t *testing.T) { t.Parallel() s := OpenServer(NewConfig(), "") diff --git a/tsdb/mapper.go b/tsdb/mapper.go index 2e513d84326..699f52cd5bc 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -190,7 +190,7 @@ func (lm *SelectMapper) Open() error { return nil } - // Validate that ANY GROUP BY is not a field for thie measurement. + // Validate that ANY GROUP BY is not a field for the measurement. if err := m.ValidateGroupBy(lm.selectStmt); err != nil { return err } @@ -414,12 +414,17 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) { Value: make([]interface{}, 0)} } - // Always clamp tmin. This can happen as bucket-times are bucketed to the nearest - // interval, and this can be less than the times in the query. + // Always clamp tmin and tmax. This can happen as bucket-times are bucketed to the nearest + // interval. This is necessary to grab the "partial" buckets at the beginning and end of the time range qmin := tmin if qmin < lm.queryTMin { qmin = lm.queryTMin } + qmax := tmax + if qmax > lm.queryTMax { + // Need to offset by one nanosecond for the logic to work properly in the tagset cursor Next + qmax = lm.queryTMax + 1 + } tsc.pointHeap = newPointHeap() for i := range lm.mapFuncs { @@ -428,7 +433,7 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) { // changes to the mapper functions, which can come later. // Prime the buffers. for i := 0; i < len(tsc.cursors); i++ { - k, v := tsc.cursors[i].SeekTo(tmin) + k, v := tsc.cursors[i].SeekTo(qmin) if k == -1 || k > tmax { continue } @@ -440,8 +445,8 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) { heap.Push(tsc.pointHeap, p) } // Wrap the tagset cursor so it implements the mapping functions interface. - nextf := func() (time int64, value interface{}) { - k, v := tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields) + nextf := func() (_ int64, value interface{}) { + k, v := tsc.Next(qmin, qmax, []string{lm.fieldNames[i]}, lm.whereFields) return k, v } @@ -768,7 +773,7 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri p := heap.Pop(tsc.pointHeap).(*pointHeapItem) // We're done if the point is outside the query's time range [tmin:tmax). - if p.timestamp != tmin && (tmin > p.timestamp || p.timestamp >= tmax) { + if p.timestamp != tmin && (p.timestamp < tmin || p.timestamp >= tmax) { return -1, nil } diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index 033481d52af..696bd370a8d 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -211,11 +211,14 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int) (Executor, error) { shards := map[uint64]meta.ShardInfo{} // Shards requiring mappers. + // It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now` + now := time.Now().UTC() + // Replace instances of "now()" with the current time, and check the resultant times. - stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()}) + stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now}) tmin, tmax := influxql.TimeRange(stmt.Condition) if tmax.IsZero() { - tmax = time.Now() + tmax = now } if tmin.IsZero() { tmin = time.Unix(0, 0) From 9d091b4362fd2aa202cf18eca880bf78dae1e511 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 8 Sep 2015 14:19:08 -0500 Subject: [PATCH 2/2] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78b7c277c1b..86b30f20e0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ With this release InfluxDB is moving to Go 1.5. - [#4034](https://github.com/influxdb/influxdb/pull/4034): Rollback bolt tx on mapper open error - [#3848](https://github.com/influxdb/influxdb/issues/3848): restart influxdb causing panic - [#3881](https://github.com/influxdb/influxdb/issues/3881): panic: runtime error: invalid memory address or nil pointer dereference +- [#3926](https://github.com/influxdb/influxdb/issues/3926): First or last value of `GROUP BY time(x)` is often null. Fixed by [#4038](https://github.com/influxdb/influxdb/pull/4038) ## v0.9.3 [2015-08-26]