Skip to content

Commit

Permalink
Move service handler creation after FX creates dispatcher (#312)
Browse files Browse the repository at this point in the history
Call user handles before starting a dispatcher
  • Loading branch information
Alex committed Feb 24, 2017
1 parent 62a9f89 commit bebde66
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 51 deletions.
11 changes: 5 additions & 6 deletions examples/keyvalue/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,17 @@ import (
"go.uber.org/yarpc/api/transport"
)

type YarpcHandler struct {
type YARPCHandler struct {
sync.RWMutex

items map[string]string
}

func NewYarpcThriftHandler(service.Host) ([]transport.Procedure, error) {
handler := &YarpcHandler{items: map[string]string{}}
return kvs.New(handler), nil
func NewYARPCThriftHandler(_ service.Host) ([]transport.Procedure, error) {
return kvs.New(&YARPCHandler{items: map[string]string{}}), nil
}

func (h *YarpcHandler) GetValue(ctx context.Context, key *string) (string, error) {
func (h *YARPCHandler) GetValue(ctx context.Context, key *string) (string, error) {
h.RLock()
defer h.RUnlock()

Expand All @@ -51,7 +50,7 @@ func (h *YarpcHandler) GetValue(ctx context.Context, key *string) (string, error
return "", &kv.ResourceDoesNotExist{Key: *key}
}

func (h *YarpcHandler) SetValue(ctx context.Context, key *string, value *string) error {
func (h *YARPCHandler) SetValue(ctx context.Context, key *string, value *string) error {
h.Lock()

h.items[*key] = *value
Expand Down
2 changes: 1 addition & 1 deletion examples/keyvalue/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
svc, err := service.WithModules(
// Create a YARPC module that exposes endpoints
rpc.ThriftModule(
rpc.CreateThriftServiceFunc(NewYarpcThriftHandler),
rpc.CreateThriftServiceFunc(NewYARPCThriftHandler),
modules.WithRoles("service"),
),
).WithOptions(
Expand Down
26 changes: 1 addition & 25 deletions modules/rpc/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@
package rpc

import (
"sync"

"go.uber.org/fx/modules"
"go.uber.org/fx/service"

"github.com/pkg/errors"
"go.uber.org/yarpc/api/transport"
)

var _setupMu sync.Mutex

// CreateThriftServiceFunc creates a Thrift service from a service host
type CreateThriftServiceFunc func(svc service.Host) ([]transport.Procedure, error)

Expand All @@ -42,31 +38,11 @@ func ThriftModule(hookup CreateThriftServiceFunc, options ...modules.Option) ser
mi.Name = "rpc"
}

mod, err := newYARPCThriftModule(mi, hookup, options...)
mod, err := newYARPCModule(mi, hookup, options...)
if err != nil {
return nil, errors.Wrap(err, "unable to instantiate Thrift module")
}

return []service.Module{mod}, nil
}
}

func newYARPCThriftModule(
mi service.ModuleCreateInfo,
createService CreateThriftServiceFunc,
options ...modules.Option,
) (*YARPCModule, error) {
registrants, err := createService(mi.Host)
if err != nil {
return nil, errors.Wrap(err, "unable to create YARPC thrift handler")
}

reg := func(mod *YARPCModule) {
_setupMu.Lock()
defer _setupMu.Unlock()

mod.controller.dispatcher.Register(registrants)
}

return newYARPCModule(mi, reg, options...)
}
16 changes: 12 additions & 4 deletions modules/rpc/thrift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (h testHost) Config() config.Provider {
}

func TestThriftModule_OK(t *testing.T) {
dig.Reset()

chip := ThriftModule(okCreate, modules.WithRoles("rescue"))
dale := ThriftModule(okCreate, modules.WithRoles("ranges"))
cfg := []byte(`
Expand Down Expand Up @@ -89,11 +91,17 @@ func TestThriftModule_BadOptions(t *testing.T) {
assert.Error(t, err)
}

func TestThrfitModule_Error(t *testing.T) {
func TestThriftModule_Error(t *testing.T) {
dig.Reset()
modCreate := ThriftModule(badCreateService)
mods, err := modCreate(service.ModuleCreateInfo{})
assert.Error(t, err)
assert.Nil(t, mods)
mods, err := modCreate(service.ModuleCreateInfo{Host: testHost{
Host: service.NopHost(),
config: config.NewYAMLProviderFromBytes([]byte(``)),
}})

assert.NoError(t, err)
ready := make(chan struct{})
assert.EqualError(t, <-mods[0].Start(ready), "unable to start dispatcher: can't create service")
}

func testInitRunModule(t *testing.T, mod service.Module, mci service.ModuleCreateInfo) {
Expand Down
65 changes: 50 additions & 15 deletions modules/rpc/yarpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
// register it in a dig.Graph provided with options/default graph.
type YARPCModule struct {
modules.ModuleBase
register registerServiceFunc
config yarpcConfig
log ulog.Log
statsClient *statsClient
Expand All @@ -69,8 +68,6 @@ var (
_ service.Module = &YARPCModule{}
)

type registerServiceFunc func(module *YARPCModule)

type transports struct {
inbounds []transport.Inbound
}
Expand Down Expand Up @@ -115,9 +112,10 @@ type Address struct {
Port int
}

// Stores a collection of all modules configs with a shared dispatcher
// that are safe to call from multiple go routines. All the configs must
// share the same AdvertiseName and represent a single service.
type handlerWithDispatcher func(dispatcher *yarpc.Dispatcher) error

// Stores a collection of all module configs with a shared dispatcher
// and user handles to work with the dispatcher.
type dispatcherController struct {
// sync configs
sync.RWMutex
Expand All @@ -129,17 +127,37 @@ type dispatcherController struct {
startError error

configs []*yarpcConfig
handlers []handlerWithDispatcher
dispatcher yarpc.Dispatcher
}

// Adds the config to the controller
// Adds the config to the controller.
func (c *dispatcherController) addConfig(config yarpcConfig) {
c.Lock()
defer c.Unlock()

c.configs = append(c.configs, &config)
}

// Adds the config to the controller.
func (c *dispatcherController) appendHandler(handler handlerWithDispatcher) {
c.Lock()
defer c.Unlock()

c.handlers = append(c.handlers, handler)
}

// Apply handlers to the dispatcher.
func (c *dispatcherController) applyHandlers() error {
for _, h := range c.handlers {
if err := h(&c.dispatcher); err != nil {
return err
}
}

return nil
}

// Adds the default middleware: context propagation and auth.
func (c *dispatcherController) addDefaultMiddleware(host service.Host, statsClient *statsClient) {
cfg := yarpcConfig{
Expand All @@ -158,8 +176,13 @@ func (c *dispatcherController) addDefaultMiddleware(host service.Host, statsClie
c.addConfig(cfg)
}

// Starts the dispatcher: wait until all modules call start, create a single dispatcher and then start it.
// Once started the collection will not start the dispatcher again.
// Starts the dispatcher:
// 1. Add default middleware and merge all existing configs
// 2. Create a dispatcher
// 3. Call user handles to e.g. register transport.Procedures on the dispatcher
// 4. Start the dispatcher
//
// Once started the controller will not start the dispatcher again.
func (c *dispatcherController) Start(host service.Host, statsClient *statsClient) error {
c.start.Do(func() {
c.addDefaultMiddleware(host, statsClient)
Expand All @@ -173,13 +196,19 @@ func (c *dispatcherController) Start(host service.Host, statsClient *statsClient

_dispatcherMu.Lock()
defer _dispatcherMu.Unlock()

var d *yarpc.Dispatcher
if d, err = _dispatcherFn(host, cfg); err != nil {
c.startError = err
return
}

c.dispatcher = *d
if err := c.applyHandlers(); err != nil {
c.startError = err
return
}

c.startError = _starterFn(&c.dispatcher)
})

Expand Down Expand Up @@ -232,7 +261,7 @@ func (c *dispatcherController) mergeConfigs(name string) (conf yarpc.Config, err
// The first created module defines the service name.
func newYARPCModule(
mi service.ModuleCreateInfo,
reg registerServiceFunc,
reg CreateThriftServiceFunc,
options ...modules.Option,
) (*YARPCModule, error) {
name := "yarpc"
Expand All @@ -241,9 +270,7 @@ func newYARPCModule(
}

module := &YARPCModule{
ModuleBase: *modules.NewModuleBase(name, mi.Host, []string{}),
register: reg,
statsClient: newStatsClient(mi.Host.Metrics()),
ModuleBase: *modules.NewModuleBase(name, mi.Host, []string{}),
}

module.log = ulog.Logger(context.Background()).With("moduleName", name)
Expand All @@ -258,7 +285,7 @@ func newYARPCModule(
return nil, errs.Wrap(err, "can't read inbounds")
}

// iterate over inbounds
// Iterate over inbounds.
transportsIn, err := prepareInbounds(module.config.Inbounds, mi.Host.Name())
if err != nil {
return nil, errs.Wrap(err, "can't process inbounds")
Expand Down Expand Up @@ -286,6 +313,15 @@ func newYARPCModule(
}

module.controller.addConfig(module.config)
module.controller.appendHandler(func(dispatcher *yarpc.Dispatcher) error {
t, err := reg(mi.Host)
if err != nil {
return err
}

dispatcher.Register(t)
return nil
})

module.log.Info("Module successfuly created", "inbounds", module.config.Inbounds)

Expand Down Expand Up @@ -335,7 +371,6 @@ func (m *YARPCModule) Start(readyCh chan<- struct{}) <-chan error {
return ret
}

m.register(m)
m.log.Info("Module started")

m.isRunning = true
Expand Down

0 comments on commit bebde66

Please sign in to comment.