-
-
Notifications
You must be signed in to change notification settings - Fork 16
/
dht.go
90 lines (79 loc) · 2.74 KB
/
dht.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//go:build !wasm
/*
Copyright 2023 Avi Zimmerman <avi.zimmerman@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package libp2p
import (
"fmt"
"sync"
"time"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/webmeshproj/webmesh/pkg/context"
)
// NewDHT returns a DHT for given host. If bootstrap peers is empty, the default
// bootstrap peers will be used.
func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Multiaddr, connectTimeout time.Duration) (*dht.IpfsDHT, error) {
if len(bootstrapPeers) == 0 {
bootstrapPeers = dht.DefaultBootstrapPeers
}
kaddht, err := newDHT(ctx, host, bootstrapPeers, connectTimeout)
return kaddht, err
}
func newDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Multiaddr, connectTimeout time.Duration) (*dht.IpfsDHT, error) {
kaddht, err := dht.New(ctx, host)
if err != nil {
return nil, fmt.Errorf("libp2p new dht: %w", err)
}
err = bootstrapDHT(ctx, host, kaddht, bootstrapPeers, connectTimeout)
if err != nil {
defer kaddht.Close()
return nil, fmt.Errorf("libp2p bootstrap dht: %w", err)
}
return kaddht, nil
}
func bootstrapDHT(ctx context.Context, host host.Host, kaddht *dht.IpfsDHT, servers []multiaddr.Multiaddr, connectTimeout time.Duration) error {
log := context.LoggerFrom(ctx)
err := kaddht.Bootstrap(ctx)
if err != nil {
return fmt.Errorf("libp2p dht bootstrap: %w", err)
}
var wg sync.WaitGroup
for _, peerAddr := range servers {
peerinfo, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil {
log.Debug("Failed to parse bootstrap peer address", "error", err.Error())
continue
}
wg.Add(1)
go func() {
defer wg.Done()
var connectCtx context.Context
var cancel context.CancelFunc
if connectTimeout > 0 {
connectCtx, cancel = context.WithTimeout(ctx, connectTimeout)
} else {
connectCtx, cancel = context.WithCancel(ctx)
}
defer cancel()
if err := host.Connect(connectCtx, *peerinfo); err != nil {
log.Debug("Failed to connect to DHT bootstrap peer", "error", err.Error())
return
}
log.Info("Connection established with bootstrap node", "node", peerinfo.String())
}()
}
wg.Wait()
return nil
}