-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
circuit_breaker.go
102 lines (83 loc) ยท 1.9 KB
/
circuit_breaker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package client
import (
"errors"
"sync/atomic"
"time"
)
var (
ErrBreakerOpen = errors.New("breaker open")
ErrBreakerTimeout = errors.New("breaker time out")
)
// ConsecCircuitBreaker is window sliding CircuitBreaker with failure threshold.
type ConsecCircuitBreaker struct {
lastFailureTime int64
failures uint64
failureThreshold uint64
window time.Duration
}
// NewConsecCircuitBreaker returns a new ConsecCircuitBreaker.
func NewConsecCircuitBreaker(failureThreshold uint64, window time.Duration) *ConsecCircuitBreaker {
return &ConsecCircuitBreaker{
failureThreshold: failureThreshold,
window: window,
}
}
// Call Circuit function
func (cb *ConsecCircuitBreaker) Call(fn func() error, d time.Duration) error {
var err error
if !cb.ready() {
return ErrBreakerOpen
}
if d == 0 {
err = fn()
} else {
c := make(chan error, 1)
go func() {
c <- fn()
close(c)
}()
t := time.NewTimer(d)
select {
case e := <-c:
err = e
case <-t.C:
err = ErrBreakerTimeout
}
t.Stop()
}
if err == nil {
cb.success()
} else {
cb.fail()
}
return err
}
func (cb *ConsecCircuitBreaker) ready() bool {
lastFailureTime := time.Unix(0, atomic.LoadInt64(&cb.lastFailureTime))
if time.Since(lastFailureTime) > cb.window {
cb.reset()
return true
}
failures := atomic.LoadUint64(&cb.failures)
return failures < cb.failureThreshold
}
func (cb *ConsecCircuitBreaker) success() {
cb.reset()
}
func (cb *ConsecCircuitBreaker) fail() {
atomic.AddUint64(&cb.failures, 1)
atomic.StoreInt64(&cb.lastFailureTime, time.Now().UnixNano())
}
func (cb *ConsecCircuitBreaker) Success() {
cb.success()
}
func (cb *ConsecCircuitBreaker) Fail() {
cb.fail()
}
func (cb *ConsecCircuitBreaker) Ready() bool {
return cb.ready()
}
func (cb *ConsecCircuitBreaker) reset() {
atomic.StoreUint64(&cb.failures, 0)
atomic.StoreInt64(&cb.lastFailureTime, time.Now().UnixNano())
}