/
synchronizer.go
201 lines (169 loc) · 6.45 KB
/
synchronizer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package dataplane
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
dpconf "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/config"
)
// -----------------------------------------------------------------------------
// Dataplane Synchronizer - Public Vars
// -----------------------------------------------------------------------------
const (
// DefaultSyncSeconds indicates the time.Duration (minimum) that will occur between
// updates to the DataplaneClient.
//
// This default was based on local testing wherein it appeared sub-second updates
// to the Admin API could be problematic (or at least operate differently) based on
// which storage backend was in use (i.e. "dbless", "postgres"). This is a workaround
// for improvements we still need to investigate upstream.
//
// See Also: https://github.com/Kong/kubernetes-ingress-controller/issues/1398
DefaultSyncSeconds float32 = 3.0
DefaultCacheSyncWaitDuration = 5 * time.Second
)
// -----------------------------------------------------------------------------
// Synchronizer - Public Types
// -----------------------------------------------------------------------------
// Synchronizer is a threadsafe object which starts a goroutine to updates
// the data-plane at regular intervals.
type Synchronizer struct {
logger logr.Logger
// dataplane client to send updates to the Kong Admin API
dataplaneClient Client
dbMode dpconf.DBMode
// server configuration, flow control, channels and utility attributes
stagger time.Duration
syncTicker *time.Ticker
configApplied bool
isServerRunning bool
initWaitPeriod time.Duration
lock sync.RWMutex
}
type SynchronizerOption func(*Synchronizer)
// WithStagger returns a SynchronizerOption which sets the stagger period.
func WithStagger(period time.Duration) SynchronizerOption {
return func(s *Synchronizer) {
s.stagger = period
}
}
// WithInitCacheSyncDuration returns a SynchronizerOption which sets the initial wait period.
func WithInitCacheSyncDuration(period time.Duration) SynchronizerOption {
return func(s *Synchronizer) {
s.initWaitPeriod = period
}
}
// NewSynchronizer will provide a new Synchronizer object with a specified
// stagger time for data-plane updates to occur. Note that this starts some
// background goroutines and the caller is resonsible for marking the provided
// context.Context as "Done()" to shut down the background routines.
func NewSynchronizer(logger logr.Logger, client Client, opts ...SynchronizerOption) (*Synchronizer, error) {
synchronizer := &Synchronizer{
logger: logger,
stagger: time.Duration(DefaultSyncSeconds),
initWaitPeriod: DefaultCacheSyncWaitDuration,
dataplaneClient: client,
configApplied: false,
dbMode: client.DBMode(),
}
for _, opt := range opts {
opt(synchronizer)
}
return synchronizer, nil
}
// -----------------------------------------------------------------------------
// Synchronizer - Public Methods
// -----------------------------------------------------------------------------
// Start starts the goroutine synchronization server that will perform an
// Update() on the provided dataplane.Client according to the provided stagger
// time, or using the DefaultSyncSeconds if not otherwise provided.
//
// To stop the server, the provided context must be Done().
func (p *Synchronizer) Start(ctx context.Context) error {
select {
// TODO https://github.com/Kong/kubernetes-ingress-controller/issues/2315
// This is a temporary mitigation to allow some time for controllers to
// populate their dataplaneClient cache.
case <-time.After(p.initWaitPeriod):
case <-ctx.Done():
return fmt.Errorf("Synchronizer Start() interrupted: %w", ctx.Err())
}
p.lock.Lock()
defer p.lock.Unlock()
if p.isServerRunning {
return fmt.Errorf("server is already running")
}
p.syncTicker = time.NewTicker(p.stagger)
go p.startUpdateServer(ctx)
p.isServerRunning = true
return nil
}
// IsRunning informs the caller whether the synchronization server is running.
func (p *Synchronizer) IsRunning() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isServerRunning
}
// IsReady indicates whether the synchronizer is actively able to synchronize
// configuration to the dataplane. It's similar to IsRunning() but reports
// on whether configuration can actually be successful and is also used as part
// of a controller-runtime Runnable interface to wait for readiness before
// starting controllers.
func (p *Synchronizer) IsReady() bool {
p.lock.RLock()
defer p.lock.RUnlock()
// If the proxy is has no database, it is only ready after a successful sync
// Otherwise, it has no configuration loaded
if p.dbMode.IsDBLessMode() {
return p.configApplied
}
// If the proxy has a database, it is ready immediately
// It will load existing configuration from the database
return true
}
// NeedLeaderElection implements the controller-runtime Runnable interface to
// inform the controller manager whether leadership election is needed, which
// is always true in our case.
func (p *Synchronizer) NeedLeaderElection() bool {
return true
}
// -----------------------------------------------------------------------------
// Synchronizer - Private Methods - Server Utilities
// -----------------------------------------------------------------------------
// startUpdateServer runs a server in a background goroutine that is responsible for
// updating the kong proxy backend at regular intervals.
func (p *Synchronizer) startUpdateServer(ctx context.Context) {
var initialConfig sync.Once
for {
select {
case <-ctx.Done():
p.logger.Info("Context done: shutting down the proxy update server")
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
p.logger.Error(err, "Context completed with error")
}
p.syncTicker.Stop()
p.lock.Lock()
defer p.lock.Unlock()
p.isServerRunning = false
p.configApplied = false
return
case <-p.syncTicker.C:
if err := p.dataplaneClient.Update(ctx); err != nil {
p.logger.Error(err, "Could not update kong admin")
continue
}
initialConfig.Do(p.markConfigApplied)
}
}
}
// -----------------------------------------------------------------------------
// Synchronizer - Private Methods - Helper
// -----------------------------------------------------------------------------
// markConfigApplied marks that config has been applied.
func (p *Synchronizer) markConfigApplied() {
p.lock.Lock()
defer p.lock.Unlock()
p.configApplied = true
}