-
Notifications
You must be signed in to change notification settings - Fork 16
/
app.go
419 lines (381 loc) · 14.5 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
package vochain
import (
"encoding/hex"
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
abcitypes "github.com/cometbft/cometbft/abci/types"
tmnode "github.com/cometbft/cometbft/node"
tmcli "github.com/cometbft/cometbft/rpc/client/local"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
ethcommon "github.com/ethereum/go-ethereum/common"
lru "github.com/hashicorp/golang-lru/v2"
"go.vocdoni.io/dvote/crypto/zk/circuit"
"go.vocdoni.io/dvote/test/testcommon/testutil"
"go.vocdoni.io/dvote/vochain/ist"
vstate "go.vocdoni.io/dvote/vochain/state"
"go.vocdoni.io/dvote/vochain/transaction"
"go.vocdoni.io/dvote/vochain/transaction/vochaintx"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/proto/build/go/models"
)
const (
// recheckTxHeightInterval is the number of blocks after which the mempool is
// checked for transactions to be rechecked.
recheckTxHeightInterval = 6 * 5 // 5 minutes
// transactionBlocksTTL is the number of blocks after which a transaction is
// removed from the mempool.
transactionBlocksTTL = 6 * 10 // 10 minutes
// maxPendingTxAttempts is the number of times a transaction can be included in a block
// and fail before being removed from the mempool.
maxPendingTxAttempts = 3
)
var (
// ErrTransactionNotFound is returned when the transaction is not found in the blockstore.
ErrTransactionNotFound = fmt.Errorf("transaction not found")
// Ensure that BaseApplication implements abcitypes.Application.
_ abcitypes.Application = (*BaseApplication)(nil)
)
// BaseApplication reflects the ABCI application implementation.
type BaseApplication struct {
State *vstate.State
Istc *ist.Controller
Node *tmnode.Node
NodeClient *tmcli.Local
NodeAddress ethcommon.Address
TransactionHandler *transaction.TransactionHandler
isSynchronizingFn func() bool
// tendermint WaitSync() function is racy, we need to use a mutex in order to avoid
// data races when querying about the sync status of the blockchain.
isSynchronizing atomic.Bool
// Callback blockchain functions
fnGetBlockByHeight func(height int64) *tmtypes.Block
fnGetBlockByHash func(hash []byte) *tmtypes.Block
fnSendTx func(tx []byte) (*ctypes.ResultBroadcastTx, error)
fnGetTx func(height uint32, txIndex int32) (*models.SignedTx, error)
fnGetTxHash func(height uint32, txIndex int32) (*models.SignedTx, []byte, error)
fnMempoolSize func() int
fnMempoolPrune func(txKey [32]byte) error
blockCache *lru.Cache[int64, *tmtypes.Block]
// txLReferences is a map indexed by hashed transactions and storing the height where the transaction
// was seen frist time and the number of attempts failed for including it into a block.
txReferences sync.Map
// endBlockTimestamp is the last block end timestamp calculated from local time.
endBlockTimestamp atomic.Int64
// startBlockTimestamp is the current block timestamp from tendermint's
// abcitypes.RequestBeginBlock.Header.Time
startBlockTimestamp atomic.Int64
chainID string
circuitConfigTag string
dataDir string
genesisInfo *tmtypes.GenesisDoc
// lastDeliverTxResponse is used to store the last DeliverTxResponse, so validators
// can skip block re-execution on FinalizeBlock call.
lastDeliverTxResponse []*DeliverTxResponse
// lastRootHash is used to store the last root hash of the current on-going state,
// it is used by validators to skip block re-execution on FinalizeBlock call.
lastRootHash []byte
// lastBlockHash stores the last cometBFT block hash
lastBlockHash []byte
// prepareProposalLock is used to avoid concurrent calls between Prepare/Process Proposal and FinalizeBlock
prepareProposalLock sync.Mutex
// testMockBlockStore is used for testing purposes only
testMockBlockStore *testutil.MockBlockStore
}
// pendingTxReference is used to store the block height where the transaction was accepted by the mempool, and the number
// of times it has been included in a block but failed.
type pendingTxReference struct {
height uint32
failedCount int
}
// DeliverTxResponse is the response returned by DeliverTx after executing the transaction.
type DeliverTxResponse struct {
Code uint32
Log string
Info string
Data []byte
}
// ExecuteBlockResponse is the response returned by ExecuteBlock after executing the block.
// If InvalidTransactions is true, it means that at least one transaction in the block was invalid.
type ExecuteBlockResponse struct {
Responses []*DeliverTxResponse
Root []byte
InvalidTransactions [][32]byte
}
// NewBaseApplication creates a new BaseApplication given a name and a DB backend.
// Node still needs to be initialized with SetNode.
// Callback functions still need to be initialized.
func NewBaseApplication(dbType, dbpath string) (*BaseApplication, error) {
state, err := vstate.NewState(dbType, dbpath)
if err != nil {
return nil, fmt.Errorf("cannot create state: (%v)", err)
}
istc := ist.NewISTC(state)
// Create the transaction handler for checking and processing transactions
transactionHandler := transaction.NewTransactionHandler(
state,
istc,
filepath.Join(dbpath, "txHandler"),
)
// Load or download the zk verification keys
if err := transactionHandler.LoadZkCircuit(circuit.DefaultCircuitConfigurationTag); err != nil {
return nil, fmt.Errorf("cannot load zk circuit: %w", err)
}
blockCache, err := lru.New[int64, *tmtypes.Block](32)
if err != nil {
return nil, err
}
return &BaseApplication{
State: state,
Istc: istc,
TransactionHandler: transactionHandler,
blockCache: blockCache,
dataDir: dbpath,
circuitConfigTag: circuit.DefaultCircuitConfigurationTag,
genesisInfo: &tmtypes.GenesisDoc{},
}, nil
}
// ExecuteBlock delivers a block of transactions to the Application.
// It modifies the state according to the transactions and returns the resulting Merkle root hash.
// It returns a list of ResponseDeliverTx, one for each transaction in the block.
// This call rollbacks the current state.
func (app *BaseApplication) ExecuteBlock(txs [][]byte, height uint32, blockTime time.Time) (*ExecuteBlockResponse, error) {
result := []*DeliverTxResponse{}
app.beginBlock(blockTime, height)
invalidTxs := [][32]byte{}
for _, tx := range txs {
resp := app.deliverTx(tx)
if resp.Code != 0 {
log.Warnw("deliverTx failed",
"code", resp.Code,
"data", string(resp.Data),
"info", resp.Info,
"log", resp.Log)
invalidTxs = append(invalidTxs, [32]byte{})
}
result = append(result, resp)
}
// execute internal state transition commit
if err := app.Istc.Commit(height); err != nil {
return nil, fmt.Errorf("cannot execute ISTC commit: %w", err)
}
app.endBlock(blockTime, height)
root, err := app.State.PrepareCommit()
if err != nil {
return nil, fmt.Errorf("cannot prepare commit: %w", err)
}
return &ExecuteBlockResponse{
Responses: result,
Root: root,
InvalidTransactions: invalidTxs,
}, nil
}
// CommitState saves the state to persistent storage and returns the hash.
// Before save the state, app.State.PrepareCommit() should be called.
func (app *BaseApplication) CommitState() ([]byte, error) {
// Commit the state and get the hash
if app.State.TxCounter() > 0 {
log.Debugw("commit block", "height", app.Height(), "txs", app.State.TxCounter())
}
hash, err := app.State.Save()
if err != nil {
return nil, fmt.Errorf("cannot save state: %w", err)
}
// perform state snapshot (DISABLED)
if false && app.Height()%50000 == 0 && !app.IsSynchronizing() { // DISABLED
startTime := time.Now()
log.Infof("performing a state snapshot on block %d", app.Height())
if _, err := app.State.Snapshot(); err != nil {
return hash, fmt.Errorf("cannot make state snapshot: %w", err)
}
log.Infof("snapshot created successfully, took %s", time.Since(startTime))
log.Debugf("%+v", app.State.ListSnapshots())
}
return hash, err
}
// deliverTx unmarshals req.Tx and adds it to the State if it is valid
func (app *BaseApplication) deliverTx(rawTx []byte) *DeliverTxResponse {
// Increase Tx counter on return since the index 0 is valid
defer app.State.TxCounterAdd()
tx := new(vochaintx.Tx)
if err := tx.Unmarshal(rawTx, app.ChainID()); err != nil {
return &DeliverTxResponse{Code: 1, Data: []byte(err.Error())}
}
log.Debugw("deliver tx",
"hash", fmt.Sprintf("%x", tx.TxID),
"type", tx.TxModelType,
"height", app.Height(),
"tx", tx.Tx,
)
// check tx is correct on the current state
response, err := app.TransactionHandler.CheckTx(tx, true)
if err != nil {
log.Errorw(err, "rejected tx")
return &DeliverTxResponse{Code: 1, Data: []byte(err.Error())}
}
app.txReferences.Delete(tx.TxID)
// call event listeners
for _, e := range app.State.EventListeners() {
e.OnNewTx(tx, app.Height(), app.State.TxCounter())
}
return &DeliverTxResponse{
Code: 0,
Data: response.Data,
Info: fmt.Sprintf("%x", response.TxHash),
Log: response.Log,
}
}
// beginBlock is called at the beginning of every block.
func (app *BaseApplication) beginBlock(t time.Time, height uint32) {
if app.isSynchronizingFn != nil {
if app.isSynchronizingFn() {
app.isSynchronizing.Store(true)
} else {
app.isSynchronizing.Store(false)
}
}
app.State.Rollback()
app.startBlockTimestamp.Store(t.Unix())
app.State.SetHeight(height)
go app.State.CachePurge(height)
app.State.OnBeginBlock(vstate.BeginBlock{
Height: int64(height),
Time: t,
})
}
// endBlock is called at the end of every block.
func (app *BaseApplication) endBlock(t time.Time, h uint32) {
app.endBlockTimestamp.Store(t.Unix())
}
// GetBlockByHeight retrieves a full block indexed by its height.
// This method uses an LRU cache for the blocks so in general it is more
// convenient for high load operations than GetBlockByHash(), which does not use cache.
func (app *BaseApplication) GetBlockByHeight(height int64) *tmtypes.Block {
if app.fnGetBlockByHeight == nil {
log.Errorw(fmt.Errorf("method not assigned"), "getBlockByHeight")
return nil
}
if block, ok := app.blockCache.Get(height); ok {
return block
}
block := app.fnGetBlockByHeight(height)
// Don't add nil entries to the block cache.
// If a block is fetched before it's available, we don't want to cache the failure,
// as otherwise we might keep returning a nil block even after the blockstore has it.
// This means that we only cache blockstore hits, but that seems okay.
//
// TODO: we could cache blockstore misses as long as we remove a block's cache entry
// when a block appears in the chain.
if block == nil {
return nil
}
app.blockCache.Add(height, block)
return block
}
// GetBlockByHash retreies a full block indexed by its Hash
func (app *BaseApplication) GetBlockByHash(hash []byte) *tmtypes.Block {
if app.fnGetBlockByHash == nil {
log.Errorw(fmt.Errorf("method not assigned"), "getBlockByHash")
return nil
}
return app.fnGetBlockByHash(hash)
}
// GetTx retrieves a vochain transaction from the blockstore
func (app *BaseApplication) GetTx(height uint32, txIndex int32) (*models.SignedTx, error) {
return app.fnGetTx(height, txIndex)
}
// GetTxHash retrieves a vochain transaction, with its hash, from the blockstore
func (app *BaseApplication) GetTxHash(height uint32, txIndex int32) (*models.SignedTx, []byte, error) {
return app.fnGetTxHash(height, txIndex)
}
// SendTx sends a transaction to the mempool (sync)
func (app *BaseApplication) SendTx(tx []byte) (*ctypes.ResultBroadcastTx, error) {
if app.fnSendTx == nil {
log.Errorw(fmt.Errorf("method not assigned"), "sendTx")
return nil, nil
}
return app.fnSendTx(tx)
}
// ChainID returns the Node ChainID
func (app *BaseApplication) ChainID() string {
return app.chainID
}
// SetChainID sets the app and state chainID
func (app *BaseApplication) SetChainID(chainID string) {
app.chainID = chainID
app.State.SetChainID(chainID)
}
// MempoolDeleteTx removes a transaction from the mempool. If the mempool implementation does not allow it,
// its a no-op function. Errors are logged but not returned.
func (app *BaseApplication) MempoolDeleteTx(txID [32]byte) {
if app.fnMempoolPrune != nil {
if err := app.fnMempoolPrune(txID); err != nil {
log.Warnw("could not remove mempool tx", "txID", hex.EncodeToString(txID[:]), "err", err)
}
}
}
// Genesis returns the tendermint genesis information
func (app *BaseApplication) Genesis() *tmtypes.GenesisDoc {
return app.genesisInfo
}
// SetCircuitConfigTag sets the current BaseApplication circuit config tag
// attribute to the provided one and loads the circuit configuration based on
// it. The available circuit config tags are defined in
// /crypto/zk/circuit/config.go
func (app *BaseApplication) SetCircuitConfigTag(tag string) error {
// Update the loaded circuit of the current app transactionHandler
if err := app.TransactionHandler.LoadZkCircuit(tag); err != nil {
return fmt.Errorf("cannot load zk circuit: %w", err)
}
app.circuitConfigTag = tag
return nil
}
// CircuitConfigurationTag returns the Node CircuitConfigurationTag
func (app *BaseApplication) CircuitConfigurationTag() string {
return app.circuitConfigTag
}
// IsSynchronizing informs if the blockchain is synchronizing or not.
func (app *BaseApplication) isSynchronizingTendermint() bool {
if app.Node == nil {
return true
}
return app.Node.ConsensusReactor().WaitSync()
}
// IsSynchronizing informs if the blockchain is synchronizing or not.
// The value is updated every new block.
func (app *BaseApplication) IsSynchronizing() bool {
return app.isSynchronizing.Load()
}
// Height returns the current blockchain height, including the latest (under construction) block.
func (app *BaseApplication) Height() uint32 {
return app.State.CurrentHeight()
}
// Timestamp returns the last block end timestamp
func (app *BaseApplication) Timestamp() int64 {
return app.endBlockTimestamp.Load()
}
// TimestampStartBlock returns the current block start timestamp
func (app *BaseApplication) TimestampStartBlock() int64 {
return app.startBlockTimestamp.Load()
}
// TimestampFromBlock returns the timestamp for a specific block height.
// If the block is not found, it returns nil.
// If the block is the current block, it returns the current block start timestamp.
func (app *BaseApplication) TimestampFromBlock(height int64) *time.Time {
if int64(app.Height()) == height {
t := time.Unix(app.TimestampStartBlock(), 0)
return &t
}
blk := app.GetBlockByHeight(height)
if blk == nil {
return nil
}
return &blk.Time
}
// MempoolSize returns the size of the transaction mempool
func (app *BaseApplication) MempoolSize() int {
return app.fnMempoolSize()
}