/
message.go
70 lines (56 loc) · 1.35 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package reducer
import "fmt"
var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)
// Message is used to wrap the data return by reduce functions
type Message struct {
value []byte
keys []string
tags []string
}
// NewMessage creates a Message with value
func NewMessage(value []byte) Message {
return Message{value: value}
}
// MessageToDrop creates a Message to be dropped
func MessageToDrop() Message {
return Message{value: []byte{}, tags: []string{DROP}}
}
// WithKeys is used to assign the keys to the message
func (m Message) WithKeys(keys []string) Message {
m.keys = keys
return m
}
// WithTags is used to assign the tags to the message
// tags will be used for conditional forwarding
func (m Message) WithTags(tags []string) Message {
m.tags = tags
return m
}
// Keys returns message keys
func (m Message) Keys() []string {
return m.keys
}
// Value returns message value
func (m Message) Value() []byte {
return m.value
}
// Tags returns message tags
func (m Message) Tags() []string {
return m.tags
}
type Messages []Message
// MessagesBuilder returns an empty instance of Messages
func MessagesBuilder() Messages {
return Messages{}
}
// Append appends a Message
func (m Messages) Append(msg Message) Messages {
m = append(m, msg)
return m
}
// Items returns the message list
func (m Messages) Items() []Message {
return m
}