forked from perlin-network/noise
/
protocol.go
141 lines (109 loc) · 3.32 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Package gossip is a simple implementation of a gossip protocol for noise. It keeps track of a cache of messages
// sent/received to/from peers to avoid re-gossiping particular messages to specific peers.
package gossip
import (
"context"
"sync"
"github.com/VictoriaMetrics/fastcache"
"github.com/mbilal92/noise"
"github.com/mbilal92/noise/kademlia"
)
const (
relayChanSize = 64
)
// Protocol implements a simple gossiping protocol that avoids resending messages to peers that it already believes
// is aware of particular messages that are being gossiped.
type Protocol struct {
node *noise.Node
overlay *kademlia.Protocol
events Events
broadcastChan chan Message
seen *fastcache.Cache
}
// New returns a new instance of a gossip protocol with 32MB of in-memory cache instantiated.
func New(overlay *kademlia.Protocol, opts ...Option) *Protocol {
p := &Protocol{
overlay: overlay,
seen: fastcache.New(32 << 20),
broadcastChan: make(chan Message, relayChanSize),
}
for _, opt := range opts {
opt(p)
}
return p
}
// Protocol returns a noise.Protocol that may registered to a node via (*noise.Node).Bind.
func (p *Protocol) Protocol() noise.Protocol {
return noise.Protocol{
VersionMajor: 0,
VersionMinor: 0,
VersionPatch: 0,
Bind: p.Bind,
}
}
// Bind registers a single message gossip.Message, and handles them by registering the (*Protocol).Handle Handler.
func (p *Protocol) Bind(node *noise.Node) error {
p.node = node
node.RegisterMessage(Message{}, UnmarshalMessage)
node.Handle(p.Handle)
return nil
}
// Push gossips a single message concurrently to all peers this node is aware of, on the condition that this node
// believes that the aforementioned peer has not received data before. A context may be provided to cancel Push, as it
// blocks the current goroutine until the gossiping of a single message is done. Any errors pushing a message to a
// particular peer is ignored.
func (p *Protocol) Push(ctx context.Context, data []byte) {
p.seen.Set(p.hash(p.node.ID(), data), nil)
peers := p.overlay.Table().Entries()
var wg sync.WaitGroup
wg.Add(len(peers))
for _, id := range peers {
id, key := id, p.hash(id, data)
go func() {
defer wg.Done()
if p.seen.Has(key) {
return
}
if err := p.node.SendMessage(ctx, id.Address, Message(data)); err != nil {
return
}
p.seen.Set(key, nil)
}()
}
wg.Wait()
}
// Handle implements noise.Protocol and handles gossip.Message messages.
func (p *Protocol) Handle(ctx noise.HandlerContext) error {
// fmt.Println("Gosip Handle")
if ctx.IsRequest() {
return nil
}
obj, err := ctx.DecodeMessage()
if err != nil {
return nil
}
msg, ok := obj.(Message)
if !ok {
return nil
}
p.seen.Set(p.hash(ctx.ID(), msg), nil) // Mark that the sender already has this data.
self := p.hash(p.node.ID(), msg)
if p.seen.Has(self) {
return nil
}
p.seen.Set(self, nil) // Mark that we already have this data.
p.broadcastChan <- msg
// if p.events.OnGossipReceived != nil {
// if err := p.events.OnGossipReceived(p.node, ctx.ID(), msg); err != nil {
// return err
// }
// }
p.Push(context.Background(), msg)
return nil
}
func (p *Protocol) hash(id noise.ID, data []byte) []byte {
return append(id.ID[:], data...)
}
func (p *Protocol) GetBroadcastChan() chan Message {
return p.broadcastChan
}