-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
orm.go
233 lines (212 loc) · 8.85 KB
/
orm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package keeper
import (
"math/rand"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)
// ORM implements ORM layer using PostgreSQL
type ORM struct {
q pg.Q
logger logger.Logger
}
// NewORM is the constructor of postgresORM
func NewORM(db *sqlx.DB, lggr logger.Logger, config pg.QConfig) ORM {
lggr = lggr.Named("KeeperORM")
return ORM{
q: pg.NewQ(db, lggr, config),
logger: lggr,
}
}
func (korm ORM) Q() pg.Q {
return korm.q
}
// Registries returns all registries
func (korm ORM) Registries() ([]Registry, error) {
var registries []Registry
err := korm.q.Select(®istries, `SELECT * FROM keeper_registries ORDER BY id ASC`)
return registries, errors.Wrap(err, "failed to get registries")
}
// RegistryByContractAddress returns a single registry based on provided address
func (korm ORM) RegistryByContractAddress(registryAddress ethkey.EIP55Address) (Registry, error) {
var registry Registry
err := korm.q.Get(®istry, `SELECT * FROM keeper_registries WHERE keeper_registries.contract_address = $1`, registryAddress)
return registry, errors.Wrap(err, "failed to get registry")
}
// RegistryForJob returns a specific registry for a job with the given ID
func (korm ORM) RegistryForJob(jobID int32) (Registry, error) {
var registry Registry
err := korm.q.Get(®istry, `SELECT * FROM keeper_registries WHERE job_id = $1 LIMIT 1`, jobID)
return registry, errors.Wrapf(err, "failed to get registry with job_id %d", jobID)
}
// UpsertRegistry upserts registry by the given input
func (korm ORM) UpsertRegistry(registry *Registry) error {
stmt := `
INSERT INTO keeper_registries (job_id, keeper_index, contract_address, from_address, check_gas, block_count_per_turn, num_keepers, keeper_index_map) VALUES (
:job_id, :keeper_index, :contract_address, :from_address, :check_gas, :block_count_per_turn, :num_keepers, :keeper_index_map
) ON CONFLICT (job_id) DO UPDATE SET
keeper_index = :keeper_index,
check_gas = :check_gas,
block_count_per_turn = :block_count_per_turn,
num_keepers = :num_keepers,
keeper_index_map = :keeper_index_map
RETURNING *
`
err := korm.q.GetNamed(stmt, registry, registry)
return errors.Wrap(err, "failed to upsert registry")
}
// UpsertUpkeep upserts upkeep by the given input
func (korm ORM) UpsertUpkeep(registration *UpkeepRegistration) error {
stmt := `
INSERT INTO upkeep_registrations (registry_id, execute_gas, check_data, upkeep_id, positioning_constant, last_run_block_height) VALUES (
:registry_id, :execute_gas, :check_data, :upkeep_id, :positioning_constant, :last_run_block_height
) ON CONFLICT (registry_id, upkeep_id) DO UPDATE SET
execute_gas = :execute_gas,
check_data = :check_data,
positioning_constant = :positioning_constant
RETURNING *
`
err := korm.q.GetNamed(stmt, registration, registration)
return errors.Wrap(err, "failed to upsert upkeep")
}
// UpdateUpkeepLastKeeperIndex updates the last keeper index for an upkeep
func (korm ORM) UpdateUpkeepLastKeeperIndex(jobID int32, upkeepID *big.Big, fromAddress ethkey.EIP55Address) error {
_, err := korm.q.Exec(`
UPDATE upkeep_registrations
SET
last_keeper_index = CAST((SELECT keeper_index_map -> $3 FROM keeper_registries WHERE job_id = $1) AS int)
WHERE upkeep_id = $2 AND
registry_id = (SELECT id FROM keeper_registries WHERE job_id = $1)`,
jobID, upkeepID, fromAddress.Hex())
return errors.Wrap(err, "UpdateUpkeepLastKeeperIndex failed")
}
// BatchDeleteUpkeepsForJob deletes all upkeeps by the given IDs for the job with the given ID
func (korm ORM) BatchDeleteUpkeepsForJob(jobID int32, upkeepIDs []big.Big) (int64, error) {
strIds := []string{}
for _, upkeepID := range upkeepIDs {
strIds = append(strIds, upkeepID.String())
}
res, err := korm.q.Exec(`
DELETE FROM upkeep_registrations WHERE registry_id IN (
SELECT id FROM keeper_registries WHERE job_id = $1
) AND upkeep_id = ANY($2)
`, jobID, strIds)
if err != nil {
return 0, errors.Wrap(err, "BatchDeleteUpkeepsForJob failed to delete")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return 0, errors.Wrap(err, "BatchDeleteUpkeepsForJob failed to get RowsAffected")
}
return rowsAffected, nil
}
// EligibleUpkeepsForRegistry fetches eligible upkeeps for processing
// The query checks the following conditions
// - checks the registry address is correct and the registry has some keepers associated
// -- is it my turn AND my keeper was not the last perform for this upkeep OR my keeper was the last before BUT it is past the grace period
// -- OR is it my buddy's turn AND they were the last keeper to do the perform for this upkeep
// DEV: note we cast upkeep_id and binaryHash as 32 bits, even though both are 256 bit numbers when performing XOR. This is enough information
// to distribute the upkeeps over the keepers so long as num keepers < 4294967296
func (korm ORM) EligibleUpkeepsForRegistry(registryAddress ethkey.EIP55Address, blockNumber int64, gracePeriod int64, binaryHash string) (upkeeps []UpkeepRegistration, err error) {
stmt := `
SELECT upkeep_registrations.*
FROM upkeep_registrations
INNER JOIN keeper_registries ON keeper_registries.id = upkeep_registrations.registry_id,
LATERAL ABS(
(least_significant(uint256_to_bit(upkeep_registrations.upkeep_id), 32) # least_significant($4, 32))::bigint
) AS turn
WHERE keeper_registries.contract_address = $1
AND keeper_registries.num_keepers > 0
AND
(
(
-- my turn
keeper_registries.keeper_index = turn % keeper_registries.num_keepers
AND
(
-- last keeper != me
upkeep_registrations.last_keeper_index IS DISTINCT FROM keeper_registries.keeper_index
OR
-- last keeper == me AND its past the grace period
(upkeep_registrations.last_keeper_index IS NOT DISTINCT FROM
keeper_registries.keeper_index AND
upkeep_registrations.last_run_block_height + $2 < $3)
)
)
OR
(
-- my buddy's turn
(keeper_registries.keeper_index + 1) % keeper_registries.num_keepers =
turn % keeper_registries.num_keepers
AND
-- last keeper == my buddy
upkeep_registrations.last_keeper_index IS NOT DISTINCT FROM
(keeper_registries.keeper_index + 1) % keeper_registries.num_keepers
-- buddy system only active if we have multiple keeper nodes
AND keeper_registries.num_keepers > 1
)
)
`
if err = korm.q.Select(&upkeeps, stmt, registryAddress, gracePeriod, blockNumber, binaryHash); err != nil {
return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to get upkeep_registrations")
}
if err = loadUpkeepsRegistry(korm.q, upkeeps); err != nil {
return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to load Registry on upkeeps")
}
rand.Shuffle(len(upkeeps), func(i, j int) {
upkeeps[i], upkeeps[j] = upkeeps[j], upkeeps[i]
})
return upkeeps, err
}
func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error {
registryIDM := make(map[int64]*Registry)
var registryIDs []int64
for _, upkeep := range upkeeps {
if _, exists := registryIDM[upkeep.RegistryID]; !exists {
registryIDM[upkeep.RegistryID] = new(Registry)
registryIDs = append(registryIDs, upkeep.RegistryID)
}
}
var registries []*Registry
err := q.Select(®istries, `SELECT * FROM keeper_registries WHERE id = ANY($1)`, pq.Array(registryIDs))
if err != nil {
return errors.Wrap(err, "loadUpkeepsRegistry failed")
}
for _, reg := range registries {
registryIDM[reg.ID] = reg
}
for i, upkeep := range upkeeps {
upkeeps[i].Registry = *registryIDM[upkeep.RegistryID]
}
return nil
}
func (korm ORM) AllUpkeepIDsForRegistry(regID int64) (upkeeps []big.Big, err error) {
err = korm.q.Select(&upkeeps, `
SELECT upkeep_id
FROM upkeep_registrations
WHERE registry_id = $1
`, regID)
return upkeeps, errors.Wrap(err, "allUpkeepIDs failed")
}
// SetLastRunInfoForUpkeepOnJob sets the last run block height and the associated keeper index only if the new block height is greater than the previous.
func (korm ORM) SetLastRunInfoForUpkeepOnJob(jobID int32, upkeepID *big.Big, height int64, fromAddress ethkey.EIP55Address, qopts ...pg.QOpt) (int64, error) {
res, err := korm.q.WithOpts(qopts...).Exec(`
UPDATE upkeep_registrations
SET last_run_block_height = $1,
last_keeper_index = CAST((SELECT keeper_index_map -> $4 FROM keeper_registries WHERE job_id = $3) AS int)
WHERE upkeep_id = $2 AND
registry_id = (SELECT id FROM keeper_registries WHERE job_id = $3) AND
last_run_block_height <= $1`, height, upkeepID, jobID, fromAddress.Hex())
if err != nil {
return 0, errors.Wrap(err, "SetLastRunInfoForUpkeepOnJob failed")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return 0, errors.Wrap(err, "SetLastRunInfoForUpkeepOnJob failed to get RowsAffected")
}
return rowsAffected, nil
}