Skip to content

Commit

Permalink
set tmax as stop time in replays
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Sep 19, 2016
1 parent 0232f25 commit 56a136d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 18 deletions.
32 changes: 20 additions & 12 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,26 @@ func (s *BatchNode) Abort() {
}

type BatchQueries struct {
Queries []string
Queries []*Query
Cluster string
GroupByMeasurement bool
}

func (s *BatchNode) Queries(start, stop time.Time) []BatchQueries {
func (s *BatchNode) Queries(start, stop time.Time) ([]BatchQueries, error) {
queries := make([]BatchQueries, len(s.children))
for i, b := range s.children {
qn := b.(*QueryNode)
qs, err := qn.Queries(start, stop)
if err != nil {
return nil, err
}
queries[i] = BatchQueries{
Queries: qn.Queries(start, stop),
Queries: qs,
Cluster: qn.Cluster(),
GroupByMeasurement: qn.GroupByMeasurement(),
}
}
return queries
return queries, nil
}

// Do not add the source batch node to the dot output
Expand Down Expand Up @@ -227,30 +231,34 @@ func (b *QueryNode) Cluster() string {
return b.b.Cluster
}

func (b *QueryNode) Queries(start, stop time.Time) []string {
func (b *QueryNode) Queries(start, stop time.Time) ([]*Query, error) {
now := time.Now()
if stop.IsZero() {
stop = now
}
// Crons are sensitive to timezones.
// Make sure we are using local time.
start = start.Local()
queries := make([]string, 0)
queries := make([]*Query, 0)
for {
start = b.ticker.Next(start)
if start.IsZero() || start.After(stop) {
break
}
qstart := start.Add(-1 * b.b.Offset)
b.query.Start(qstart)
q, err := b.query.Clone()
if err != nil {
return nil, err
}
q.SetStartTime(qstart)
qstop := qstart.Add(b.b.Period)
if qstop.After(now) {
break
}
b.query.Stop(qstop)
queries = append(queries, b.query.String())
q.SetStopTime(qstop)
queries = append(queries, q)
}
return queries
return queries, nil
}

// Query InfluxDB and collect batches on batch collector.
Expand Down Expand Up @@ -283,8 +291,8 @@ func (b *QueryNode) doQuery() error {

// Update times for query
stop := now.Add(-1 * b.b.Offset)
b.query.Start(stop.Add(-1 * b.b.Period))
b.query.Stop(stop)
b.query.SetStartTime(stop.Add(-1 * b.b.Period))
b.query.SetStopTime(stop)

b.logger.Println("D! starting next batch query:", b.query.String())

Expand Down
60 changes: 58 additions & 2 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,71 @@ func (q *Query) DBRPs() ([]DBRP, error) {
}

// Set the start time of the query
func (q *Query) Start(s time.Time) {
func (q *Query) StartTime() time.Time {
return q.startTL.Val
}

// Set the stop time of the query
func (q *Query) StopTime() time.Time {
return q.stopTL.Val
}

// Set the start time of the query
func (q *Query) SetStartTime(s time.Time) {
q.startTL.Val = s
}

// Set the stop time of the query
func (q *Query) Stop(s time.Time) {
func (q *Query) SetStopTime(s time.Time) {
q.stopTL.Val = s
}

// Deep clone this query
func (q *Query) Clone() (*Query, error) {
n := &Query{
stmt: q.stmt.Clone(),
}
// Find the start/stop time literals
var err error
influxql.WalkFunc(n.stmt.Condition, func(qlNode influxql.Node) {
if bn, ok := qlNode.(*influxql.BinaryExpr); ok {
switch bn.Op {
case influxql.GTE:
if vf, ok := bn.LHS.(*influxql.VarRef); !ok || vf.Val != "time" {
return
}
if tl, ok := bn.RHS.(*influxql.TimeLiteral); ok {
// We have a "time" >= 'time literal'
if n.startTL == nil {
n.startTL = tl
} else {
err = errors.New("invalid query, found multiple start time conditions")
}
}
case influxql.LT:
if vf, ok := bn.LHS.(*influxql.VarRef); !ok || vf.Val != "time" {
return
}
if tl, ok := bn.RHS.(*influxql.TimeLiteral); ok {
// We have a "time" < 'time literal'
if n.stopTL == nil {
n.stopTL = tl
} else {
err = errors.New("invalid query, found multiple stop time conditions")
}
}
}
}
})
if n.startTL == nil {
err = errors.New("invalid query, missing start time condition")
}
if n.stopTL == nil {
err = errors.New("invalid query, missing stop time condition")
}
return n, err
}

// Set the dimensions on the query
func (q *Query) Dimensions(dims []interface{}) error {
q.stmt.Dimensions = q.stmt.Dimensions[:0]
Expand Down
6 changes: 4 additions & 2 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time) ([]
for batchIndex, batchQueries := range batches {
source := make(chan models.Batch)
sources[batchIndex] = source
go func(cluster string, queries []string, groupByName bool) {
go func(cluster string, queries []*kapacitor.Query, groupByName bool) {
defer close(source)

// Connect to the cluster
Expand All @@ -1349,7 +1349,7 @@ func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time) ([]
// Run queries
for _, q := range queries {
query := influxdb.Query{
Command: q,
Command: q.String(),
}
resp, err := con.Query(query)
if err != nil {
Expand All @@ -1367,6 +1367,8 @@ func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time) ([]
return
}
for _, b := range batches {
// Set stop time based off query bounds
b.TMax = q.StopTime()
source <- b
}
}
Expand Down
3 changes: 1 addition & 2 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,7 @@ func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([]BatchQueries, er
if err != nil {
return nil, err
}

return batcher.Queries(start, stop), nil
return batcher.Queries(start, stop)
}

// Check that the task allows access to DBRPs
Expand Down

0 comments on commit 56a136d

Please sign in to comment.