Skip to content

Commit

Permalink
Merge pull request #3721 from Roasbeef/complete-abandon-channel
Browse files Browse the repository at this point in the history
rpc+cnct: update AbandonChannel to also remove cnct and channel graph state
  • Loading branch information
Roasbeef committed Nov 21, 2019
2 parents 8755a35 + 32965fd commit 1ad6817
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 53 deletions.
48 changes: 48 additions & 0 deletions channeldb/db.go
Expand Up @@ -1094,6 +1094,54 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
return dedupedAddrs, nil
}

// AbandonChannel attempts to remove the target channel from the open channel
// database. If the channel was already removed (has a closed channel entry),
// then we'll return a nil error. Otherwise, we'll insert a new close summary
// into the database.
func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := d.FetchChannel(*chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.
case err == ErrChannelNotFound:
_, closedErr := d.FetchClosedChannel(chanPoint)
if closedErr != nil {
return closedErr
}

// If the channel was already closed, then we don't return an
// error as we'd like fro this step to be repeatable.
return nil
case err != nil:
return err
}

// Now that we've found the channel, we'll populate a close summary for
// the channel, so we can store as much information for this abounded
// channel as possible. We also ensure that we set Pending to false, to
// indicate that this channel has been "fully" closed.
summary := &ChannelCloseSummary{
CloseType: Abandoned,
ChanPoint: *chanPoint,
ChainHash: dbChan.ChainHash,
CloseHeight: bestHeight,
RemotePub: dbChan.IdentityPub,
Capacity: dbChan.Capacity,
SettledBalance: dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
ShortChanID: dbChan.ShortChanID(),
RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
RemoteNextRevocation: dbChan.RemoteNextRevocation,
LocalChanConfig: dbChan.LocalChanCfg,
}

// Finally, we'll close the channel in the DB, and return back to the
// caller.
return dbChan.CloseChannel(summary)
}

// syncVersions function is used for safe db version synchronization. It
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.
Expand Down
64 changes: 64 additions & 0 deletions channeldb/db_test.go
Expand Up @@ -469,3 +469,67 @@ func TestRestoreChannelShells(t *testing.T) {
t.Fatalf("only a single edge should be inserted: %v", err)
}
}

// TestAbandonChannel tests that the AbandonChannel method is able to properly
// remove a channel from the database and add a close channel summary. If
// called after a channel has already been removed, the method shouldn't return
// an error.
func TestAbandonChannel(t *testing.T) {
t.Parallel()

cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()

// If we attempt to abandon the state of a channel that doesn't exist
// in the open or closed channel bucket, then we should receive an
// error.
err = cdb.AbandonChannel(&wire.OutPoint{}, 0)
if err == nil {
t.Fatalf("removing non-existent channel should have failed")
}

// We'll now create a new channel to abandon shortly.
chanState, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
err = chanState.SyncPending(addr, 10)
if err != nil {
t.Fatalf("unable to sync pending channel: %v", err)
}

// We should now be able to abandon the channel without any errors.
closeHeight := uint32(11)
err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight)
if err != nil {
t.Fatalf("unable to abandon channel: %v", err)
}

// At this point, the channel should no longer be found in the set of
// open channels.
_, err = cdb.FetchChannel(chanState.FundingOutpoint)
if err != ErrChannelNotFound {
t.Fatalf("channel should not have been found: %v", err)
}

// However we should be able to retrieve a close channel summary for
// the channel.
_, err = cdb.FetchClosedChannel(&chanState.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch closed channel: %v", err)
}

// Finally, if we attempt to abandon the channel again, we should get a
// nil error as the channel has already been abandoned.
err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight)
if err != nil {
t.Fatalf("unable to abandon channel: %v", err)
}
}
56 changes: 36 additions & 20 deletions contractcourt/chain_arbitrator.go
Expand Up @@ -296,7 +296,7 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
}

arbCfg.MarkChannelResolved = func() error {
return c.resolveContract(chanPoint, chanLog)
return c.ResolveContract(chanPoint)
}

// Finally, we'll need to construct a series of htlc Sets based on all
Expand All @@ -321,11 +321,10 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
), nil
}

// resolveContract marks a contract as fully resolved within the database.
// ResolveContract marks a contract as fully resolved within the database.
// This is only to be done once all contracts which were live on the channel
// before hitting the chain have been resolved.
func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint,
arbLog ArbitratorLog) error {
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {

log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)

Expand All @@ -338,27 +337,44 @@ func (c *ChainArbitrator) resolveContract(chanPoint wire.OutPoint,
return err
}

if arbLog != nil {
// Once this has been marked as resolved, we'll wipe the log
// that the channel arbitrator was using to store its
// persistent state. We do this after marking the channel
// resolved, as otherwise, the arbitrator would be re-created,
// and think it was starting from the default state.
if err := arbLog.WipeHistory(); err != nil {
return err
}
}

// Now that the channel has been marked as fully closed, we'll stop
// both the channel arbitrator and chain watcher for this channel if
// they're still active.
var arbLog ArbitratorLog
c.Lock()
chainArb := c.activeChannels[chanPoint]
delete(c.activeChannels, chanPoint)

chainWatcher, ok := c.activeWatchers[chanPoint]
if ok {
chainWatcher.Stop()
}
chainWatcher := c.activeWatchers[chanPoint]
delete(c.activeWatchers, chanPoint)
c.Unlock()

if chainArb != nil {
arbLog = chainArb.log

if err := chainArb.Stop(); err != nil {
log.Warnf("unable to stop ChannelArbitrator(%v): %v",
chanPoint, err)
}
}
if chainWatcher != nil {
if err := chainWatcher.Stop(); err != nil {
log.Warnf("unable to stop ChainWatcher(%v): %v",
chanPoint, err)
}
}

// Once this has been marked as resolved, we'll wipe the log that the
// channel arbitrator was using to store its persistent state. We do
// this after marking the channel resolved, as otherwise, the
// arbitrator would be re-created, and think it was starting from the
// default state.
if arbLog != nil {
if err := arbLog.WipeHistory(); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -491,7 +507,7 @@ func (c *ChainArbitrator) Start() error {
return err
}
arbCfg.MarkChannelResolved = func() error {
return c.resolveContract(chanPoint, chanLog)
return c.ResolveContract(chanPoint)
}

// We can also leave off the set of HTLC's here as since the
Expand Down
102 changes: 102 additions & 0 deletions contractcourt/chain_arbitrator_test.go
Expand Up @@ -115,3 +115,105 @@ func TestChainArbitratorRepublishCommitment(t *testing.T) {
t.Fatalf("unexpected tx published")
}
}

// TestResolveContract tests that if we have an active channel being watched by
// the chain arb, then a call to ResolveContract will mark the channel as fully
// closed in the database, and also clean up all arbitrator state.
func TestResolveContract(t *testing.T) {
t.Parallel()

// To start with, we'll create a new temp DB for the duration of this
// test.
tempPath, err := ioutil.TempDir("", "testdb")
if err != nil {
t.Fatalf("unable to make temp dir: %v", err)
}
defer os.RemoveAll(tempPath)
db, err := channeldb.Open(tempPath)
if err != nil {
t.Fatalf("unable to open db: %v", err)
}
defer db.Close()

// With the DB created, we'll make a new channel, and mark it as
// pending open within the database.
newChannel, _, cleanup, err := lnwallet.CreateTestChannels(true)
if err != nil {
t.Fatalf("unable to make new test channel: %v", err)
}
defer cleanup()
channel := newChannel.State()
channel.Db = db
addr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18556,
}
if err := channel.SyncPending(addr, 101); err != nil {
t.Fatalf("unable to write channel to db: %v", err)
}

// With the channel inserted into the database, we'll now create a new
// chain arbitrator that should pick up these new channels and launch
// resolver for them.
chainArbCfg := ChainArbitratorConfig{
ChainIO: &mockChainIO{},
Notifier: &mockNotifier{},
PublishTx: func(tx *wire.MsgTx) error {
return nil
},
}
chainArb := NewChainArbitrator(
chainArbCfg, db,
)
if err := chainArb.Start(); err != nil {
t.Fatal(err)
}
defer func() {
if err := chainArb.Stop(); err != nil {
t.Fatal(err)
}
}()

channelArb := chainArb.activeChannels[channel.FundingOutpoint]

// While the resolver are active, we'll now remove the channel from the
// database (mark is as closed).
err = db.AbandonChannel(&channel.FundingOutpoint, 4)
if err != nil {
t.Fatalf("unable to remove channel: %v", err)
}

// With the channel removed, we'll now manually call ResolveContract.
// This stimulates needing to remove a channel from the chain arb due
// to any possible external consistency issues.
err = chainArb.ResolveContract(channel.FundingOutpoint)
if err != nil {
t.Fatalf("unable to resolve contract: %v", err)
}

// The shouldn't be an active chain watcher or channel arb for this
// channel.
if len(chainArb.activeChannels) != 0 {
t.Fatalf("expected zero active channels, instead have %v",
len(chainArb.activeChannels))
}
if len(chainArb.activeWatchers) != 0 {
t.Fatalf("expected zero active watchers, instead have %v",
len(chainArb.activeWatchers))
}

// At this point, the channel's arbitrator log should also be empty as
// well.
_, err = channelArb.log.FetchContractResolutions()
if err != errScopeBucketNoExist {
t.Fatalf("channel arb log state should have been "+
"removed: %v", err)
}

// If we attempt to call this method again, then we should get a nil
// error, as there is no more state to be cleaned up.
err = chainArb.ResolveContract(channel.FundingOutpoint)
if err != nil {
t.Fatalf("second resolve call shouldn't fail: %v", err)
}
}

0 comments on commit 1ad6817

Please sign in to comment.