Skip to content

Commit

Permalink
Move the CQ interval by the group by offset
Browse files Browse the repository at this point in the history
This will make the period selected by the CQ system work correctly for a
query with an offset.
  • Loading branch information
jsternberg committed Aug 1, 2016
1 parent ed527ce commit 9db3738
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6543](https://github.com/influxdata/influxdb/issues/6543): Fix parseFill to check for fill ident before attempting to parse an expression.
- [#7032](https://github.com/influxdata/influxdb/pull/7032): Copy tags in influx_stress to avoid a concurrent write panic on a map.
- [#7028](https://github.com/influxdata/influxdb/pull/7028): Do not run continuous queries that have no time span.
- [#7025](https://github.com/influxdata/influxdb/issues/7025): Move the CQ interval by the group by offset.

## v0.13.0 [2016-05-12]

Expand Down
2 changes: 1 addition & 1 deletion influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,7 @@ func (s *SelectStatement) GroupByInterval() (time.Duration, error) {
}

// GroupByOffset extracts the time interval offset, if specified.
func (s *SelectStatement) GroupByOffset(opt *IteratorOptions) (time.Duration, error) {
func (s *SelectStatement) GroupByOffset() (time.Duration, error) {
interval, err := s.GroupByInterval()
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion influxql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite
if interval < 0 {
interval = 0
} else if interval > 0 {
opt.Interval.Offset, err = stmt.GroupByOffset(&opt)
opt.Interval.Offset, err = stmt.GroupByOffset()
if err != nil {
return opt, err
}
Expand Down
12 changes: 9 additions & 3 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,20 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
return nil
}

// Get the group by offset.
offset, err := cq.q.GroupByOffset()
if err != nil {
return err
}

resampleEvery := interval
if cq.Resample.Every != 0 {
resampleEvery = cq.Resample.Every
}

// We're about to run the query so store the current time closest to the nearest interval.
// If all is going well, this time should be the same as nextRun.
cq.LastRun = now.Truncate(resampleEvery)
cq.LastRun = now.Add(-offset).Truncate(resampleEvery).Add(offset)
s.lastRuns[id] = cq.LastRun

// Retrieve the oldest interval we should calculate based on the next time
Expand All @@ -321,8 +327,8 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}

// Calculate and set the time range for the query.
startTime := nextRun.Add(-resampleFor).Add(interval - 1).Truncate(interval)
endTime := now.Add(-resampleEvery).Add(interval).Truncate(interval)
startTime := nextRun.Add(interval - resampleFor - offset - 1).Truncate(interval).Add(offset)
endTime := now.Add(interval - resampleEvery - offset).Truncate(interval).Add(offset)
if !endTime.After(startTime) {
// Exit early since there is no time interval.
return nil
Expand Down
48 changes: 48 additions & 0 deletions services/continuous_querier/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,54 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) {
}
}

func TestContinuousQueryService_GroupByOffset(t *testing.T) {
s := NewTestService(t)
mc := NewMetaClient(t)
mc.CreateDatabase("db", "")
mc.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m, 30s) END`)
s.MetaClient = mc

// Set RunInterval high so we can trigger using Run method.
s.RunInterval = 10 * time.Minute

done := make(chan struct{})
var expected struct {
min time.Time
max time.Time
}

// Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
s := stmt.(*influxql.SelectStatement)
min, max, err := influxql.TimeRange(s.Condition)
if err != nil {
t.Errorf("unexpected error parsing time range: %s", err)
} else if !expected.min.Equal(min) || !expected.max.Equal(max) {
t.Errorf("mismatched time range: got=(%s, %s) exp=(%s, %s)", min, max, expected.min, expected.max)
}
done <- struct{}{}
ctx.Results <- &influxql.Result{}
return nil
},
}

s.Open()
defer s.Close()

// Set the 'now' time to the start of a 10 minute interval with a 30 second offset.
// Then trigger a run. This should trigger two queries (one for the current time
// interval, one for the previous).
now := time.Now().UTC().Truncate(10 * time.Minute).Add(30 * time.Second)
expected.min = now.Add(-time.Minute)
expected.max = now.Add(-1)
s.RunCh <- &RunRequest{Now: now}

if err := wait(done, 100*time.Millisecond); err != nil {
t.Fatal(err)
}
}

// Test service when not the cluster leader (CQs shouldn't run).
func TestContinuousQueryService_NotLeader(t *testing.T) {
s := NewTestService(t)
Expand Down

0 comments on commit 9db3738

Please sign in to comment.