Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with holt winters and tmax #915

Merged
merged 3 commits into from
Sep 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

- [#889](https://github.com/influxdata/kapacitor/issues/889): Some typo in the default config file
- [#914](https://github.com/influxdata/kapacitor/pull/914): Change |log() output to be in JSON format so its self documenting structure.
- [#915](https://github.com/influxdata/kapacitor/pull/915): Fix issue with TMax and the Holt-Winters method.

## v1.0.0 [2016-09-02]

Expand Down
43 changes: 29 additions & 14 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,26 @@ func (s *BatchNode) Abort() {
}

type BatchQueries struct {
Queries []string
Queries []*Query
Cluster string
GroupByMeasurement bool
}

func (s *BatchNode) Queries(start, stop time.Time) []BatchQueries {
func (s *BatchNode) Queries(start, stop time.Time) ([]BatchQueries, error) {
queries := make([]BatchQueries, len(s.children))
for i, b := range s.children {
qn := b.(*QueryNode)
qs, err := qn.Queries(start, stop)
if err != nil {
return nil, err
}
queries[i] = BatchQueries{
Queries: qn.Queries(start, stop),
Queries: qs,
Cluster: qn.Cluster(),
GroupByMeasurement: qn.GroupByMeasurement(),
}
}
return queries
return queries, nil
}

// Do not add the source batch node to the dot output
Expand Down Expand Up @@ -227,29 +231,34 @@ func (b *QueryNode) Cluster() string {
return b.b.Cluster
}

func (b *QueryNode) Queries(start, stop time.Time) []string {
func (b *QueryNode) Queries(start, stop time.Time) ([]*Query, error) {
now := time.Now()
if stop.IsZero() {
stop = now
}
// Crons are sensitive to timezones.
// Make sure we are using local time.
start = start.Local()
queries := make([]string, 0)
queries := make([]*Query, 0)
for {
start = b.ticker.Next(start)
if start.IsZero() || start.After(stop) {
break
}
b.query.Start(start)
qstop := start.Add(b.b.Period)
qstart := start.Add(-1 * b.b.Offset)
q, err := b.query.Clone()
if err != nil {
return nil, err
}
q.SetStartTime(qstart)
qstop := qstart.Add(b.b.Period)
if qstop.After(now) {
break
}
b.query.Stop(qstop)
queries = append(queries, b.query.String())
q.SetStopTime(qstop)
queries = append(queries, q)
}
return queries
return queries, nil
}

// Query InfluxDB and collect batches on batch collector.
Expand Down Expand Up @@ -282,8 +291,8 @@ func (b *QueryNode) doQuery() error {

// Update times for query
stop := now.Add(-1 * b.b.Offset)
b.query.Start(stop.Add(-1 * b.b.Period))
b.query.Stop(stop)
b.query.SetStartTime(stop.Add(-1 * b.b.Period))
b.query.SetStopTime(stop)

b.logger.Println("D! starting next batch query:", b.query.String())

Expand Down Expand Up @@ -411,6 +420,7 @@ type ticker interface {

type timeTicker struct {
every time.Duration
align bool
alignChan chan time.Time
stopping chan struct{}
ticker *time.Ticker
Expand All @@ -420,6 +430,7 @@ type timeTicker struct {

func newTimeTicker(every time.Duration, align bool) *timeTicker {
t := &timeTicker{
align: align,
every: every,
}
if align {
Expand Down Expand Up @@ -480,7 +491,11 @@ func (t *timeTicker) Stop() {
}

func (t *timeTicker) Next(now time.Time) time.Time {
return now.Add(t.every)
next := now.Add(t.every)
if t.align {
next = next.Round(t.every)
}
return next
}

type cronTicker struct {
Expand Down
12 changes: 12 additions & 0 deletions influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (e *floatPointEmitter) EmitBatch() models.Batch {
Tags: tags,
Fields: map[string]interface{}{e.as: ap.Value},
}
if t.After(b.TMax) {
b.TMax = t
}
}
return b
}
Expand Down Expand Up @@ -477,6 +480,9 @@ func (e *integerPointEmitter) EmitBatch() models.Batch {
Tags: tags,
Fields: map[string]interface{}{e.as: ap.Value},
}
if t.After(b.TMax) {
b.TMax = t
}
}
return b
}
Expand Down Expand Up @@ -710,6 +716,9 @@ func (e *stringPointEmitter) EmitBatch() models.Batch {
Tags: tags,
Fields: map[string]interface{}{e.as: ap.Value},
}
if t.After(b.TMax) {
b.TMax = t
}
}
return b
}
Expand Down Expand Up @@ -943,6 +952,9 @@ func (e *booleanPointEmitter) EmitBatch() models.Batch {
Tags: tags,
Fields: map[string]interface{}{e.as: ap.Value},
}
if t.After(b.TMax) {
b.TMax = t
}
}
return b
}
Expand Down
3 changes: 3 additions & 0 deletions influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (e *{{.name}}PointEmitter) EmitBatch() models.Batch {
Tags: tags,
Fields: map[string]interface{}{e.as: ap.Value},
}
if t.After(b.TMax) {
b.TMax = t
}
}
return b
}
Expand Down
60 changes: 58 additions & 2 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,71 @@ func (q *Query) DBRPs() ([]DBRP, error) {
}

// Set the start time of the query
func (q *Query) Start(s time.Time) {
func (q *Query) StartTime() time.Time {
return q.startTL.Val
}

// Set the stop time of the query
func (q *Query) StopTime() time.Time {
return q.stopTL.Val
}

// Set the start time of the query
func (q *Query) SetStartTime(s time.Time) {
q.startTL.Val = s
}

// Set the stop time of the query
func (q *Query) Stop(s time.Time) {
func (q *Query) SetStopTime(s time.Time) {
q.stopTL.Val = s
}

// Deep clone this query
func (q *Query) Clone() (*Query, error) {
n := &Query{
stmt: q.stmt.Clone(),
}
// Find the start/stop time literals
var err error
influxql.WalkFunc(n.stmt.Condition, func(qlNode influxql.Node) {
if bn, ok := qlNode.(*influxql.BinaryExpr); ok {
switch bn.Op {
case influxql.GTE:
if vf, ok := bn.LHS.(*influxql.VarRef); !ok || vf.Val != "time" {
return
}
if tl, ok := bn.RHS.(*influxql.TimeLiteral); ok {
// We have a "time" >= 'time literal'
if n.startTL == nil {
n.startTL = tl
} else {
err = errors.New("invalid query, found multiple start time conditions")
}
}
case influxql.LT:
if vf, ok := bn.LHS.(*influxql.VarRef); !ok || vf.Val != "time" {
return
}
if tl, ok := bn.RHS.(*influxql.TimeLiteral); ok {
// We have a "time" < 'time literal'
if n.stopTL == nil {
n.stopTL = tl
} else {
err = errors.New("invalid query, found multiple stop time conditions")
}
}
}
}
})
if n.startTL == nil {
err = errors.New("invalid query, missing start time condition")
}
if n.stopTL == nil {
err = errors.New("invalid query, missing stop time condition")
}
return n, err
}

// Set the dimensions on the query
func (q *Query) Dimensions(dims []interface{}) error {
q.stmt.Dimensions = q.stmt.Dimensions[:0]
Expand Down
7 changes: 3 additions & 4 deletions replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,10 @@ func replayBatchFromChan(clck clock.Clock, batches <-chan models.Batch, collecto
lastTime = b.Points[len(b.Points)-1].Time.Add(diff).UTC()
}
clck.Until(lastTime)
if b.TMax.IsZero() {
b.TMax = b.Points[len(b.Points)-1].Time
} else {
b.TMax = b.TMax.UTC()
if lpt := b.Points[len(b.Points)-1].Time; b.TMax.Before(lpt) {
b.TMax = lpt
}
b.TMax = b.TMax.UTC()
tmax = b.TMax
collector.CollectBatch(b)
}
Expand Down
6 changes: 4 additions & 2 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time) ([]
for batchIndex, batchQueries := range batches {
source := make(chan models.Batch)
sources[batchIndex] = source
go func(cluster string, queries []string, groupByName bool) {
go func(cluster string, queries []*kapacitor.Query, groupByName bool) {
defer close(source)

// Connect to the cluster
Expand All @@ -1349,7 +1349,7 @@ func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time) ([]
// Run queries
for _, q := range queries {
query := influxdb.Query{
Command: q,
Command: q.String(),
}
resp, err := con.Query(query)
if err != nil {
Expand All @@ -1367,6 +1367,8 @@ func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time) ([]
return
}
for _, b := range batches {
// Set stop time based off query bounds
b.TMax = q.StopTime()
source <- b
}
}
Expand Down
3 changes: 1 addition & 2 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,7 @@ func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([]BatchQueries, er
if err != nil {
return nil, err
}

return batcher.Queries(start, stop), nil
return batcher.Queries(start, stop)
}

// Check that the task allows access to DBRPs
Expand Down