forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.go
48 lines (43 loc) · 1.4 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package query
import (
"context"
"time"
)
// MonitorFunc is a function that will be called to check if a query
// is currently healthy. If the query needs to be interrupted for some reason,
// the error should be returned by this function.
type MonitorFunc func(<-chan struct{}) error
// Monitor monitors the status of a query and returns whether the query should
// be aborted with an error.
type Monitor interface {
// Monitor starts a new goroutine that will monitor a query. The function
// will be passed in a channel to signal when the query has been finished
// normally. If the function returns with an error and the query is still
// running, the query will be terminated.
Monitor(fn MonitorFunc)
}
// MonitorFromContext returns a Monitor embedded within the Context
// if one exists.
func MonitorFromContext(ctx context.Context) Monitor {
v, _ := ctx.Value(monitorContextKey).(Monitor)
return v
}
// PointLimitMonitor is a query monitor that exits when the number of points
// emitted exceeds a threshold.
func PointLimitMonitor(cur Cursor, interval time.Duration, limit int) MonitorFunc {
return func(closing <-chan struct{}) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := cur.Stats()
if stats.PointN >= limit {
return ErrMaxSelectPointsLimitExceeded(stats.PointN, limit)
}
case <-closing:
return nil
}
}
}
}