forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
132 lines (118 loc) · 3.61 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package pipeline
import (
"time"
)
// A node that handles creating several child BatchNodes.
// Each call to `query` creates a child batch node that
// can further be configured. See BatchNode
// The `batch` variable in batch tasks is an instance of
// a SourceBatchNode.
//
// Example:
// var errors = batch
// .query('SELECT value from errors')
// ...
// var views = batch
// .query('SELECT value from views')
// ...
//
type SourceBatchNode struct {
node
}
func newSourceBatchNode() *SourceBatchNode {
return &SourceBatchNode{
node: node{
desc: "srcbatch",
wants: NoEdge,
provides: BatchEdge,
},
}
}
// The query to execute. Must not contain a time condition
// in the `WHERE` clause or contain a `GROUP BY` clause.
// The time conditions are added dynamically according to the period, offset and schedule.
// The `GROUP BY` clause is added dynamically according to the dimensions
// passed to the `groupBy` method.
func (b *SourceBatchNode) Query(q string) *BatchNode {
n := newBatchNode()
n.QueryStr = q
b.linkChild(n)
return n
}
// A BatchNode defines a source and a schedule for
// processing batch data. The data is queried from
// an InfluxDB database and then passed into the data pipeline.
//
// Example:
// batch
// .query('''
// SELECT mean("value")
// FROM "telegraf"."default".cpu_usage_idle
// WHERE "host" = 'serverA'
// ''')
// .period(1m)
// .every(20s)
// .groupBy(time(10s), 'cpu')
// ...
//
// In the above example InfluxDB is queried every 20 seconds; the window of time returned
// spans 1 minute and is grouped into 10 second buckets.
type BatchNode struct {
chainnode
// The query text
//tick:ignore
QueryStr string
// The period or length of time that will be queried from InfluxDB
Period time.Duration
// How often to query InfluxDB.
//
// The Every property is mutually exclusive with the Cron property.
Every time.Duration
// Define a schedule using a cron syntax.
//
// The specific cron implementation is documented here:
// https://github.com/gorhill/cronexpr#implementation
//
// The Cron property is mutually exclusive with the Every property.
Cron string
// How far back in time to query from the current time
//
// For example an Offest of 2 hours and an Every of 5m,
// Kapacitor will query InfluxDB every 5 minutes for the window of data 2 hours ago.
//
// This applies to Cron schedules as well. If the cron specifies to run every Sunday at
// 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried.
Offset time.Duration
// The list of dimensions for the group-by clause.
//tick:ignore
Dimensions []interface{}
// Fill the data.
// Options are:
//
// - Any numerical value
// - null - exhibits the same behavior as the default
// - previous - reports the value of the previous window
// - none - suppresses timestamps and values where the value is null
Fill interface{}
}
func newBatchNode() *BatchNode {
return &BatchNode{
chainnode: newBasicChainNode("batch", BatchEdge, BatchEdge),
}
}
// Group the data by a set of dimensions.
// Can specify one time dimension.
//
// This property adds a `GROUP BY` clause to the query
// so all the normal behaviors when quering InfluxDB with a `GROUP BY` apply.
// More details: https://influxdb.com/docs/v0.9/query_language/data_exploration.html#the-group-by-clause
//
// Example:
// batch
// .groupBy(time(10s), 'tag1', 'tag2'))
//
// tick:property
func (b *BatchNode) GroupBy(d ...interface{}) *BatchNode {
b.Dimensions = d
return b
}