From d82b3624b94893339476cf2afc7b12b6d182b21c Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 11 Nov 2025 23:36:03 -0300 Subject: [PATCH 1/2] sweepbatcher: fix code format --- sweepbatcher/sweep_batch.go | 1 + sweepbatcher/sweep_batcher_test.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 4415d3fc4..58cdf19dc 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -2439,6 +2439,7 @@ func (b *batch) isComplete() bool { if err != nil && err != ErrBatchShuttingDown { return false } + return b.state == Confirmed } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 009e8da02..2fccd17f2 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -4149,9 +4149,9 @@ func testSweepBatcherHandleSweepRace(t *testing.T, store testStore, go func() { defer addWG.Done() - // After this goroutine completes, stop the goroutine that handles - // registrations as well. Give it one second to finish the last - // AddSweep to prevent goroutine leaks. + // After this goroutine completes, stop the goroutine that + // handles registrations as well. Give it one second to finish + // the last AddSweep to prevent goroutine leaks. defer time.AfterFunc(time.Second, confCancel) for { From 37eb3462e56ab80610108f4a4cdf9d5c86efb52e Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 12 Nov 2025 00:54:00 -0300 Subject: [PATCH 2/2] sweepbatcher: handle batch shutdown during re-add Started from CI failure of TestSweepBatcherHandleSweepRace/loopdb [1] that surfaced ErrBatcherShuttingDown when the AddSweep loop caught a batch right as it finished. Reproduced via: go test ./sweepbatcher -run TestSweepBatcherHandleSweepRace/loopdb -cpu=12,4,8 -count=1 Discovered that handleSweeps treated ErrBatchShuttingDown from batch.addSweeps as fatal, so the batcher exited even though the sweep was already confirmed. Fixed by treating that error as "batch already done" so we fall through to the persisted status/monitorSpend path, and added a regression test that deterministically simulates the shutdown window. New regression test added: go test ./sweepbatcher -run TestSweepBatcherHandleBatchShutdown [1] https://github.com/lightninglabs/loop/actions/runs/19282552307/job/55136597881?pr=1041 --- sweepbatcher/sweep_batcher.go | 10 ++- sweepbatcher/sweep_batcher_test.go | 137 +++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 1 deletion(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index ec5453a8e..dd5d01751 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -906,7 +906,15 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, for _, batch := range b.batches { if batch.sweepExists(sweep.outpoint) { accepted, err := batch.addSweeps(ctx, sweeps) - if err != nil && !errors.Is(err, ErrBatchShuttingDown) { + if errors.Is(err, ErrBatchShuttingDown) { + // The batch finished while we were trying to + // re-add the sweep. Leave it in the map for the + // lazy cleanup below and fall back to the + // monitorSpendAndNotify path below. + break + } + + if err != nil { return err } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 2fccd17f2..2d35541b2 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -13,6 +13,7 @@ import ( "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" @@ -4262,6 +4263,136 @@ func testSweepBatcherHandleSweepRace(t *testing.T, store testStore, checkBatcherError(t, runErr) } +// testSweepBatcherHandleBatchShutdown simulates a sweep that is re-added after +// its batch has already completed and begun shutting down. Before the fix, +// addSweeps surfaced ErrBatchShuttingDown, causing handleSweeps to return an +// error and, ultimately, AddSweep to deliver ErrBatcherShuttingDown back to +// the caller. The regression ensures we fall back to the persisted sweep +// status and silently switch to the monitor path. +func testSweepBatcherHandleBatchShutdown(t *testing.T, store testStore, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + ctx := context.Background() + swapHash := lntypes.Hash{2, 2, 2} + sweepOutpoint := wire.OutPoint{ + Hash: chainhash.Hash{0, 0, 0, 2}, + Index: 1, + } + + swap := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 144, + AmountRequested: 1_000, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + Preimage: lntypes.Preimage{2}, + }, + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: confTarget, + } + err := store.CreateLoopOut(ctx, swapHash, swap) + require.NoError(t, err) + + // Insert a confirmed batch/sweep pair directly into the store so + // GetSweepStatus/GetParentBatch report that the swap already finished. + dbEntry := &dbBatch{} + batchID, err := batcherStore.InsertSweepBatch(ctx, dbEntry) + require.NoError(t, err) + + dbEntry.ID = batchID + dbEntry.Confirmed = true + require.NoError(t, batcherStore.UpdateSweepBatch(ctx, dbEntry)) + + err = batcherStore.UpsertSweep(ctx, &dbSweep{ + BatchID: batchID, + SwapHash: swapHash, + Outpoint: sweepOutpoint, + Amount: 1_000, + Completed: true, + }) + require.NoError(t, err) + + // Build a minimal batch that already contains the sweep. Its event-loop + // channels are serviced by a helper goroutine so scheduleNextCall can + // run without spinning up the full batch.Run machinery. + testCfg := &batchConfig{ + maxTimeoutDistance: defaultMaxTimeoutDistance, + } + completedBatch := &batch{ + id: batchID, + state: Confirmed, + primarySweepID: sweepOutpoint, + sweeps: map[wire.OutPoint]sweep{ + sweepOutpoint: { + swapHash: swapHash, + outpoint: sweepOutpoint, + value: 1_000, + confTarget: 6, + minFeeRate: 1, + }, + }, + callEnter: make(chan struct{}), + callLeave: make(chan struct{}), + stopping: make(chan struct{}), + finished: make(chan struct{}), + quit: make(chan struct{}), + cfg: testCfg, + store: batcherStore, + } + completedBatch.setLog(batchPrefixLogger("test-shutdown")) + + // scheduleNextCall interacts with callEnter/callLeave to serialize + // access to the batch state. We don't run the full batch.Run loop in + // this test, so we spin up a helper goroutine that grants and releases + // those slots whenever a test handler grabs them via scheduleNextCall. + // The helper also closes b.stopping the first time it runs to mimic the + // behavior of a batch whose Run method already exited (which is what + // causes ErrBatchShuttingDown). + var once sync.Once + callLoopDone := make(chan struct{}) + go func() { + defer close(callLoopDone) + + for range completedBatch.callEnter { + once.Do(func() { + close(completedBatch.stopping) + }) + + <-completedBatch.callLeave + } + }() + defer func() { + // Stop the helper loop to avoid leaking the goroutine once the + // test completes. Closing callEnter unblocks the goroutine, and + // waiting on callLoopDone ensures it has drained callLeave + // before we return. + close(completedBatch.callEnter) + <-callLoopDone + }() + + testBatcher := &Batcher{ + batches: map[int32]*batch{batchID: completedBatch}, + store: batcherStore, + chainParams: &chaincfg.TestNet3Params, + clock: clock.NewTestClock(time.Unix(0, 0)), + initialDelayProvider: zeroInitialDelay, + } + + testSweep := &sweep{ + swapHash: swapHash, + outpoint: sweepOutpoint, + confTarget: 6, + minFeeRate: 1, + value: 1_000, + } + + err = testBatcher.handleSweeps(ctx, []*sweep{testSweep}, nil, false) + require.NoError(t, err) +} + // testCustomSignMuSig2 tests the operation with custom musig2 signer. func testCustomSignMuSig2(t *testing.T, store testStore, batcherStore testBatcherStore) { @@ -5208,6 +5339,12 @@ func TestSweepBatcherHandleSweepRace(t *testing.T) { runTests(t, testSweepBatcherHandleSweepRace) } +// TestSweepBatcherHandleBatchShutdown covers the regression where a sweep that +// finishes while being re-added must not surface ErrBatcherShuttingDown. +func TestSweepBatcherHandleBatchShutdown(t *testing.T) { + runTests(t, testSweepBatcherHandleBatchShutdown) +} + // TestCustomSignMuSig2 tests the operation with custom musig2 signer. func TestCustomSignMuSig2(t *testing.T) { runTests(t, testCustomSignMuSig2)