-
Notifications
You must be signed in to change notification settings - Fork 350
/
routesrv.go
234 lines (201 loc) · 6.25 KB
/
routesrv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package routesrv
import (
"context"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/zalando/skipper"
"github.com/zalando/skipper/dataclients/kubernetes"
"github.com/zalando/skipper/filters/auth"
"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/tracing"
)
// RouteServer is used to serve eskip-formatted routes,
// that originate from the polled data source.
type RouteServer struct {
metrics metrics.Metrics
server *http.Server
supportServer *http.Server
poller *poller
wg *sync.WaitGroup
}
// New returns an initialized route server according to the passed options.
// This call does not start data source updates automatically. Kept routes
// will stay in an uninitialized state, till StartUpdates is called and
// in effect data source is queried and routes initialized/updated.
func New(opts skipper.Options) (*RouteServer, error) {
if opts.PrometheusRegistry == nil {
opts.PrometheusRegistry = prometheus.NewRegistry()
}
mopt := metrics.Options{
Format: metrics.PrometheusKind,
Prefix: "routesrv",
PrometheusRegistry: opts.PrometheusRegistry,
EnableDebugGcMetrics: true,
EnableRuntimeMetrics: true,
EnableProfile: opts.EnableProfile,
BlockProfileRate: opts.BlockProfileRate,
MutexProfileFraction: opts.MutexProfileFraction,
MemProfileRate: opts.MemProfileRate,
}
m := metrics.NewMetrics(mopt)
metricsHandler := metrics.NewHandler(mopt, m)
rs := &RouteServer{
metrics: m,
}
opentracingOpts := opts.OpenTracing
if len(opentracingOpts) == 0 {
opentracingOpts = []string{"noop"}
}
tracer, err := tracing.InitTracer(opentracingOpts)
if err != nil {
return nil, err
}
b := &eskipBytes{
tracer: tracer,
metrics: m,
now: time.Now,
}
bs := &eskipBytesStatus{
b: b,
}
mux := http.NewServeMux()
mux.Handle("/health", bs)
mux.Handle("/routes", b)
supportHandler := http.NewServeMux()
supportHandler.Handle("/metrics", metricsHandler)
supportHandler.Handle("/metrics/", metricsHandler)
if opts.EnableProfile {
supportHandler.Handle("/debug/pprof", metricsHandler)
supportHandler.Handle("/debug/pprof/", metricsHandler)
}
dataclient, err := kubernetes.New(opts.KubernetesDataClientOptions())
if err != nil {
return nil, err
}
var oauthConfig *auth.OAuthConfig
if opts.EnableOAuth2GrantFlow /* explicitly enable grant flow */ {
oauthConfig = &auth.OAuthConfig{}
oauthConfig.CallbackPath = opts.OAuth2CallbackPath
}
var rh *RedisHandler
// in case we have kubernetes dataclient and we can detect redis instances, we patch redisOptions
if opts.KubernetesRedisServiceNamespace != "" && opts.KubernetesRedisServiceName != "" {
log.Infof("Use endpoints %s/%s to fetch updated redis shards", opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
rh = &RedisHandler{}
_, err := dataclient.LoadAll()
if err != nil {
return nil, err
}
rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient, m)
mux.Handle("/swarm/redis/shards", rh)
}
rs.server = &http.Server{
Addr: opts.Address,
Handler: mux,
ReadTimeout: 1 * time.Minute,
ReadHeaderTimeout: 1 * time.Minute,
}
rs.supportServer = &http.Server{
Addr: opts.SupportListener,
Handler: supportHandler,
ReadTimeout: 1 * time.Minute,
ReadHeaderTimeout: 1 * time.Minute,
}
rs.poller = &poller{
client: dataclient,
timeout: opts.SourcePollTimeout,
b: b,
quit: make(chan struct{}),
defaultFilters: opts.DefaultFilters,
editRoute: opts.EditRoute,
cloneRoute: opts.CloneRoute,
oauth2Config: oauthConfig,
tracer: tracer,
metrics: m,
}
rs.wg = &sync.WaitGroup{}
return rs, nil
}
// StartUpdates starts the data source polling process.
func (rs *RouteServer) StartUpdates() {
rs.wg.Add(1)
go rs.poller.poll(rs.wg)
}
// StopUpdates stop the data source polling process.
func (rs *RouteServer) StopUpdates() {
rs.poller.quit <- struct{}{}
}
// ServeHTTP serves kept eskip-formatted routes under /routes
// endpoint. Additionally it provides a simple health check under
// /health and Prometheus-compatible metrics under /metrics.
func (rs *RouteServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rs.server.Handler.ServeHTTP(w, r)
}
func (rs *RouteServer) startSupportListener() {
if rs.supportServer != nil {
err := rs.supportServer.ListenAndServe()
if err != nil {
log.Errorf("Failed support listener: %v", err)
}
}
}
func newShutdownFunc(rs *RouteServer) func(delay time.Duration) {
once := sync.Once{}
rs.wg.Add(1)
return func(delay time.Duration) {
once.Do(func() {
defer rs.wg.Done()
defer rs.StopUpdates()
log.Infof("shutting down the server in %s...", delay)
time.Sleep(delay)
if rs.supportServer != nil {
if err := rs.supportServer.Shutdown(context.Background()); err != nil {
log.Error("unable to shut down the support server: ", err)
}
log.Info("supportServer shut down")
}
if err := rs.server.Shutdown(context.Background()); err != nil {
log.Error("unable to shut down the server: ", err)
}
log.Info("server shut down")
})
}
}
func run(rs *RouteServer, opts skipper.Options, sigs chan os.Signal) error {
var err error
shutdown := newShutdownFunc(rs)
signal.Notify(sigs, syscall.SIGTERM)
go func() {
<-sigs
shutdown(opts.WaitForHealthcheckInterval)
}()
rs.StartUpdates()
go rs.startSupportListener()
if err = rs.server.ListenAndServe(); err != http.ErrServerClosed {
go shutdown(0)
} else {
err = nil
}
rs.wg.Wait()
return err
}
// Run starts a route server set up according to the passed options.
// It is a blocking call designed to be used as a single call/entry point,
// when running the route server as a standalone binary. It returns, when
// the server is closed, which can happen due to server startup errors or
// gracefully handled SIGTERM signal. In case of a server startup error,
// the error is returned as is.
func Run(opts skipper.Options) error {
rs, err := New(opts)
if err != nil {
return err
}
sigs := make(chan os.Signal, 1)
return run(rs, opts, sigs)
}