forked from go-kit/kit
/
registrar.go
143 lines (120 loc) · 3.4 KB
/
registrar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package eureka
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/hudl/fargo"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
)
// Matches official Netflix Java client default.
const defaultRenewalInterval = 30 * time.Second
// The methods of fargo.Connection used in this package.
type fargoConnection interface {
RegisterInstance(instance *fargo.Instance) error
DeregisterInstance(instance *fargo.Instance) error
ReregisterInstance(instance *fargo.Instance) error
HeartBeatInstance(instance *fargo.Instance) error
ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
GetApp(name string) (*fargo.Application, error)
}
type fargoUnsuccessfulHTTPResponse struct {
statusCode int
messagePrefix string
}
func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}
// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
conn fargoConnection
instance *fargo.Instance
logger log.Logger
quitc chan chan struct{}
sync.Mutex
}
var _ sd.Registrar = (*Registrar)(nil)
// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
// Fargo connection and instance. See the integration test for usage examples.
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
return &Registrar{
conn: conn,
instance: instance,
logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
}
}
// Register implements sd.Registrar.
func (r *Registrar) Register() {
r.Lock()
defer r.Unlock()
if r.quitc != nil {
return // Already in the registration loop.
}
if err := r.conn.RegisterInstance(r.instance); err != nil {
r.logger.Log("during", "Register", "err", err)
}
r.quitc = make(chan chan struct{})
go r.loop()
}
// Deregister implements sd.Registrar.
func (r *Registrar) Deregister() {
r.Lock()
defer r.Unlock()
if r.quitc == nil {
return // Already deregistered.
}
q := make(chan struct{})
r.quitc <- q
<-q
r.quitc = nil
}
func (r *Registrar) loop() {
var renewalInterval time.Duration
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
} else {
renewalInterval = defaultRenewalInterval
}
ticker := time.NewTicker(renewalInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := r.heartbeat(); err != nil {
r.logger.Log("during", "heartbeat", "err", err)
}
case q := <-r.quitc:
if err := r.conn.DeregisterInstance(r.instance); err != nil {
r.logger.Log("during", "Deregister", "err", err)
}
close(q)
return
}
}
}
func httpResponseStatusCode(err error) (code int, present bool) {
if code, ok := fargo.HTTPResponseStatusCode(err); ok {
return code, true
}
// Allow injection of errors for testing.
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
return u.statusCode, true
}
return 0, false
}
func isNotFound(err error) bool {
code, ok := httpResponseStatusCode(err)
return ok && code == http.StatusNotFound
}
func (r *Registrar) heartbeat() error {
err := r.conn.HeartBeatInstance(r.instance)
if err == nil {
return nil
}
if isNotFound(err) {
// Instance expired (e.g. network partition). Re-register.
return r.conn.ReregisterInstance(r.instance)
}
return err
}