Skip to content

Commit

Permalink
ci: fix panic in indexer (#1402)
Browse files Browse the repository at this point in the history
<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview

<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

3 fixes:
1.  test not using a ctx with a cancel.
2. t.Parallel() was contributing to flakiness
3. added ctx done check in long running for loop

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Refactor**
- Improved transaction processing to allow non-blocking behavior and
better handle context cancellation.
- Enhanced thread safety with the addition of mutex locks in various
methods to prevent race conditions.

- **Tests**
- Updated test functions to include context management and cancellation
capabilities for more robust testing scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
MSevey committed Dec 8, 2023
1 parent 21e37e9 commit eccdd0f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
3 changes: 3 additions & 0 deletions mempool/clist_mempool.go
Expand Up @@ -29,6 +29,7 @@ type CListMempool struct {

// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txAvailMtx sync.Mutex
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty

config *config.MempoolConfig
Expand Down Expand Up @@ -501,6 +502,8 @@ func (mem *CListMempool) TxsAvailable() <-chan struct{} {
}

func (mem *CListMempool) notifyTxsAvailable() {
mem.txAvailMtx.Lock()
defer mem.txAvailMtx.Unlock()
if mem.Size() == 0 {
panic("notified txs available but mempool is empty!")
}
Expand Down
5 changes: 3 additions & 2 deletions node/full_client_test.go
Expand Up @@ -171,7 +171,9 @@ func TestGenesisChunked(t *testing.T) {
mockApp.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
n, _ := newFullNode(context.Background(), config.NodeConfig{DAAddress: MockServerAddr}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, test.NewFileLogger(t))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
n, _ := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, test.NewFileLogger(t))

rpc := NewFullClient(n)

Expand Down Expand Up @@ -926,7 +928,6 @@ func TestStatus(t *testing.T) {
}

func TestFutureGenesisTime(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)

Expand Down
34 changes: 19 additions & 15 deletions state/txindex/indexer_service.go
Expand Up @@ -74,22 +74,26 @@ func (is *IndexerService) OnStart() error {
batch := NewBatch(numTxs)

for i := int64(0); i < numTxs; i++ {
msg2 := <-txsSub.Out()
txResult := msg2.Data().(types.EventDataTx).TxResult

if err = batch.Add(&txResult); err != nil {
is.Logger.Error(
"failed to add tx to batch",
"height", height,
"index", txResult.Index,
"err", err,
)

if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
select {
case <-is.ctx.Done():
return
case msg2 := <-txsSub.Out():
txResult := msg2.Data().(types.EventDataTx).TxResult

if err = batch.Add(&txResult); err != nil {
is.Logger.Error(
"failed to add tx to batch",
"height", height,
"index", txResult.Index,
"err", err,
)

if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
return
}
}
}
Expand Down

0 comments on commit eccdd0f

Please sign in to comment.