diff --git a/dht.go b/dht.go index 8c848be90..d3ee3a8c4 100644 --- a/dht.go +++ b/dht.go @@ -42,6 +42,10 @@ import ( var logger = logging.Logger("dht") +// DynamicModeSwitchDebouncePeriod is the amount of time to wait before making a dynamic dht mode switch effective. +// It helps mitigate flapping from routability events. +var DynamicModeSwitchDebouncePeriod = 1 * time.Minute + // NumBootstrapQueries defines the number of random dht queries to do to // collect members of the routing table. const NumBootstrapQueries = 5 @@ -83,10 +87,6 @@ type IpfsDHT struct { modeLk sync.Mutex bucketSize int - - subscriptions struct { - evtPeerIdentification event.Subscription - } } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -117,15 +117,13 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er } } - dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { - if dht.subscriptions.evtPeerIdentification != nil { - _ = dht.subscriptions.evtPeerIdentification.Close() - } - return nil - }) + dht.proc = goprocessctx.WithContext(ctx) // register for network notifs. - dht.proc.AddChild((*subscriberNotifee)(dht).Process()) + dht.proc.Go((*subscriberNotifee)(dht).subscribe) + + // switch modes dynamically based on local routability changes. + dht.proc.Go(dht.dynamicModeSwitching) // handle protocol changes dht.proc.Go(dht.handleProtocolChanges) @@ -184,13 +182,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p bucketSize: bucketSize, } - var err error - evts := []interface{}{&event.EvtPeerIdentificationCompleted{}, &event.EvtPeerIdentificationFailed{}} - dht.subscriptions.evtPeerIdentification, err = h.EventBus().Subscribe(evts, eventbus.BufSize(256)) - if err != nil { - logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err) - } - dht.ctx = dht.newContextWithLocalTags(ctx) return dht @@ -495,6 +486,66 @@ func (dht *IpfsDHT) SetMode(m DHTMode) error { } } +func (dht *IpfsDHT) dynamicModeSwitching(proc goprocess.Process) { + evts := []interface{}{ + &event.EvtLocalRoutabilityPublic{}, + &event.EvtLocalRoutabilityPrivate{}, + &event.EvtLocalRoutabilityUnknown{}, + } + + sub, err := dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) + if err != nil { + logger.Errorf("dht not subscribed to local routability events; dynamic mode switching will not work; err: %s", err) + } + defer sub.Close() + + var ( + debouncer = time.NewTimer(0) + target DHTMode + ) + defer debouncer.Stop() + + stopTimer := func() { + if debouncer.Stop() { + return + } + select { + case <-debouncer.C: + default: + } + } + + stopTimer() + + for { + select { + case ev := <-sub.Out(): + switch ev.(type) { + case event.EvtLocalRoutabilityPrivate, event.EvtLocalRoutabilityUnknown: + target = ModeClient + case event.EvtLocalRoutabilityPublic: + target = ModeServer + } + stopTimer() + debouncer.Reset(DynamicModeSwitchDebouncePeriod) + logger.Infof("processed event %T; scheduled dht mode switch", ev) + + case <-debouncer.C: + err := dht.SetMode(target) + // NOTE: the mode will be printed out as a decimal. + if err == nil { + logger.Infof("switched DHT mode successfully; new mode: %d", target) + } else { + logger.Warningf("switching DHT mode failed; new mode: %d, err: %s", target, err) + } + target = 0 + + case <-proc.Closing(): + return + } + } +} + func (dht *IpfsDHT) moveToServerMode() error { dht.mode = ModeServer for _, p := range dht.protocols { diff --git a/dht_test.go b/dht_test.go index 0d9320d05..f0ce614ed 100644 --- a/dht_test.go +++ b/dht_test.go @@ -12,11 +12,12 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - multistream "github.com/multiformats/go-multistream" + "github.com/multiformats/go-multistream" "golang.org/x/xerrors" @@ -26,12 +27,12 @@ import ( opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" swarmt "github.com/libp2p/go-libp2p-swarm/testing" - ci "github.com/libp2p/go-libp2p-testing/ci" + "github.com/libp2p/go-libp2p-testing/ci" travisci "github.com/libp2p/go-libp2p-testing/ci/travis" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" ma "github.com/multiformats/go-multiaddr" @@ -1405,3 +1406,86 @@ func TestModeChange(t *testing.T) { err = clientOnly.Ping(ctx, clientToServer.PeerID()) assert.NotNil(t, err) } + +func TestDynamicModeSwitching(t *testing.T) { + // remove the debounce period. + DynamicModeSwitchDebouncePeriod = 0 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + prober := setupDHT(ctx, t, true) // our test harness + node := setupDHT(ctx, t, true) // the node under test + prober.Host().Peerstore().AddAddrs(node.PeerID(), node.Host().Addrs(), peerstore.AddressTTL) + if _, err := prober.Host().Network().DialPeer(ctx, node.PeerID()); err != nil { + t.Fatal(err) + } + + var err error + var emitters struct { + evtLocalRoutabilityPrivate event.Emitter + evtLocalRoutabilityPublic event.Emitter + evtLocalRoutabilityUnknown event.Emitter + } + + emitters.evtLocalRoutabilityPublic, err = node.host.EventBus().Emitter(new(event.EvtLocalRoutabilityPublic)) + if err != nil { + t.Fatal(err) + } + emitters.evtLocalRoutabilityPrivate, err = node.host.EventBus().Emitter(new(event.EvtLocalRoutabilityPrivate)) + if err != nil { + t.Fatal(err) + } + emitters.evtLocalRoutabilityUnknown, err = node.host.EventBus().Emitter(new(event.EvtLocalRoutabilityUnknown)) + if err != nil { + t.Fatal(err) + } + + assertDHTClient := func() { + err = prober.Ping(ctx, node.PeerID()) + assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) + if l := len(prober.RoutingTable().ListPeers()); l != 0 { + t.Errorf("expected routing table length to be 0; instead is %d", l) + } + } + + assertDHTServer := func() { + err = prober.Ping(ctx, node.PeerID()) + assert.Nil(t, err) + if l := len(prober.RoutingTable().ListPeers()); l != 1 { + t.Errorf("expected routing table length to be 1; instead is %d", l) + } + } + + emitters.evtLocalRoutabilityPrivate.Emit(event.EvtLocalRoutabilityPrivate{}) + time.Sleep(500 * time.Millisecond) + + assertDHTClient() + + emitters.evtLocalRoutabilityPublic.Emit(event.EvtLocalRoutabilityPublic{}) + time.Sleep(500 * time.Millisecond) + + assertDHTServer() + + emitters.evtLocalRoutabilityUnknown.Emit(event.EvtLocalRoutabilityUnknown{}) + time.Sleep(500 * time.Millisecond) + + assertDHTClient() + + ////////////////////////////////////////////////////////////// + // let's activate the debounce, to check we do not flap. + // receiving a "routability public" event should have no immediate effect now. + DynamicModeSwitchDebouncePeriod = 2 * time.Second + + emitters.evtLocalRoutabilityPublic.Emit(event.EvtLocalRoutabilityPublic{}) + time.Sleep(500 * time.Millisecond) + + // the debounce has prevented us from switching modes too soon. + assertDHTClient() + + // wait so that the debounce fires. + time.Sleep(1750 * time.Millisecond) + + // after the debounce period elapses, we will have switched modes. + assertDHTServer() +} diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 9e83cf04a..f54b2ee7a 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -2,6 +2,7 @@ package dht import ( "github.com/jbenet/goprocess" + "github.com/libp2p/go-eventbus" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/network" ma "github.com/multiformats/go-multiaddr" @@ -16,23 +17,27 @@ func (nn *subscriberNotifee) DHT() *IpfsDHT { return (*IpfsDHT)(nn) } -func (nn *subscriberNotifee) Process() goprocess.Process { +func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { dht := nn.DHT() - proc := goprocess.Go(nn.subscribe) dht.host.Network().Notify(nn) - proc.SetTeardown(func() error { - dht.host.Network().StopNotify(nn) - return nil - }) - return proc -} + defer dht.host.Network().StopNotify(nn) + + var err error + evts := []interface{}{ + &event.EvtPeerIdentificationCompleted{}, + &event.EvtPeerIdentificationFailed{}, + } + + sub, err := dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) + if err != nil { + logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err) + } + defer sub.Close() -func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { - dht := nn.DHT() for { select { - case evt, more := <-dht.subscriptions.evtPeerIdentification.Out(): + case evt, more := <-sub.Out(): if !more { return }