-
Notifications
You must be signed in to change notification settings - Fork 0
/
history_claimable_balances.go
152 lines (126 loc) · 5.38 KB
/
history_claimable_balances.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package history
import (
"context"
"sort"
sq "github.com/Masterminds/squirrel"
"github.com/shantanu-hashcash/go/support/db"
"github.com/shantanu-hashcash/go/support/errors"
)
// QHistoryClaimableBalances defines account related queries.
type QHistoryClaimableBalances interface {
CreateHistoryClaimableBalances(ctx context.Context, ids []string, batchSize int) (map[string]int64, error)
NewOperationClaimableBalanceBatchInsertBuilder() OperationClaimableBalanceBatchInsertBuilder
NewTransactionClaimableBalanceBatchInsertBuilder() TransactionClaimableBalanceBatchInsertBuilder
}
// CreateHistoryClaimableBalances creates rows in the history_claimable_balances table for a given list of ids.
// CreateHistoryClaimableBalances returns a mapping of id to its corresponding internal id in the history_claimable_balances table
func (q *Q) CreateHistoryClaimableBalances(ctx context.Context, ids []string, batchSize int) (map[string]int64, error) {
builder := &db.BatchInsertBuilder{
Table: q.GetTable("history_claimable_balances"),
MaxBatchSize: batchSize,
Suffix: "ON CONFLICT (claimable_balance_id) DO NOTHING",
}
// sort before inserting to prevent deadlocks on acquiring a ShareLock
// https://github.com/shantanu-hashcash/go/issues/2370
sort.Strings(ids)
for _, id := range ids {
err := builder.Row(ctx, map[string]interface{}{
"claimable_balance_id": id,
})
if err != nil {
return nil, errors.Wrap(err, "could not insert history_claimable_balances row")
}
}
err := builder.Exec(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not exec claimable balance insert builder")
}
var cbs []HistoryClaimableBalance
toInternalID := map[string]int64{}
const selectBatchSize = 10000
for i := 0; i < len(ids); i += selectBatchSize {
end := i + selectBatchSize
if end > len(ids) {
end = len(ids)
}
subset := ids[i:end]
cbs, err = q.ClaimableBalancesByIDs(ctx, subset)
if err != nil {
return nil, errors.Wrap(err, "could not select claimable balances")
}
for _, cb := range cbs {
toInternalID[cb.BalanceID] = cb.InternalID
}
}
return toInternalID, nil
}
// HistoryClaimableBalance is a row of data from the `history_claimable_balances` table
type HistoryClaimableBalance struct {
BalanceID string `db:"claimable_balance_id"`
InternalID int64 `db:"id"`
}
var selectHistoryClaimableBalance = sq.Select("hcb.*").From("history_claimable_balances hcb")
// ClaimableBalancesByIDs loads rows from `history_claimable_balances`, by claimable_balance_id
func (q *Q) ClaimableBalancesByIDs(ctx context.Context, ids []string) (dest []HistoryClaimableBalance, err error) {
sql := selectHistoryClaimableBalance.Where(map[string]interface{}{
"hcb.claimable_balance_id": ids, // hcb.claimable_balance_id IN (...)
})
err = q.Select(ctx, &dest, sql)
return dest, err
}
// ClaimableBalanceByID loads a row from `history_claimable_balances`, by claimable_balance_id
func (q *Q) ClaimableBalanceByID(ctx context.Context, id string) (dest HistoryClaimableBalance, err error) {
sql := selectHistoryClaimableBalance.Limit(1).Where("hcb.claimable_balance_id = ?", id)
err = q.Get(ctx, &dest, sql)
return dest, err
}
type OperationClaimableBalanceBatchInsertBuilder interface {
Add(operationID int64, claimableBalance FutureClaimableBalanceID) error
Exec(ctx context.Context, session db.SessionInterface) error
}
type operationClaimableBalanceBatchInsertBuilder struct {
table string
builder db.FastBatchInsertBuilder
}
func (q *Q) NewOperationClaimableBalanceBatchInsertBuilder() OperationClaimableBalanceBatchInsertBuilder {
return &operationClaimableBalanceBatchInsertBuilder{
table: "history_operation_claimable_balances",
builder: db.FastBatchInsertBuilder{},
}
}
// Add adds a new operation claimable balance to the batch
func (i *operationClaimableBalanceBatchInsertBuilder) Add(operationID int64, claimableBalance FutureClaimableBalanceID) error {
return i.builder.Row(map[string]interface{}{
"history_operation_id": operationID,
"history_claimable_balance_id": claimableBalance,
})
}
// Exec flushes all pending operation claimable balances to the db
func (i *operationClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}
type TransactionClaimableBalanceBatchInsertBuilder interface {
Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error
Exec(ctx context.Context, session db.SessionInterface) error
}
type transactionClaimableBalanceBatchInsertBuilder struct {
table string
builder db.FastBatchInsertBuilder
}
func (q *Q) NewTransactionClaimableBalanceBatchInsertBuilder() TransactionClaimableBalanceBatchInsertBuilder {
return &transactionClaimableBalanceBatchInsertBuilder{
table: "history_transaction_claimable_balances",
builder: db.FastBatchInsertBuilder{},
}
}
// Add adds a new transaction claimable balance to the batch
func (i *transactionClaimableBalanceBatchInsertBuilder) Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error {
return i.builder.Row(map[string]interface{}{
"history_transaction_id": transactionID,
"history_claimable_balance_id": claimableBalance,
})
}
// Exec flushes all pending transaction claimable balances to the db
func (i *transactionClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}