Skip to content

Commit

Permalink
Merge d79b9f1 into 3f1893c
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal committed Mar 28, 2017
2 parents 3f1893c + d79b9f1 commit fa0dd2a
Show file tree
Hide file tree
Showing 8 changed files with 544 additions and 32 deletions.
8 changes: 8 additions & 0 deletions api/peer/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,11 @@ type ListUpdates struct {
// Removals are the identifiers that should be removed to the list
Removals []Identifier
}

// Binder is a callback, provided to peer.Bind, that accepts a peer list and
// binds it to a peer provider for the duration of the returned lifecycle.
// The lifecycle that the binder returns should start and stop binding peers to
// the list.
// The binder must not call block on updating the list, because that will
// typically block until the peer list has started.
type Binder func(List) transport.Lifecycle
45 changes: 45 additions & 0 deletions peer/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,48 @@ func (c *BoundChooser) Introspect() introspection.ChooserStatus {
}
return introspection.ChooserStatus{}
}

// BindPeers returns a binder (suitable as an argument to peer.Bind) that
// binds a peer list to a static list of peers for the duration of its
// lifecycle.
func BindPeers(ids []peer.Identifier) Binder {
return func(pl peer.List) transport.Lifecycle {
return &peersBinder{
once: intsync.Once(),
pl: pl,
ids: ids,
}
}
}

type peersBinder struct {
once intsync.LifecycleOnce
pl peer.List
ids []peer.Identifier
}

func (s *peersBinder) Start() error {
return s.once.Start(s.start)
}

func (s *peersBinder) start() error {
s.pl.Update(peer.ListUpdates{
Additions: s.ids,
})
return nil
}

func (s *peersBinder) Stop() error {
return s.once.Stop(s.stop)
}

func (s *peersBinder) stop() error {
s.pl.Update(peer.ListUpdates{
Removals: s.ids,
})
return nil
}

func (s *peersBinder) IsRunning() bool {
return s.once.IsRunning()
}
56 changes: 27 additions & 29 deletions peer/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import (

"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/peer/peertest"
"go.uber.org/yarpc/api/transport"
intsync "go.uber.org/yarpc/internal/sync"
. "go.uber.org/yarpc/peer"
"go.uber.org/yarpc/peer/hostport"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand All @@ -38,37 +37,36 @@ func TestBind(t *testing.T) {
defer mockCtrl.Finish()

list := peertest.NewMockChooserList(mockCtrl)
life := &lowlife{once: intsync.Once()}

list.EXPECT().Start().Return(nil)
list.EXPECT().Update(peer.ListUpdates{})
list.EXPECT().Stop().Return(nil)
chooser := Bind(list, BindPeers([]peer.Identifier{
hostport.PeerIdentifier("x"),
hostport.PeerIdentifier("y"),
}))

binder := func(cl peer.List) transport.Lifecycle {
cl.Update(peer.ListUpdates{})
return life
}
list.EXPECT().IsRunning().Return(false)
assert.Equal(t, false, chooser.IsRunning(), "chooser should not be running")

chooser := Bind(list, binder)
assert.Equal(t, false, life.IsRunning(), "binder should not be running")
chooser.Start()
assert.Equal(t, true, life.IsRunning(), "binder should be running")
chooser.Stop()
assert.Equal(t, false, life.IsRunning(), "binder should not be running")
}

type lowlife struct {
once intsync.LifecycleOnce
}
list.EXPECT().Start().Return(nil)
list.EXPECT().Update(peer.ListUpdates{
Additions: []peer.Identifier{
hostport.PeerIdentifier("x"),
hostport.PeerIdentifier("y"),
},
})
assert.NoError(t, chooser.Start(), "start without error")

func (ll *lowlife) Start() error {
return ll.once.Start(nil)
}
list.EXPECT().IsRunning().Return(true)
assert.Equal(t, true, chooser.IsRunning(), "chooser should be running")

func (ll *lowlife) Stop() error {
return ll.once.Stop(nil)
}
list.EXPECT().Stop().Return(nil)
list.EXPECT().Update(peer.ListUpdates{
Removals: []peer.Identifier{
hostport.PeerIdentifier("x"),
hostport.PeerIdentifier("y"),
},
})
assert.NoError(t, chooser.Stop(), "stop without error")

func (ll *lowlife) IsRunning() bool {
return ll.once.IsRunning()
list.EXPECT().IsRunning().Return(false)
assert.Equal(t, false, chooser.IsRunning(), "chooser should not be running")
}
5 changes: 5 additions & 0 deletions peer/hostport/hostport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func (p PeerIdentifier) Identifier() string {
return string(p)
}

// Identify coerces a string to a PeerIdentifier
func Identify(peer string) peer.Identifier {
return PeerIdentifier(peer)
}

// NewPeer creates a new hostport.Peer from a hostport.PeerIdentifier, peer.Transport, and peer.Subscriber
func NewPeer(pid PeerIdentifier, transport peer.Transport) *Peer {
return &Peer{
Expand Down
67 changes: 66 additions & 1 deletion x/config/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,19 @@ import (
// RegisterTransport function.
type Configurator struct {
knownTransports map[string]*compiledTransportSpec
knownChoosers map[string]*compiledChooserSpec
knownBinders map[string]*compiledBinderSpec
}

// New sets up a new empty Configurator. The returned Configurator does not
// know about any transports. Individual TransportSpecs must be registered
// against it using the RegisterTransport function.
func New() *Configurator {
return &Configurator{knownTransports: make(map[string]*compiledTransportSpec)}
return &Configurator{
knownTransports: make(map[string]*compiledTransportSpec),
knownChoosers: make(map[string]*compiledChooserSpec),
knownBinders: make(map[string]*compiledBinderSpec),
}
}

// RegisterTransport registers a TransportSpec with the given Configurator. An
Expand Down Expand Up @@ -80,6 +86,65 @@ func (c *Configurator) MustRegisterTransport(t TransportSpec) {
}
}

// RegisterChooser registers a ChooserSpec with the given Configurator. Returns
// an error if the ChooserSpec is invalid.
//
// If a chooser with the same name already exists, it will be replaced.
//
// Use MustRegisterChooser to panic in the case of registration failure.
func (c *Configurator) RegisterChooser(s ChooserSpec) error {
if s.Name == "" {
return errors.New("name is required")
}

spec, err := compileChooserSpec(&s)
if err != nil {
return fmt.Errorf("invalid ChooserSpec for %q: %v", s.Name, err)
}

c.knownChoosers[s.Name] = spec
return nil
}

// MustRegisterChooser registers the given ChooserSpec with the Configurator.
// This function panics if the ChooserSpec is invalid.
func (c *Configurator) MustRegisterChooser(s ChooserSpec) {
if err := c.RegisterChooser(s); err != nil {
panic(err)
}
}

// RegisterBinder registers a BinderSpec with the given Configurator. Returns
// an error if the BinderSpec is invalid.
//
// A binder enables custom peer list bindings, like DNS with SRV + A records or
// a task list file watcher.
//
// If a binder with the same name already exists, it will be replaced.
//
// Use MustRegisterBinder to panic if the registration fails.
func (c *Configurator) RegisterBinder(s BinderSpec) error {
if s.Name == "" {
return errors.New("name is required")
}

spec, err := compileBinderSpec(&s)
if err != nil {
return fmt.Errorf("invalid BinderSpec for %q: %v", s.Name, err)
}

c.knownBinders[s.Name] = spec
return nil
}

// MustRegisterBinder registers the given BinderSpec with the Configurator.
// This function panics if the BinderSpec is invalid.
func (c *Configurator) MustRegisterBinder(s BinderSpec) {
if err := c.RegisterBinder(s); err != nil {
panic(err)
}
}

// LoadConfigFromYAML loads a yarpc.Config from YAML. Use LoadConfig if you
// have your own map[string]interface{} or map[interface{}]interface{} to
// provide.
Expand Down
14 changes: 12 additions & 2 deletions x/config/kit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ package config

import "reflect"

// Kit carries tools for building transports, inbounds, and outbounds using
// plugins.
// Kit carries internal dependencies for building peer choosers.
// The kit gets threaded through transport, outbound, and inbound builders
// so they can thread the kit through functions like BuildChooser on a
// ChooserConfig.
type Kit struct {
c *Configurator
}

var _typeOfKit = reflect.TypeOf((*Kit)(nil))

func (k *Kit) binder(name string) *compiledBinderSpec {
return k.c.knownBinders[name]
}

func (k *Kit) chooser(name string) *compiledChooserSpec {
return k.c.knownChoosers[name]
}
Loading

0 comments on commit fa0dd2a

Please sign in to comment.