-
Notifications
You must be signed in to change notification settings - Fork 157
/
tumbling_window.go
111 lines (94 loc) · 2.63 KB
/
tumbling_window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package flow
import (
"sync"
"time"
"github.com/reugn/go-streams"
)
// TumblingWindow assigns each element to a window of a specified window size.
// Tumbling windows have a fixed size and do not overlap.
// T indicates the incoming element type, and the outgoing element type is []T.
type TumblingWindow[T any] struct {
sync.Mutex
windowSize time.Duration
in chan any
out chan any
done chan struct{}
buffer []T
}
// Verify TumblingWindow satisfies the Flow interface.
var _ streams.Flow = (*TumblingWindow[any])(nil)
// NewTumblingWindow returns a new TumblingWindow operator.
// T specifies the incoming element type, and the outgoing element type is []T.
//
// size is the Duration of generated windows.
func NewTumblingWindow[T any](size time.Duration) *TumblingWindow[T] {
tumblingWindow := &TumblingWindow[T]{
windowSize: size,
in: make(chan any),
out: make(chan any),
done: make(chan struct{}),
}
go tumblingWindow.receive()
go tumblingWindow.emit()
return tumblingWindow
}
// Via streams data to a specified Flow and returns it.
func (tw *TumblingWindow[T]) Via(flow streams.Flow) streams.Flow {
go tw.transmit(flow)
return flow
}
// To streams data to a specified Sink.
func (tw *TumblingWindow[T]) To(sink streams.Sink) {
tw.transmit(sink)
}
// Out returns the output channel of the TumblingWindow operator.
func (tw *TumblingWindow[T]) Out() <-chan any {
return tw.out
}
// In returns the input channel of the TumblingWindow operator.
func (tw *TumblingWindow[T]) In() chan<- any {
return tw.in
}
// transmit submits closed windows to the next Inlet.
func (tw *TumblingWindow[T]) transmit(inlet streams.Inlet) {
for window := range tw.out {
inlet.In() <- window
}
close(inlet.In())
}
// receive buffers the incoming elements.
func (tw *TumblingWindow[T]) receive() {
for element := range tw.in {
tw.Lock()
tw.buffer = append(tw.buffer, element.(T))
tw.Unlock()
}
close(tw.done)
}
// emit captures and emits a new window based on the fixed time interval.
func (tw *TumblingWindow[T]) emit() {
ticker := time.NewTicker(tw.windowSize)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tw.dispatchWindow()
case <-tw.done:
tw.dispatchWindow()
close(tw.out)
return
}
}
}
// dispatchWindow creates a window from buffered elements and resets the buffer.
// It sends the slice of elements to the output channel if the window is not empty.
func (tw *TumblingWindow[T]) dispatchWindow() {
tw.Lock()
windowElements := tw.buffer
tw.buffer = nil
tw.Unlock()
// send elements if the window is not empty
if len(windowElements) > 0 {
tw.out <- windowElements
}
}