Skip to content

Commit

Permalink
Merge pull request #4261 from carlaKC/4164-indexpayments
Browse files Browse the repository at this point in the history
channeldb: Index payments by sequence number
  • Loading branch information
cfromknecht committed Jun 10, 2020
2 parents 38b8e54 + ab594ea commit d47d17b
Show file tree
Hide file tree
Showing 13 changed files with 1,265 additions and 128 deletions.
10 changes: 10 additions & 0 deletions channeldb/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func WriteElement(w io.Writer, element interface{}) error {
return err
}

case paymentIndexType:
if err := binary.Write(w, byteOrder, e); err != nil {
return err
}

case lnwire.FundingFlag:
if err := binary.Write(w, byteOrder, e); err != nil {
return err
Expand Down Expand Up @@ -406,6 +411,11 @@ func ReadElement(r io.Reader, element interface{}) error {
return err
}

case *paymentIndexType:
if err := binary.Read(r, byteOrder, e); err != nil {
return err
}

case *lnwire.FundingFlag:
if err := binary.Read(r, byteOrder, e); err != nil {
return err
Expand Down
15 changes: 15 additions & 0 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
mig "github.com/lightningnetwork/lnd/channeldb/migration"
"github.com/lightningnetwork/lnd/channeldb/migration12"
"github.com/lightningnetwork/lnd/channeldb/migration13"
"github.com/lightningnetwork/lnd/channeldb/migration16"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -144,6 +145,19 @@ var (
number: 14,
migration: mig.CreateTLB(payAddrIndexBucket),
},
{
// Initialize payment index bucket which will be used
// to index payments by sequence number. This index will
// be used to allow more efficient ListPayments queries.
number: 15,
migration: mig.CreateTLB(paymentsIndexBucket),
},
{
// Add our existing payments to the index bucket created
// in migration 15.
number: 16,
migration: migration16.MigrateSequenceIndex,
},
}

// Big endian is the preferred byte order, due to cursor scans over
Expand Down Expand Up @@ -257,6 +271,7 @@ var topLevelBuckets = [][]byte{
fwdPackagesKey,
invoiceBucket,
payAddrIndexBucket,
paymentsIndexBucket,
nodeInfoBucket,
nodeBucket,
edgeBucket,
Expand Down
12 changes: 12 additions & 0 deletions channeldb/invoice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,18 @@ func TestQueryInvoices(t *testing.T) {
// still pending.
expected: pendingInvoices[len(pendingInvoices)-15:],
},
// Fetch all invoices paginating backwards, with an index offset
// that is beyond our last offset. We expect all invoices to be
// returned.
{
query: InvoiceQuery{
IndexOffset: numInvoices * 2,
PendingOnly: false,
Reversed: true,
NumMaxInvoices: numInvoices,
},
expected: invoices,
},
}

for i, testCase := range testCases {
Expand Down
90 changes: 26 additions & 64 deletions channeldb/invoices.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,85 +839,47 @@ func (d *DB) QueryInvoices(q InvoiceQuery) (InvoiceSlice, error) {
if invoices == nil {
return ErrNoInvoicesCreated
}

// Get the add index bucket which we will use to iterate through
// our indexed invoices.
invoiceAddIndex := invoices.NestedReadBucket(addIndexBucket)
if invoiceAddIndex == nil {
return ErrNoInvoicesCreated
}

// keyForIndex is a helper closure that retrieves the invoice
// key for the given add index of an invoice.
keyForIndex := func(c kvdb.RCursor, index uint64) []byte {
var keyIndex [8]byte
byteOrder.PutUint64(keyIndex[:], index)
_, invoiceKey := c.Seek(keyIndex[:])
return invoiceKey
}

// nextKey is a helper closure to determine what the next
// invoice key is when iterating over the invoice add index.
nextKey := func(c kvdb.RCursor) ([]byte, []byte) {
if q.Reversed {
return c.Prev()
}
return c.Next()
}

// We'll be using a cursor to seek into the database and return
// a slice of invoices. We'll need to determine where to start
// our cursor depending on the parameters set within the query.
c := invoiceAddIndex.ReadCursor()
invoiceKey := keyForIndex(c, q.IndexOffset+1)

// If the query is specifying reverse iteration, then we must
// handle a few offset cases.
if q.Reversed {
switch q.IndexOffset {

// This indicates the default case, where no offset was
// specified. In that case we just start from the last
// invoice.
case 0:
_, invoiceKey = c.Last()

// This indicates the offset being set to the very
// first invoice. Since there are no invoices before
// this offset, and the direction is reversed, we can
// return without adding any invoices to the response.
case 1:
return nil

// Otherwise we start iteration at the invoice prior to
// the offset.
default:
invoiceKey = keyForIndex(c, q.IndexOffset-1)
}
}

// If we know that a set of invoices exists, then we'll begin
// our seek through the bucket in order to satisfy the query.
// We'll continue until either we reach the end of the range, or
// reach our max number of invoices.
for ; invoiceKey != nil; _, invoiceKey = nextKey(c) {
// If our current return payload exceeds the max number
// of invoices, then we'll exit now.
if uint64(len(resp.Invoices)) >= q.NumMaxInvoices {
break
}
// Create a paginator which reads from our add index bucket with
// the parameters provided by the invoice query.
paginator := newPaginator(
invoiceAddIndex.ReadCursor(), q.Reversed, q.IndexOffset,
q.NumMaxInvoices,
)

invoice, err := fetchInvoice(invoiceKey, invoices)
// accumulateInvoices looks up an invoice based on the index we
// are given, adds it to our set of invoices if it has the right
// characteristics for our query and returns the number of items
// we have added to our set of invoices.
accumulateInvoices := func(_, indexValue []byte) (bool, error) {
invoice, err := fetchInvoice(indexValue, invoices)
if err != nil {
return err
return false, err
}

// Skip any settled or canceled invoices if the caller is
// only interested in pending ones.
// Skip any settled or canceled invoices if the caller
// is only interested in pending ones.
if q.PendingOnly && !invoice.IsPending() {
continue
return false, nil
}

// At this point, we've exhausted the offset, so we'll
// begin collecting invoices found within the range.
resp.Invoices = append(resp.Invoices, invoice)
return true, nil
}

// Query our paginator using accumulateInvoices to build up a
// set of invoices.
if err := paginator.query(accumulateInvoices); err != nil {
return err
}

// If we iterated through the add index in reverse order, then
Expand Down
2 changes: 2 additions & 0 deletions channeldb/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
mig "github.com/lightningnetwork/lnd/channeldb/migration"
"github.com/lightningnetwork/lnd/channeldb/migration12"
"github.com/lightningnetwork/lnd/channeldb/migration13"
"github.com/lightningnetwork/lnd/channeldb/migration16"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
)

Expand Down Expand Up @@ -33,4 +34,5 @@ func UseLogger(logger btclog.Logger) {
migration_01_to_11.UseLogger(logger)
migration12.UseLogger(logger)
migration13.UseLogger(logger)
migration16.UseLogger(logger)
}
14 changes: 14 additions & 0 deletions channeldb/migration16/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package migration16

import (
"github.com/btcsuite/btclog"
)

// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled

// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

0 comments on commit d47d17b

Please sign in to comment.