Skip to content

Commit

Permalink
two step circuit breaker (#6)
Browse files Browse the repository at this point in the history
* add multi step breaker control

* added documentation

* test breaker initialization with preserving coverage

* change the 2-step breaker interface to a single Allow function with a callback

* provide a separate structure for the different use case of Allow()

* update documentation

* review fixes:
- renamed twostep breaker constructor
- fixed failure test call to return an error
  • Loading branch information
aryszka authored and YoshiyukiMineo committed May 30, 2017
1 parent 7f5cbc9 commit e9556a4
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 23 deletions.
41 changes: 37 additions & 4 deletions gobreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ type CircuitBreaker struct {
expiry time.Time
}

// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
// with the breaker functionality, it only checks whether a request can proceed and
// expects the caller to report the outcome in a separate step using a callback.
type TwoStepCircuitBreaker struct {
cb *CircuitBreaker
}

// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker(st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
Expand Down Expand Up @@ -146,6 +153,13 @@ func NewCircuitBreaker(st Settings) *CircuitBreaker {
return cb
}

// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker {
return &TwoStepCircuitBreaker{
cb: NewCircuitBreaker(st),
}
}

const defaultTimeout = time.Duration(60) * time.Second

func defaultReadyToTrip(counts Counts) bool {
Expand Down Expand Up @@ -176,16 +190,35 @@ func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{},
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, fmt.Errorf("panic in request"))
cb.afterRequest(generation, false)
panic(e)
}
}()

result, err := req()
cb.afterRequest(generation, err)
cb.afterRequest(generation, err == nil)
return result, err
}

// Allow checks if a new request can proceed. It returns a callback that should be used to
// register the success or failure in a separate step. If the Circuit Breaker doesn't allow
// requests it returns an error.
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) {
generation, err := tscb.cb.beforeRequest()
if err != nil {
return nil, err
}

return func(success bool) {
tscb.cb.afterRequest(generation, success)
}, nil
}

// State returns the current state of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) State() State {
return tscb.cb.State()
}

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
Expand All @@ -203,7 +236,7 @@ func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
return generation, nil
}

func (cb *CircuitBreaker) afterRequest(before uint64, err error) {
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -213,7 +246,7 @@ func (cb *CircuitBreaker) afterRequest(before uint64, err error) {
return
}

if err == nil {
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
Expand Down
122 changes: 103 additions & 19 deletions gobreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ func succeedLater(cb *CircuitBreaker, delay time.Duration) <-chan error {
return ch
}

func succeed2Step(cb *TwoStepCircuitBreaker) error {
done, err := cb.Allow()
if err != nil {
return err
}

done(true)
return nil
}

func fail(cb *CircuitBreaker) error {
msg := "fail"
_, err := cb.Execute(func() (interface{}, error) { return nil, fmt.Errorf(msg) })
Expand All @@ -52,11 +62,47 @@ func fail(cb *CircuitBreaker) error {
return err
}

func fail2Step(cb *TwoStepCircuitBreaker) error {
done, err := cb.Allow()
if err != nil {
return err
}

done(false)
return nil
}

func causePanic(cb *CircuitBreaker) error {
_, err := cb.Execute(func() (interface{}, error) { panic("oops"); return nil, nil })
return err
}

func newCustom() *CircuitBreaker {
var customSt Settings
customSt.Name = "cb"
customSt.MaxRequests = 3
customSt.Interval = time.Duration(30) * time.Second
customSt.Timeout = time.Duration(90) * time.Second
customSt.ReadyToTrip = func(counts Counts) bool {
numReqs := counts.Requests
failureRatio := float64(counts.TotalFailures) / float64(numReqs)

counts.clear() // no effect on customCB.counts

return numReqs >= 3 && failureRatio >= 0.6
}
customSt.OnStateChange = func(name string, from State, to State) {
stateChange = StateChange{name, from, to}
}

return NewCircuitBreaker(customSt)
}

func init() {
defaultCB = NewCircuitBreaker(Settings{})
customCB = newCustom()
}

func TestStateConstants(t *testing.T) {
assert.Equal(t, State(0), StateClosed)
assert.Equal(t, State(1), StateHalfOpen)
Expand All @@ -69,8 +115,7 @@ func TestStateConstants(t *testing.T) {
}

func TestNewCircuitBreaker(t *testing.T) {
var defaultSt Settings
defaultCB = NewCircuitBreaker(defaultSt)
defaultCB := NewCircuitBreaker(Settings{})
assert.Equal(t, "", defaultCB.name)
assert.Equal(t, uint32(1), defaultCB.maxRequests)
assert.Equal(t, time.Duration(0), defaultCB.interval)
Expand All @@ -81,23 +126,7 @@ func TestNewCircuitBreaker(t *testing.T) {
assert.Equal(t, Counts{0, 0, 0, 0, 0}, defaultCB.counts)
assert.True(t, defaultCB.expiry.IsZero())

var customSt Settings
customSt.Name = "cb"
customSt.MaxRequests = 3
customSt.Interval = time.Duration(30) * time.Second
customSt.Timeout = time.Duration(90) * time.Second
customSt.ReadyToTrip = func(counts Counts) bool {
numReqs := counts.Requests
failureRatio := float64(counts.TotalFailures) / float64(numReqs)

counts.clear() // no effect on customCB.counts

return numReqs >= 3 && failureRatio >= 0.6
}
customSt.OnStateChange = func(name string, from State, to State) {
stateChange = StateChange{name, from, to}
}
customCB = NewCircuitBreaker(customSt)
customCB := newCustom()
assert.Equal(t, "cb", customCB.name)
assert.Equal(t, uint32(3), customCB.maxRequests)
assert.Equal(t, time.Duration(30)*time.Second, customCB.interval)
Expand Down Expand Up @@ -211,6 +240,61 @@ func TestCustomCircuitBreaker(t *testing.T) {
assert.Equal(t, StateChange{"cb", StateHalfOpen, StateClosed}, stateChange)
}

func TestTwoStepCircuitBreaker(t *testing.T) {
tscb := NewTwoStepCircuitBreaker(Settings{})
for i := 0; i < 5; i++ {
assert.Nil(t, fail2Step(tscb))
}

assert.Equal(t, StateClosed, tscb.State())
assert.Equal(t, Counts{5, 0, 5, 0, 5}, tscb.cb.counts)

assert.Nil(t, succeed2Step(tscb))
assert.Equal(t, StateClosed, tscb.State())
assert.Equal(t, Counts{6, 1, 5, 1, 0}, tscb.cb.counts)

assert.Nil(t, fail2Step(tscb))
assert.Equal(t, StateClosed, tscb.State())
assert.Equal(t, Counts{7, 1, 6, 0, 1}, tscb.cb.counts)

// StateClosed to StateOpen
for i := 0; i < 5; i++ {
assert.Nil(t, fail2Step(tscb)) // 6 consecutive failures
}
assert.Equal(t, StateOpen, tscb.State())
assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts)
assert.False(t, tscb.cb.expiry.IsZero())

assert.Error(t, succeed2Step(tscb))
assert.Error(t, fail2Step(tscb))
assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts)

pseudoSleep(tscb.cb, time.Duration(59)*time.Second)
assert.Equal(t, StateOpen, tscb.State())

// StateOpen to StateHalfOpen
pseudoSleep(tscb.cb, time.Duration(1)*time.Second) // over Timeout
assert.Equal(t, StateHalfOpen, tscb.State())
assert.True(t, tscb.cb.expiry.IsZero())

// StateHalfOpen to StateOpen
assert.Nil(t, fail2Step(tscb))
assert.Equal(t, StateOpen, tscb.State())
assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts)
assert.False(t, tscb.cb.expiry.IsZero())

// StateOpen to StateHalfOpen
pseudoSleep(tscb.cb, time.Duration(60)*time.Second)
assert.Equal(t, StateHalfOpen, tscb.State())
assert.True(t, tscb.cb.expiry.IsZero())

// StateHalfOpen to StateClosed
assert.Nil(t, succeed2Step(tscb))
assert.Equal(t, StateClosed, tscb.State())
assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts)
assert.True(t, tscb.cb.expiry.IsZero())
}

func TestPanicInRequest(t *testing.T) {
assert.Panics(t, func() { causePanic(defaultCB) })
assert.Equal(t, Counts{1, 0, 1, 0, 1}, defaultCB.counts)
Expand Down

0 comments on commit e9556a4

Please sign in to comment.