Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Discovery #184

Merged
merged 3 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// get the initial RPC containing all of our subscriptions to send to new peers
func (p *PubSub) getHelloPacket() *RPC {
var rpc RPC
for t := range p.myTopics {
for t := range p.mySubs {
as := &pb.RPC_SubOpts{
Topicid: proto.String(t),
Subscribe: proto.Bool(true),
Expand Down
336 changes: 336 additions & 0 deletions discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
package pubsub

import (
"context"
"math/rand"
"time"

"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
discimpl "github.com/libp2p/go-libp2p-discovery"
)

var (
// poll interval

// DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
DiscoveryPollInitialDelay = 0 * time.Millisecond
// DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
// more peers are needed for any topic
DiscoveryPollInterval = 1 * time.Second
)

type DiscoverOpt func(*discoverOptions) error

type discoverOptions struct {
connFactory BackoffConnectorFactory
opts []discovery.Option
}

func defaultDiscoverOptions() *discoverOptions {
rng := rand.New(rand.NewSource(rand.Int63()))
minBackoff, maxBackoff := time.Second*10, time.Hour
cacheSize := 100
dialTimeout := time.Minute * 2
discoverOpts := &discoverOptions{
connFactory: func(host host.Host) (*discimpl.BackoffConnector, error) {
backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rng)
return discimpl.NewBackoffConnector(host, cacheSize, dialTimeout, backoff)
},
}

return discoverOpts
}

// discover represents the discovery pipeline.
// The discovery pipeline handles advertising and discovery of peers
type discover struct {
p *PubSub

// discovery assists in discovering and advertising peers for a topic
discovery discovery.Discovery

// advertising tracks which topics are being advertised
advertising map[string]context.CancelFunc

// discoverQ handles continuing peer discovery
discoverQ chan *discoverReq

// ongoing tracks ongoing discovery requests
ongoing map[string]struct{}

// done handles completion of a discovery request
done chan string

// connector handles connecting to new peers found via discovery
connector *discimpl.BackoffConnector

// options are the set of options to be used to complete struct construction in Start
options *discoverOptions
}

// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size.
// The router ultimately decides the whether it is ready or not, the given size is just a suggestion.
func MinTopicSize(size int) RouterReady {
return func(rt PubSubRouter, topic string) (bool, error) {
return rt.EnoughPeers(topic, size), nil
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Start attaches the discovery pipeline to a pubsub instance, initializes discovery and starts event loop
func (d *discover) Start(p *PubSub, opts ...DiscoverOpt) error {
if d.discovery == nil || p == nil {
return nil
}

d.p = p
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
d.advertising = make(map[string]context.CancelFunc)
d.discoverQ = make(chan *discoverReq, 32)
d.ongoing = make(map[string]struct{})
d.done = make(chan string)

conn, err := d.options.connFactory(p.host)
if err != nil {
return err
}
d.connector = conn

go d.discoverLoop()
go d.pollTimer()

return nil
}

func (d *discover) pollTimer() {
select {
case <-time.After(DiscoveryPollInitialDelay):
case <-d.p.ctx.Done():
return
}
Stebalien marked this conversation as resolved.
Show resolved Hide resolved

select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}

ticker := time.NewTicker(DiscoveryPollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}
case <-d.p.ctx.Done():
return
}
}
}

func (d *discover) requestDiscovery() {
for t := range d.p.myTopics {
if !d.p.rt.EnoughPeers(t, 0) {
d.discoverQ <- &discoverReq{topic: t, done: make(chan struct{}, 1)}
}
}
}

func (d *discover) discoverLoop() {
for {
select {
case discover := <-d.discoverQ:
topic := discover.topic

if _, ok := d.ongoing[topic]; ok {
discover.done <- struct{}{}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
continue
}

d.ongoing[topic] = struct{}{}

go func() {
d.handleDiscovery(d.p.ctx, topic, discover.opts)
select {
case d.done <- topic:
case <-d.p.ctx.Done():
}
discover.done <- struct{}{}
}()
case topic := <-d.done:
delete(d.ongoing, topic)
case <-d.p.ctx.Done():
return
}
}
}

// Advertise advertises this node's interest in a topic to a discovery service. Advertise is not thread-safe.
func (d *discover) Advertise(topic string) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if d.discovery == nil {
return
}

advertisingCtx, cancel := context.WithCancel(d.p.ctx)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

if _, ok := d.advertising[topic]; ok {
cancel()
return
}
d.advertising[topic] = cancel

go func() {
next, err := d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}

t := time.NewTimer(next)
for {
select {
case <-t.C:
next, err = d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
}
t.Reset(next)
case <-advertisingCtx.Done():
t.Stop()
return
}
}
}()
}

// StopAdvertise stops advertising this node's interest in a topic. StopAdvertise is not thread-safe.
func (d *discover) StopAdvertise(topic string) {
if d.discovery == nil {
return
}

if advertiseCancel, ok := d.advertising[topic]; ok {
advertiseCancel()
delete(d.advertising, topic)
}
}

// Discover searches for additional peers interested in a given topic
func (d *discover) Discover(topic string, opts ...discovery.Option) {
if d.discovery == nil {
return
}

d.discoverQ <- &discoverReq{topic, opts, make(chan struct{}, 1)}
}

// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise.
func (d *discover) Bootstrap(ctx context.Context, topic string, ready RouterReady, opts ...discovery.Option) bool {
if d.discovery == nil {
return true
}

t := time.NewTimer(time.Hour)
if !t.Stop() {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
<-t.C
}

for {
// Check if ready for publishing
bootstrapped := make(chan bool, 1)
select {
case d.p.eval <- func() {
done, _ := ready(d.p.rt, topic)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
bootstrapped <- done
}:
if <-bootstrapped {
return true
}
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}

// If not ready discover more peers
disc := &discoverReq{topic, opts, make(chan struct{}, 1)}
select {
case d.discoverQ <- disc:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}

select {
case <-disc.done:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to spawn a bunch of goroutines. We really do need to listen on peer join/leave events to determine when we're done bootstrapping. At the moment:

  1. This is going to fire once the DHT query completes, regardless of whether or not we're done connecting to peers.
  2. We're going to hit that 100ms sleep.
  3. We're then going to restart the loop, see that we aren't "ready" because we haven't finished connecting yet.
  4. We're going to spawn another discovery process.
  5. We're going to spawn a bunch of goroutines to connect to the same peers.

The right way to do this is:

  1. When we want more peers, signal that we want to discover more peers.
  2. When we don't need any more peers, signal that we're happy with the number of peers we have.

Note: We can probably also find a way to hack in a fix but, at the very least, we can't spawn a bunch of goroutines connecting to the same peers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emitting an "I don't need more peers" signal is likely much more invasive then just checking from the outside. However, if we're certain that the "how many peers" functions that we care about is static per PubSub instance then we can use the PeerJoin events instead of time.Sleep(100 ms) (although if the number of peers we wait on can change over time this won't work).

We're going to spawn another discovery process.

This won't be very expensive since it should be backed by a backoff cache of some sort anyhow.

We're going to spawn a bunch of goroutines to connect to the same peers.

I'm fine replacing this with storing a local cache of queried peers. I only left it this way bc I though when we talked about this previously that you didn't want to store that state locally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this on a call. It's going to be fine as-is as long as we remember which peers we're currently dialing and/or have recently dialed.

case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}

t.Reset(time.Millisecond * 100)
select {
case <-t.C:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
}
}

func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) {
discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

peerCh, err := d.discovery.FindPeers(discoverCtx, topic, opts...)
if err != nil {
log.Debugf("error finding peers for topic %s: %v", topic, err)
return
}

d.connector.Connect(ctx, peerCh)
}

type discoverReq struct {
topic string
opts []discovery.Option
done chan struct{}
}

type pubSubDiscovery struct {
discovery.Discovery
opts []discovery.Option
}

func (d *pubSubDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return d.Discovery.Advertise(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
}

func (d *pubSubDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
return d.Discovery.FindPeers(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
}

// WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt {
return func(d *discoverOptions) error {
d.opts = opts
return nil
}
}

// BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)

// WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt {
return func(d *discoverOptions) error {
d.connFactory = connFactory
return nil
}
}