-
Notifications
You must be signed in to change notification settings - Fork 73
/
config_poller.go
147 lines (121 loc) · 4.3 KB
/
config_poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package configpoller
import (
"context"
"net/http"
"time"
"github.com/hashicorp/go-retryablehttp"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/internal/cdn"
"github.com/wundergraph/cosmo/router/internal/controlplane"
"go.uber.org/zap"
)
type Option func(cp *configPoller)
type ConfigPoller interface {
// Subscribe subscribes to the config poller with a handler function that will be invoked
// with the latest router config and the previous version string. If the handler takes longer than the poll interval
// to execute, the next invocation will be skipped.
Subscribe(ctx context.Context, handler func(newConfig *nodev1.RouterConfig, oldVersion string) error)
// GetRouterConfig returns the latest router config from the CDN
// If the Config is nil, no new config is available and the current config should be used.
// and updates the latest router config version. This method is only used for the initial config
GetRouterConfig(ctx context.Context) (*nodev1.RouterConfig, error)
// Stop stops the config poller. After calling stop, the config poller cannot be used again.
Stop(ctx context.Context) error
}
type configPoller struct {
graphApiToken string
controlplaneEndpoint string
logger *zap.Logger
latestRouterConfigVersion string
poller controlplane.Poller
pollInterval time.Duration
cdnConfigClient *cdn.RouterConfigClient
}
func New(endpoint, token string, opts ...Option) ConfigPoller {
c := &configPoller{
controlplaneEndpoint: endpoint,
graphApiToken: token,
}
for _, opt := range opts {
opt(c)
}
if c.logger == nil {
c.logger = zap.NewNop()
}
retryClient := retryablehttp.NewClient()
retryClient.RetryWaitMax = 60 * time.Second
retryClient.RetryMax = 5
retryClient.Backoff = retryablehttp.DefaultBackoff
retryClient.Logger = nil
retryClient.RequestLogHook = func(_ retryablehttp.Logger, _ *http.Request, retry int) {
if retry > 0 {
c.logger.Info("Fetch router config from controlplane", zap.Int("retry", retry))
}
}
c.poller = controlplane.NewPoll(c.pollInterval)
return c
}
func (c *configPoller) Version() string {
return c.latestRouterConfigVersion
}
// Stop stops the config poller
func (c *configPoller) Stop(_ context.Context) error {
return c.poller.Stop()
}
func (c *configPoller) Subscribe(ctx context.Context, handler func(newConfig *nodev1.RouterConfig, _ string) error) {
c.poller.Subscribe(ctx, func() {
cfg, err := c.getRouterConfig(ctx)
if err != nil {
c.logger.Sugar().Errorf("Could not fetch for config update. Trying again in %s", c.pollInterval.String())
return
}
if cfg == nil {
c.logger.Sugar().Debugf("No new router config available. Trying again in %s", c.pollInterval.String())
return
}
newVersion := cfg.GetVersion()
latestVersion := c.latestRouterConfigVersion
// If the version hasn't changed, don't invoke the handler
if newVersion == latestVersion {
c.logger.Info("Router config version has not changed, skipping handler invocation")
return
}
if err := handler(cfg, c.latestRouterConfigVersion); err != nil {
c.logger.Error("Error invoking config poll handler", zap.Error(err))
return
}
// only update the version if the handler was invoked successfully
c.latestRouterConfigVersion = cfg.GetVersion()
})
}
func (c *configPoller) getRouterConfig(ctx context.Context) (*nodev1.RouterConfig, error) {
cfg, err := c.cdnConfigClient.RouterConfig(ctx, c.latestRouterConfigVersion)
if err != nil {
return nil, err
}
return cfg, nil
}
// GetRouterConfig returns the latest router config from the CDN first, if not found then it fetches from the controlplane.
// Not safe for concurrent use.
func (c *configPoller) GetRouterConfig(ctx context.Context) (*nodev1.RouterConfig, error) {
cfg, err := c.getRouterConfig(ctx)
if err == nil {
c.latestRouterConfigVersion = cfg.GetVersion()
}
return cfg, err
}
func WithLogger(logger *zap.Logger) Option {
return func(s *configPoller) {
s.logger = logger
}
}
func WithPollInterval(interval time.Duration) Option {
return func(s *configPoller) {
s.pollInterval = interval
}
}
func WithCDNClient(cdnConfigClient *cdn.RouterConfigClient) Option {
return func(s *configPoller) {
s.cdnConfigClient = cdnConfigClient
}
}