forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
window.go
147 lines (134 loc) · 4.24 KB
/
window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package pipeline
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/influxdata/influxdb/influxql"
)
// A `window` node caches data within a moving time range.
// The `period` property of `window` defines the time range covered by `window`.
//
// The `every` property of `window` defines the frequency at which the window
// is emitted to the next node in the pipeline.
//
// The `align` property of `window` defines how to align the window edges.
// (By default, the edges are defined relative to the first data point the `window`
// node receives.)
//
// Example:
// stream
// |window()
// .period(10m)
// .every(5m)
// |httpOut('recent')
//
// This example emits the last `10 minute` period every `5 minutes` to the pipeline's `httpOut` node.
// Because `every` is less than `period`, each time the window is emitted it contains `5 minutes` of
// new data and `5 minutes` of the previous period's data.
//
// NOTE: Because no `align` property is defined, the `window` edge is defined relative to the first data point.
type WindowNode struct {
chainnode `json:"-"`
// The period, or length in time, of the window.
Period time.Duration `json:"period"`
// How often the current window is emitted into the pipeline.
// If equal to zero, then every new point will emit the current window.
Every time.Duration `json:"every"`
// Whether to align the window edges with the zero time
// tick:ignore
AlignFlag bool `json:"align" tick:"Align"`
// Whether to wait till the period is full before the first emit.
// tick:ignore
FillPeriodFlag bool `json:"fillPeriod" tick:"FillPeriod"`
// PeriodCount is the number of points per window.
PeriodCount int64 `json:"periodCount"`
// EveryCount determines how often the window is emitted based on the count of points.
// A value of 1 means that every new point will emit the window.
EveryCount int64 `json:"everyCount"`
}
func newWindowNode() *WindowNode {
return &WindowNode{
chainnode: newBasicChainNode("window", StreamEdge, BatchEdge),
}
}
// MarshalJSON converts WindowNode to JSON
// tick:ignore
func (n *WindowNode) MarshalJSON() ([]byte, error) {
type Alias WindowNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Every string `json:"every"`
}{
TypeOf: TypeOf{
Type: "window",
ID: n.ID(),
},
Alias: (*Alias)(n),
Period: influxql.FormatDuration(n.Period),
Every: influxql.FormatDuration(n.Every),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an WindowNode
// tick:ignore
func (n *WindowNode) UnmarshalJSON(data []byte) error {
type Alias WindowNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Every string `json:"every"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "window" {
return fmt.Errorf("error unmarshaling node %d of type %s as WindowNode", raw.ID, raw.Type)
}
n.Period, err = influxql.ParseDuration(raw.Period)
if err != nil {
return err
}
n.Every, err = influxql.ParseDuration(raw.Every)
if err != nil {
return err
}
n.setID(raw.ID)
return nil
}
// If the `align` property is not used to modify the `window` node, then the
// window alignment is assumed to start at the time of the first data point it receives.
// If `align` property is set, the window time edges
// will be truncated to the `every` property (For example, if a data point's time
// is 12:06 and the `every` property is `5m` then the data point's window will range
// from 12:05 to 12:10).
// tick:property
func (w *WindowNode) Align() *WindowNode {
w.AlignFlag = true
return w
}
// FillPeriod instructs the WindowNode to wait till the period has elapsed before emitting the first batch.
// This only applies if the period is greater than the every value.
// tick:property
func (w *WindowNode) FillPeriod() *WindowNode {
w.FillPeriodFlag = true
return w
}
func (w *WindowNode) validate() error {
if w.PeriodCount != 0 && w.Period != 0 {
return errors.New("cannot specify both period and periodCount")
}
if w.PeriodCount != 0 && w.AlignFlag {
return errors.New("can only align windows based off time, not count")
}
if w.PeriodCount != 0 && w.EveryCount <= 0 {
return errors.New("everyCount must be greater than zero")
}
return nil
}