-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
lease_lock.go
291 lines (264 loc) · 9.89 KB
/
lease_lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package pg
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"github.com/smartcontractkit/sqlx"
"go.uber.org/multierr"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
// LeaseLock handles taking an exclusive lease on database access. This is not
// enforced by any database primitives, but rather voluntarily respected by
// other instances of the Chainlink application.
//
// Chainlink is designed to run as a single instance. Running multiple
// instances of Chainlink on a single database at the same time is not
// supported and likely to lead to strange errors and possibly even data
// integrity failures.
//
// With that being said, a common use case is to run multiple Chainlink
// instances in failover mode. The first instance will take some kind of lock
// on the database and subsequent instances will wait trying to take this lock
// in case the first instance disappears or dies.
//
// Traditionally Chainlink has used an advisory lock to manage this. However,
// advisory locks come with several problems, notably: - Postgres does not
// really like it when you hold locks open for a very long time (hours/days).
// It hampers certain internal cleanup tasks and is explicitly discouraged by
// the postgres maintainers - The advisory lock can silently disappear on
// postgres upgrade - Advisory locks do not play nicely with pooling tools such
// as pgbouncer - If the application crashes, the advisory lock can be left
// hanging around for a while (sometimes hours) and can require manual
// intervention to remove it
//
// For this reason, we now use a database leaseLock instead, which works as
// such: - Have one row in a database which is updated periodically with the
// client ID - CL node A will run a background process on start that updates
// this e.g. once per second - CL node B will spinlock, checking periodically
// to see if the update got too old. If it goes more than, say, 5s without
// updating, it assumes that node A is dead and takes over. Now CL node B is
// the owner of the row and it updates this every second - If CL node A comes
// back somehow, it will go to take out a lease and realise that the database
// has been leased to another process, so it will panic and quit immediately
type LeaseLock interface {
TakeAndHold(ctx context.Context) error
ClientID() uuid.UUID
Release()
}
type LeaseLockConfig interface {
DatabaseDefaultQueryTimeout() time.Duration
LeaseLockDuration() time.Duration
LeaseLockRefreshInterval() time.Duration
}
var _ LeaseLock = &leaseLock{}
type leaseLock struct {
id uuid.UUID
db *sqlx.DB
conn *sqlx.Conn
cfg LeaseLockConfig
logger logger.Logger
stop func()
wgReleased sync.WaitGroup
}
// NewLeaseLock creates a "leaseLock" - an entity that tries to take an exclusive lease on the database
func NewLeaseLock(db *sqlx.DB, appID uuid.UUID, lggr logger.Logger, cfg LeaseLockConfig) LeaseLock {
refreshInterval := cfg.LeaseLockRefreshInterval()
leaseDuration := cfg.LeaseLockDuration()
if refreshInterval > leaseDuration/2 {
panic("refresh interval must be <= half the lease duration")
}
return &leaseLock{appID, db, nil, cfg, lggr.Named("LeaseLock").With("appID", appID), func() {}, sync.WaitGroup{}}
}
// TakeAndHold will block and wait indefinitely until it can get its first lock or ctx is cancelled.
// Release() function must be used to release the acquired lock.
// NOT THREAD SAFE
func (l *leaseLock) TakeAndHold(ctx context.Context) (err error) {
l.logger.Debug("Taking initial lease...")
retryCount := 0
isInitial := true
for {
var gotLease bool
var err error
err = func() error {
qctx, cancel := context.WithTimeout(ctx, l.cfg.DatabaseDefaultQueryTimeout())
defer cancel()
if l.conn == nil {
if err = l.checkoutConn(qctx); err != nil {
return errors.Wrap(err, "lease lock failed to checkout initial connection")
}
}
gotLease, err = l.getLease(qctx, isInitial)
if errors.Is(err, sql.ErrConnDone) {
l.logger.Warnw("DB connection was unexpectedly closed; checking out a new one", "err", err)
l.conn = nil
return err
}
return nil
}()
if errors.Is(err, sql.ErrConnDone) {
continue
} else if err != nil {
err = errors.Wrap(err, "failed to get lease lock")
if l.conn != nil {
err = multierr.Combine(err, l.conn.Close())
}
return err
}
if gotLease {
break
}
isInitial = false
l.logRetry(retryCount)
retryCount++
select {
case <-ctx.Done():
err = errors.New("stopped")
if l.conn != nil {
err = multierr.Combine(err, l.conn.Close())
}
return err
case <-time.After(utils.WithJitter(l.cfg.LeaseLockRefreshInterval())):
}
}
l.logger.Debug("Got exclusive lease on database")
lctx, cancel := context.WithCancel(context.Background())
l.stop = cancel
l.wgReleased.Add(1)
// Once the lock is acquired, Release() method must be used to release the lock (hence different context).
// This is done on purpose: Release() method has exclusive control on releasing the lock.
go l.loop(lctx)
return nil
}
// Release requests the lock to release and blocks until it gets released.
// Calling Release for a released lock has no effect.
func (l *leaseLock) Release() {
l.stop()
l.wgReleased.Wait()
}
// checkout dedicated connection for lease lock to bypass any DB contention
func (l *leaseLock) checkoutConn(ctx context.Context) (err error) {
newConn, err := l.db.Connx(ctx)
if err != nil {
return errors.Wrap(err, "failed checking out connection from pool")
}
l.conn = newConn
if err = l.setInitialTimeouts(ctx); err != nil {
return multierr.Combine(
errors.Wrap(err, "failed to set initial timeouts"),
l.conn.Close(),
)
}
return nil
}
func (l *leaseLock) setInitialTimeouts(ctx context.Context) error {
// Set short timeouts to prevent some kind of pathological situation
// occurring where we get stuck waiting for the table lock, or hang during
// the transaction - we do not want to leave rows locked if this process is
// dead
ms := l.cfg.LeaseLockDuration().Milliseconds()
return multierr.Combine(
utils.JustError(l.conn.ExecContext(ctx, fmt.Sprintf(`SET SESSION lock_timeout = %d`, ms))),
utils.JustError(l.conn.ExecContext(ctx, fmt.Sprintf(`SET SESSION idle_in_transaction_session_timeout = %d`, ms))),
)
}
func (l *leaseLock) logRetry(count int) {
if count%1000 == 0 || (count < 1000 && count&(count-1) == 0) {
l.logger.Infow("Another application is currently holding the database lease (or a previous instance exited uncleanly), waiting for lease to expire...", "tryCount", count)
}
}
func (l *leaseLock) loop(ctx context.Context) {
defer l.wgReleased.Done()
refresh := time.NewTicker(l.cfg.LeaseLockRefreshInterval())
defer refresh.Stop()
for {
select {
case <-ctx.Done():
qctx, cancel := context.WithTimeout(context.Background(), l.cfg.DatabaseDefaultQueryTimeout())
err := multierr.Combine(
utils.JustError(l.conn.ExecContext(qctx, `UPDATE lease_lock SET expires_at=NOW() WHERE client_id = $1 AND expires_at > NOW()`, l.id)),
l.conn.Close(),
)
cancel()
if err != nil {
l.logger.Warnw("Error trying to release lease on cancelled ctx", "err", err)
}
return
case <-refresh.C:
qctx, cancel := context.WithTimeout(ctx, l.cfg.LeaseLockDuration())
gotLease, err := l.getLease(qctx, false)
if errors.Is(err, sql.ErrConnDone) {
l.logger.Warnw("DB connection was unexpectedly closed; checking out a new one", "err", err)
if err = l.checkoutConn(ctx); err != nil {
l.logger.Warnw("Error trying to refresh connection", "err", err)
}
gotLease, err = l.getLease(ctx, false)
}
cancel()
if err != nil {
l.logger.Errorw("Error trying to refresh database lease", "err", err)
} else if !gotLease {
if err := l.db.Close(); err != nil {
l.logger.Errorw("Failed to close DB", "err", err)
}
l.logger.Fatal("Another node has taken the lease, exiting immediately")
}
}
}
}
// initialSQL is necessary because the application attempts to take the lease
// lock BEFORE running migrations
var initialSQL = []string{
`CREATE TABLE IF NOT EXISTS lease_lock (client_id uuid NOT NULL, expires_at timestamptz NOT NULL)`,
`CREATE UNIQUE INDEX IF NOT EXISTS only_one_lease_lock ON lease_lock ((client_id IS NOT NULL))`,
}
// GetLease tries to get a lease from the DB
// If successful, returns true
// If the lease is currently held by someone else, returns false
// If some other error occurred, returns the error
func (l *leaseLock) getLease(ctx context.Context, isInitial bool) (gotLease bool, err error) {
l.logger.Trace("Refreshing database lease")
leaseDuration := fmt.Sprintf("%f seconds", l.cfg.LeaseLockDuration().Seconds())
// NOTE: Uses database time for all calculations since it's conceivable
// that node local times might be skewed compared to each other
err = sqlxTransactionQ(ctx, l.conn, l.logger, func(tx Queryer) error {
if isInitial {
for _, query := range initialSQL {
if _, err = tx.Exec(query); err != nil {
return errors.Wrap(err, "failed to create initial lease_lock table")
}
}
}
// Upsert the lease_lock, only overwriting an existing one if the existing one has expired
var res sql.Result
res, err = tx.Exec(`
INSERT INTO lease_lock (client_id, expires_at) VALUES ($1, NOW()+$2::interval) ON CONFLICT ((client_id IS NOT NULL)) DO UPDATE SET
client_id = EXCLUDED.client_id,
expires_at = EXCLUDED.expires_at
WHERE
lease_lock.client_id = $1
OR
lease_lock.expires_at < NOW()
`, l.id, leaseDuration)
if err != nil {
return errors.Wrap(err, "failed to upsert lease_lock")
}
var rowsAffected int64
rowsAffected, err = res.RowsAffected()
if err != nil {
return errors.Wrap(err, "failed to get RowsAffected for lease lock upsert")
}
if rowsAffected > 0 {
gotLease = true
}
return nil
})
return gotLease, errors.Wrap(err, "leaseLock#GetLease failed")
}
func (l *leaseLock) ClientID() uuid.UUID {
return l.id
}