Permalink
Switch branches/tags
Stebalien/splunking ci/jenkins-coverage ci/32bit dep/bump-go-cid-to-0.7.22 deps/update-ipld-cbor docs/ci-binaries exp/sync-concurrency experiment/provide+reprovide extract-merkledag-blockservice-path feat/add-wrap-exp feat/ai-mirror-updated feat/always-load-routing feat/announce-relay feat/bitswap-benchmark feat/bitswap/event-log feat/bs/notif feat/buffer-bitswap feat/cat-iptb-logs feat/ci-bench feat/ci-js-api feat/cl feat/cmds-dht feat/cmds-run-return-error feat/cmds0.5 feat/codec-agnostic-gc feat/commands2.0-CR-cleanup feat/commands2.0 feat/concurrent-pin-fetch feat/connmgr-cmds feat/coreapi-for-real-this-time feat/coreapi-gateway feat/coreapi-gateway2 feat/coreapi/dht feat/coreapi/opts feat/coreapi/remove-cat feat/coreapi/unixfs-dirs feat/corehttp-check-api-version feat/coreunix/event-log feat/dep feat/docs-global-opts feat/ed25519-ident feat/fast-ls feat/faster-filestore-verify feat/fetcher-interfaces feat/full-changelog feat/gateway-nofetch feat/gc/next-gen feat/get-tar-stdout feat/global-plugins feat/go-get-meta-tag feat/http-with-go-libp2p-http feat/http_proxy_over_p2p-cleanup-tests feat/improve-mfs feat/improve-p2p-docs feat/ipns-follow feat/ipns-over-dns feat/ls-session feat/mfs-fuse feat/pin-levels feat/proto-path-resolve feat/protobuf-2 feat/provider-strategies-1 feat/provider-strategies feat/pubsub-rename feat/pubsub-signing feat/quic feat/recurring-metrics feat/refactor-gateway-api feat/repo-size feat/rm-path-from-merkledag feat/rootro-whitelisting feat/swiftds-plugin feat/systemd-activation feat/trace-resolve feat/ungx feat/update-kad-dht feat/update-ws-fixes feat/zcash feat/0.4.15-changelog feat/0.4.15-release-notes features/dockerfile-ipfs-mount-4329 features/provider-strategies features/streaming-ls-5600 fix/add-panic-ctrl-d-2 fix/add-panic-ctrl-d fix/bitswap-lock-contention fix/ci/dist-gx fix/config-data-race fix/coreapi/addeventpath fix/coreunix/add-encod-json fix/disable-go10-build fix/disable-keepalives fix/dnslink-recursion fix/files/raw-leaves fix/flaky-sharness-api-file fix/gateway-flatfs-debug fix/gc/internal fix/less-noisy-boostrap-warning fix/multi-dir-add fix/name-test-swarm fix/offline-routing-validate fix/pin-stream fix/publisher-tests fix/reprovider/explosion fix/resolve-non-terminal fix/transports fix/unused-imports fix/use-gogo-filestore fix/177-c324290935 fix/2528-really fix/4543 fix/4837 fix/5089 fix/5288 fix/5309 fix/5436 fix/5458 fix/5703 go-contributing-docs gx/files2 gx/files2.0 gx/go-libp2p-routing-2.6 gx/release-0.4.16-rc3 gx/update-3ahlkz gx/update hack/fastls ipns-pubsub jenkinsfile-1 jenkinsfile keks-patch-1 kevina/better-error-messages kevina/filestore-get kevina/filestore-rm kevina/gateway-symlink kevina/go-ipfs-cmds-pr-test kevina/idhash kevina/kill-get-offline kevina/multi-file-add-fix kevina/multibase kevina/multibase2 kevina/multibase3 kevina/multibase4 kevina/multihash kevina/parallel-hamt-fetch kevina/repo-recreate-root kevina/statlocal-fix1 kevina/statlocal-fix2 kevina/strict-cids-wip kevina/strict-cids kevina/test kevina/validate-size kevina/0.4.18-changlog master misc/ci/win-test-results nit/directly-parse-peer-id p2p/report-remote refactor/commands/p2p release-0.4.15-rc1 release update/go-merkledag-1.0.1
Nothing to show
Find file Copy path
239 lines (200 sloc) 7.57 KB
package core
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"sync"
"time"
math2 "github.com/ipfs/go-ipfs/thirdparty/math2"
lgbl "gx/ipfs/QmQSREHX6CMoPkT5FfuA4A9cWSFwoKY8RAzjf5m7Gpdtqu/go-libp2p-loggables"
inet "gx/ipfs/QmPtFaR7BWHLAjSwLh9kXcyrgTzDpuhcWLkx8ioa9RMYnx/go-libp2p-net"
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
periodicproc "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/periodic"
peer "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer"
config "gx/ipfs/QmYyzmMnhNTtoXx5ttgUaRdHHckYnQWjPL98hgLAR2QLDD/go-ipfs-config"
pstore "gx/ipfs/QmZ9zH2FnLcxv1xyzFeUpDUeo55xEhZQHgveZijcxr7TLj/go-libp2p-peerstore"
host "gx/ipfs/QmfD51tKgJiTMnW9JEiDiPwsCY4mqUoxkhKhBfyW12spTC/go-libp2p-host"
)
// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap
// peers to bootstrap correctly.
var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap")
// BootstrapConfig specifies parameters used in an IpfsNode's network
// bootstrapping process.
type BootstrapConfig struct {
// MinPeerThreshold governs whether to bootstrap more connections. If the
// node has less open connections than this number, it will open connections
// to the bootstrap nodes. From there, the routing system should be able
// to use the connections to the bootstrap nodes to connect to even more
// peers. Routing systems like the IpfsDHT do so in their own Bootstrap
// process, which issues random queries to find more peers.
MinPeerThreshold int
// Period governs the periodic interval at which the node will
// attempt to bootstrap. The bootstrap process is not very expensive, so
// this threshold can afford to be small (<=30s).
Period time.Duration
// ConnectionTimeout determines how long to wait for a bootstrap
// connection attempt before cancelling it.
ConnectionTimeout time.Duration
// BootstrapPeers is a function that returns a set of bootstrap peers
// for the bootstrap process to use. This makes it possible for clients
// to control the peers the process uses at any moment.
BootstrapPeers func() []pstore.PeerInfo
}
// DefaultBootstrapConfig specifies default sane parameters for bootstrapping.
var DefaultBootstrapConfig = BootstrapConfig{
MinPeerThreshold: 4,
Period: 30 * time.Second,
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
}
func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig {
cfg := DefaultBootstrapConfig
cfg.BootstrapPeers = func() []pstore.PeerInfo {
return pis
}
return cfg
}
// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically
// check the number of open connections and -- if there are too few -- initiate
// connections to well-known bootstrap peers. It also kicks off subsystem
// bootstrapping (i.e. routing).
func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
// make a signal to wait for one bootstrap round to complete.
doneWithRound := make(chan struct{})
if len(cfg.BootstrapPeers()) == 0 {
// We *need* to bootstrap but we have no bootstrap peers
// configured *at all*, inform the user.
log.Error("no bootstrap nodes configured: go-ipfs may have difficulty connecting to the network")
}
// the periodic bootstrap function -- the connection supervisor
periodic := func(worker goprocess.Process) {
ctx := procctx.OnClosingContext(worker)
defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done()
if err := bootstrapRound(ctx, n.PeerHost, cfg); err != nil {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Debugf("%s bootstrap error: %s", n.Identity, err)
}
<-doneWithRound
}
// kick off the node's periodic bootstrapping
proc := periodicproc.Tick(cfg.Period, periodic)
proc.Go(periodic) // run one right now.
// kick off Routing.Bootstrap
if n.Routing != nil {
ctx := procctx.OnClosingContext(proc)
if err := n.Routing.Bootstrap(ctx); err != nil {
proc.Close()
return nil, err
}
}
doneWithRound <- struct{}{}
close(doneWithRound) // it no longer blocks periodic
return proc, nil
}
func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) error {
ctx, cancel := context.WithTimeout(ctx, cfg.ConnectionTimeout)
defer cancel()
id := host.ID()
// get bootstrap peers from config. retrieving them here makes
// sure we remain observant of changes to client configuration.
peers := cfg.BootstrapPeers()
// determine how many bootstrap connections to open
connected := host.Network().Peers()
if len(connected) >= cfg.MinPeerThreshold {
log.Event(ctx, "bootstrapSkip", id)
log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes",
id, len(connected), cfg.MinPeerThreshold)
return nil
}
numToDial := cfg.MinPeerThreshold - len(connected)
// filter out bootstrap nodes we are already connected to
var notConnected []pstore.PeerInfo
for _, p := range peers {
if host.Network().Connectedness(p.ID) != inet.Connected {
notConnected = append(notConnected, p)
}
}
// if connected to all bootstrap peer candidates, exit
if len(notConnected) < 1 {
log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial)
return ErrNotEnoughBootstrapPeers
}
// connect to a random susbset of bootstrap candidates
randSubset := randomSubsetOfPeers(notConnected, numToDial)
defer log.EventBegin(ctx, "bootstrapStart", id).Done()
log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset)
return bootstrapConnect(ctx, host, randSubset)
}
func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo) error {
if len(peers) < 1 {
return ErrNotEnoughBootstrapPeers
}
errs := make(chan error, len(peers))
var wg sync.WaitGroup
for _, p := range peers {
// performed asynchronously because when performed synchronously, if
// one `Connect` call hangs, subsequent calls are more likely to
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.
wg.Add(1)
go func(p pstore.PeerInfo) {
defer wg.Done()
defer log.EventBegin(ctx, "bootstrapDial", ph.ID(), p.ID).Done()
log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID)
ph.Peerstore().AddAddrs(p.ID, p.Addrs, pstore.PermanentAddrTTL)
if err := ph.Connect(ctx, p); err != nil {
log.Event(ctx, "bootstrapDialFailed", p.ID)
log.Debugf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
return
}
log.Event(ctx, "bootstrapDialSuccess", p.ID)
log.Infof("bootstrapped with %v", p.ID)
}(p)
}
wg.Wait()
// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close(errs)
count := 0
var err error
for err = range errs {
if err != nil {
count++
}
}
if count == len(peers) {
return fmt.Errorf("failed to bootstrap. %s", err)
}
return nil
}
func toPeerInfos(bpeers []config.BootstrapPeer) []pstore.PeerInfo {
pinfos := make(map[peer.ID]*pstore.PeerInfo)
for _, bootstrap := range bpeers {
pinfo, ok := pinfos[bootstrap.ID()]
if !ok {
pinfo = new(pstore.PeerInfo)
pinfos[bootstrap.ID()] = pinfo
pinfo.ID = bootstrap.ID()
}
pinfo.Addrs = append(pinfo.Addrs, bootstrap.Transport())
}
var peers []pstore.PeerInfo
for _, pinfo := range pinfos {
peers = append(peers, *pinfo)
}
return peers
}
func randomSubsetOfPeers(in []pstore.PeerInfo, max int) []pstore.PeerInfo {
n := math2.IntMin(max, len(in))
var out []pstore.PeerInfo
for _, val := range rand.Perm(len(in)) {
out = append(out, in[val])
if len(out) >= n {
break
}
}
return out
}