-
Notifications
You must be signed in to change notification settings - Fork 211
/
store.go
178 lines (153 loc) · 5.81 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package txs
import (
"context"
"fmt"
"time"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/layers"
"github.com/spacemeshos/go-spacemesh/sql/transactions"
)
type store struct {
db *sql.Database
}
func newStore(db *sql.Database) *store {
return &store{
db: db,
}
}
// LastAppliedLayer returns the last layer applied in mesh.
func (s *store) LastAppliedLayer() (types.LayerID, error) {
// it's not correct to query transactions table for max applied layer because
// layer can be empty (contains no transactions).
return layers.GetLastApplied(s.db)
}
// GetMeshHash gets the aggregated layer hash at the specified layer.
func (s *store) GetMeshHash(lid types.LayerID) (types.Hash32, error) {
return layers.GetAggregatedHash(s.db, lid)
}
// Add adds a transaction to the database.
func (s *store) Add(tx *types.Transaction, received time.Time) error {
return transactions.Add(s.db, tx, received)
}
// Has returns true if a transaction already exists in the database.
func (s *store) Has(tid types.TransactionID) (bool, error) {
return transactions.Has(s.db, tid)
}
// Get returns a transaction from the database.
func (s *store) Get(tid types.TransactionID) (*types.MeshTransaction, error) {
return transactions.Get(s.db, tid)
}
// GetBlob returns a transaction as a byte array.
func (s *store) GetBlob(tid types.TransactionID) ([]byte, error) {
return transactions.GetBlob(s.db, tid)
}
// GetByAddress returns a list of transactions from `address` with layers in [from, to].
func (s *store) GetByAddress(from, to types.LayerID, address types.Address) ([]*types.MeshTransaction, error) {
return transactions.GetByAddress(s.db, from, to, address)
}
// DiscardNonceBelow discards pending transactions with nonce lower than `nonce`.
func (s *store) DiscardNonceBelow(addr types.Address, nonce uint64) error {
return transactions.DiscardNonceBelow(s.db, addr, nonce)
}
// SetNextLayerBlock sets and returns the next applicable layer/block for the transaction.
func (s *store) SetNextLayerBlock(tid types.TransactionID, lid types.LayerID) (types.LayerID, types.BlockID, error) {
return transactions.SetNextLayer(s.db, tid, lid)
}
// GetAllPending gets all pending transactions for all accounts from database.
func (s *store) GetAllPending() ([]*types.MeshTransaction, error) {
return transactions.GetAllPending(s.db)
}
// GetAcctPendingFromNonce gets all pending transactions with nonce <= `from` for an account.
func (s *store) GetAcctPendingFromNonce(addr types.Address, from uint64) ([]*types.MeshTransaction, error) {
return transactions.GetAcctPendingFromNonce(s.db, addr, from)
}
func (s *store) runInDBTransaction(fn func(*sql.Tx) error) error {
dbtx, err := s.db.Tx(context.Background())
if err != nil {
return err
}
defer dbtx.Release()
if err = fn(dbtx); err != nil {
return err
}
return dbtx.Commit()
}
// AddToProposal adds a transaction to a proposal in the database.
func (s *store) AddToProposal(lid types.LayerID, pid types.ProposalID, tids []types.TransactionID) error {
return s.runInDBTransaction(func(dbtx *sql.Tx) error {
return addToProposal(dbtx, lid, pid, tids)
})
}
func addToProposal(dbtx *sql.Tx, lid types.LayerID, pid types.ProposalID, tids []types.TransactionID) error {
for _, tid := range tids {
if err := transactions.AddToProposal(dbtx, tid, lid, pid); err != nil {
return fmt.Errorf("add2prop %w", err)
}
_, err := transactions.UpdateIfBetter(dbtx, tid, lid, types.EmptyBlockID)
if err != nil {
return fmt.Errorf("add2prop update %w", err)
}
}
return nil
}
// AddToBlock adds a transaction to a block in the database.
func (s *store) AddToBlock(lid types.LayerID, bid types.BlockID, tids []types.TransactionID) error {
return s.runInDBTransaction(func(dbtx *sql.Tx) error {
return addToBlock(dbtx, lid, bid, tids)
})
}
func addToBlock(dbtx *sql.Tx, lid types.LayerID, bid types.BlockID, tids []types.TransactionID) error {
for _, tid := range tids {
if err := transactions.AddToBlock(dbtx, tid, lid, bid); err != nil {
return fmt.Errorf("add2block %w", err)
}
_, err := transactions.UpdateIfBetter(dbtx, tid, lid, bid)
if err != nil {
return fmt.Errorf("add2block update %w", err)
}
}
return nil
}
// ApplyLayer sets transactions to applied and discarded accordingly, and sets the layer at which the
// transactions are applied/discarded.
func (s *store) ApplyLayer(lid types.LayerID, bid types.BlockID, addr types.Address, appliedByNonce map[uint64]types.TransactionID) error {
return s.runInDBTransaction(func(dbtx *sql.Tx) error {
return applyLayer(dbtx, lid, bid, addr, appliedByNonce)
})
}
func applyLayer(dbtx *sql.Tx, lid types.LayerID, bid types.BlockID, addr types.Address, appliedByNonce map[uint64]types.TransactionID) error {
// nonce order doesn't matter here
for nonce, tid := range appliedByNonce {
updated, err := transactions.Apply(dbtx, tid, lid, bid)
if err != nil {
return fmt.Errorf("apply %w", err)
}
if updated == 0 {
return fmt.Errorf("tx not applied %v", tid)
}
if err = transactions.DiscardByAcctNonce(dbtx, tid, lid, addr, nonce); err != nil {
return fmt.Errorf("apply discard %w", err)
}
}
return nil
}
// UndoLayers resets all transactions that were applied/discarded between `from` and the most recent layer,
// and reset their layers if they were included in a proposal/block.
func (s *store) UndoLayers(from types.LayerID) error {
return s.runInDBTransaction(func(dbtx *sql.Tx) error {
return undoLayers(dbtx, from)
})
}
func undoLayers(dbtx *sql.Tx, from types.LayerID) error {
tids, err := transactions.UndoLayers(dbtx, from)
if err != nil {
return fmt.Errorf("undo %w", err)
}
for _, tid := range tids {
if _, _, err = transactions.SetNextLayer(dbtx, tid, from.Sub(1)); err != nil {
return fmt.Errorf("reset for undo %w", err)
}
}
return nil
}