Skip to content

Commit

Permalink
Merge pull request #8464 from ellemouton/resend-shutdown-2
Browse files Browse the repository at this point in the history
multi: resend shutdown on reestablish
  • Loading branch information
ellemouton committed Feb 21, 2024
2 parents c398b0c + 2fe8520 commit 1627976
Show file tree
Hide file tree
Showing 14 changed files with 635 additions and 180 deletions.
140 changes: 140 additions & 0 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
Expand Down Expand Up @@ -121,6 +122,12 @@ var (
// broadcasted when moving the channel to state CoopBroadcasted.
coopCloseTxKey = []byte("coop-closing-tx-key")

// shutdownInfoKey points to the serialised shutdown info that has been
// persisted for a channel. The existence of this info means that we
// have sent the Shutdown message before and so should re-initiate the
// shutdown on re-establish.
shutdownInfoKey = []byte("shutdown-info-key")

// commitDiffKey stores the current pending commitment state we've
// extended to the remote party (if any). Each time we propose a new
// state, we store the information necessary to reconstruct this state
Expand Down Expand Up @@ -188,6 +195,10 @@ var (
// in the state CommitBroadcasted.
ErrNoCloseTx = fmt.Errorf("no closing tx found")

// ErrNoShutdownInfo is returned when no shutdown info has been
// persisted for a channel.
ErrNoShutdownInfo = errors.New("no shutdown info")

// ErrNoRestoredChannelMutation is returned when a caller attempts to
// mutate a channel that's been recovered.
ErrNoRestoredChannelMutation = fmt.Errorf("cannot mutate restored " +
Expand Down Expand Up @@ -1575,6 +1586,79 @@ func (c *OpenChannel) ChanSyncMsg() (*lnwire.ChannelReestablish, error) {
}, nil
}

// MarkShutdownSent serialises and persist the given ShutdownInfo for this
// channel. Persisting this info represents the fact that we have sent the
// Shutdown message to the remote side and hence that we should re-transmit the
// same Shutdown message on re-establish.
func (c *OpenChannel) MarkShutdownSent(info *ShutdownInfo) error {
c.Lock()
defer c.Unlock()

return c.storeShutdownInfo(info)
}

// storeShutdownInfo serialises the ShutdownInfo and persists it under the
// shutdownInfoKey.
func (c *OpenChannel) storeShutdownInfo(info *ShutdownInfo) error {
var b bytes.Buffer
err := info.encode(&b)
if err != nil {
return err
}

return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error {
chanBucket, err := fetchChanBucketRw(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil {
return err
}

return chanBucket.Put(shutdownInfoKey, b.Bytes())
}, func() {})
}

// ShutdownInfo decodes the shutdown info stored for this channel and returns
// the result. If no shutdown info has been persisted for this channel then the
// ErrNoShutdownInfo error is returned.
func (c *OpenChannel) ShutdownInfo() (fn.Option[ShutdownInfo], error) {
c.RLock()
defer c.RUnlock()

var shutdownInfo *ShutdownInfo
err := kvdb.View(c.Db.backend, func(tx kvdb.RTx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
switch {
case err == nil:
case errors.Is(err, ErrNoChanDBExists),
errors.Is(err, ErrNoActiveChannels),
errors.Is(err, ErrChannelNotFound):

return ErrNoShutdownInfo
default:
return err
}

shutdownInfoBytes := chanBucket.Get(shutdownInfoKey)
if shutdownInfoBytes == nil {
return ErrNoShutdownInfo
}

shutdownInfo, err = decodeShutdownInfo(shutdownInfoBytes)

return err
}, func() {
shutdownInfo = nil
})
if err != nil {
return fn.None[ShutdownInfo](), err
}

return fn.Some[ShutdownInfo](*shutdownInfo), nil
}

// isBorked returns true if the channel has been marked as borked in the
// database. This requires an existing database transaction to already be
// active.
Expand Down Expand Up @@ -4294,3 +4378,59 @@ func MakeScidRecord(typ tlv.Type, scid *lnwire.ShortChannelID) tlv.Record {
typ, scid, 8, lnwire.EShortChannelID, lnwire.DShortChannelID,
)
}

// ShutdownInfo contains various info about the shutdown initiation of a
// channel.
type ShutdownInfo struct {
// DeliveryScript is the address that we have included in any previous
// Shutdown message for a particular channel and so should include in
// any future re-sends of the Shutdown message.
DeliveryScript tlv.RecordT[tlv.TlvType0, lnwire.DeliveryAddress]

// LocalInitiator is true if we sent a Shutdown message before ever
// receiving a Shutdown message from the remote peer.
LocalInitiator tlv.RecordT[tlv.TlvType1, bool]
}

// NewShutdownInfo constructs a new ShutdownInfo object.
func NewShutdownInfo(deliveryScript lnwire.DeliveryAddress,
locallyInitiated bool) *ShutdownInfo {

return &ShutdownInfo{
DeliveryScript: tlv.NewRecordT[tlv.TlvType0](deliveryScript),
LocalInitiator: tlv.NewPrimitiveRecord[tlv.TlvType1](
locallyInitiated,
),
}
}

// encode serialises the ShutdownInfo to the given io.Writer.
func (s *ShutdownInfo) encode(w io.Writer) error {
records := []tlv.Record{
s.DeliveryScript.Record(),
s.LocalInitiator.Record(),
}

stream, err := tlv.NewStream(records...)
if err != nil {
return err
}

return stream.Encode(w)
}

// decodeShutdownInfo constructs a ShutdownInfo struct by decoding the given
// byte slice.
func decodeShutdownInfo(b []byte) (*ShutdownInfo, error) {
tlvStream := lnwire.ExtraOpaqueData(b)

var info ShutdownInfo
records := []tlv.RecordProducer{
&info.DeliveryScript,
&info.LocalInitiator,
}

_, err := tlvStream.ExtractRecords(records...)

return &info, err
}
64 changes: 64 additions & 0 deletions channeldb/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,70 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
}
}

// TestShutdownInfo tests that a channel's shutdown info can correctly be
// persisted and retrieved.
func TestShutdownInfo(t *testing.T) {
t.Parallel()

tests := []struct {
name string
localInit bool
}{
{
name: "local node initiated",
localInit: true,
},
{
name: "remote node initiated",
localInit: false,
},
}

for _, test := range tests {
test := test

t.Run(test.name, func(t *testing.T) {
t.Parallel()

testShutdownInfo(t, test.localInit)
})
}
}

func testShutdownInfo(t *testing.T, locallyInitiated bool) {
fullDB, err := MakeTestDB(t)
require.NoError(t, err, "unable to make test database")

cdb := fullDB.ChannelStateDB()

// First a test channel.
channel := createTestChannel(t, cdb)

// We haven't persisted any shutdown info for this channel yet.
_, err = channel.ShutdownInfo()
require.Error(t, err, ErrNoShutdownInfo)

// Construct a new delivery script and create a new ShutdownInfo object.
script := []byte{1, 3, 4, 5}

// Create a ShutdownInfo struct.
shutdownInfo := NewShutdownInfo(script, locallyInitiated)

// Persist the shutdown info.
require.NoError(t, channel.MarkShutdownSent(shutdownInfo))

// We should now be able to retrieve the shutdown info.
info, err := channel.ShutdownInfo()
require.NoError(t, err)
require.True(t, info.IsSome())

// Assert that the decoded values of the shutdown info are correct.
info.WhenSome(func(info ShutdownInfo) {
require.EqualValues(t, script, info.DeliveryScript.Val)
require.Equal(t, locallyInitiated, info.LocalInitiator.Val)
})
}

// TestRefresh asserts that Refresh updates the in-memory state of another
// OpenChannel to reflect a preceding call to MarkOpen on a different
// OpenChannel.
Expand Down
5 changes: 5 additions & 0 deletions docs/release-notes/release-notes-0.18.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
a `shutdown` message if there were currently HTLCs on the channel. After this
change, the shutdown procedure should be compliant with BOLT2 requirements.

* If HTLCs are in-flight at the same time that a `shutdown` is sent and then
a re-connect happens before the coop-close is completed we now [ensure that
we re-init the `shutdown`
exchange](https://github.com/lightningnetwork/lnd/pull/8464)

* The AMP struct in payment hops will [now be populated](https://github.com/lightningnetwork/lnd/pull/7976) when the AMP TLV is set.

* [Add Taproot witness types
Expand Down
18 changes: 10 additions & 8 deletions htlcswitch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ type ChannelUpdateHandler interface {
MayAddOutgoingHtlc(lnwire.MilliSatoshi) error

// EnableAdds sets the ChannelUpdateHandler state to allow
// UpdateAddHtlc's in the specified direction. It returns an error if
// the state already allowed those adds.
EnableAdds(direction LinkDirection) error

// DiableAdds sets the ChannelUpdateHandler state to allow
// UpdateAddHtlc's in the specified direction. It returns an error if
// the state already disallowed those adds.
DisableAdds(direction LinkDirection) error
// UpdateAddHtlc's in the specified direction. It returns true if the
// state was changed and false if the desired state was already set
// before the method was called.
EnableAdds(direction LinkDirection) bool

// DisableAdds sets the ChannelUpdateHandler state to allow
// UpdateAddHtlc's in the specified direction. It returns true if the
// state was changed and false if the desired state was already set
// before the method was called.
DisableAdds(direction LinkDirection) bool

// IsFlushing returns true when UpdateAddHtlc's are disabled in the
// direction of the argument.
Expand Down
66 changes: 39 additions & 27 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/invoices"
Expand Down Expand Up @@ -271,6 +272,14 @@ type ChannelLinkConfig struct {
// GetAliases is used by the link and switch to fetch the set of
// aliases for a given link.
GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID

// PreviouslySentShutdown is an optional value that is set if, at the
// time of the link being started, persisted shutdown info was found for
// the channel. This value being set means that we previously sent a
// Shutdown message to our peer, and so we should do so again on
// re-establish and should not allow anymore HTLC adds on the outgoing
// direction of the link.
PreviouslySentShutdown fn.Option[lnwire.Shutdown]
}

// channelLink is the service which drives a channel's commitment update
Expand Down Expand Up @@ -618,41 +627,25 @@ func (l *channelLink) EligibleToUpdate() bool {
}

// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
// the specified direction. It returns an error if the state already allowed
// those adds.
func (l *channelLink) EnableAdds(linkDirection LinkDirection) error {
// the specified direction. It returns true if the state was changed and false
// if the desired state was already set before the method was called.
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
if linkDirection == Outgoing {
if !l.isOutgoingAddBlocked.Swap(false) {
return errors.New("outgoing adds already enabled")
}
}

if linkDirection == Incoming {
if !l.isIncomingAddBlocked.Swap(false) {
return errors.New("incoming adds already enabled")
}
return l.isOutgoingAddBlocked.Swap(false)
}

return nil
return l.isIncomingAddBlocked.Swap(false)
}

// DiableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
// the specified direction. It returns an error if the state already disallowed
// those adds.
func (l *channelLink) DisableAdds(linkDirection LinkDirection) error {
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
// the specified direction. It returns true if the state was changed and false
// if the desired state was already set before the method was called.
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
if linkDirection == Outgoing {
if l.isOutgoingAddBlocked.Swap(true) {
return errors.New("outgoing adds already disabled")
}
return !l.isOutgoingAddBlocked.Swap(true)
}

if linkDirection == Incoming {
if l.isIncomingAddBlocked.Swap(true) {
return errors.New("incoming adds already disabled")
}
}

return nil
return !l.isIncomingAddBlocked.Swap(true)
}

// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
Expand Down Expand Up @@ -1206,6 +1199,25 @@ func (l *channelLink) htlcManager() {
}
}

// If a shutdown message has previously been sent on this link, then we
// need to make sure that we have disabled any HTLC adds on the outgoing
// direction of the link and that we re-resend the same shutdown message
// that we previously sent.
l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
// Immediately disallow any new outgoing HTLCs.
if !l.DisableAdds(Outgoing) {
l.log.Warnf("Outgoing link adds already disabled")
}

// Re-send the shutdown message the peer. Since syncChanStates
// would have sent any outstanding CommitSig, it is fine for us
// to immediately queue the shutdown message now.
err := l.cfg.Peer.SendMessage(false, &shutdown)
if err != nil {
l.log.Warnf("Error sending shutdown message: %v", err)
}
})

// We've successfully reestablished the channel, mark it as such to
// allow the switch to forward HTLCs in the outbound direction.
l.markReestablished()
Expand Down

0 comments on commit 1627976

Please sign in to comment.