Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

invoices: refactor InvoiceDB to eliminate ScanInvoices #8081

Merged
merged 5 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 95 additions & 42 deletions channeldb/invoice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,64 +1013,58 @@ func TestSettleIndexAmpPayments(t *testing.T) {
require.Nil(t, err)
}

// TestScanInvoices tests that ScanInvoices scans through all stored invoices
// correctly.
func TestScanInvoices(t *testing.T) {
// TestFetchPendingInvoices tests that we can fetch all pending invoices from
// the database using the FetchPendingInvoices method.
func TestFetchPendingInvoices(t *testing.T) {
t.Parallel()

db, err := MakeTestInvoiceDB(t)
db, err := MakeTestInvoiceDB(t, OptionClock(testClock))
require.NoError(t, err, "unable to make test db")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add a case here to test the empty slice returned from FetchPendingInvoices?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good idea! Done.

var invoices map[lntypes.Hash]*invpkg.Invoice
callCount := 0
resetCount := 0

// reset is used to reset/initialize results and is called once
// upon calling ScanInvoices and when the underlying transaction is
// retried.
reset := func() {
invoices = make(map[lntypes.Hash]*invpkg.Invoice)
callCount = 0
resetCount++
}

scanFunc := func(paymentHash lntypes.Hash,
invoice *invpkg.Invoice) error {

invoices[paymentHash] = invoice
callCount++

return nil
}

ctxb := context.Background()
// With an empty DB we expect to not scan any invoices.
require.NoError(t, db.ScanInvoices(ctxb, scanFunc, reset))
require.Equal(t, 0, len(invoices))
require.Equal(t, 0, callCount)
require.Equal(t, 1, resetCount)

numInvoices := 5
testInvoices := make(map[lntypes.Hash]*invpkg.Invoice)
// Make sure that fetching pending invoices from an empty database
// returns an empty result and no errors.
pending, err := db.FetchPendingInvoices(ctxb)
require.NoError(t, err)
require.Empty(t, pending)

const numInvoices = 20
var settleIndex uint64 = 1
pendingInvoices := make(map[lntypes.Hash]invpkg.Invoice)

// Now populate the DB and check if we can get all invoices with their
// payment hashes as expected.
for i := 1; i <= numInvoices; i++ {
invoice, err := randInvoice(lnwire.MilliSatoshi(i))
amt := lnwire.MilliSatoshi(i * 1000)
invoice, err := randInvoice(amt)
require.NoError(t, err)

invoice.CreationDate = invoice.CreationDate.Add(
time.Duration(i-1) * time.Second,
)

paymentHash := invoice.Terms.PaymentPreimage.Hash()
testInvoices[paymentHash] = invoice

_, err = db.AddInvoice(ctxb, invoice, paymentHash)
require.NoError(t, err)

// Settle every second invoice.
if i%2 == 0 {
pendingInvoices[paymentHash] = *invoice
continue
}

ref := invpkg.InvoiceRefByHash(paymentHash)
_, err = db.UpdateInvoice(ctxb, ref, nil, getUpdateInvoice(amt))
require.NoError(t, err)

settleTestInvoice(invoice, settleIndex)
yyforyongyu marked this conversation as resolved.
Show resolved Hide resolved
settleIndex++
}

resetCount = 0
require.NoError(t, db.ScanInvoices(ctxb, scanFunc, reset))
require.Equal(t, numInvoices, callCount)
require.Equal(t, testInvoices, invoices)
require.Equal(t, 1, resetCount)
// Fetch all pending invoices.
pending, err = db.FetchPendingInvoices(ctxb)
require.NoError(t, err)
require.Equal(t, pendingInvoices, pending)
}

// TestDuplicateSettleInvoice tests that if we add a new invoice and settle it
Expand Down Expand Up @@ -3092,6 +3086,65 @@ func TestDeleteInvoices(t *testing.T) {
assertInvoiceCount(0)
}

// TestDeleteCanceledInvoices tests that deleting canceled invoices with the
// specific DeleteCanceledInvoices method works correctly.
func TestDeleteCanceledInvoices(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be great if we could add tests to cover more branches in this method, but guess it's too much work as most stuff here is not mocked.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe we should return to the test coverage question later on once we have both SQL and KV implementations run against this test suite since I'm afraid many of the branches we'd test will go away as we deprecate the KV implementation. Similarly TestDeleteInvoices is also very barebones, so perhaps a bigger overhaul is necessary.

t.Parallel()

db, err := MakeTestInvoiceDB(t)
require.NoError(t, err, "unable to make test db")

// Updatefunc is used to cancel an invoice.
updateFunc := func(invoice *invpkg.Invoice) (
*invpkg.InvoiceUpdateDesc, error) {

return &invpkg.InvoiceUpdateDesc{
UpdateType: invpkg.CancelInvoiceUpdate,
State: &invpkg.InvoiceStateUpdateDesc{
NewState: invpkg.ContractCanceled,
},
}, nil
}

// Add some invoices to the test db.
ctxb := context.Background()
var invoices []invpkg.Invoice
for i := 0; i < 10; i++ {
invoice, err := randInvoice(lnwire.MilliSatoshi(i + 1))
require.NoError(t, err)

paymentHash := invoice.Terms.PaymentPreimage.Hash()
_, err = db.AddInvoice(ctxb, invoice, paymentHash)
require.NoError(t, err)

// Cancel every second invoice.
if i%2 == 0 {
invoice, err = db.UpdateInvoice(
ctxb, invpkg.InvoiceRefByHash(paymentHash), nil,
updateFunc,
)
require.NoError(t, err)
} else {
invoices = append(invoices, *invoice)
}
}

// Delete canceled invoices.
require.NoError(t, db.DeleteCanceledInvoices(ctxb))

// Query to collect all invoices.
query := invpkg.InvoiceQuery{
IndexOffset: 0,
NumMaxInvoices: math.MaxUint64,
}

dbInvoices, err := db.QueryInvoices(ctxb, query)
require.NoError(t, err)

// Check that we really have the expected invoices.
require.Equal(t, invoices, dbInvoices.Invoices)
}

// TestAddInvoiceInvalidFeatureDeps asserts that inserting an invoice with
// invalid transitive feature dependencies fails with the appropriate error.
func TestAddInvoiceInvalidFeatureDeps(t *testing.T) {
Expand Down
134 changes: 121 additions & 13 deletions channeldb/invoices.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,17 +433,18 @@ func fetchInvoiceNumByRef(invoiceIndex, payAddrIndex, setIDIndex kvdb.RBucket,
}
}

// ScanInvoices scans through all invoices and calls the passed scanFunc for
// for each invoice with its respective payment hash. Additionally a reset()
// closure is passed which is used to reset/initialize partial results and also
// to signal if the kvdb.View transaction has been retried.
func (d *DB) ScanInvoices(_ context.Context, scanFunc invpkg.InvScanFunc,
reset func()) error {

return kvdb.View(d, func(tx kvdb.RTx) error {
// FetchPendingInvoices returns all invoices that have not yet been settled or
// canceled. The returned map is keyed by the payment hash of each respective
// invoice.
func (d *DB) FetchPendingInvoices(_ context.Context) (
map[lntypes.Hash]invpkg.Invoice, error) {

result := make(map[lntypes.Hash]invpkg.Invoice)

err := kvdb.View(d, func(tx kvdb.RTx) error {
invoices := tx.ReadBucket(invoiceBucket)
if invoices == nil {
return invpkg.ErrNoInvoicesCreated
return nil
}

invoiceIndex := invoices.NestedReadBucket(invoiceIndexBucket)
Expand Down Expand Up @@ -472,12 +473,23 @@ func (d *DB) ScanInvoices(_ context.Context, scanFunc invpkg.InvScanFunc,
return err
}

var paymentHash lntypes.Hash
copy(paymentHash[:], k)
if invoice.IsPending() {
var paymentHash lntypes.Hash
copy(paymentHash[:], k)
result[paymentHash] = invoice
}

return scanFunc(paymentHash, &invoice)
return nil
})
}, reset)
}, func() {
result = make(map[lntypes.Hash]invpkg.Invoice)
})

if err != nil {
return nil, err
}

return result, nil
}

// QueryInvoices allows a caller to query the invoice database for invoices
Expand Down Expand Up @@ -2702,6 +2714,102 @@ func delAMPSettleIndex(invoiceNum []byte, invoices,
return nil
}

// DeleteCanceledInvoices deletes all canceled invoices from the database.
func (d *DB) DeleteCanceledInvoices(_ context.Context) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use batch here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably not a good idea since we scan through all the invoices here and bolt's Batch method assumes that the closure can be executed multiple times since it'll only try to parallelize optimistically (given bolt has no idea about what we do here).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on whether there's an error and how we handle the error, from this bench test, if we distinguish the errors here, each closure is executed exactly once.

invoices := tx.ReadWriteBucket(invoiceBucket)
if invoices == nil {
return nil
}

invoiceIndex := invoices.NestedReadWriteBucket(
invoiceIndexBucket,
)
if invoiceIndex == nil {
return invpkg.ErrNoInvoicesCreated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we should unify the behaviors here - in FetchPendingInvoices we'd ignore this error, and we should. do the same here? In general I think DeleteCanceledInvoices should be a NOOP if there are no invoices in the db, instead of returning an error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is different though right? We do return nil above when we "open" the read-write bucket invoices however if that exist, we should assume that both the the invoice index and add index exists.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could it be the case that this is just an empty bucket? like all the invoices are deleted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if the bucket is empty we'd still get back the handle.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool - then I'm curious in what case will this ErrNoInvoicesCreated be returned?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's there to give us some reassurance that about expected existence of those buckets. Similar checks are done for other channeldb methods too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool then I interpret it as a db corruption check.

}

invoiceAddIndex := invoices.NestedReadWriteBucket(
addIndexBucket,
)
if invoiceAddIndex == nil {
return invpkg.ErrNoInvoicesCreated
}

payAddrIndex := tx.ReadWriteBucket(payAddrIndexBucket)

return invoiceIndex.ForEach(func(k, v []byte) error {
// Skip the special numInvoicesKey as that does not
// point to a valid invoice.
if bytes.Equal(k, numInvoicesKey) {
return nil
}

// Skip sub-buckets.
if v == nil {
return nil
}

invoice, err := fetchInvoice(v, invoices)
if err != nil {
return err
}

if invoice.State != invpkg.ContractCanceled {
return nil
}

// Delete the payment hash from the invoice index.
err = invoiceIndex.Delete(k)
if err != nil {
return err
}

// Delete payment address index reference if there's a
// valid payment address.
if invoice.Terms.PaymentAddr != invpkg.BlankPayAddr {
// To ensure consistency check that the already
// fetched invoice key matches the one in the
// payment address index.
key := payAddrIndex.Get(
invoice.Terms.PaymentAddr[:],
)
if bytes.Equal(key, k) {
// Delete from the payment address
// index.
if err := payAddrIndex.Delete(
invoice.Terms.PaymentAddr[:],
); err != nil {
return err
}
}
}

// Remove from the add index.
var addIndexKey [8]byte
byteOrder.PutUint64(addIndexKey[:], invoice.AddIndex)
err = invoiceAddIndex.Delete(addIndexKey[:])
if err != nil {
return err
}

// Note that we don't need to delete the invoice from
// the settle index as it is not added until the
// invoice is settled.

// Now remove all sub invoices.
err = delAMPInvoices(k, invoices)
if err != nil {
return err
}

// Finally remove the serialized invoice from the
// invoice bucket.
return invoices.Delete(k)
})
}, func() {})
}

// DeleteInvoice attempts to delete the passed invoices from the database in
// one transaction. The passed delete references hold all keys required to
// delete the invoices without also needing to deserialze them.
Expand Down
3 changes: 3 additions & 0 deletions docs/release-notes/release-notes-0.18.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
a context parameter to all `InvoiceDB` methods which is a pre-requisite for
the SQL implementation.

* [Refactor InvoiceDB](https://github.com/lightningnetwork/lnd/pull/8081) to
eliminate the use of `ScanInvoices`.

## Code Health
## Tooling and Documentation

Expand Down
22 changes: 8 additions & 14 deletions invoices/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/lightningnetwork/lnd/record"
)

// InvScanFunc is a helper type used to specify the type used in the
// ScanInvoices methods (part of the InvoiceDB interface).
type InvScanFunc func(lntypes.Hash, *Invoice) error

// InvoiceDB is the database that stores the information about invoices.
type InvoiceDB interface {
// AddInvoice inserts the targeted invoice into the database.
Expand Down Expand Up @@ -45,16 +41,10 @@ type InvoiceDB interface {
// payment.
LookupInvoice(ctx context.Context, ref InvoiceRef) (Invoice, error)

// ScanInvoices scans through all invoices and calls the passed scanFunc
// for each invoice with its respective payment hash. Additionally a
// reset() closure is passed which is used to reset/initialize partial
// results and also to signal if the kvdb.View transaction has been
// retried.
//
// TODO(positiveblue): abstract this functionality so it makes sense for
// other backends like sql.
ScanInvoices(ctx context.Context, scanFunc InvScanFunc,
reset func()) error
// FetchPendingInvoices returns all invoices that have not yet been
// settled or canceled.
FetchPendingInvoices(ctx context.Context) (map[lntypes.Hash]Invoice,
error)

// QueryInvoices allows a caller to query the invoice database for
// invoices within the specified add index range.
Expand Down Expand Up @@ -90,6 +80,10 @@ type InvoiceDB interface {
// deserialze them.
DeleteInvoice(ctx context.Context,
invoicesToDelete []InvoiceDeleteRef) error

// DeleteCanceledInvoices removes all canceled invoices from the
// database.
DeleteCanceledInvoices(ctx context.Context) error
}

// Payload abstracts access to any additional fields provided in the final hop's
Expand Down