/
claimable_balance_loader.go
161 lines (142 loc) · 4.31 KB
/
claimable_balance_loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package history
import (
"context"
"database/sql/driver"
"fmt"
"sort"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
)
// FutureClaimableBalanceID represents a future history claimable balance.
// A FutureClaimableBalanceID is created by a ClaimableBalanceLoader and
// the claimable balance id is available after calling Exec() on
// the ClaimableBalanceLoader.
type FutureClaimableBalanceID struct {
id string
loader *ClaimableBalanceLoader
}
// Value implements the database/sql/driver Valuer interface.
func (a FutureClaimableBalanceID) Value() (driver.Value, error) {
return a.loader.getNow(a.id)
}
// ClaimableBalanceLoader will map claimable balance ids to their internal
// history ids. If there is no existing mapping for a given claimable balance id,
// the ClaimableBalanceLoader will insert into the history_claimable_balances table to
// establish a mapping.
type ClaimableBalanceLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}
// NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance.
func NewClaimableBalanceLoader() *ClaimableBalanceLoader {
return &ClaimableBalanceLoader{
sealed: false,
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
}
}
// GetFuture registers the given claimable balance into the loader and
// returns a FutureClaimableBalanceID which will hold the internal history id for
// the claimable balance after Exec() is called.
func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID {
if a.sealed {
panic(errSealed)
}
a.set.Add(id)
return FutureClaimableBalanceID{
id: id,
loader: a,
}
}
// getNow returns the internal history id for the given claimable balance.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// call can succeed.
func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid claimable balance loader state,
Exec was not called yet to properly seal and resolve %v id`, id)
}
if internalID, ok := a.ids[id]; !ok {
return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id)
} else {
return internalID, nil
}
}
func (a *ClaimableBalanceLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error {
for i := 0; i < len(ids); i += loaderLookupBatchSize {
end := ordered.Min(len(ids), i+loaderLookupBatchSize)
cbs, err := q.ClaimableBalancesByIDs(ctx, ids[i:end])
if err != nil {
return errors.Wrap(err, "could not select claimable balances")
}
for _, cb := range cbs {
a.ids[cb.BalanceID] = cb.InternalID
}
}
return nil
}
// Exec will look up all the internal history ids for the claimable balances registered in the loader.
// If there are no internal ids for a given set of claimable balances, Exec will insert rows
// into the history_claimable_balances table.
func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInterface) error {
a.sealed = true
if len(a.set) == 0 {
return nil
}
q := &Q{session}
ids := make([]string, 0, len(a.set))
for id := range a.set {
ids = append(ids, id)
}
if err := a.lookupKeys(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)
insert := 0
for _, id := range ids {
if _, ok := a.ids[id]; ok {
continue
}
ids[insert] = id
insert++
}
if insert == 0 {
return nil
}
ids = ids[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(ids)
err := bulkInsert(
ctx,
q,
"history_claimable_balances",
[]string{"claimable_balance_id"},
[]bulkInsertField{
{
name: "claimable_balance_id",
dbType: "text",
objects: ids,
},
},
)
if err != nil {
return err
}
a.stats.Inserted += insert
return a.lookupKeys(ctx, q, ids)
}
// Stats returns the number of claimable balances registered in the loader and the number of claimable balances
// inserted into the history_claimable_balances table.
func (a *ClaimableBalanceLoader) Stats() LoaderStats {
return a.stats
}
func (a *ClaimableBalanceLoader) Name() string {
return "ClaimableBalanceLoader"
}