This repository has been archived by the owner on Aug 17, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
80 lines (73 loc) · 2 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package subpub
import (
"bufio"
"github.com/libp2p/go-libp2p-core/network"
"gitlab.com/vocdoni/go-dvote/log"
)
func (ps *SubPub) handleStream(stream network.Stream) {
// First, ensure that any messages read from the stream are sent to the
// SubPub.Reader channel.
go ps.readHandler(bufio.NewReader(stream))
// Second, ensure that, from now on, any broadcast message is sent to
// this stream as well.
// Allow up to 8 queued broadcast messages per peer. This allows us to
// concurrently spread broadcast messages without blocking, falling back
// to dropping messages if any peer is slow or disconnects.
// TODO(mvdan): if another peer opens a stream with us, to just send
// us a single message (unicast), it's wasteful to also send broadcasts
// back via that stream.
write := make(chan []byte, 8)
pid := stream.Conn().RemotePeer()
ps.PeersMu.Lock()
defer ps.PeersMu.Unlock()
ps.Peers = append(ps.Peers, peerSub{pid, write})
if fn := ps.onPeerAdd; fn != nil {
fn(pid)
}
log.Infof("connected to peer %s", pid)
go ps.broadcastHandler(write, bufio.NewWriter(stream))
}
func (ps *SubPub) broadcastHandler(write <-chan []byte, w *bufio.Writer) {
for {
select {
case <-ps.close:
return
case msg := <-write:
if err := ps.SendMessage(w, msg); err != nil {
log.Debugf("error writing to buffer: (%s)", err)
return
}
if err := w.Flush(); err != nil {
log.Debugf("error flushing write buffer: (%s)", err)
return
}
}
}
}
func (ps *SubPub) readHandler(r *bufio.Reader) {
for {
select {
case <-ps.close:
return
default:
// continues below
}
message, err := r.ReadBytes(byte(delimiter))
if err != nil {
log.Debugf("error reading from buffer: %s", err)
return
}
// Remove delimiter
message = message[:len(message)-1]
if !ps.Private {
var ok bool
message, ok = ps.decrypt(string(message))
if !ok {
log.Warn("cannot decrypt message")
continue
}
}
log.Debugf("message received: %s", message)
go func() { ps.Reader <- message }()
}
}