-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
locked_db.go
148 lines (126 loc) · 3.8 KB
/
locked_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package pg
import (
"context"
"net/url"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/smartcontractkit/sqlx"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/static"
"github.com/smartcontractkit/chainlink/v2/core/store/dialects"
)
// LockedDB bounds DB connection and DB locks.
type LockedDB interface {
Open(ctx context.Context) error
Close() error
DB() *sqlx.DB
}
type LockedDBConfig interface {
ConnectionConfig
URL() url.URL
DefaultQueryTimeout() time.Duration
Dialect() dialects.DialectName
}
type lockedDb struct {
appID uuid.UUID
cfg LockedDBConfig
lockCfg config.Lock
lggr logger.Logger
db *sqlx.DB
leaseLock LeaseLock
statsReporter *StatsReporter
}
// NewLockedDB creates a new instance of LockedDB.
func NewLockedDB(appID uuid.UUID, cfg LockedDBConfig, lockCfg config.Lock, lggr logger.Logger) LockedDB {
return &lockedDb{
appID: appID,
cfg: cfg,
lockCfg: lockCfg,
lggr: lggr.Named("LockedDB"),
}
}
// OpenUnlockedDB just opens DB connection, without any DB locks.
// This should be used carefully, when we know we don't need any locks.
// Currently this is used by RebroadcastTransactions command only.
func OpenUnlockedDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
return openDB(appID, cfg)
}
// Open function connects to DB and acquires DB locks based on configuration.
// If any of the steps fails or ctx is cancelled, it reverts everything.
// This is a blocking function and it may execute long due to DB locks acquisition.
// NOT THREAD SAFE
func (l *lockedDb) Open(ctx context.Context) (err error) {
// If Open succeeded previously, db will not be nil
if l.db != nil {
l.lggr.Panic("calling Open() twice")
}
// Step 1: open DB connection
l.db, err = openDB(l.appID, l.cfg)
if err != nil {
// l.db will be nil in case of error
return errors.Wrap(err, "failed to open db")
}
revert := func() {
// Let Open() return the actual error, while l.Close() error is just logged.
if err2 := l.Close(); err2 != nil {
l.lggr.Errorf("failed to cleanup LockedDB: %v", err2)
}
}
// Step 2: start the stat reporter
l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr)
l.statsReporter.Start(ctx)
// Step 3: acquire DB locks
lockingMode := l.lockCfg.LockingMode()
l.lggr.Debugf("Using database locking mode: %s", lockingMode)
// Take the lease before any other DB operations
switch lockingMode {
case "lease":
cfg := LeaseLockConfig{
DefaultQueryTimeout: l.cfg.DefaultQueryTimeout(),
LeaseDuration: l.lockCfg.LeaseDuration(),
LeaseRefreshInterval: l.lockCfg.LeaseRefreshInterval(),
}
l.leaseLock = NewLeaseLock(l.db, l.appID, l.lggr, cfg)
if err = l.leaseLock.TakeAndHold(ctx); err != nil {
defer revert()
return errors.Wrap(err, "failed to take initial lease on database")
}
}
return
}
// Close function releases DB locks (if acquired by Open) and closes DB connection.
// Closing of a closed LockedDB instance has no effect.
// NOT THREAD SAFE
func (l *lockedDb) Close() error {
defer func() {
l.db = nil
l.leaseLock = nil
l.statsReporter = nil
}()
// Step 0: stop the stat reporter
if l.statsReporter != nil {
l.statsReporter.Stop()
}
// Step 1: release DB locks
if l.leaseLock != nil {
l.leaseLock.Release()
}
// Step 2: close DB connection
if l.db != nil {
return l.db.Close()
}
return nil
}
// DB returns DB connection if Opened successfully, or nil.
func (l lockedDb) DB() *sqlx.DB {
return l.db
}
func openDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
uri := cfg.URL()
static.SetConsumerName(&uri, "App", &appID)
dialect := cfg.Dialect()
db, err = NewConnection(uri.String(), dialect, cfg)
return
}