forked from moby/libnetwork
-
Notifications
You must be signed in to change notification settings - Fork 0
/
messages.go
147 lines (124 loc) · 3.98 KB
/
messages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package serf
import (
"bytes"
"github.com/hashicorp/go-msgpack/codec"
"time"
)
// messageType are the types of gossip messages Serf will send along
// memberlist.
type messageType uint8
const (
messageLeaveType messageType = iota
messageJoinType
messagePushPullType
messageUserEventType
messageQueryType
messageQueryResponseType
messageConflictResponseType
messageKeyRequestType
messageKeyResponseType
)
const (
// Ack flag is used to force receiver to send an ack back
queryFlagAck uint32 = 1 << iota
// NoBroadcast is used to prevent re-broadcast of a query.
// this can be used to selectively send queries to individual members
queryFlagNoBroadcast
)
// filterType is used with a queryFilter to specify the type of
// filter we are sending
type filterType uint8
const (
filterNodeType filterType = iota
filterTagType
)
// messageJoin is the message broadcasted after we join to
// associated the node with a lamport clock
type messageJoin struct {
LTime LamportTime
Node string
}
// messageLeave is the message broadcasted to signal the intentional to
// leave.
type messageLeave struct {
LTime LamportTime
Node string
}
// messagePushPullType is used when doing a state exchange. This
// is a relatively large message, but is sent infrequently
type messagePushPull struct {
LTime LamportTime // Current node lamport time
StatusLTimes map[string]LamportTime // Maps the node to its status time
LeftMembers []string // List of left nodes
EventLTime LamportTime // Lamport time for event clock
Events []*userEvents // Recent events
QueryLTime LamportTime // Lamport time for query clock
}
// messageUserEvent is used for user-generated events
type messageUserEvent struct {
LTime LamportTime
Name string
Payload []byte
CC bool // "Can Coalesce". Zero value is compatible with Serf 0.1
}
// messageQuery is used for query events
type messageQuery struct {
LTime LamportTime // Event lamport time
ID uint32 // Query ID, randomly generated
Addr []byte // Source address, used for a direct reply
Port uint16 // Source port, used for a direct reply
Filters [][]byte // Potential query filters
Flags uint32 // Used to provide various flags
Timeout time.Duration // Maximum time between delivery and response
Name string // Query name
Payload []byte // Query payload
}
// Ack checks if the ack flag is set
func (m *messageQuery) Ack() bool {
return (m.Flags & queryFlagAck) != 0
}
// NoBroadcast checks if the no broadcast flag is set
func (m *messageQuery) NoBroadcast() bool {
return (m.Flags & queryFlagNoBroadcast) != 0
}
// filterNode is used with the filterNodeType, and is a list
// of node names
type filterNode []string
// filterTag is used with the filterTagType and is a regular
// expression to apply to a tag
type filterTag struct {
Tag string
Expr string
}
// messageQueryResponse is used to respond to a query
type messageQueryResponse struct {
LTime LamportTime // Event lamport time
ID uint32 // Query ID
From string // Node name
Flags uint32 // Used to provide various flags
Payload []byte // Optional response payload
}
// Ack checks if the ack flag is set
func (m *messageQueryResponse) Ack() bool {
return (m.Flags & queryFlagAck) != 0
}
func decodeMessage(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle
return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
}
func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(t))
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(msg)
return buf.Bytes(), err
}
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(f))
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(filt)
return buf.Bytes(), err
}