Skip to content

Commit

Permalink
Keep CreateThriftServiceFunc as before
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandr Samylkin committed Feb 23, 2017
1 parent c6202a4 commit 002778a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 24 deletions.
3 changes: 1 addition & 2 deletions examples/keyvalue/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.uber.org/fx/examples/keyvalue/kv"
kvs "go.uber.org/fx/examples/keyvalue/kv/keyvalueserver"
"go.uber.org/fx/service"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
)

Expand All @@ -37,7 +36,7 @@ type YARPCHandler struct {
items map[string]string
}

func NewYARPCThriftHandler(_ service.Host, _ *yarpc.Dispatcher) ([]transport.Procedure, error) {
func NewYARPCThriftHandler(_ service.Host) ([]transport.Procedure, error) {
return kvs.New(&YARPCHandler{items: map[string]string{}}), nil
}

Expand Down
3 changes: 1 addition & 2 deletions modules/rpc/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"go.uber.org/fx/service"

"github.com/pkg/errors"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
)

// CreateThriftServiceFunc creates a Thrift service from a service host
type CreateThriftServiceFunc func(svc service.Host, dispatcher *yarpc.Dispatcher) ([]transport.Procedure, error)
type CreateThriftServiceFunc func(svc service.Host) ([]transport.Procedure, error)

// ThriftModule creates a Thrift Module from a service func
func ThriftModule(hookup CreateThriftServiceFunc, options ...modules.Option) service.ModuleCreateFunc {
Expand Down
26 changes: 11 additions & 15 deletions modules/rpc/thrift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package rpc

import (
"errors"
"sync"
"testing"

"go.uber.org/fx/config"
Expand All @@ -49,15 +48,7 @@ func (h testHost) Config() config.Provider {
func TestThriftModule_OK(t *testing.T) {
dig.Reset()

wg := sync.WaitGroup{}
wg.Add(1)
testInbounds := func(_ service.Host, dispatcher *yarpc.Dispatcher) ([]transport.Procedure, error) {
require.Equal(t, 2, len(dispatcher.Inbounds()))
wg.Done()
return nil, nil
}

chip := ThriftModule(testInbounds, modules.WithRoles("rescue"))
chip := ThriftModule(okCreate, modules.WithRoles("rescue"))
dale := ThriftModule(okCreate, modules.WithRoles("ranges"))
cfg := []byte(`
modules:
Expand Down Expand Up @@ -87,7 +78,11 @@ modules:

testInitRunModule(t, goofy[0], mci)
testInitRunModule(t, gopher[0], mci)
wg.Wait()

// Dispatcher must be resolved in the default graph
var dispatcher *yarpc.Dispatcher
assert.NoError(t, dig.Resolve(&dispatcher))
assert.Equal(t, 2, len(dispatcher.Inbounds()))
}

func TestThriftModule_BadOptions(t *testing.T) {
Expand Down Expand Up @@ -134,12 +129,13 @@ func errorOption(_ *service.ModuleCreateInfo) error {
return errors.New("bad option")
}

func okCreate(_ service.Host, dispatcher *yarpc.Dispatcher) ([]transport.Procedure, error) {
return thrift.BuildProcedures(thrift.Service{
func okCreate(_ service.Host) ([]transport.Procedure, error) {
reg := thrift.BuildProcedures(thrift.Service{
Name: "foo",
}), nil
})
return reg, nil
}

func badCreateService(service.Host, *yarpc.Dispatcher) ([]transport.Procedure, error) {
func badCreateService(_ service.Host) ([]transport.Procedure, error) {
return nil, errors.New("can't create service")
}
17 changes: 12 additions & 5 deletions modules/rpc/yarpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type dispatcherController struct {

configs []*yarpcConfig
handlers []handlerWithDispatcher
dispatcher *yarpc.Dispatcher
dispatcher yarpc.Dispatcher
}

// Adds the config to the controller.
Expand All @@ -150,7 +150,7 @@ func (c *dispatcherController) appendHandler(handler handlerWithDispatcher) {
// Apply handlers to the dispatcher.
func (c *dispatcherController) applyHandlers() error {
for _, h := range c.handlers {
if err := h(c.dispatcher); err != nil {
if err := h(&c.dispatcher); err != nil {
return err
}
}
Expand Down Expand Up @@ -197,17 +197,19 @@ func (c *dispatcherController) Start(host service.Host) error {
_dispatcherMu.Lock()
defer _dispatcherMu.Unlock()

if c.dispatcher, err = _dispatcherFn(host, cfg); err != nil {
var d *yarpc.Dispatcher
if d, err = _dispatcherFn(host, cfg); err != nil {
c.startError = err
return
}

c.dispatcher = *d
if err := c.applyHandlers(); err != nil {
c.startError = err
return
}

c.startError = _starterFn(c.dispatcher)
c.startError = _starterFn(&c.dispatcher)
})

return c.startError
Expand Down Expand Up @@ -305,11 +307,16 @@ func newYARPCModule(
if errCr := dig.Register(module.controller); errCr != nil {
return nil, errs.Wrap(errCr, "can't register a dispatcher controller")
}

// Register dispatcher
if err := dig.Register(&module.controller.dispatcher); err != nil {
return nil, errs.Wrap(err, "unable to register the dispatcher")
}
}

module.controller.addConfig(module.config)
module.controller.appendHandler(func(dispatcher *yarpc.Dispatcher) error {
t, err := reg(mi.Host, dispatcher)
t, err := reg(mi.Host)
if err != nil {
return err
}
Expand Down

0 comments on commit 002778a

Please sign in to comment.