-
Notifications
You must be signed in to change notification settings - Fork 343
/
polling.go
129 lines (113 loc) · 3.27 KB
/
polling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package routesrv
import (
"context"
"fmt"
"sort"
"sync"
"time"
ot "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters/auth"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/tracing"
)
const (
LogPollingStarted = "starting polling"
LogPollingStopped = "polling stopped"
LogRoutesFetchingFailed = "failed to fetch routes"
LogRoutesEmpty = "received empty routes; ignoring"
LogRoutesInitialized = "routes initialized"
LogRoutesUpdated = "routes updated"
)
var (
pollingStarted = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "routesrv",
Name: "polling_started_timestamp",
Help: "UNIX time when the routes polling has started",
})
routesInitialized = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "routesrv",
Name: "routes_initialized_timestamp",
Help: "UNIX time when the first routes were received and stored",
})
routesUpdated = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "routesrv",
Name: "routes_updated_timestamp",
Help: "UNIX time of the last routes update (initial load counts as well)",
})
)
type poller struct {
client routing.DataClient
b *eskipBytes
timeout time.Duration
quit chan struct{}
// Preprocessors
defaultFilters *eskip.DefaultFilters
oauth2Config *auth.OAuthConfig
// tracer
tracer ot.Tracer
}
func (p *poller) poll(wg *sync.WaitGroup) {
defer wg.Done()
var (
routesCount, routesBytes int
initialized bool
msg string
)
log.WithField("timeout", p.timeout).Info(LogPollingStarted)
pollingStarted.SetToCurrentTime()
for {
span := tracing.CreateSpan("poll_routes", context.TODO(), p.tracer)
routes, err := p.client.LoadAll()
if p.defaultFilters != nil {
routes = p.defaultFilters.Do(routes)
}
if p.oauth2Config != nil {
routes = p.oauth2Config.NewGrantPreprocessor().Do(routes)
}
routesCount = len(routes)
switch {
case err != nil:
log.WithError(err).Error(LogRoutesFetchingFailed)
span.SetTag("error", true)
span.LogKV(
"event", "error",
"message", fmt.Sprintf("%s: %s", LogRoutesFetchingFailed, err),
)
case routesCount == 0:
log.Error(LogRoutesEmpty)
span.SetTag("error", true)
span.LogKV(
"event", "error",
"message", msg,
)
case routesCount > 0:
// sort the routes, otherwise it will lead to different etag values for the same route list for different orders
sort.SliceStable(routes, func(i, j int) bool {
return routes[i].Id < routes[j].Id
})
routesBytes, initialized = p.b.formatAndSet(routes)
logger := log.WithFields(log.Fields{"count": routesCount, "bytes": routesBytes})
if initialized {
logger.Info(LogRoutesInitialized)
span.SetTag("routes.initialized", true)
routesInitialized.SetToCurrentTime()
} else {
logger.Info(LogRoutesUpdated)
}
routesUpdated.SetToCurrentTime()
span.SetTag("routes.count", routesCount)
span.SetTag("routes.bytes", routesBytes)
}
span.Finish()
select {
case <-p.quit:
log.Info(LogPollingStopped)
return
case <-time.After(p.timeout):
}
}
}