Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Allow init timeout error, move traffic controller to router pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Wang committed Jul 9, 2018
1 parent 1bdf66c commit ad39c94
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 240 deletions.
99 changes: 0 additions & 99 deletions aggregator/handler/common/traffic_controller_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions aggregator/handler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ type dynamicBackendConfiguration struct {
Filters []consumerServiceFilterConfiguration `yaml:"filters"`

// TrafficControl configs the traffic controller.
TrafficControl *common.TrafficControllerConfiguration `yaml:"trafficControl"`
TrafficControl *router.TrafficControllerConfiguration `yaml:"trafficControl"`
}

func (c *dynamicBackendConfiguration) NewSharderRouter(
Expand Down Expand Up @@ -273,7 +273,7 @@ type staticBackendConfiguration struct {
DisableValidation bool `yaml:"disableValidation"`

// TrafficControl configs the traffic controller.
TrafficControl *common.TrafficControllerConfiguration `yaml:"trafficControl"`
TrafficControl *router.TrafficControllerConfiguration `yaml:"trafficControl"`
}

func (c *staticBackendConfiguration) Validate() error {
Expand Down Expand Up @@ -325,7 +325,7 @@ func (c *staticBackendConfiguration) NewSharderRouter(
queueOpts = queueOpts.SetQueueSize(c.QueueSize)
}
var (
tc common.TrafficController
tc router.TrafficController
err error
)
if c.TrafficControl != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,28 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package common
package router

import (
"fmt"
"time"

"github.com/m3db/m3cluster/kv"
"github.com/m3db/m3cluster/kv/util"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/watch"

"go.uber.org/atomic"
)

const (
defaultDefaultDisabled = false
defaultInitTimeout = 2 * time.Second
)

// TrafficController controls traffic.
type TrafficController interface {
// Allow returns true if traffic is allowed.
Allow() bool

// Init initializes the traffic controller to watch the runtime updates.
Init() error

// Close closes the traffic controller.
Close()
}

// TrafficControllerType defines the type of the traffic controller.
type TrafficControllerType string

// Supported types.
var (
Enabler TrafficControllerType = "enabler"
Disabler TrafficControllerType = "disabler"
// TrafficEnabler enables the traffic when the runtime value equals to true.
TrafficEnabler TrafficControllerType = "trafficEnabler"
// TrafficDisabler disables the traffic when the runtime value equals to true.
TrafficDisabler TrafficControllerType = "trafficDisabler"

validTypes = []TrafficControllerType{
Enabler,
Disabler,
TrafficEnabler,
TrafficDisabler,
}
)

Expand All @@ -84,9 +64,10 @@ func (t *TrafficControllerType) UnmarshalYAML(unmarshal func(interface{}) error)

// TrafficControllerConfiguration configures the traffic controller.
type TrafficControllerConfiguration struct {
Type TrafficControllerType `yaml:"type"`
RuntimeKey string `yaml:"runtimeKey" validate:"nonzero"`
InitTimeout *time.Duration `yaml:"initTimeout"`
Type TrafficControllerType `yaml:"type"`
DefaultValue bool `yaml:"defaultValue"`
RuntimeKey string `yaml:"runtimeKey" validate:"nonzero"`
InitTimeout *time.Duration `yaml:"initTimeout"`
}

// NewTrafficController creates a new traffic controller.
Expand All @@ -96,13 +77,14 @@ func (c *TrafficControllerConfiguration) NewTrafficController(
) (TrafficController, error) {
opts := NewTrafficControlOptions().
SetStore(store).
SetDefaultValue(c.DefaultValue).
SetRuntimeKey(c.RuntimeKey).
SetInstrumentOptions(instrumentOpts)
if c.InitTimeout != nil {
opts = opts.SetInitTimeout(*c.InitTimeout)
}
var tc TrafficController
if c.Type == Enabler {
if c.Type == TrafficEnabler {
tc = NewTrafficEnabler(opts)
} else {
tc = NewTrafficDisabler(opts)
Expand All @@ -121,6 +103,12 @@ type TrafficControlOptions interface {
// Store returns the kv store.
Store() kv.Store

// SetDefaultValue sets the default value.
SetDefaultValue(value bool) TrafficControlOptions

// DefaultValue returns the default value.
DefaultValue() bool

// SetRuntimeKey sets the runtime enable key,
// which will override the default enabled value when present.
SetRuntimeKey(value string) TrafficControlOptions
Expand Down Expand Up @@ -153,128 +141,57 @@ type trafficControlOptions struct {
// NewTrafficControlOptions creats a new TrafficControlOptions.
func NewTrafficControlOptions() TrafficControlOptions {
return &trafficControlOptions{
defaultValue: defaultDefaultDisabled,
initTimeout: defaultInitTimeout,
instrumentOpts: instrument.NewOptions(),
}
}

func (opts *trafficControlOptions) SetStore(store kv.Store) TrafficControlOptions {
o := *opts
o.store = store
return &o
}

func (opts *trafficControlOptions) Store() kv.Store {
return opts.store
}

func (opts *trafficControlOptions) SetDefaultValue(value bool) TrafficControlOptions {
o := *opts
o.defaultValue = value
return &o
}

func (opts *trafficControlOptions) DefaultValue() bool {
return opts.defaultValue
func (o *trafficControlOptions) SetStore(store kv.Store) TrafficControlOptions {
opts := *o
opts.store = store
return &opts
}

func (opts *trafficControlOptions) SetRuntimeKey(value string) TrafficControlOptions {
o := *opts
o.runtimeKey = value
return &o
func (o *trafficControlOptions) Store() kv.Store {
return o.store
}

func (opts *trafficControlOptions) RuntimeKey() string {
return opts.runtimeKey
func (o *trafficControlOptions) SetDefaultValue(value bool) TrafficControlOptions {
opts := *o
opts.defaultValue = value
return &opts
}

func (opts *trafficControlOptions) SetInitTimeout(value time.Duration) TrafficControlOptions {
o := *opts
o.initTimeout = value
return &o
func (o *trafficControlOptions) DefaultValue() bool {
return o.defaultValue
}

func (opts *trafficControlOptions) InitTimeout() time.Duration {
return opts.initTimeout
func (o *trafficControlOptions) SetRuntimeKey(value string) TrafficControlOptions {
opts := *o
opts.runtimeKey = value
return &opts
}

func (opts *trafficControlOptions) SetInstrumentOptions(value instrument.Options) TrafficControlOptions {
o := *opts
o.instrumentOpts = value
return &o
func (o *trafficControlOptions) RuntimeKey() string {
return o.runtimeKey
}

func (opts *trafficControlOptions) InstrumentOptions() instrument.Options {
return opts.instrumentOpts
func (o *trafficControlOptions) SetInitTimeout(value time.Duration) TrafficControlOptions {
opts := *o
opts.initTimeout = value
return &opts
}

type trafficEnabler struct {
enabled *atomic.Bool
value watch.Value
opts TrafficControlOptions
func (o *trafficControlOptions) InitTimeout() time.Duration {
return o.initTimeout
}

// NewTrafficEnabler creates a new traffic controller.
func NewTrafficEnabler(opts TrafficControlOptions) TrafficController {
enabled := atomic.NewBool(false)
iOpts := opts.InstrumentOptions()
newUpdatableFn := func() (watch.Updatable, error) {
w, err := opts.Store().Watch(opts.RuntimeKey())
return w, err
}
getFn := func(updatable watch.Updatable) (interface{}, error) {
return updatable.(kv.ValueWatch).Get(), nil
}
processFn := func(update interface{}) error {
b, err := util.BoolFromValue(
update.(kv.Value),
opts.RuntimeKey(),
false,
util.NewOptions().SetLogger(iOpts.Logger()),
)
if err != nil {
return err
}
enabled.Store(b)
return nil
}
vOptions := watch.NewOptions().
SetInitWatchTimeout(opts.InitTimeout()).
SetInstrumentOptions(iOpts).
SetNewUpdatableFn(newUpdatableFn).
SetGetUpdateFn(getFn).
SetProcessFn(processFn)
return &trafficEnabler{
enabled: enabled,
value: watch.NewValue(vOptions),
opts: opts,
}
}

func (c *trafficEnabler) Init() error {
return c.value.Watch()
}

func (c *trafficEnabler) Close() {
c.value.Unwatch()
}

func (c *trafficEnabler) Allow() bool {
return c.enabled.Load()
}

type trafficDisabler struct {
TrafficController
}

// NewTrafficDisabler creates a new traffic disabler.
func NewTrafficDisabler(opts TrafficControlOptions) TrafficController {
return &trafficDisabler{
TrafficController: NewTrafficEnabler(opts),
}
func (o *trafficControlOptions) SetInstrumentOptions(value instrument.Options) TrafficControlOptions {
opts := *o
opts.instrumentOpts = value
return &opts
}

func (c *trafficDisabler) Allow() bool {
return !c.TrafficController.Allow()
func (o *trafficControlOptions) InstrumentOptions() instrument.Options {
return o.instrumentOpts
}

0 comments on commit ad39c94

Please sign in to comment.