-
Notifications
You must be signed in to change notification settings - Fork 19
/
echo.go
98 lines (80 loc) · 2.2 KB
/
echo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package service
import (
"bufio"
"bytes"
"context"
"fmt"
"io/ioutil"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
var log = logging.Logger(ProtocolID)
const (
ProtocolID = "/kava-relayer/echo/1.0.0"
ServiceName = "kava-relayer.echo"
)
type EchoService struct {
Host host.Host
peers map[peer.ID]bool
minPeers int
done chan bool
}
func NewEchoService(h host.Host, done chan bool, minPeers int) *EchoService {
es := &EchoService{
Host: h,
peers: make(map[peer.ID]bool),
minPeers: minPeers,
done: done,
}
// Labled as (Thread-safe), but double check if we need to use a sync.Mutex
// in EchoService for state modifications in EchoHandler
h.SetStreamHandler(ProtocolID, es.onEchoRequest)
return es
}
func (es *EchoService) onEchoRequest(s network.Stream) {
log.Debugw("listener received new echo stream", "peerID", s.Conn().RemotePeer())
es.peers[s.Conn().RemotePeer()] = true
if err := doEcho(s); err != nil {
log.Error(err)
// Close both ends of stream, log to debug instead of error as it's not
// an important err
err = s.Reset()
log.Debugf("error closing stream: %s", err)
} else {
s.Close()
}
if len(es.peers) >= es.minPeers {
es.done <- true
}
}
func (es *EchoService) Echo(ctx context.Context, peerID peer.ID, payload string) (string, error) {
stream, err := es.Host.NewStream(network.WithUseTransient(ctx, "echo"), peerID, ProtocolID)
if err != nil {
return "", err
}
_, err = stream.Write([]byte(payload))
if err != nil {
return "", fmt.Errorf("failed to write to stream: %s", err)
}
out, err := ioutil.ReadAll(stream)
if err != nil {
return "", fmt.Errorf("failed to read from stream: %s", err)
}
if !bytes.Equal(out, []byte(payload)) {
return "", fmt.Errorf("echoed data was %s, expected %s", string(out), payload)
}
return fmt.Sprintf("%q", out), nil
}
// doEcho reads a line of data a stream and writes it back
func doEcho(s network.Stream) error {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
return err
}
log.Debugf("read: %s", str)
_, err = s.Write([]byte(str))
return err
}