-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffered.go
138 lines (113 loc) · 2.6 KB
/
buffered.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package send
import (
"context"
"sync"
"time"
"github.com/tychoish/grip/message"
)
const minInterval = 5 * time.Second
type bufferedSender struct {
mu sync.Mutex
cancel context.CancelFunc
buffer []message.Composer
size int
lastFlush time.Time
closed bool
Sender
}
// MakeBuffered provides a Sender implementation that wraps an existing
// Sender sending messages in batches, on a specified buffer size or after an
// interval has passed.
//
// If the interval is 0, the constructor sets an interval of 1 minute, and if
// it is less than 5 seconds, the constructor sets it to 5 seconds. If the
// size threshold is 0, then the constructor sets a threshold of 100.
//
// This Sender does not own the underlying Sender, so users are responsible for
// closing the underlying Sender if/when it is appropriate to release its
// resources.
func MakeBuffered(sender Sender, interval time.Duration, size int) Sender {
if interval == 0 {
interval = time.Minute
} else if interval < minInterval {
interval = minInterval
}
if size <= 0 {
size = 100
}
ctx, cancel := context.WithCancel(context.Background())
s := &bufferedSender{
Sender: sender,
cancel: cancel,
buffer: []message.Composer{},
size: size,
}
go s.intervalFlush(ctx, interval)
return s
}
func (s *bufferedSender) Unwrap() Sender { return s.Sender }
func (s *bufferedSender) Send(msg message.Composer) {
if !ShouldLog(s, msg) {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.buffer = append(s.buffer, msg)
if len(s.buffer) >= s.size {
s.flush()
}
}
func (s *bufferedSender) Flush(_ context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.closed {
s.flush()
}
return nil
}
// Close writes any buffered messages to the underlying Sender. This does not
// close the underlying sender.
func (s *bufferedSender) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return nil
}
s.cancel()
if len(s.buffer) > 0 {
s.flush()
}
s.closed = true
return nil
}
func (s *bufferedSender) intervalFlush(ctx context.Context, interval time.Duration) {
timer := time.NewTimer(interval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
func() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.buffer) > 0 && time.Since(s.lastFlush) >= interval {
s.flush()
}
}()
_ = timer.Reset(interval)
}
}
}
func (s *bufferedSender) flush() {
if len(s.buffer) == 1 {
s.Sender.Send(s.buffer[0])
} else {
s.Sender.Send(message.MakeGroupComposer(s.buffer))
}
s.buffer = []message.Composer{}
s.lastFlush = time.Now()
}