/
ratebreaker.go
78 lines (61 loc) · 1.69 KB
/
ratebreaker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package circuit
import (
log "github.com/sirupsen/logrus"
"sync"
"github.com/sony/gobreaker"
)
// TODO:
// in case of the rate breaker, there are unnecessary synchronization steps due to the 3rd party gobreaker. If
// the sliding window was part of the implementation of the individual breakers, this additional syncrhonization
// would not be required.
type rateBreaker struct {
settings BreakerSettings
mx *sync.Mutex
sampler *binarySampler
gb *gobreaker.TwoStepCircuitBreaker
}
func newRate(s BreakerSettings) *rateBreaker {
b := &rateBreaker{
settings: s,
mx: &sync.Mutex{},
}
b.gb = gobreaker.NewTwoStepCircuitBreaker(gobreaker.Settings{
Name: s.Host,
MaxRequests: uint32(s.HalfOpenRequests),
Timeout: s.Timeout,
ReadyToTrip: func(gobreaker.Counts) bool { return b.readyToTrip() },
OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {
log.Infof("circuit breaker %v went from %v to %v", name, from.String(), to.String())
},
})
return b
}
func (b *rateBreaker) readyToTrip() bool {
b.mx.Lock()
defer b.mx.Unlock()
if b.sampler == nil {
return false
}
return b.sampler.count >= b.settings.Failures
}
// count the failures in closed and half-open state
func (b *rateBreaker) countRate(success bool) {
b.mx.Lock()
defer b.mx.Unlock()
if b.sampler == nil {
b.sampler = newBinarySampler(b.settings.Window)
}
b.sampler.tick(!success)
}
func (b *rateBreaker) Allow() (func(bool), bool) {
done, err := b.gb.Allow()
// this error can only indicate that the breaker is not closed
closed := err == nil
if !closed {
return nil, false
}
return func(success bool) {
b.countRate(success)
done(success)
}, true
}