From eccdd0f1793a5ac532011ef4d896de9e0d8bcb9d Mon Sep 17 00:00:00 2001 From: Matthew Sevey Date: Fri, 8 Dec 2023 16:09:27 -0500 Subject: [PATCH] ci: fix panic in indexer (#1402) ## Overview 3 fixes: 1. test not using a ctx with a cancel. 2. t.Parallel() was contributing to flakiness 3. added ctx done check in long running for loop ## Summary by CodeRabbit - **Refactor** - Improved transaction processing to allow non-blocking behavior and better handle context cancellation. - Enhanced thread safety with the addition of mutex locks in various methods to prevent race conditions. - **Tests** - Updated test functions to include context management and cancellation capabilities for more robust testing scenarios. --- mempool/clist_mempool.go | 3 +++ node/full_client_test.go | 5 +++-- state/txindex/indexer_service.go | 34 ++++++++++++++++++-------------- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index ed29fba08..eb074ef2b 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -29,6 +29,7 @@ type CListMempool struct { // notify listeners (ie. consensus) when txs are available notifiedTxsAvailable bool + txAvailMtx sync.Mutex txsAvailable chan struct{} // fires once for each height, when the mempool is not empty config *config.MempoolConfig @@ -501,6 +502,8 @@ func (mem *CListMempool) TxsAvailable() <-chan struct{} { } func (mem *CListMempool) notifyTxsAvailable() { + mem.txAvailMtx.Lock() + defer mem.txAvailMtx.Unlock() if mem.Size() == 0 { panic("notified txs available but mempool is empty!") } diff --git a/node/full_client_test.go b/node/full_client_test.go index d8ff91f60..a073e1913 100644 --- a/node/full_client_test.go +++ b/node/full_client_test.go @@ -171,7 +171,9 @@ func TestGenesisChunked(t *testing.T) { mockApp.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil) privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader) - n, _ := newFullNode(context.Background(), config.NodeConfig{DAAddress: MockServerAddr}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, test.NewFileLogger(t)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + n, _ := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, test.NewFileLogger(t)) rpc := NewFullClient(n) @@ -926,7 +928,6 @@ func TestStatus(t *testing.T) { } func TestFutureGenesisTime(t *testing.T) { - t.Parallel() assert := assert.New(t) require := require.New(t) diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 4e72329f8..53732616a 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -74,22 +74,26 @@ func (is *IndexerService) OnStart() error { batch := NewBatch(numTxs) for i := int64(0); i < numTxs; i++ { - msg2 := <-txsSub.Out() - txResult := msg2.Data().(types.EventDataTx).TxResult - - if err = batch.Add(&txResult); err != nil { - is.Logger.Error( - "failed to add tx to batch", - "height", height, - "index", txResult.Index, - "err", err, - ) - - if is.terminateOnError { - if err := is.Stop(); err != nil { - is.Logger.Error("failed to stop", "err", err) + select { + case <-is.ctx.Done(): + return + case msg2 := <-txsSub.Out(): + txResult := msg2.Data().(types.EventDataTx).TxResult + + if err = batch.Add(&txResult); err != nil { + is.Logger.Error( + "failed to add tx to batch", + "height", height, + "index", txResult.Index, + "err", err, + ) + + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return } - return } } }