Skip to content

Commit

Permalink
Merge pull request #4038 from influxdb/issue-3926
Browse files Browse the repository at this point in the history
Fix group by time intervals
  • Loading branch information
corylanou committed Sep 8, 2015
2 parents 8a3d7e5 + 9d091b4 commit dea5814
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#4034](https://github.com/influxdb/influxdb/pull/4034): Rollback bolt tx on mapper open error
- [#3848](https://github.com/influxdb/influxdb/issues/3848): restart influxdb causing panic
- [#3881](https://github.com/influxdb/influxdb/issues/3881): panic: runtime error: invalid memory address or nil pointer dereference
- [#3926](https://github.com/influxdb/influxdb/issues/3926): First or last value of `GROUP BY time(x)` is often null. Fixed by [#4038](https://github.com/influxdb/influxdb/pull/4038)

## v0.9.3 [2015-08-26]

Expand Down
82 changes: 82 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2481,6 +2481,88 @@ func TestServer_Query_AggregatesIdenticalTime(t *testing.T) {
}
}

// This will test that when using a group by, that it observes the time you asked for
// but will only put the values in the bucket that match the time range
func TestServer_Query_GroupByTimeCutoffs(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`cpu value=1i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu value=2i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu value=3i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:05Z").UnixNano()),
fmt.Sprintf(`cpu value=4i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:08Z").UnixNano()),
fmt.Sprintf(`cpu value=5i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:09Z").UnixNano()),
fmt.Sprintf(`cpu value=6i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")

test.addQueries([]*Query{
&Query{
name: "sum all time",
params: url.Values{"db": []string{"db0"}},
command: `SELECT SUM(value) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",21]]}]}]}`,
},
&Query{
name: "sum all time grouped by time 5s",
params: url.Values{"db": []string{"db0"}},
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`,
},
&Query{
name: "sum all time grouped by time 5s missing first point",
params: url.Values{"db": []string{"db0"}},
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:01Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",2],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`,
},
&Query{
name: "sum all time grouped by time 5s missing first points (null for bucket)",
params: url.Values{"db": []string{"db0"}},
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:02Z' and time <= '2000-01-01T00:00:10Z' group by time(5s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",null],["2000-01-01T00:00:05Z",12],["2000-01-01T00:00:10Z",6]]}]}]}`,
},
&Query{
name: "sum all time grouped by time 5s missing last point - 2 time intervals",
params: url.Values{"db": []string{"db0"}},
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:09Z' group by time(5s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",12]]}]}]}`,
},
&Query{
name: "sum all time grouped by time 5s missing last 2 points - 2 time intervals",
params: url.Values{"db": []string{"db0"}},
command: `SELECT SUM(value) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T00:00:08Z' group by time(5s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:05Z",7]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Write_Precision(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
Expand Down
19 changes: 12 additions & 7 deletions tsdb/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (lm *SelectMapper) Open() error {
return nil
}

// Validate that ANY GROUP BY is not a field for thie measurement.
// Validate that ANY GROUP BY is not a field for the measurement.
if err := m.ValidateGroupBy(lm.selectStmt); err != nil {
return err
}
Expand Down Expand Up @@ -414,12 +414,17 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
Value: make([]interface{}, 0)}
}

// Always clamp tmin. This can happen as bucket-times are bucketed to the nearest
// interval, and this can be less than the times in the query.
// Always clamp tmin and tmax. This can happen as bucket-times are bucketed to the nearest
// interval. This is necessary to grab the "partial" buckets at the beginning and end of the time range
qmin := tmin
if qmin < lm.queryTMin {
qmin = lm.queryTMin
}
qmax := tmax
if qmax > lm.queryTMax {
// Need to offset by one nanosecond for the logic to work properly in the tagset cursor Next
qmax = lm.queryTMax + 1
}

tsc.pointHeap = newPointHeap()
for i := range lm.mapFuncs {
Expand All @@ -428,7 +433,7 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
// changes to the mapper functions, which can come later.
// Prime the buffers.
for i := 0; i < len(tsc.cursors); i++ {
k, v := tsc.cursors[i].SeekTo(tmin)
k, v := tsc.cursors[i].SeekTo(qmin)
if k == -1 || k > tmax {
continue
}
Expand All @@ -440,8 +445,8 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
heap.Push(tsc.pointHeap, p)
}
// Wrap the tagset cursor so it implements the mapping functions interface.
nextf := func() (time int64, value interface{}) {
k, v := tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields)
nextf := func() (_ int64, value interface{}) {
k, v := tsc.Next(qmin, qmax, []string{lm.fieldNames[i]}, lm.whereFields)
return k, v
}

Expand Down Expand Up @@ -768,7 +773,7 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri
p := heap.Pop(tsc.pointHeap).(*pointHeapItem)

// We're done if the point is outside the query's time range [tmin:tmax).
if p.timestamp != tmin && (tmin > p.timestamp || p.timestamp >= tmax) {
if p.timestamp != tmin && (p.timestamp < tmin || p.timestamp >= tmax) {
return -1, nil
}

Expand Down
7 changes: 5 additions & 2 deletions tsdb/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,14 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int) (Executor, error) {
shards := map[uint64]meta.ShardInfo{} // Shards requiring mappers.

// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()

// Replace instances of "now()" with the current time, and check the resultant times.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: now})
tmin, tmax := influxql.TimeRange(stmt.Condition)
if tmax.IsZero() {
tmax = time.Now()
tmax = now
}
if tmin.IsZero() {
tmin = time.Unix(0, 0)
Expand Down

0 comments on commit dea5814

Please sign in to comment.