-
Notifications
You must be signed in to change notification settings - Fork 37
WIP Dialer v2: Pluggable and composable dialer #88
Changes from 2 commits
7ee900d
a76e2a5
2e1b16e
ed7a5dd
fa40242
f4e2dfe
2b36943
0da3288
baefc51
d79a0e3
9b74546
bb0b30a
35941a6
3ede9c6
321632e
8d64d92
6573c29
1bf1bf6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package dial | ||
|
||
import ( | ||
addrutil "github.com/libp2p/go-addr-util" | ||
ma "github.com/multiformats/go-multiaddr" | ||
) | ||
|
||
type AddrResolver struct { | ||
sFilters []func(ma.Multiaddr) bool | ||
dFilters []func(req *Request) func(ma.Multiaddr) bool | ||
} | ||
|
||
func DefaultStaticFilters() []func(ma.Multiaddr) bool { | ||
return []func(ma.Multiaddr) bool{ | ||
addrutil.AddrOverNonLocalIP, | ||
} | ||
} | ||
|
||
func DefaultDynamicFilters() []func(req *Request) func(ma.Multiaddr) bool { | ||
excludeOurAddrs := func(req *Request) func(ma.Multiaddr) bool { | ||
lisAddrs, _ := req.net.InterfaceListenAddresses() | ||
var ourAddrs []ma.Multiaddr | ||
for _, addr := range lisAddrs { | ||
protos := addr.Protocols() | ||
if len(protos) == 2 && (protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6) { | ||
// we're only sure about filtering out /ip4 and /ip6 addresses, so far | ||
ourAddrs = append(ourAddrs, addr) | ||
} | ||
} | ||
return addrutil.SubtractFilter(ourAddrs...) | ||
} | ||
|
||
return []func(req *Request) func(ma.Multiaddr) bool{ | ||
excludeOurAddrs, | ||
} | ||
} | ||
|
||
type AddrFilterFactory func(req *Request) []func(ma.Multiaddr) bool | ||
|
||
var _ RequestPreparer = (*validator)(nil) | ||
|
||
func NewAddrResolver(staticFilters []func(ma.Multiaddr) bool, dynamicFilters []func(req *Request) func(ma.Multiaddr) bool) RequestPreparer { | ||
return &AddrResolver{ | ||
sFilters: staticFilters, | ||
dFilters: dynamicFilters, | ||
} | ||
} | ||
|
||
func (m *AddrResolver) Prepare(req *Request) { | ||
req.addrs = req.net.Peerstore().Addrs(req.id) | ||
if len(req.addrs) == 0 { | ||
return | ||
} | ||
|
||
// apply the static filters. | ||
req.addrs = addrutil.FilterAddrs(req.addrs, m.sFilters...) | ||
if len(m.dFilters) == 0 { | ||
return | ||
} | ||
|
||
// apply the dynamic filters. | ||
var dFilters = make([]func(multiaddr ma.Multiaddr) bool, 0, len(m.dFilters)) | ||
for _, df := range m.dFilters { | ||
dFilters = append(dFilters, df(req)) | ||
} | ||
req.addrs = addrutil.FilterAddrs(req.addrs, dFilters...) | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,147 @@ | ||||||
package dial | ||||||
|
||||||
import ( | ||||||
"errors" | ||||||
"sync" | ||||||
"time" | ||||||
|
||||||
logging "github.com/ipfs/go-log" | ||||||
"github.com/libp2p/go-libp2p-peer" | ||||||
) | ||||||
|
||||||
var log = logging.Logger("swarm") | ||||||
|
||||||
// ErrDialBackoff is returned by the backoff code when a given peer has | ||||||
// been dialed too frequently | ||||||
var ErrDialBackoff = errors.New("dial backoff") | ||||||
|
||||||
// BackoffBase is the base amount of time to backoff (default: 5s). | ||||||
var BackoffBase = time.Second * 5 | ||||||
|
||||||
// BackoffCoef is the backoff coefficient (default: 1s). | ||||||
var BackoffCoef = time.Second | ||||||
|
||||||
// BackoffMax is the maximum backoff time (default: 5m). | ||||||
var BackoffMax = time.Minute * 5 | ||||||
|
||||||
// Backoff is a struct used to avoid over-dialing the same, dead peers. | ||||||
// Whenever we totally time out on a peer (all three attempts), we add them | ||||||
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they | ||||||
// check dialbackoff. If it's there, they don't wait and exit promptly with | ||||||
// an error. (the single goroutine that is actually dialing continues to | ||||||
// dial). If a dial is successful, the peer is removed from backoff. | ||||||
// Example: | ||||||
// | ||||||
// for { | ||||||
// if ok, wait := dialsync.Lock(p); !ok { | ||||||
// if backoff.Backoff(p) { | ||||||
// return errDialFailed | ||||||
// } | ||||||
// <-wait | ||||||
// continue | ||||||
// } | ||||||
// defer dialsync.Unlock(p) | ||||||
// c, err := actuallyDial(p) | ||||||
// if err != nil { | ||||||
// dialbackoff.AddBackoff(p) | ||||||
// continue | ||||||
// } | ||||||
// dialbackoff.Clear(p) | ||||||
// } | ||||||
// | ||||||
|
||||||
// DialBackoff is a type for tracking peer dial backoffs. | ||||||
// | ||||||
// * It's safe to use it's zero value. | ||||||
// * It's thread-safe. | ||||||
// * It's *not* safe to move this type after using. | ||||||
type Backoff struct { | ||||||
entries map[peer.ID]*backoffPeer | ||||||
lock sync.RWMutex | ||||||
} | ||||||
|
||||||
func NewBackoff() RequestPreparer { | ||||||
return &Backoff{} | ||||||
} | ||||||
|
||||||
func (db *Backoff) Prepare(req *Request) { | ||||||
req.AddCallback(func() { | ||||||
if req.err != nil { | ||||||
db.AddBackoff(req.id) | ||||||
} | ||||||
db.ClearBackoff(req.id) | ||||||
raulk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
}) | ||||||
|
||||||
// if this peer has been backed off, complete the dial immediately | ||||||
if !db.Backoff(req.id) { | ||||||
return | ||||||
} | ||||||
log.Event(req.ctx, "swarmDialBackoff", req.id) | ||||||
req.Complete(nil, ErrDialBackoff) | ||||||
} | ||||||
|
||||||
var _ RequestPreparer = (*Backoff)(nil) | ||||||
|
||||||
type backoffPeer struct { | ||||||
tries int | ||||||
until time.Time | ||||||
} | ||||||
|
||||||
func (db *Backoff) init() { | ||||||
if db.entries == nil { | ||||||
db.entries = make(map[peer.ID]*backoffPeer) | ||||||
} | ||||||
} | ||||||
|
||||||
// Backoff returns whether the client should backoff from dialing | ||||||
// peer p | ||||||
func (db *Backoff) Backoff(p peer.ID) (backoff bool) { | ||||||
db.lock.Lock() | ||||||
defer db.lock.Unlock() | ||||||
db.init() | ||||||
bp, found := db.entries[p] | ||||||
if found && time.Now().Before(bp.until) { | ||||||
return true | ||||||
} | ||||||
return false | ||||||
} | ||||||
|
||||||
// AddBackoff lets other nodes know that we've entered backoff with | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets other nodes know? that doesnt seem right There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree it's confusing; copy-pasted from the original implementation though: Line 129 in ccc587c
While we are here, @whyrusleeping... the Swarm currently exposes the Backoff via an accessor that's not part of the Network interface: Line 435 in 4bf3943
I think it's there mostly for testing purposes, or do we want to allow other components to fiddle with the backoff logic? With it now being a just another |
||||||
// peer p, so dialers should not wait unnecessarily. We still will | ||||||
// attempt to dial with one goroutine, in case we get through. | ||||||
// | ||||||
// Backoff is not exponential, it's quadratic and computed according to the | ||||||
// following formula: | ||||||
// | ||||||
// BackoffBase + BakoffCoef * PriorBackoffs^2 | ||||||
// | ||||||
// Where PriorBackoffs is the number of previous backoffs. | ||||||
func (db *Backoff) AddBackoff(p peer.ID) { | ||||||
db.lock.Lock() | ||||||
defer db.lock.Unlock() | ||||||
db.init() | ||||||
bp, ok := db.entries[p] | ||||||
if !ok { | ||||||
db.entries[p] = &backoffPeer{ | ||||||
tries: 1, | ||||||
until: time.Now().Add(BackoffBase), | ||||||
} | ||||||
return | ||||||
} | ||||||
|
||||||
backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries) | ||||||
if backoffTime > BackoffMax { | ||||||
backoffTime = BackoffMax | ||||||
} | ||||||
bp.until = time.Now().Add(backoffTime) | ||||||
bp.tries++ | ||||||
} | ||||||
|
||||||
// Clear removes a backoff record. Clients should call this after a | ||||||
// successful Dial. | ||||||
func (db *Backoff) ClearBackoff(p peer.ID) { | ||||||
db.lock.Lock() | ||||||
defer db.lock.Unlock() | ||||||
db.init() | ||||||
delete(db.entries, p) | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package dial | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
) | ||
|
||
// ErrNoTransport is returned when we don't know a transport for the | ||
// given multiaddr. | ||
var ErrNoTransport = errors.New("no transport for protocol") | ||
|
||
type executor struct { | ||
resolver TransportResolverFn | ||
|
||
localCloseCh chan struct{} | ||
} | ||
|
||
var _ Executor = (*executor)(nil) | ||
|
||
func NewExecutor(resolver TransportResolverFn) Executor { | ||
return &executor{ | ||
resolver: resolver, | ||
localCloseCh: make(chan struct{}), | ||
} | ||
} | ||
|
||
func (e *executor) Start(ctx context.Context, dialCh <-chan *Job) { | ||
for { | ||
select { | ||
case j := <-dialCh: | ||
go e.processDial(j) | ||
case <-ctx.Done(): | ||
return | ||
case <-e.localCloseCh: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (e *executor) Close() error { | ||
close(e.localCloseCh) | ||
return nil | ||
} | ||
|
||
func (e *executor) processDial(job *Job) { | ||
defer func() { | ||
job.completeCh <- job | ||
}() | ||
|
||
addr, id := job.addr, job.req.id | ||
|
||
log.Debugf("%s swarm dialing %s %s", job.req.net.LocalPeer(), id, addr) | ||
|
||
tpt := e.resolver(addr) | ||
if tpt == nil { | ||
job.err = ErrNoTransport | ||
return | ||
} | ||
|
||
tconn, err := tpt.Dial(job.req.ctx, addr, id) | ||
if err != nil { | ||
err = fmt.Errorf("%s --> %s dial attempt failed: %s", job.req.net.LocalPeer(), id, job.err) | ||
job.Complete(tconn, err) | ||
return | ||
} | ||
|
||
// Trust the transport? Yeah... right. | ||
if tconn.RemotePeer() != id { | ||
tconn.Close() | ||
err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", id, job.tconn.RemotePeer(), tpt) | ||
log.Error(err) | ||
job.Complete(nil, err) | ||
return | ||
} | ||
|
||
job.Complete(tconn, err) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package dial | ||
|
||
import ( | ||
"context" | ||
"io" | ||
|
||
"github.com/libp2p/go-libp2p-net" | ||
"github.com/libp2p/go-libp2p-peer" | ||
tpt "github.com/libp2p/go-libp2p-transport" | ||
ma "github.com/multiformats/go-multiaddr" | ||
) | ||
|
||
// A request preparer can perform operations on a dial Request before it is sent to the Planner. | ||
// Examples include validation, de-duplication, back-off, etc. | ||
// | ||
// A RequestPreparer may cancel the dial preemptively in error or in success, by calling Complete() on the Request. | ||
type RequestPreparer interface { | ||
Prepare(req *Request) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason to not have this return a new modified request for easier debugability? edit github was glitchin |
||
} | ||
|
||
// A job preparer can perform operations on a dial Job as soon as it is emitted from the Planner, and before | ||
// it is sent to the Throttler. Examples include timeout setting, address rewriting, dial filtering, etc. | ||
type JobPreparer interface { | ||
Prepare(job *Job) | ||
} | ||
|
||
// Dial planners take a Request (populated with multiaddrs) and emit dial jobs on dialCh for the addresses | ||
// they want dialed. The pipeline will call the planner once, as well as every time a dial job completes. | ||
// | ||
// For more information on the choreography, read the docs on Next(). | ||
type Planner interface { | ||
|
||
// Next requests the planner to send a new dialJobs on dialCh, if appropriate. | ||
// | ||
// When planning starts, Next is invoked with a nil last parameter. | ||
// | ||
// Next is then subsequently invoked on every completed dial, providing a slice of dialed jobs and the | ||
// last job to complete. With these two elements, in conjunction with any state that may be tracked, the Planner | ||
// can take decisions about what to dial next, or to finish planning altogether. | ||
// | ||
// When the planner is satisfied and has no more dials to request, it must signal so by closing | ||
// the dialCh channel. | ||
Next(req *Request, dialed dialJobs, last *Job, dialCh chan dialJobs) error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about:
type Planner interface {
Plan(req *Request) (Plan, error)
}
type Plan interface {
Complete(j *Job) error // may need to pass in a connection?
Next() (*Job, error) // or maybe a channel?
} Not sure about the actual interface definitions, I just think breaking this up will help. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, good point! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seconding this, i think this general structure highlights the FLOW as well |
||
} | ||
|
||
// A throttler is a goroutine that applies a throttling process to dial jobs requested by the Planner. | ||
type Throttler interface { | ||
io.Closer | ||
|
||
// Start spawns the goroutine that is in charge of throttling. It receives planned jobs via inCh and emits | ||
// jobs to execute on dialCh. The throttler can apply any logic in-between: it may throttle jobs based on | ||
// system resources, time, inflight dials, network conditions, fail rate, etc. | ||
Start(ctx context.Context, inCh <-chan *Job, dialCh chan<- *Job) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason not to have: Start(ctx context.Context, inCh <-chan *Job) <-chan *Job There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since these are buffered channels, I preferred the pipeline to be responsible for them. |
||
} | ||
|
||
// An executor is a goroutine responsible for ultimately carrying out the network dial to an addr. | ||
type Executor interface { | ||
io.Closer | ||
|
||
// Start spawns the gorutine responsible for executing network dials. Jobs sent to dialCh have already | ||
// been subjected to the throttler. Once a dial finishes, the Executor must send the completed job to | ||
// completeCh, where it'll be received by the pipeline. | ||
// | ||
// In terms of concurrency, the Executor should behave like a dispatcher, in turn spawning individual | ||
// goroutines, or maintaining a finite set of child workers, to carry out the dials. | ||
// The Executor must never block. | ||
Start(ctx context.Context, dialCh <-chan *Job) | ||
} | ||
|
||
// Once the Planner is satisfied with the result of the dials, and all inflight dials have finished executing, | ||
// the Selector picks the optimal successful connection to return to the consumer. The Pipeline takes care of | ||
// closing unselected successful connections. | ||
type Selector interface { | ||
Select(successful dialJobs) (tpt.Conn, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, the planner needs to be able to determine when to finish anyways. I'd be surprised if it didn't have enough information to perform the selection as well. What's the motivation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
In (1), I do agree that the selection heuristic should be part of the Planner by design; if it's deliberately seeking a better something, it must know what that something is. With (2), I'm not so sure it's part of the |
||
} | ||
|
||
type TransportResolverFn func(a ma.Multiaddr) tpt.Transport | ||
type BestConnFn func(p peer.ID) inet.Conn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make some type definitions for the funcs? The declaration is alomst comical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main culprit is
type MultiaddrFilter func(ma.Multiaddr) bool
would go a long way towards simplifying.