Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add traffic controller to non sharded backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Wang committed Jul 7, 2018
1 parent 4971b2f commit 3215094
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions aggregator/handler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,6 @@ type flushHandlerConfiguration struct {

// DynamicBackend configures the dynamic backend.
DynamicBackend *dynamicBackendConfiguration `yaml:"dynamicBackend"`

// TrafficControl configures the traffic controller.
TrafficControl *trafficControlConfiguration `yaml:"trafficControl"`
}

func (c flushHandlerConfiguration) Validate() error {
Expand Down Expand Up @@ -230,14 +227,20 @@ func (c *dynamicBackendConfiguration) NewSharderRouter(
p.RegisterFilter(sid, f)
logger.Infof("registered filter for consumer service: %s", sid.String())
}
sr := SharderRouter{
SharderID: sharding.NewSharderID(c.HashType, c.TotalShards),
Router: router.NewWithAckRouter(p),
}
if c.TrafficControl == nil {
return sr, nil
r := router.NewWithAckRouter(p)
if c.TrafficControl != nil {
if r, err = c.TrafficControl.NewTrafficControlledSharderRouter(
r,
store,
instrumentOpts.SetMetricsScope(scope),
); err != nil {
return SharderRouter{}, err
}
}
return c.TrafficControl.NewTrafficControlledSharderRouter(sr, store, instrumentOpts.SetMetricsScope(scope))
return SharderRouter{
SharderID: sharding.NewSharderID(c.HashType, c.TotalShards),
Router: r,
}, nil
}

type consumerServiceFilterConfiguration struct {
Expand Down Expand Up @@ -331,16 +334,36 @@ func (c *staticBackendConfiguration) NewSharderRouter(
return SharderRouter{}, err
}
router := router.NewAllowAllRouter(queue)
if c.TrafficControl != nil {
if router, err = c.TrafficControl.NewTrafficControlledSharderRouter(
router,
store,
instrumentOpts.SetMetricsScope(backendScope),
); err != nil {
return SharderRouter{}, err
}
}
return SharderRouter{SharderID: sharding.NoShardingSharderID, Router: router}, nil
}

// Sharded backend.
routerScope := backendScope.SubScope("router")
sr, err := c.Sharded.NewSharderRouter(routerScope, queueOpts)
if c.TrafficControl == nil {
return sr, err
if err != nil {
return SharderRouter{}, err
}
if c.TrafficControl != nil {
router, err := c.TrafficControl.NewTrafficControlledSharderRouter(
sr.Router,
store,
instrumentOpts.SetMetricsScope(backendScope),
)
if err != nil {
return SharderRouter{}, err
}
sr.Router = router
}
return c.TrafficControl.NewTrafficControlledSharderRouter(sr, store, instrumentOpts.SetMetricsScope(routerScope))
return sr, nil
}

type shardedConfiguration struct {
Expand Down Expand Up @@ -440,10 +463,10 @@ type trafficControlConfiguration struct {
}

func (c *trafficControlConfiguration) NewTrafficControlledSharderRouter(
sr SharderRouter,
r router.Router,
store kv.Store,
instrumentOpts instrument.Options,
) (SharderRouter, error) {
) (router.Router, error) {
opts := common.NewTrafficControlOptions().
SetStore(store).
SetDefaultEnabled(c.DefaultEnabled).
Expand All @@ -452,12 +475,11 @@ func (c *trafficControlConfiguration) NewTrafficControlledSharderRouter(
if c.InitTimeout != nil {
opts = opts.SetInitTimeout(*c.InitTimeout)
}
sr.Router = router.NewTrafficControlledRouter(
common.NewTrafficController(opts),
sr.Router,
instrumentOpts,
)
return sr, nil
tc := common.NewTrafficController(opts)
if err := tc.Init(); err != nil {
return SharderRouter{}, err
}
return router.NewTrafficControlledRouter(tc, r, instrumentOpts), nil
}

type connectionConfiguration struct {
Expand Down

0 comments on commit 3215094

Please sign in to comment.