Skip to content

Commit

Permalink
Merge pull request #1788 from cfromknecht/disable-height-hint-cache
Browse files Browse the repository at this point in the history
Disable height hint cache
  • Loading branch information
Roasbeef committed Aug 27, 2018
2 parents 26f68da + 28a5936 commit f1256ba
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 22 deletions.
2 changes: 1 addition & 1 deletion chainntnfs/bitcoindnotify/bitcoind_test.go
Expand Up @@ -25,7 +25,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache {
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := chainntnfs.NewHeightHintCache(db)
hintCache, err := chainntnfs.NewHeightHintCache(db, true)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion chainntnfs/btcdnotify/btcd_test.go
Expand Up @@ -23,7 +23,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache {
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := chainntnfs.NewHeightHintCache(db)
hintCache, err := chainntnfs.NewHeightHintCache(db, true)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
34 changes: 31 additions & 3 deletions chainntnfs/height_hint_cache.go
Expand Up @@ -85,7 +85,8 @@ type ConfirmHintCache interface {
// ConfirmHintCache interfaces backed by a channeldb DB instance where the hints
// will be stored.
type HeightHintCache struct {
db *channeldb.DB
db *channeldb.DB
disabled bool
}

// Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache
Expand All @@ -94,8 +95,11 @@ var _ SpendHintCache = (*HeightHintCache)(nil)
var _ ConfirmHintCache = (*HeightHintCache)(nil)

// NewHeightHintCache returns a new height hint cache backed by a database.
func NewHeightHintCache(db *channeldb.DB) (*HeightHintCache, error) {
cache := &HeightHintCache{db}
func NewHeightHintCache(db *channeldb.DB, disable bool) (*HeightHintCache, error) {
cache := &HeightHintCache{
db: db,
disabled: disable,
}
if err := cache.initBuckets(); err != nil {
return nil, err
}
Expand All @@ -119,6 +123,10 @@ func (c *HeightHintCache) initBuckets() error {

// CommitSpendHint commits a spend hint for the outpoints to the cache.
func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) error {
if c.disabled {
return nil
}

Log.Tracef("Updating spend hint to height %d for %v", height, ops)

return c.db.Batch(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -153,6 +161,10 @@ func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) e
// ErrSpendHintNotFound is returned if a spend hint does not exist within the
// cache for the outpoint.
func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) {
if c.disabled {
return 0, ErrSpendHintNotFound
}

var hint uint32
err := c.db.View(func(tx *bolt.Tx) error {
spendHints := tx.Bucket(spendHintBucket)
Expand Down Expand Up @@ -181,6 +193,10 @@ func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) {

// PurgeSpendHint removes the spend hint for the outpoints from the cache.
func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error {
if c.disabled {
return nil
}

Log.Tracef("Removing spend hints for %v", ops)

return c.db.Batch(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -208,6 +224,10 @@ func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error {

// CommitConfirmHint commits a confirm hint for the transactions to the cache.
func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Hash) error {
if c.disabled {
return nil
}

Log.Tracef("Updating confirm hints to height %d for %v", height, txids)

return c.db.Batch(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -242,6 +262,10 @@ func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Ha
// ErrConfirmHintNotFound is returned if a confirm hint does not exist within
// the cache for the transaction hash.
func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) {
if c.disabled {
return 0, ErrConfirmHintNotFound
}

var hint uint32
err := c.db.View(func(tx *bolt.Tx) error {
confirmHints := tx.Bucket(confirmHintBucket)
Expand Down Expand Up @@ -271,6 +295,10 @@ func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error)
// PurgeConfirmHint removes the confirm hint for the transactions from the
// cache.
func (c *HeightHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error {
if c.disabled {
return nil
}

Log.Tracef("Removing confirm hints for %v", txids)

return c.db.Batch(func(tx *bolt.Tx) error {
Expand Down
81 changes: 77 additions & 4 deletions chainntnfs/height_hint_cache_test.go
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
)

func initHintCache(t *testing.T) *HeightHintCache {
func initHintCache(t *testing.T, disable bool) *HeightHintCache {
t.Helper()

tempDir, err := ioutil.TempDir("", "kek")
Expand All @@ -21,7 +21,7 @@ func initHintCache(t *testing.T) *HeightHintCache {
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := NewHeightHintCache(db)
hintCache, err := NewHeightHintCache(db, disable)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand All @@ -34,7 +34,7 @@ func initHintCache(t *testing.T) *HeightHintCache {
func TestHeightHintCacheConfirms(t *testing.T) {
t.Parallel()

hintCache := initHintCache(t)
hintCache := initHintCache(t, false)

// Querying for a transaction hash not found within the cache should
// return an error indication so.
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestHeightHintCacheConfirms(t *testing.T) {
func TestHeightHintCacheSpends(t *testing.T) {
t.Parallel()

hintCache := initHintCache(t)
hintCache := initHintCache(t, false)

// Querying for an outpoint not found within the cache should return an
// error indication so.
Expand Down Expand Up @@ -146,3 +146,76 @@ func TestHeightHintCacheSpends(t *testing.T) {
}
}
}

// TestHeightHintCacheDisabled asserts that a disabled height hint cache never
// returns spend or confirm hints that are committed.
func TestHeightHintCacheDisabled(t *testing.T) {
t.Parallel()

const height uint32 = 100

// Create a disabled height hint cache.
hintCache := initHintCache(t, true)

// Querying a disabled cache w/ no spend hint should return not found.
var outpoint wire.OutPoint
_, err := hintCache.QuerySpendHint(outpoint)
if err != ErrSpendHintNotFound {
t.Fatalf("expected ErrSpendHintNotFound, got: %v", err)
}

// Commit a spend hint to the disabled cache, which should be a noop.
if err := hintCache.CommitSpendHint(height, outpoint); err != nil {
t.Fatalf("unable to commit spend hint: %v", err)
}

// Querying a disabled cache after commit noop should return not found.
_, err = hintCache.QuerySpendHint(outpoint)
if err != ErrSpendHintNotFound {
t.Fatalf("expected ErrSpendHintNotFound, got: %v", err)
}

// Reenable the cache, this time actually committing a spend hint.
hintCache.disabled = false
if err := hintCache.CommitSpendHint(height, outpoint); err != nil {
t.Fatalf("unable to commit spend hint: %v", err)
}

// Disable the cache again, spend hint should not be found.
hintCache.disabled = true
_, err = hintCache.QuerySpendHint(outpoint)
if err != ErrSpendHintNotFound {
t.Fatalf("expected ErrSpendHintNotFound, got: %v", err)
}

// Querying a disabled cache w/ no conf hint should return not found.
var txid chainhash.Hash
_, err = hintCache.QueryConfirmHint(txid)
if err != ErrConfirmHintNotFound {
t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err)
}

// Commit a conf hint to the disabled cache, which should be a noop.
if err := hintCache.CommitConfirmHint(height, txid); err != nil {
t.Fatalf("unable to commit spend hint: %v", err)
}

// Querying a disabled cache after commit noop should return not found.
_, err = hintCache.QueryConfirmHint(txid)
if err != ErrConfirmHintNotFound {
t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err)
}

// Reenable the cache, this time actually committing a conf hint.
hintCache.disabled = false
if err := hintCache.CommitConfirmHint(height, txid); err != nil {
t.Fatalf("unable to commit spend hint: %v", err)
}

// Disable the cache again, conf hint should not be found.
hintCache.disabled = true
_, err = hintCache.QueryConfirmHint(txid)
if err != ErrConfirmHintNotFound {
t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err)
}
}
2 changes: 1 addition & 1 deletion chainntnfs/interface_test.go
Expand Up @@ -1572,7 +1572,7 @@ func TestInterfaces(t *testing.T) {
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := chainntnfs.NewHeightHintCache(db)
hintCache, err := chainntnfs.NewHeightHintCache(db, true)
if err != nil {
t.Fatalf("unable to create height hint cache: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions chainregistry.go
Expand Up @@ -181,8 +181,8 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB,
cleanUp func()
)

// Initialize the height hint cache within the chain directory.
hintCache, err := chainntnfs.NewHeightHintCache(chanDB)
// Initialize disabled height hint cache within the chain directory.
hintCache, err := chainntnfs.NewHeightHintCache(chanDB, true)
if err != nil {
return nil, nil, fmt.Errorf("unable to initialize height hint "+
"cache: %v", err)
Expand Down
17 changes: 12 additions & 5 deletions lnd_test.go
Expand Up @@ -2411,15 +2411,22 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf(err.Error())
}

// The htlc funds will still be shown as limbo, since they are still in
// their first stage. The commitment funds will have been recovered
// after the commit txn was included in the last block.
// The commitment funds will have been recovered after the commit txn
// was included in the last block. The htlc funds will not be shown in
// limbo, since they are still in their first stage and the nursery
// hasn't received them from the contract court.
forceClose, err := findForceClosedChannel(pendingChanResp, &op)
if err != nil {
t.Fatalf(err.Error())
}
if forceClose.LimboBalance == 0 {
t.Fatalf("htlc funds should still be in limbo")
err = checkPendingChannelNumHtlcs(forceClose, 0)
if err != nil {
t.Fatalf("expected 0 pending htlcs, found %d",
len(forceClose.PendingHtlcs))
}
if forceClose.LimboBalance != 0 {
t.Fatalf("expected 0 funds in limbo, found %d",
forceClose.LimboBalance)
}

// Compute the height preceding that which will cause the htlc CLTV
Expand Down
2 changes: 1 addition & 1 deletion lnwallet/interface_test.go
Expand Up @@ -2076,7 +2076,7 @@ func TestLightningWallet(t *testing.T) {
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := chainntnfs.NewHeightHintCache(db)
hintCache, err := chainntnfs.NewHeightHintCache(db, true)
if err != nil {
t.Fatalf("unable to create height hint cache: %v", err)
}
Expand Down
10 changes: 6 additions & 4 deletions peer.go
Expand Up @@ -804,14 +804,16 @@ func (ms *msgStream) msgConsumer() {

// AddMsg adds a new message to the msgStream. This function is safe for
// concurrent access.
func (ms *msgStream) AddMsg(msg lnwire.Message) {
func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) {
// First, we'll attempt to receive from the producerSema struct. This
// acts as a sempahore to prevent us from indefinitely buffering
// incoming items from the wire. Either the msg queue isn't full, and
// we'll not block, or the queue is full, and we'll block until either
// we're signalled to quit, or a slot is freed up.
select {
case <-ms.producerSema:
case <-quit:
return
case <-ms.quit:
return
}
Expand Down Expand Up @@ -1020,7 +1022,7 @@ out:
// forward the error to all channels with this peer.
case msg.ChanID == lnwire.ConnectionWideID:
for chanID, chanStream := range chanMsgStreams {
chanStream.AddMsg(nextMsg)
chanStream.AddMsg(nextMsg, p.quit)

// Also marked this channel as failed,
// so we won't try to restart it on
Expand Down Expand Up @@ -1082,7 +1084,7 @@ out:
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:

discStream.AddMsg(msg)
discStream.AddMsg(msg, p.quit)

default:
peerLog.Errorf("unknown message %v received from peer "+
Expand All @@ -1105,7 +1107,7 @@ out:

// With the stream obtained, add the message to the
// stream so we can continue processing message.
chanStream.AddMsg(nextMsg)
chanStream.AddMsg(nextMsg, p.quit)
}

idleTimer.Reset(idleTimeout)
Expand Down

0 comments on commit f1256ba

Please sign in to comment.