/
message.go
115 lines (94 loc) · 2.33 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package swarm
import (
"bytes"
"encoding/gob"
)
type messageType int
const (
sharedValue messageType = iota
broadcast
)
type message struct {
Type messageType
Source string
Key string
Value interface{}
}
type outgoingMessage struct {
message *message
encoded []byte
}
type Message struct {
Source string
Value interface{}
}
type reqOutgoing struct {
overhead int
limit int
ret chan [][]byte
}
// mlDelegate is a memberlist delegate
type mlDelegate struct {
meta []byte
outgoing chan<- reqOutgoing
incoming chan<- []byte
}
type sharedValues map[string]map[string]interface{}
type valueReq struct {
key string
ret chan map[string]interface{}
}
// NodeMeta implements a memberlist delegate
func (d *mlDelegate) NodeMeta(limit int) []byte {
if len(d.meta) > limit {
// TODO: would nil better here?
// documentation is unclear
return d.meta[:limit]
}
return d.meta
}
// NotifyMsg implements a memberlist delegate
func (d *mlDelegate) NotifyMsg(m []byte) {
d.incoming <- m
}
// GetBroadcasts implements a memberlist delegate
// TODO: verify over TCP-only
func (d *mlDelegate) GetBroadcasts(overhead, limit int) [][]byte {
req := reqOutgoing{
overhead: overhead,
limit: limit,
ret: make(chan [][]byte),
}
d.outgoing <- req
return <-req.ret
}
// LocalState implements a memberlist delegate
func (d *mlDelegate) LocalState(bool) []byte { return nil }
// MergeRemoteState implements a memberlist delegate
func (d *mlDelegate) MergeRemoteState(buf []byte, join bool) {}
// the top level map is used internally, we can use it as mutable
// the leaf maps are shared, we need to clone those
func (sv sharedValues) set(source, key string, value interface{}) {
prev := sv[key]
sv[key] = make(map[string]interface{})
for s, v := range prev {
sv[key][s] = v
}
sv[key][source] = value
}
func encodeMessage(m *message) ([]byte, error) {
// we're not saving the encoder together with the connections, because
// even if the reflection info would be cached, it's very fragile and
// complicated. These messages should be small, it should be OK to pay
// this cost.
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(m)
return buf.Bytes(), err
}
func decodeMessage(b []byte) (*message, error) {
var m message
dec := gob.NewDecoder(bytes.NewBuffer(b))
err := dec.Decode(&m)
return &m, err
}