-
Notifications
You must be signed in to change notification settings - Fork 19
/
node.go
129 lines (104 loc) · 3.11 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package p2p
import (
"context"
"fmt"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/kava-labs/kava-bridge/relayer/p2p/service"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/pnet"
noise "github.com/libp2p/go-libp2p-noise"
"github.com/libp2p/go-tcp-transport"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("p2p")
type Node struct {
Host host.Host
EchoService *service.EchoService
done chan bool
}
func NewNode(options NodeOptions, done chan bool) (*Node, error) {
libp2pOpts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", options.Port)),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.PrivateNetwork(options.NetworkPrivateKey),
libp2p.Identity(options.NodePrivateKey),
libp2p.DisableRelay(),
libp2p.Security(noise.ID, noise.New),
}
pnet.ForcePrivateNetwork = true
host, err := libp2p.New(libp2pOpts...)
if err != nil {
return nil, err
}
node := &Node{
Host: host,
// Sets stream handler
EchoService: service.NewEchoService(host, done, options.EchoRequiredPeers),
done: done,
}
service.NewEchoService(host, done, options.EchoRequiredPeers)
registerNotifiees(host)
return node, nil
}
func (n Node) GetMultiAddress() ([]ma.Multiaddr, error) {
peerInfo := peer.AddrInfo{
ID: n.Host.ID(),
Addrs: n.Host.Addrs(),
}
return peer.AddrInfoToP2pAddrs(&peerInfo)
}
func (n Node) ConnectToPeers(ctx context.Context, peerAddrInfos []*peer.AddrInfo) error {
for _, peer := range peerAddrInfos {
// Ignore self
if n.Host.ID() == peer.ID {
continue
}
// TODO: Determine TTL for peer
n.Host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.RecentlyConnectedAddrTTL)
// Retry connection 10 times to account for peers starting later than others.
// This is not entirely necessary as using a service will also connect
// to the node, but connecting prior to making requests ensures that
// this node can connect to the peer.
err := retry(10, time.Second, func() error {
return n.Host.Connect(ctx, *peer)
})
if err != nil {
return err
}
}
return nil
}
func (n Node) EchoPeers(ctx context.Context) error {
log.Debugf("sending echo to %d peers", len(n.Host.Peerstore().Peers())-1)
for _, peerID := range n.Host.Peerstore().Peers() {
// Skip self
if n.Host.ID() == peerID {
continue
}
res, err := n.EchoService.Echo(ctx, peerID, "hello world!\n")
if err != nil {
return err
}
log.Info("received echo response: ", res)
}
return nil
}
// Close cleans up and stops the node
func (n Node) Close() error {
return n.Host.Close()
}
func registerNotifiees(host host.Host) {
var notifee network.NotifyBundle
notifee.ConnectedF = func(net network.Network, conn network.Conn) {
log.Info("connected to peer: ", conn.RemotePeer())
}
notifee.DisconnectedF = func(net network.Network, conn network.Conn) {
log.Info("disconnected from peer: ", conn.RemotePeer())
}
host.Network().Notify(¬ifee)
}