Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
pipeline: remove cardinalities; introduce preparer sequence.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Nov 9, 2018
1 parent 2e1b16e commit cd357ce
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 12 deletions.
2 changes: 1 addition & 1 deletion dial/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Planner interface {
// When planning starts, Next is invoked with a nil last parameter.
//
// Next is then subsequently invoked on every completed dial, providing a slice of dialed jobs and the
// last job to complete. With these two elements, in conjunction with any state that may be tracked, the Planner
// last job to complete. With these two seq, in conjunction with any state that may be tracked, the Planner
// can take decisions about what to dial next, or to finish planning altogether.
//
// When the planner is satisfied and has no more dials to request, it must signal so by closing
Expand Down
114 changes: 108 additions & 6 deletions dial/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dial

import (
"context"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -106,7 +107,7 @@ type Job struct {

callbacks []func()

// result.
// Result.
tconn tpt.Conn
err error

Expand Down Expand Up @@ -162,18 +163,119 @@ func (djs *dialJobs) sift() (success dialJobs, failed dialJobs) {

type AddConnFn func(tc tpt.Conn, dir inet.Direction) (inet.Conn, error)

type compositePreparer []Preparer
type preparerBinding struct {
name string
p Preparer
}

// PreparerSeq is a Preparer that daisy-chains the Request through an ordered list of preparers.
//
// It short-circuits the process if a Preparer completes the Request.
//
// Preparers are bound by unique names.
type PreparerSeq struct {
lk sync.Mutex
seq []preparerBinding
}

var _ Preparer = (*compositePreparer)(nil)
var _ Preparer = (*PreparerSeq)(nil)

func (crp *compositePreparer) Prepare(req *Request) {
for _, p := range *crp {
if p.Prepare(req); req.IsComplete() {
func (ps *PreparerSeq) Prepare(req *Request) {
for _, p := range ps.seq {
if p.p.Prepare(req); req.IsComplete() {
break
}
}
}

func (ps *PreparerSeq) find(name string) (i int, res *preparerBinding) {
for i, pb := range ps.seq {
if pb.name == name {
return i, &pb
}
}
return -1, nil
}

func (ps *PreparerSeq) AddFirst(name string, preparer Preparer) error {
ps.lk.Lock()
defer ps.lk.Unlock()

if _, prev := ps.find(name); prev != nil {
return fmt.Errorf("a preparer with name %s already exists", name)
}
pb := preparerBinding{name, preparer}
ps.seq = append([]preparerBinding{pb}, ps.seq...)
return nil
}

func (ps *PreparerSeq) AddLast(name string, preparer Preparer) error {
ps.lk.Lock()
defer ps.lk.Unlock()

if _, prev := ps.find(name); prev != nil {
return fmt.Errorf("a preparer with name %s already exists", name)
}
pb := preparerBinding{name, preparer}
ps.seq = append(ps.seq, pb)
return nil
}

func (ps *PreparerSeq) InsertBefore(before, name string, preparer Preparer) error {
ps.lk.Lock()
defer ps.lk.Unlock()

i, prev := ps.find(before)
if prev == nil {
return fmt.Errorf("no preparers found with name %s", name)
}

pb := preparerBinding{name, preparer}
ps.seq = append(ps.seq, pb)
copy(ps.seq[i+1:], ps.seq[i:])
ps.seq[i] = pb

return nil
}

func (ps *PreparerSeq) InsertAfter(after, name string, preparer Preparer) error {
ps.lk.Lock()
defer ps.lk.Unlock()

i, prev := ps.find(after)
if prev == nil {
return fmt.Errorf("no preparers found with name %s", name)
}

pb := preparerBinding{name, preparer}
ps.seq = append(ps.seq, pb)
copy(ps.seq[i+2:], ps.seq[i+1:])
ps.seq[i+1] = pb

return nil
}

func (ps *PreparerSeq) Replace(old, name string, preparer Preparer) error {
ps.lk.Lock()
defer ps.lk.Unlock()

i, prev := ps.find(old)
if prev == nil {
return fmt.Errorf("no preparers found with name %s", name)
}
ps.seq[i] = preparerBinding{name, preparer}
return nil
}

func (ps *PreparerSeq) Remove(name string) {
ps.lk.Lock()
defer ps.lk.Unlock()

if i, prev := ps.find(name); prev != nil {
ps.seq = append(ps.seq[:i], ps.seq[i+1:]...)
}
}

type Pipeline struct {
lk sync.RWMutex
ctx context.Context
Expand Down
48 changes: 48 additions & 0 deletions dial/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package dial_test

import (
"reflect"
"testing"

"github.com/libp2p/go-libp2p-swarm/dial"
)

type testPreparer struct {
Value string
target *[]string
}

func (tp *testPreparer) Prepare(req *dial.Request) {
vals := append(*tp.target, tp.Value)
*tp.target = vals
}

var _ dial.Preparer = (*testPreparer)(nil)

func TestPreparerSequence(t *testing.T) {
expected := []string{"four", "three", "seven", "one", "five", "eight"}

var target []string
var seq dial.PreparerSeq

seq.AddLast("one", &testPreparer{"one", &target})
seq.AddLast("two", &testPreparer{"two", &target})
seq.AddFirst("three", &testPreparer{"three", &target})
seq.InsertBefore("three", "four", &testPreparer{"four", &target})
seq.InsertBefore("two", "five", &testPreparer{"five", &target})
seq.InsertAfter("two", "six", &testPreparer{"six", &target})
seq.InsertAfter("three", "seven", &testPreparer{"seven", &target})
seq.Remove("non-existent")
seq.Remove("two")
seq.Replace("six", "eight", &testPreparer{"eight", &target})

if err := seq.Replace("non-existent", "nine", &testPreparer{"nine", &target}); err == nil {
t.Fatal("expected an error when replacing non-existent preparer")
}

seq.Prepare(&dial.Request{})

if !reflect.DeepEqual(expected, target) {
t.Fatalf("unexpected result when manipulating preparer sequence; expected: %v, got: %v", expected, target)
}
}
13 changes: 8 additions & 5 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,14 @@ func (s *Swarm) defaultPipeline() *dial.Pipeline {
s.backoff = dial.NewBackoff().(*dial.Backoff)

// preparers
p.Component("validator", dial.NewValidator(bestConnFn))
p.Component("request_timeout", dial.NewRequestTimeout())
p.Component("syncer", dial.NewDialSync())
p.Component("backoff", s.backoff)
p.Component("addr_resolver", dial.NewAddrResolver(sFilters, dial.DefaultDynamicFilters()))
var seq dial.PreparerSeq
seq.AddLast("validator", dial.NewValidator(bestConnFn))
seq.AddLast("request_timeout", dial.NewRequestTimeout())
seq.AddLast("syncer", dial.NewDialSync())
seq.AddLast("backoff", s.backoff)
seq.AddLast("addr_resolver", dial.NewAddrResolver(sFilters, dial.DefaultDynamicFilters()))

p.Component("preparers", seq)

// throttler
p.Component("throttler", dial.NewDefaultThrottler())
Expand Down

0 comments on commit cd357ce

Please sign in to comment.