-
Notifications
You must be signed in to change notification settings - Fork 50
/
producer_daemon_batcher.go
95 lines (73 loc) · 2.21 KB
/
producer_daemon_batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package stream
import (
"fmt"
"github.com/justtrackio/gosoline/pkg/encoding/json"
)
type producerDaemonBatcher struct {
maxMessages int
maxBytes int
messages []WritableMessage
size int
}
//go:generate mockery --name ProducerDaemonBatcher
type ProducerDaemonBatcher interface {
Append(msg *Message) ([]WritableMessage, error)
Flush() []WritableMessage
}
type rawJsonMessage struct {
attributes map[string]string
body []byte
}
var _ json.Marshaler = rawJsonMessage{}
func (r rawJsonMessage) MarshalToBytes() ([]byte, error) {
return r.body, nil
}
func (r rawJsonMessage) MarshalToString() (string, error) {
return string(r.body), nil
}
func (r rawJsonMessage) MarshalJSON() ([]byte, error) {
return r.body, nil
}
func (r rawJsonMessage) GetAttributes() map[string]string {
return r.attributes
}
func NewProducerDaemonBatcher(settings ProducerDaemonSettings) ProducerDaemonBatcher {
return &producerDaemonBatcher{
maxMessages: settings.BatchSize,
maxBytes: settings.BatchMaxSize,
messages: make([]WritableMessage, 0, settings.BatchSize),
size: 0,
}
}
func (b *producerDaemonBatcher) Append(msg *Message) ([]WritableMessage, error) {
encodedMessage, err := json.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("failed to encode message for batch: %w", err)
}
var result []WritableMessage = nil
// if we can't fit this message in the batch, create a new one
// subtract 1 so if we can fit it exactly so, we do that and flush after adding it
if b.needsFlush(len(encodedMessage) - 1) {
result = b.Flush()
}
b.messages = append(b.messages, rawJsonMessage{
attributes: msg.Attributes,
body: encodedMessage,
})
b.size += len(encodedMessage)
// if this batch is already full (we added a message exactly the max batch size), flush that too
if b.needsFlush(0) {
result = append(result, b.Flush()...)
}
return result, nil
}
func (b *producerDaemonBatcher) needsFlush(nextSize int) bool {
newSize := b.size + nextSize
return len(b.messages) >= b.maxMessages || (b.maxBytes != 0 && newSize >= b.maxBytes)
}
func (b *producerDaemonBatcher) Flush() []WritableMessage {
result := b.messages
b.messages = make([]WritableMessage, 0, b.maxMessages)
b.size = 0
return result
}