diff --git a/channeldb/channel.go b/channeldb/channel.go index d5764759455..81bd14788f3 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -78,6 +78,12 @@ var ( // is retained. historicalChannelBucket = []byte("historical-chan-bucket") + // pendingCleanupBucket stores information about channels that have been + // closed but whose data (revocation logs, forwarding packages) has not + // yet been deleted. This is used by SQL backends to defer heavy cleanup + // operations to startup. + pendingCleanupBucket = []byte("pending-cleanup-bucket") + // chanInfoKey can be accessed within the bucket for a channel // (identified by its chanPoint). This key stores all the static // information for a channel which is decided at the end of the @@ -3759,6 +3765,57 @@ const ( Abandoned ClosureType = 5 ) +// PendingCleanupInfo contains the information needed to clean up a channel's +// data after it has been closed. This is used by SQL backends to defer heavy +// deletion operations to startup. +type PendingCleanupInfo struct { + // ChanPoint is the funding outpoint of the channel. + ChanPoint wire.OutPoint + + // ShortChanID is the short channel ID of the channel. + ShortChanID lnwire.ShortChannelID + + // NodePub is the compressed public key of the remote node. + NodePub [33]byte + + // ChainHash is the hash of the chain this channel belongs to. + ChainHash chainhash.Hash +} + +// Encode serializes the PendingCleanupInfo to the given writer. +func (p *PendingCleanupInfo) Encode(w io.Writer) error { + if err := WriteElements(w, p.ChanPoint, p.ShortChanID); err != nil { + return err + } + + if _, err := w.Write(p.NodePub[:]); err != nil { + return err + } + + if _, err := w.Write(p.ChainHash[:]); err != nil { + return err + } + + return nil +} + +// Decode deserializes the PendingCleanupInfo from the given reader. +func (p *PendingCleanupInfo) Decode(r io.Reader) error { + if err := ReadElements(r, &p.ChanPoint, &p.ShortChanID); err != nil { + return err + } + + if _, err := io.ReadFull(r, p.NodePub[:]); err != nil { + return err + } + + if _, err := io.ReadFull(r, p.ChainHash[:]); err != nil { + return err + } + + return nil +} + // ChannelCloseSummary contains the final state of a channel at the point it // was closed. Once a channel is closed, all the information pertaining to that // channel within the openChannelBucket is deleted, and a compact summary is @@ -3853,6 +3910,10 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary, c.Lock() defer c.Unlock() + // Check if the backend prefers deferring heavy operations to startup. + // Postgres backends return true here to avoid lock contention. + deferCleanup := kvdb.ShouldDeferHeavyOperations(c.Db.backend) + return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error { openChanBucket := tx.ReadWriteBucket(openChannelBucket) if openChanBucket == nil { @@ -3893,37 +3954,25 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary, return err } - // Delete all the forwarding packages stored for this particular - // channel. - if err = chanState.Packager.Wipe(tx); err != nil { - return err - } - - // Now that the index to this channel has been deleted, purge - // the remaining channel metadata from the database. - err = deleteOpenChannel(chanBucket) - if err != nil { - return err - } - - // We'll also remove the channel from the frozen channel bucket - // if we need to. - if c.ChanType.IsFrozen() || c.ChanType.HasLeaseExpiration() { - err := deleteThawHeight(chanBucket) + if deferCleanup { + // For postgres backends, store cleanup info and defer + // the heavy deletion operations to startup. + err = storePendingCleanup( + tx, c, nodePub, chanKey, + ) + if err != nil { + return err + } + } else { + // For non-postgres backends (bbolt, sqlite), perform + // immediate cleanup. + err = performImmediateCleanup( + tx, chanState, chanBucket, chainBucket, + chanPointBuf.Bytes(), + ) if err != nil { return err } - } - - // With the base channel data deleted, attempt to delete the - // information stored within the revocation log. - if err := deleteLogBucket(chanBucket); err != nil { - return err - } - - err = chainBucket.DeleteNestedBucket(chanPointBuf.Bytes()) - if err != nil { - return err } // Fetch the outpoint bucket to see if the outpoint exists or @@ -4733,6 +4782,71 @@ func deleteOpenChannel(chanBucket kvdb.RwBucket) error { return nil } +// storePendingCleanup stores cleanup info for a channel to be processed at +// startup. This is used by postgres backends to defer heavy deletion +// operations. +func storePendingCleanup(tx kvdb.RwTx, c *OpenChannel, nodePub []byte, + chanKey []byte) error { + + cleanupBucket, err := tx.CreateTopLevelBucket(pendingCleanupBucket) + if err != nil { + return err + } + + var nodePubKey [33]byte + copy(nodePubKey[:], nodePub) + + cleanupInfo := &PendingCleanupInfo{ + ChanPoint: c.FundingOutpoint, + ShortChanID: c.ShortChannelID, + NodePub: nodePubKey, + ChainHash: c.ChainHash, + } + + var cleanupBuf bytes.Buffer + if err := cleanupInfo.Encode(&cleanupBuf); err != nil { + return err + } + + return cleanupBucket.Put(chanKey, cleanupBuf.Bytes()) +} + +// performImmediateCleanup handles the cleanup operations that are performed +// immediately during channel close for non-postgres backends (bbolt, sqlite). +// This includes wiping forwarding packages, deleting channel data, thaw height, +// revocation logs, and the channel bucket itself. +func performImmediateCleanup(tx kvdb.RwTx, chanState *OpenChannel, + chanBucket kvdb.RwBucket, chainBucket kvdb.RwBucket, + chanKey []byte) error { + + // Delete all the forwarding packages stored for this channel. + if err := chanState.Packager.Wipe(tx); err != nil { + return err + } + + // Purge the remaining channel metadata from the database. + if err := deleteOpenChannel(chanBucket); err != nil { + return err + } + + // Remove the channel from the frozen channel bucket if needed. + if chanState.ChanType.IsFrozen() || + chanState.ChanType.HasLeaseExpiration() { + + if err := deleteThawHeight(chanBucket); err != nil { + return err + } + } + + // Delete the information stored within the revocation log. + if err := deleteLogBucket(chanBucket); err != nil { + return err + } + + // Delete the channel bucket itself. + return chainBucket.DeleteNestedBucket(chanKey) +} + // makeLogKey converts a uint64 into an 8 byte array. func makeLogKey(updateNum uint64) [8]byte { var key [8]byte diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 47504067780..70e942da136 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -509,6 +509,12 @@ func TestOpenChannelPutGetDelete(t *testing.T) { t.Fatalf("unable to close channel: %v", err) } + // For postgres backends, cleanup is deferred. Process it now. + if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) { + err = cdb.CleanupPendingCloses() + require.NoError(t, err, "unable to cleanup pending closes") + } + // As the channel is now closed, attempting to fetch all open channels // for our fake node ID should return an empty slice. openChans, err := cdb.FetchOpenChannels(state.IdentityPub) @@ -955,6 +961,12 @@ func TestChannelStateTransition(t *testing.T) { t.Fatalf("unable to delete updated channel: %v", err) } + // For postgres backends, cleanup is deferred. Process it now. + if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) { + err = cdb.CleanupPendingCloses() + require.NoError(t, err, "unable to cleanup pending closes") + } + // If we attempt to fetch the target channel again, it shouldn't be // found. channels, err := cdb.FetchOpenChannels(channel.IdentityPub) diff --git a/channeldb/db.go b/channeldb/db.go index 00b29f65f9f..c4a4609dbc5 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -2023,6 +2023,157 @@ func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome( }, func() {}) } +// CleanupPendingCloses processes any channels that were closed but whose heavy +// cleanup operations (deleting revocation logs, forwarding packages) were +// deferred to startup. This is used by postgres backends to avoid lock +// contention during normal operation. +func (c *ChannelStateDB) CleanupPendingCloses() error { + // First, collect all the pending cleanup entries. + var cleanupEntries []*PendingCleanupInfo + err := kvdb.View(c.backend, func(tx kvdb.RTx) error { + cleanupBucket := tx.ReadBucket(pendingCleanupBucket) + if cleanupBucket == nil { + return nil + } + + return cleanupBucket.ForEach(func(k, v []byte) error { + info := &PendingCleanupInfo{} + if err := info.Decode(bytes.NewReader(v)); err != nil { + return err + } + + cleanupEntries = append(cleanupEntries, info) + + return nil + }) + }, func() { + cleanupEntries = nil + }) + if err != nil { + return err + } + + if len(cleanupEntries) == 0 { + return nil + } + + log.Infof("Processing %d deferred channel cleanups", + len(cleanupEntries)) + + // Process each cleanup entry. + for _, info := range cleanupEntries { + err := c.cleanupChannel(info) + if err != nil { + log.Warnf("Failed to cleanup channel %v: %v", + info.ChanPoint, err) + continue + } + + log.Debugf("Cleaned up deferred channel data for %v", + info.ChanPoint) + } + + return nil +} + +// cleanupChannel performs the actual cleanup for a single channel. +func (c *ChannelStateDB) cleanupChannel(info *PendingCleanupInfo) error { + return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + // Get the open channel bucket structure. + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + if openChanBucket == nil { + // If there's no open channel bucket, nothing to clean. + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + nodeChanBucket := openChanBucket.NestedReadWriteBucket( + info.NodePub[:], + ) + if nodeChanBucket == nil { + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + chainBucket := nodeChanBucket.NestedReadWriteBucket( + info.ChainHash[:], + ) + if chainBucket == nil { + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + var chanPointBuf bytes.Buffer + err := graphdb.WriteOutpoint(&chanPointBuf, &info.ChanPoint) + if err != nil { + return err + } + chanKey := chanPointBuf.Bytes() + + chanBucket := chainBucket.NestedReadWriteBucket(chanKey) + if chanBucket == nil { + // Channel bucket doesn't exist, just remove the + // pending cleanup entry. + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + // Fetch the channel state to get the packager. + chanState, err := fetchOpenChannel( + chanBucket, &info.ChanPoint, + ) + if err != nil { + return err + } + + // Delete all the forwarding packages stored for this channel. + if err := chanState.Packager.Wipe(tx); err != nil { + return err + } + + // Purge the remaining channel metadata from the database. + if err := deleteOpenChannel(chanBucket); err != nil { + return err + } + + // Remove the channel from the frozen channel bucket if needed. + if chanState.ChanType.IsFrozen() || + chanState.ChanType.HasLeaseExpiration() { + + if err := deleteThawHeight(chanBucket); err != nil { + return err + } + } + + // Delete the information stored within the revocation log. + if err := deleteLogBucket(chanBucket); err != nil { + return err + } + + // Delete the channel bucket itself. + if err := chainBucket.DeleteNestedBucket(chanKey); err != nil { + return err + } + + // Finally, remove the pending cleanup entry. + return c.removePendingCleanup(tx, &info.ChanPoint) + }, func() {}) +} + +// removePendingCleanup removes a channel's entry from the pending cleanup +// bucket. +func (c *ChannelStateDB) removePendingCleanup(tx kvdb.RwTx, + chanPoint *wire.OutPoint) error { + + cleanupBucket := tx.ReadWriteBucket(pendingCleanupBucket) + if cleanupBucket == nil { + return nil + } + + var chanPointBuf bytes.Buffer + if err := graphdb.WriteOutpoint(&chanPointBuf, chanPoint); err != nil { + return err + } + + return cleanupBucket.Delete(chanPointBuf.Bytes()) +} + // MakeTestInvoiceDB is used to create a test invoice database for testing // purposes. It simply calls into MakeTestDB so the same modifiers can be used. func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) ( diff --git a/channeldb/db_test.go b/channeldb/db_test.go index e2e9a197004..c7802cf5594 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -1,6 +1,7 @@ package channeldb import ( + "bytes" "image/color" "math" "math/rand" @@ -455,6 +456,12 @@ func TestAbandonChannel(t *testing.T) { err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight) require.NoError(t, err, "unable to abandon channel") + // For postgres backends, cleanup is deferred. Process it now. + if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) { + err = cdb.CleanupPendingCloses() + require.NoError(t, err, "unable to cleanup pending closes") + } + // At this point, the channel should no longer be found in the set of // open channels. _, err = cdb.FetchChannel(chanState.FundingOutpoint) @@ -833,3 +840,107 @@ func createTestVertex(t *testing.T) *models.Node { return createNode(priv) } + +// TestPendingCleanupInfoEncodeDecode tests that PendingCleanupInfo can be +// properly encoded and decoded. +func TestPendingCleanupInfoEncodeDecode(t *testing.T) { + t.Parallel() + + // Create a test PendingCleanupInfo. + var nodePub [33]byte + copy(nodePub[:], bytes.Repeat([]byte{0x02}, 33)) + + chanPoint := wire.OutPoint{ + Hash: chainhash.Hash{0x01, 0x02, 0x03}, + Index: 42, + } + shortChanID := lnwire.NewShortChanIDFromInt(123456) + chainHash := chainhash.Hash{0x0a, 0x0b, 0x0c} + + info := &PendingCleanupInfo{ + ChanPoint: chanPoint, + ShortChanID: shortChanID, + NodePub: nodePub, + ChainHash: chainHash, + } + + // Encode it. + var buf bytes.Buffer + err := info.Encode(&buf) + require.NoError(t, err) + + // Decode it. + decoded := &PendingCleanupInfo{} + err = decoded.Decode(&buf) + require.NoError(t, err) + + // Verify all fields match. + require.Equal(t, info.ChanPoint, decoded.ChanPoint) + require.Equal(t, info.ShortChanID, decoded.ShortChanID) + require.Equal(t, info.NodePub, decoded.NodePub) + require.Equal(t, info.ChainHash, decoded.ChainHash) +} + +// TestCleanupPendingClosesEmpty tests that CleanupPendingCloses works +// correctly when there are no pending cleanups. +func TestCleanupPendingClosesEmpty(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + + // Calling CleanupPendingCloses when there's nothing to clean should + // succeed without error. + err = cdb.CleanupPendingCloses() + require.NoError(t, err) +} + +// TestImmediateCleanupOnClose tests that for non-postgres backends (like +// bbolt and sqlite), channel close performs immediate cleanup without +// deferring to the pending cleanup bucket. +func TestImmediateCleanupOnClose(t *testing.T) { + t.Parallel() + + // Skip this test for postgres as it defers cleanup. + if kvdb.PostgresBackend { + t.Skip("Skipping test for postgres backend") + } + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + + // Create an open channel. + channel := createTestChannel(t, cdb, openChannelOption()) + + // Close the channel. + err = channel.CloseChannel(&ChannelCloseSummary{ + ChanPoint: channel.FundingOutpoint, + RemotePub: channel.IdentityPub, + SettledBalance: btcutil.Amount(500), + }) + require.NoError(t, err) + + // For non-postgres backends, the pending cleanup bucket should be + // empty (or not exist). + var pendingCleanupCount int + err = kvdb.View(fullDB.Backend, func(tx kvdb.RTx) error { + cleanupBucket := tx.ReadBucket(pendingCleanupBucket) + if cleanupBucket == nil { + return nil + } + + return cleanupBucket.ForEach(func(k, v []byte) error { + pendingCleanupCount++ + return nil + }) + }, func() { + pendingCleanupCount = 0 + }) + require.NoError(t, err) + require.Zero(t, pendingCleanupCount, + "expected no pending cleanup entries for non-postgres backend") +} diff --git a/config_builder.go b/config_builder.go index 7ce63041ee2..bed1437ab4e 100644 --- a/config_builder.go +++ b/config_builder.go @@ -1094,11 +1094,28 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( return nil, nil, err } + // Process any deferred channel cleanups. For postgres backends, heavy + // cleanup operations (deleting revocation logs, forwarding packages) + // are deferred to startup to avoid lock contention. For other backends + // (bbolt, sqlite), this is a no-op as they perform immediate cleanup. + if kvdb.ShouldDeferHeavyOperations(databaseBackends.ChanStateDB) { + err = dbs.ChanStateDB.ChannelStateDB().CleanupPendingCloses() + if err != nil { + cleanUp() + + err = fmt.Errorf("unable to cleanup pending "+ + "closes: %w", err) + d.logger.Error(err) + return nil, nil, err + } + } + // The graph store implementation we will use depends on whether // native SQL is enabled or not. var graphStore graphdb.V1Store // Instantiate a native SQL store if the flag is set. + //nolint:nestif if d.cfg.DB.UseNativeSQL { migrations := sqldb.GetMigrations() diff --git a/docs/release-notes/release-notes-0.20.1.md b/docs/release-notes/release-notes-0.20.1.md index 1963db61b45..c84e32893ad 100644 --- a/docs/release-notes/release-notes-0.20.1.md +++ b/docs/release-notes/release-notes-0.20.1.md @@ -43,6 +43,15 @@ ## Performance Improvements +* [Defer deletion of closed + channel data](https://github.com/lightningnetwork/lnd/pull/10390) to the + next restart for postgres backends. Normally deleting everything as soon as + the channel is closed is ok, but for the kvdb backend for postgres this can + cause severe stress on the kv table, so we introduce this performance + improvment until the native sql schema for channels and revocation logs are + deployed. + + ## Deprecations # Technical and Architectural Updates diff --git a/go.mod b/go.mod index 7728da649ef..752fe63afa1 100644 --- a/go.mod +++ b/go.mod @@ -205,6 +205,9 @@ require ( // TODO(elle): remove once the gossip V2 sqldb changes have been made. replace github.com/lightningnetwork/lnd/sqldb => ./sqldb +// TODO: remove once the deferred cleanup changes have been released. +replace github.com/lightningnetwork/lnd/kvdb => ./kvdb + // This replace is for https://github.com/advisories/GHSA-25xm-hr59-7c27 replace github.com/ulikunitz/xz => github.com/ulikunitz/xz v0.5.11 diff --git a/go.sum b/go.sum index fed066fb800..954a1efbc36 100644 --- a/go.sum +++ b/go.sum @@ -378,8 +378,6 @@ github.com/lightningnetwork/lnd/fn/v2 v2.0.9 h1:ZytG4ltPac/sCyg1EJDn10RGzPIDJeye github.com/lightningnetwork/lnd/fn/v2 v2.0.9/go.mod h1:aPUJHJ31S+Lgoo8I5SxDIjnmeCifqujaiTXKZqpav3w= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= -github.com/lightningnetwork/lnd/kvdb v1.4.16 h1:9BZgWdDfjmHRHLS97cz39bVuBAqMc4/p3HX1xtUdbDI= -github.com/lightningnetwork/lnd/kvdb v1.4.16/go.mod h1:HW+bvwkxNaopkz3oIgBV6NEnV4jCEZCACFUcNg4xSjM= github.com/lightningnetwork/lnd/queue v1.1.1 h1:99ovBlpM9B0FRCGYJo6RSFDlt8/vOkQQZznVb18iNMI= github.com/lightningnetwork/lnd/queue v1.1.1/go.mod h1:7A6nC1Qrm32FHuhx/mi1cieAiBZo5O6l8IBIoQxvkz4= github.com/lightningnetwork/lnd/ticker v1.1.1 h1:J/b6N2hibFtC7JLV77ULQp++QLtCwT6ijJlbdiZFbSM= diff --git a/itest/list_on_bbolt_test.go b/itest/list_on_bbolt_test.go new file mode 100644 index 00000000000..b9ac114c037 --- /dev/null +++ b/itest/list_on_bbolt_test.go @@ -0,0 +1,19 @@ +//go:build integration && !kvdb_postgres + +package itest + +import "github.com/lightningnetwork/lnd/lntest" + +// immediateCleanupTestCases is a list of tests that are only run when using +// bbolt or sqlite backends. These backends perform immediate cleanup during +// channel close, unlike postgres which defers cleanup to startup. +var immediateCleanupTestCases = []*lntest.TestCase{ + { + Name: "wipe forwarding packages", + TestFunc: testWipeForwardingPackages, + }, +} + +func init() { + allTestCases = append(allTestCases, immediateCleanupTestCases...) +} diff --git a/itest/list_on_sql_test.go b/itest/list_on_sql_test.go new file mode 100644 index 00000000000..598969cb577 --- /dev/null +++ b/itest/list_on_sql_test.go @@ -0,0 +1,18 @@ +//go:build integration && kvdb_postgres + +package itest + +import "github.com/lightningnetwork/lnd/lntest" + +// postgresTestCases is a list of tests that are only run when using postgres +// backend. These tests verify postgres-specific behavior like deferred cleanup. +var postgresTestCases = []*lntest.TestCase{ + { + Name: "wipe forwarding packages", + TestFunc: testWipeForwardingPackagesPostgres, + }, +} + +func init() { + allTestCases = append(allTestCases, postgresTestCases...) +} diff --git a/itest/list_on_test.go b/itest/list_on_test.go index f5961147d3a..530f341794f 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -383,10 +383,6 @@ var allTestCases = []*lntest.TestCase{ Name: "single hop invoice", TestFunc: testSingleHopInvoice, }, - { - Name: "wipe forwarding packages", - TestFunc: testWipeForwardingPackages, - }, { Name: "switch circuit persistence", TestFunc: testSwitchCircuitPersistence, diff --git a/itest/lnd_wipe_fwdpkgs_sql_test.go b/itest/lnd_wipe_fwdpkgs_sql_test.go new file mode 100644 index 00000000000..260cb3b2a52 --- /dev/null +++ b/itest/lnd_wipe_fwdpkgs_sql_test.go @@ -0,0 +1,103 @@ +//go:build integration && kvdb_postgres + +package itest + +import ( + "github.com/lightningnetwork/lnd/chainreg" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntest" + "github.com/stretchr/testify/require" +) + +// testWipeForwardingPackagesPostgres tests that for postgres backends, +// forwarding packages are deleted when CleanupPendingCloses runs at startup +// (not during CloseChannel like bbolt/sqlite). This test verifies the deferred +// cleanup behavior that avoids lock contention during normal operation. +func testWipeForwardingPackagesPostgres(ht *lntest.HarnessTest) { + const ( + chanAmt = 10e6 + paymentAmt = 10e4 + finalCTLVDelta = chainreg.DefaultBitcoinTimeLockDelta + numInvoices = 3 + ) + + chanPoints, nodes := ht.CreateSimpleNetwork( + [][]string{nil, nil, nil}, + lntest.OpenChannelParams{Amt: chanAmt}, + ) + chanPointAB, chanPointBC := chanPoints[0], chanPoints[1] + alice, bob, carol := nodes[0], nodes[1], nodes[2] + + // Before we continue, make sure Alice has seen the channel between Bob + // and Carol. + ht.AssertChannelInGraph(alice, chanPointBC) + + // Alice sends several payments to Carol through Bob, which triggers + // Bob to create forwarding packages. + for i := 0; i < numInvoices; i++ { + // Add an invoice for Carol. + invoice := &lnrpc.Invoice{Memo: "testing", Value: paymentAmt} + resp := carol.RPC.AddInvoice(invoice) + + // Alice sends a payment to Carol through Bob. + ht.CompletePaymentRequests(alice, []string{resp.PaymentRequest}) + } + + flakePaymentStreamReturnEarly() + + // Firstly, Bob force closes the channel. + ht.CloseChannelAssertPending(bob, chanPointAB, true) + + // Now that the channel has been force closed, it should show up in + // bob's PendingChannels RPC under the waiting close section. + pendingAB := ht.AssertChannelWaitingClose(bob, chanPointAB).Channel + + // Check that Bob has created forwarding packages. We don't care the + // exact number here as long as these packages are deleted when the + // channel is closed. + require.NotZero(ht, pendingAB.NumForwardingPackages) + + // Secondly, Bob coop closes the channel. + ht.CloseChannelAssertPending(bob, chanPointBC, false) + + // Now that the channel has been coop closed, it should show up in + // bob's PendingChannels RPC under the waiting close section. + pendingBC := ht.AssertChannelWaitingClose(bob, chanPointBC).Channel + + // Check that Bob has created forwarding packages. We don't care the + // exact number here as long as these packages are deleted when the + // channel is closed. + require.NotZero(ht, pendingBC.NumForwardingPackages) + + // Since it's a coop close, Carol should see the waiting close channel + // too. + pendingBC = ht.AssertChannelWaitingClose(carol, chanPointBC).Channel + require.NotZero(ht, pendingBC.NumForwardingPackages) + + // Mine 1 block to get the two closing transactions confirmed. + ht.MineBlocksAndAssertNumTxes(1, 2) + + // For SQL backends, the forwarding packages are NOT deleted during + // CloseChannel - they are deferred to CleanupPendingCloses at startup. + // Restart the nodes to trigger the cleanup. + ht.RestartNode(bob) + ht.RestartNode(alice) + + // Now that the closing transaction is confirmed, the above waiting + // close channel should now become pending force closed channel. + pendingAB = ht.AssertChannelPendingForceClose(bob, chanPointAB).Channel + + // Check the forwarding packages are deleted after the restart. + require.Zero(ht, pendingAB.NumForwardingPackages) + + // For Alice, the forwarding packages should have been wiped too. + pending := ht.AssertChannelPendingForceClose(alice, chanPointAB) + pendingAB = pending.Channel + require.Zero(ht, pendingAB.NumForwardingPackages) + + // Alice should one pending sweep. + ht.AssertNumPendingSweeps(alice, 1) + + // Mine 1 block to get Alice's sweeping tx confirmed. + ht.MineBlocksAndAssertNumTxes(1, 1) +} diff --git a/kvdb/interface.go b/kvdb/interface.go index 5fd1ecd16a9..d095b3b7caf 100644 --- a/kvdb/interface.go +++ b/kvdb/interface.go @@ -145,6 +145,31 @@ func RootBucket(t RTx) RBucket { return nil } +// DeferrableBackend is an optional interface that backends can implement to +// indicate they prefer deferring heavy operations (like bulk deletes) to +// startup rather than executing them inline. SQL-based backends typically +// implement this interface since concurrent large transactions can cause +// lock contention and timeouts. +type DeferrableBackend interface { + // ShouldDeferHeavyOperations returns true if the backend prefers to + // defer expensive operations (like deleting thousands of revocation + // log entries) to startup rather than executing them inline during + // normal operations. + ShouldDeferHeavyOperations() bool +} + +// ShouldDeferHeavyOperations checks if the backend implements +// DeferrableBackend and if so, returns the result of +// ShouldDeferHeavyOperations(). Returns false for backends that don't +// implement the interface (like bbolt). +func ShouldDeferHeavyOperations(backend Backend) bool { + if db, ok := backend.(DeferrableBackend); ok { + return db.ShouldDeferHeavyOperations() + } + + return false +} + var ( // ErrBucketNotFound is returned when trying to access a bucket that // has not been created yet. diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index 8ff7f979aff..e0c13d7918b 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -95,6 +95,16 @@ type db struct { // Enforce db implements the walletdb.DB interface. var _ walletdb.DB = (*db)(nil) +// ShouldDeferHeavyOperations returns true for postgres backends, indicating +// they prefer deferring heavy operations (like bulk deletes of revocation logs) +// to startup. This helps avoid lock contention and transaction timeouts that +// can occur when multiple channels are closed concurrently. SQLite uses the +// same immediate cleanup approach as bbolt since it doesn't have the same +// concurrency constraints. +func (db *db) ShouldDeferHeavyOperations() bool { + return db.cfg.DriverName == "pgx" +} + var ( // dbConns is a global set of database connections. dbConns *dbConnSet