-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
orm.go
90 lines (75 loc) · 2.64 KB
/
orm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package upkeepstate
import (
"context"
"math/big"
"time"
"github.com/lib/pq"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
)
type orm struct {
chainID *ubig.Big
ds sqlutil.DataSource
}
type persistedStateRecord struct {
UpkeepID *ubig.Big
WorkID string
CompletionState uint8
BlockNumber int64
IneligibilityReason uint8
InsertedAt time.Time
}
// NewORM creates an ORM scoped to chainID.
func NewORM(chainID *big.Int, ds sqlutil.DataSource) *orm {
return &orm{
chainID: ubig.New(chainID),
ds: ds,
}
}
// BatchInsertRecords is idempotent and sets upkeep state values in db
func (o *orm) BatchInsertRecords(ctx context.Context, state []persistedStateRecord) error {
if len(state) == 0 {
return nil
}
type row struct {
EvmChainId *ubig.Big
WorkId string
CompletionState uint8
BlockNumber int64
InsertedAt time.Time
UpkeepId *ubig.Big
IneligibilityReason uint8
}
var rows []row
for _, record := range state {
rows = append(rows, row{
EvmChainId: o.chainID,
WorkId: record.WorkID,
CompletionState: record.CompletionState,
BlockNumber: record.BlockNumber,
InsertedAt: record.InsertedAt,
UpkeepId: record.UpkeepID,
IneligibilityReason: record.IneligibilityReason,
})
}
_, err := o.ds.NamedExecContext(ctx, `INSERT INTO evm.upkeep_states
(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES
(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows)
return err
}
// SelectStatesByWorkIDs searches the data store for stored states for the
// provided work ids and configured chain id
func (o *orm) SelectStatesByWorkIDs(ctx context.Context, workIDs []string) (states []persistedStateRecord, err error) {
err = o.ds.SelectContext(ctx, &states, `SELECT upkeep_id, work_id, completion_state, block_number, ineligibility_reason, inserted_at
FROM evm.upkeep_states
WHERE work_id = ANY($1) AND evm_chain_id = $2::NUMERIC`, pq.Array(workIDs), o.chainID)
if err != nil {
return nil, err
}
return states, err
}
// DeleteExpired prunes stored states older than to the provided time
func (o *orm) DeleteExpired(ctx context.Context, expired time.Time) error {
_, err := o.ds.ExecContext(ctx, `DELETE FROM evm.upkeep_states WHERE inserted_at <= $1 AND evm_chain_id::NUMERIC = $2`, expired, o.chainID)
return err
}