Skip to content

Commit

Permalink
Merge pull request #2355 from joostjager/move-invoices
Browse files Browse the repository at this point in the history
invoices: create package
  • Loading branch information
Roasbeef committed Jan 8, 2019
2 parents 0c893c6 + c1eaf60 commit 4ac54dc
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 48 deletions.
91 changes: 49 additions & 42 deletions invoiceregistry.go → invoices/invoiceregistry.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main
package invoices

import (
"bytes"
"crypto/sha256"
"fmt"
"github.com/btcsuite/btcd/chaincfg"
"sync"
"sync/atomic"
"time"
Expand All @@ -18,29 +19,30 @@ import (
)

var (
// debugPre is the default debug preimage which is inserted into the
// DebugPre is the default debug preimage which is inserted into the
// invoice registry if the --debughtlc flag is activated on start up.
// All nodes initialized with the flag active will immediately settle
// any incoming HTLC whose rHash corresponds with the debug
// preimage.
debugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32))
DebugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32))

debugHash = chainhash.Hash(sha256.Sum256(debugPre[:]))
// DebugHash is the hash of the default preimage.
DebugHash = chainhash.Hash(sha256.Sum256(DebugPre[:]))
)

// invoiceRegistry is a central registry of all the outstanding invoices
// InvoiceRegistry is a central registry of all the outstanding invoices
// created by the daemon. The registry is a thin wrapper around a map in order
// to ensure that all updates/reads are thread safe.
type invoiceRegistry struct {
type InvoiceRegistry struct {
sync.RWMutex

cdb *channeldb.DB

clientMtx sync.Mutex
nextClientID uint32
notificationClients map[uint32]*invoiceSubscription
notificationClients map[uint32]*InvoiceSubscription

newSubscriptions chan *invoiceSubscription
newSubscriptions chan *InvoiceSubscription
subscriptionCancels chan uint32
invoiceEvents chan *invoiceEvent

Expand All @@ -49,28 +51,33 @@ type invoiceRegistry struct {
// that *all* nodes are able to fully settle.
debugInvoices map[chainhash.Hash]*channeldb.Invoice

activeNetParams *chaincfg.Params

wg sync.WaitGroup
quit chan struct{}
}

// newInvoiceRegistry creates a new invoice registry. The invoice registry
// NewRegistry creates a new invoice registry. The invoice registry
// wraps the persistent on-disk invoice storage with an additional in-memory
// layer. The in-memory layer is in place such that debug invoices can be added
// which are volatile yet available system wide within the daemon.
func newInvoiceRegistry(cdb *channeldb.DB) *invoiceRegistry {
return &invoiceRegistry{
func NewRegistry(cdb *channeldb.DB,
activeNetParams *chaincfg.Params) *InvoiceRegistry {

return &InvoiceRegistry{
cdb: cdb,
debugInvoices: make(map[chainhash.Hash]*channeldb.Invoice),
notificationClients: make(map[uint32]*invoiceSubscription),
newSubscriptions: make(chan *invoiceSubscription),
notificationClients: make(map[uint32]*InvoiceSubscription),
newSubscriptions: make(chan *InvoiceSubscription),
subscriptionCancels: make(chan uint32),
invoiceEvents: make(chan *invoiceEvent, 100),
activeNetParams: activeNetParams,
quit: make(chan struct{}),
}
}

// Start starts the registry and all goroutines it needs to carry out its task.
func (i *invoiceRegistry) Start() error {
func (i *InvoiceRegistry) Start() error {
i.wg.Add(1)

go i.invoiceEventNotifier()
Expand All @@ -79,7 +86,7 @@ func (i *invoiceRegistry) Start() error {
}

// Stop signals the registry for a graceful shutdown.
func (i *invoiceRegistry) Stop() {
func (i *InvoiceRegistry) Stop() {
close(i.quit)

i.wg.Wait()
Expand All @@ -96,7 +103,7 @@ type invoiceEvent struct {
// invoiceEventNotifier is the dedicated goroutine responsible for accepting
// new notification subscriptions, cancelling old subscriptions, and
// dispatching new invoice events.
func (i *invoiceRegistry) invoiceEventNotifier() {
func (i *InvoiceRegistry) invoiceEventNotifier() {
defer i.wg.Done()

for {
Expand All @@ -110,11 +117,11 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
// invoice events.
err := i.deliverBacklogEvents(newClient)
if err != nil {
ltndLog.Errorf("unable to deliver backlog invoice "+
log.Errorf("unable to deliver backlog invoice "+
"notifications: %v", err)
}

ltndLog.Infof("New invoice subscription "+
log.Infof("New invoice subscription "+
"client: id=%v", newClient.id)

// With the backlog notifications delivered (if any),
Expand All @@ -125,7 +132,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
// A client no longer wishes to receive invoice notifications.
// So we'll remove them from the set of active clients.
case clientID := <-i.subscriptionCancels:
ltndLog.Infof("Cancelling invoice subscription for "+
log.Infof("Cancelling invoice subscription for "+
"client=%v", clientID)

delete(i.notificationClients, clientID)
Expand Down Expand Up @@ -157,14 +164,14 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
// instance.
case event.state == channeldb.ContractOpen &&
client.addIndex+1 != invoice.AddIndex:
ltndLog.Warnf("client=%v for invoice "+
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"add_index=%v, new add event index=%v",
clientID, client.addIndex,
invoice.AddIndex)
case event.state == channeldb.ContractSettled &&
client.settleIndex+1 != invoice.SettleIndex:
ltndLog.Warnf("client=%v for invoice "+
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"settle_index=%v, new settle event index=%v",
clientID, client.settleIndex,
Expand Down Expand Up @@ -192,7 +199,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() {
case channeldb.ContractOpen:
client.addIndex = invoice.AddIndex
default:
ltndLog.Errorf("unknown invoice "+
log.Errorf("unknown invoice "+
"state: %v", event.state)
}
}
Expand All @@ -205,7 +212,7 @@ func (i *invoiceRegistry) invoiceEventNotifier() {

// deliverBacklogEvents will attempts to query the invoice database for any
// notifications that the client has missed since it reconnected last.
func (i *invoiceRegistry) deliverBacklogEvents(client *invoiceSubscription) error {
func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {
// First, we'll query the database to see if based on the provided
// addIndex and settledIndex we need to deliver any backlog
// notifications.
Expand Down Expand Up @@ -257,7 +264,7 @@ func (i *invoiceRegistry) deliverBacklogEvents(client *invoiceSubscription) erro
// by the passed preimage. Once this invoice is added, subsystems within the
// daemon add/forward HTLCs that are able to obtain the proper preimage
// required for redemption in the case that we're the final destination.
func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) {
func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) {
paymentHash := chainhash.Hash(sha256.Sum256(preimage[:]))

invoice := &channeldb.Invoice{
Expand All @@ -272,7 +279,7 @@ func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash
i.debugInvoices[paymentHash] = invoice
i.Unlock()

ltndLog.Debugf("Adding debug invoice %v", newLogClosure(func() string {
log.Debugf("Adding debug invoice %v", newLogClosure(func() string {
return spew.Sdump(invoice)
}))
}
Expand All @@ -284,11 +291,11 @@ func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash
// redemption in the case that we're the final destination. We also return the
// addIndex of the newly created invoice which monotonically increases for each
// new invoice added.
func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) {
func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) {
i.Lock()
defer i.Unlock()

ltndLog.Debugf("Adding invoice %v", newLogClosure(func() string {
log.Debugf("Adding invoice %v", newLogClosure(func() string {
return spew.Sdump(invoice)
}))

Expand All @@ -311,7 +318,7 @@ func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error)
// according to the cltv delta.
//
// TODO(roasbeef): ignore if settled?
func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) {
func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) {
// First check the in-memory debug invoice index to see if this is an
// existing invoice added for debugging.
i.RLock()
Expand All @@ -331,7 +338,7 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice
}

payReq, err := zpay32.Decode(
string(invoice.PaymentRequest), activeNetParams.Params,
string(invoice.PaymentRequest), i.activeNetParams,
)
if err != nil {
return channeldb.Invoice{}, 0, err
Expand All @@ -343,13 +350,13 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice
// SettleInvoice attempts to mark an invoice as settled. If the invoice is a
// debug invoice, then this method is a noop as debug invoices are never fully
// settled.
func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash,
func (i *InvoiceRegistry) SettleInvoice(rHash chainhash.Hash,
amtPaid lnwire.MilliSatoshi) error {

i.Lock()
defer i.Unlock()

ltndLog.Debugf("Settling invoice %x", rHash[:])
log.Debugf("Settling invoice %x", rHash[:])

// First check the in-memory debug invoice index to see if this is an
// existing invoice added for debugging.
Expand All @@ -366,7 +373,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash,
return err
}

ltndLog.Infof("Payment received: %v", spew.Sdump(invoice))
log.Infof("Payment received: %v", spew.Sdump(invoice))

i.notifyClients(invoice, channeldb.ContractSettled)

Expand All @@ -375,7 +382,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash,

// notifyClients notifies all currently registered invoice notification clients
// of a newly added/settled invoice.
func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice,
func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice,
state channeldb.ContractState) {

event := &invoiceEvent{
Expand All @@ -389,12 +396,12 @@ func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice,
}
}

// invoiceSubscription represents an intent to receive updates for newly added
// InvoiceSubscription represents an intent to receive updates for newly added
// or settled invoices. For each newly added invoice, a copy of the invoice
// will be sent over the NewInvoices channel. Similarly, for each newly settled
// invoice, a copy of the invoice will be sent over the SettledInvoices
// channel.
type invoiceSubscription struct {
type InvoiceSubscription struct {
cancelled uint32 // To be used atomically.

// NewInvoices is a channel that we'll use to send all newly created
Expand Down Expand Up @@ -424,16 +431,16 @@ type invoiceSubscription struct {

id uint32

inv *invoiceRegistry
inv *InvoiceRegistry

cancelChan chan struct{}

wg sync.WaitGroup
}

// Cancel unregisters the invoiceSubscription, freeing any previously allocated
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
// resources.
func (i *invoiceSubscription) Cancel() {
func (i *InvoiceSubscription) Cancel() {
if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) {
return
}
Expand All @@ -449,13 +456,13 @@ func (i *invoiceSubscription) Cancel() {
i.wg.Wait()
}

// SubscribeNotifications returns an invoiceSubscription which allows the
// SubscribeNotifications returns an InvoiceSubscription which allows the
// caller to receive async notifications when any invoices are settled or
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
// by first sending out all new events with an invoice index _greater_ than
// this value. Afterwards, we'll send out real-time notifications.
func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *invoiceSubscription {
client := &invoiceSubscription{
func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *InvoiceSubscription {
client := &InvoiceSubscription{
NewInvoices: make(chan *channeldb.Invoice),
SettledInvoices: make(chan *channeldb.Invoice),
addIndex: addIndex,
Expand Down Expand Up @@ -495,7 +502,7 @@ func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
case channeldb.ContractSettled:
targetChan = client.SettledInvoices
default:
ltndLog.Errorf("unknown invoice "+
log.Errorf("unknown invoice "+
"state: %v", invoiceEvent.state)

continue
Expand Down
45 changes: 45 additions & 0 deletions invoices/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package invoices

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("INVC", nil))
}

// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

// logClosure is used to provide a closure over expensive logging operations so
// don't have to be performed when the logging level doesn't warrant it.
type logClosure func() string

// String invokes the underlying function and returns the result.
func (c logClosure) String() string {
return c()
}

// newLogClosure returns a new closure over a function that returns a string
// which itself provides a Stringer interface so that it can be used with the
// logging system.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}
4 changes: 4 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/lightningnetwork/lnd/invoices"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -71,6 +72,7 @@ var (
sgnrLog = build.NewSubLogger("SGNR", backendLog.Logger)
wlktLog = build.NewSubLogger("WLKT", backendLog.Logger)
arpcLog = build.NewSubLogger("ARPC", backendLog.Logger)
invcLog = build.NewSubLogger("INVC", backendLog.Logger)
)

// Initialize package-global logger variables.
Expand All @@ -91,6 +93,7 @@ func init() {
signrpc.UseLogger(sgnrLog)
walletrpc.UseLogger(wlktLog)
autopilotrpc.UseLogger(arpcLog)
invoices.UseLogger(invcLog)
}

// subsystemLoggers maps each subsystem identifier to its associated logger.
Expand All @@ -117,6 +120,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"SGNR": sgnrLog,
"WLKT": wlktLog,
"ARPC": arpcLog,
"INVC": invcLog,
}

// initLogRotator initializes the logging rotator to write logs to logFile and
Expand Down
Loading

0 comments on commit 4ac54dc

Please sign in to comment.