forked from go-chassis/go-chassis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bizkeeper_consumer_handle.go
executable file
·103 lines (92 loc) · 3.08 KB
/
bizkeeper_consumer_handle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package handler
import (
"fmt"
"github.com/go-chassis/go-chassis/client/rest"
"github.com/go-chassis/go-chassis/control"
"github.com/go-chassis/go-chassis/core/common"
"github.com/go-chassis/go-chassis/core/config"
"github.com/go-chassis/go-chassis/core/invocation"
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
"io"
"io/ioutil"
"net/http"
)
// constant for bizkeeper-consumer
const (
Name = "bizkeeper-consumer"
)
// BizKeeperConsumerHandler bizkeeper consumer handler
type BizKeeperConsumerHandler struct{}
// Handle function is for to handle the chain
func (bk *BizKeeperConsumerHandler) Handle(chain *Chain, i *invocation.Invocation, cb invocation.ResponseCallBack) {
command, cmdConfig := control.DefaultPanel.GetCircuitBreaker(*i, common.Consumer)
hystrix.ConfigureCommand(command, cmdConfig)
finish := make(chan *invocation.Response, 1)
err := hystrix.Do(command, func() (err error) {
chain.Next(i, func(resp *invocation.Response) error {
err = resp.Err
select {
case finish <- resp:
default:
// means hystrix error occurred
}
return err
})
return
}, GetFallbackFun(command, common.Consumer, i, finish, cmdConfig.ForceFallback))
//if err is not nil, means fallback is nil, return original err
if err != nil {
writeErr(err, cb)
return
}
cb(<-finish)
}
// GetFallbackFun get fallback function
func GetFallbackFun(cmd, t string, i *invocation.Invocation, finish chan *invocation.Response, isForce bool) func(error) error {
enabled := config.GetFallbackEnabled(cmd, t)
if enabled || isForce {
return func(err error) error {
// if err is type of hystrix error, return a new response
if err.Error() == hystrix.ErrForceFallback.Error() || err.Error() == hystrix.ErrCircuitOpen.Error() ||
err.Error() == hystrix.ErrMaxConcurrency.Error() || err.Error() == hystrix.ErrTimeout.Error() {
// isolation happened, so lead to callback
lager.Logger.Errorf(err, fmt.Sprintf("fallback for %v", cmd))
resp := &invocation.Response{}
var code = http.StatusOK
if config.PolicyNull == config.GetPolicy(i.MicroServiceName, t) {
resp.Err = hystrix.FallbackNullError{Message: "return null"}
} else {
resp.Err = hystrix.CircuitError{Message: i.MicroServiceName + " is isolated because of error: " + err.Error()}
code = http.StatusRequestTimeout
}
switch i.Reply.(type) {
case *rest.Response:
resp := i.Reply.(*rest.Response)
resp.SetStatusCode(code)
//make sure body is empty
if resp.GetResponse().Body != nil {
io.Copy(ioutil.Discard, resp.GetResponse().Body)
resp.GetResponse().Body.Close()
}
}
select {
case finish <- resp:
default:
}
return nil //no need to return error
}
// call back success
return nil
}
}
return nil
}
// newBizKeeperConsumerHandler new bizkeeper consumer handler
func newBizKeeperConsumerHandler() Handler {
return &BizKeeperConsumerHandler{}
}
// Name is for to represent the name of bizkeeper handler
func (bk *BizKeeperConsumerHandler) Name() string {
return Name
}