Skip to content

Commit

Permalink
Decoupled the sending of FundingLocked and channel announcement from …
Browse files Browse the repository at this point in the history
…each other and from waitForFundingConfirmation
  • Loading branch information
nsa committed Sep 13, 2017
1 parent 737eeed commit 14e9aed
Showing 1 changed file with 159 additions and 53 deletions.
212 changes: 159 additions & 53 deletions fundingmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,11 @@ func (f *fundingManager) Start() error {

f.localDiscoverySignals[chanID] = make(chan struct{})

doneChan := make(chan struct{})
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{})

go func(ch *channeldb.OpenChannel) {
go f.waitForFundingWithTimeout(ch, doneChan, timeoutChan)
go f.waitForFundingWithTimeout(ch, confChan, timeoutChan)

select {
case <-timeoutChan:
Expand All @@ -393,8 +393,37 @@ func (f *fundingManager) Start() error {
case <-f.quit:
// The fundingManager is shutting down, and will
// resume wait on startup.
case <-doneChan:
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("confChan was closed")
return
}
// Success, funding transaction was confirmed.

// With the channel marked open, we'll create the
// state-machine object which wraps the database
// state.
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, f.cfg.FeeEstimator, ch)
if err != nil {
fndgLog.Errorf("error creating new " +
"lightning channel: %v", err)
return
}
defer lnChannel.Stop()

if err = f.sendFundingLocked(ch, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send " +
"fundingLocked: %v", err)
return
}
if err = f.sendChannelAnnouncement(ch, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send channel " +
"announcement: %v", err)
return
}
}
}(channel)
}
Expand Down Expand Up @@ -439,8 +468,31 @@ func (f *fundingManager) Start() error {
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.sendFundingLockedAndAnnounceChannel(channel,
shortChanID)

// With the channel marked open, we'll create
// the state-machine object which wraps the
// database state.
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, f.cfg.FeeEstimator, channel)
if err != nil {
fndgLog.Errorf("error creating new " +
"lightning channel: %v", err)
return
}
defer lnChannel.Stop()

if err = f.sendFundingLocked(channel, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send " +
"fundingLocked: %v", err)
return
}
if err = f.sendChannelAnnouncement(channel,
lnChannel, shortChanID); err != nil {
fndgLog.Errorf("Failed to send channel" +
"announcement: %v", err)
return
}
}()

case fundingLockedSent:
Expand All @@ -458,8 +510,13 @@ func (f *fundingManager) Start() error {
}
defer lnChannel.Stop()

f.sendChannelAnnouncement(channel, lnChannel,
err = f.sendChannelAnnouncement(channel, lnChannel,
shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcement: %v", err)
return
}
}()

default:
Expand Down Expand Up @@ -1088,9 +1145,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// transaction in 288 blocks (~ 48 hrs), by canceling the reservation
// and canceling the wait for the funding confirmation.
go func() {
doneChan := make(chan struct{})
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{})
go f.waitForFundingWithTimeout(completeChan, doneChan,
go f.waitForFundingWithTimeout(completeChan, confChan,
timeoutChan)

select {
Expand All @@ -1100,11 +1157,39 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
deleteFromDatabase()
case <-f.quit:
// The fundingManager is shutting down, will resume
// wait for funding transaction on startup.
case <-doneChan:
// wait for funding transaction on startup.
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("confChan was closed")
return
}
// Success, funding transaction was confirmed.
f.deleteReservationCtx(peerKey,
fmsg.msg.PendingChannelID)

// With the channel marked open, we'll create the
// state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(nil, nil,
f.cfg.FeeEstimator, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning channel:" +
" %v", err)
return
}
defer lnChannel.Stop()

if err = f.sendFundingLocked(completeChan, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send fundingLocked: %v", err)
return
}

if err = f.sendChannelAnnouncement(completeChan, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send channel " +
"announcement: %v", err)
return
}

f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
}
}()
}
Expand Down Expand Up @@ -1189,7 +1274,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
}

go func() {
doneChan := make(chan struct{})
confChan := make(chan *lnwire.ShortChannelID)
cancelChan := make(chan struct{})

// In case the fundingManager is stopped at some point during
Expand All @@ -1200,13 +1285,40 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
doneChan)
confChan)
}()

select {
case <-f.quit:
return
case <-doneChan:
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("confChan was closed")
return
}

// With the channel marked open, we'll create the state-machine object
// which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(nil, nil,
f.cfg.FeeEstimator, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning " +
"channel: %v", err)
return
}
defer lnChannel.Stop()

if err = f.sendFundingLocked(completeChan, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send fundingLocked: %v", err)
return
}
if err = f.sendChannelAnnouncement(completeChan,
lnChannel, shortChanID); err != nil {
fndgLog.Errorf("Failed to send channel " +
"announcement: %v", err)
return
}
}

// Finally give the caller a final update notifying them that
Expand All @@ -1230,21 +1342,21 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that
// will cancel the wait for confirmation if maxWaitNumBlocksFundingConf has
// passed from bestHeight. In the case of timeout, the timeoutChan will be
// closed. In case of confirmation or error, doneChan will be closed.
// closed. In case of confirmation or error, confChan will be closed.
func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel,
doneChan chan<- struct{}, timeoutChan chan<- struct{}) {
confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) {

epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn()
if err != nil {
fndgLog.Errorf("unable to register for epoch notification: %v",
err)
close(doneChan)
close(confChan)
return
}

defer epochClient.Cancel()

waitingDoneChan := make(chan struct{})
waitingConfChan := make(chan *lnwire.ShortChannelID)
cancelChan := make(chan struct{})

// Add this goroutine to wait group so we can be sure that it is
Expand All @@ -1253,7 +1365,7 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
waitingDoneChan)
waitingConfChan)
}()

// On block maxHeight we will cancel the funding confirmation wait.
Expand Down Expand Up @@ -1283,8 +1395,13 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
// The fundingManager is shutting down, will resume
// waiting for the funding transaction on startup.
return
case <-waitingDoneChan:
close(doneChan)
case shortChanID, ok := <-waitingConfChan:
if !ok {
fndgLog.Errorf("waitingConfChan was closed")
close(confChan)
return
}
confChan <- shortChanID
return
}
}
Expand All @@ -1297,9 +1414,9 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
// when a channel has become active for lightning transactions.
// The wait can be canceled by closing the cancelChan.
func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.OpenChannel,
cancelChan <-chan struct{}, doneChan chan<- struct{}) {
cancelChan <-chan struct{}, confChan chan<- *lnwire.ShortChannelID) {

defer close(doneChan)
defer close(confChan)

// Register with the ChainNotifier for a notification once the funding
// transaction reaches `numConfs` confirmations.
Expand Down Expand Up @@ -1386,44 +1503,33 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
return
}

// Now that the funding transaction has the required number of
// confirmations, we send the fundingLocked message to the peer.
f.sendFundingLockedAndAnnounceChannel(completeChan, &shortChanID)
confChan <- &shortChanID
}

// sendFundingLockedAndAnnounceChannel creates and sends the fundingLocked
// message, and then the channel announcement. This should be called after the
// funding transaction has been confirmed, and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) {
// sendFundingLocked creates and sends the fundingLocked message.
// This should be called after the funding transaction has been confirmed,
// and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID) error {

chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)

// With the channel marked open, we'll create the state-machine object
// which wraps the database state.
channel, err := lnwallet.NewLightningChannel(nil, nil,
f.cfg.FeeEstimator, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning channel: %v", err)
return
}
defer channel.Stop()

// Next, we'll send over the funding locked message which marks that we
// consider the channel open by presenting the remote party with our
// next revocation key. Without the revocation key, the remote party
// will be unable to propose state transitions.
nextRevocation, err := channel.NextRevocationKey()
if err != nil {
fndgLog.Errorf("unable to create next revocation: %v", err)
return
return err
}
fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation)

err = f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg)
if err != nil {
fndgLog.Errorf("unable to send fundingLocked to peer: %v", err)
return
return err
}

// As the fundingLocked message is now sent to the peer, the channel is
Expand All @@ -1435,20 +1541,20 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
if err != nil {
fndgLog.Errorf("error setting channel state to "+
"fundingLockedSent: %v", err)
return
return err
}

// TODO(roasbeef): wait 6 blocks before announcing

f.sendChannelAnnouncement(completeChan, channel, shortChanID)
return nil
}

// sendChannelAnnouncement broadcast the necessary channel announcement
// messages to the network. Should be called after the fundingLocked message
// is sent (channelState is 'fundingLockedSent') and the channel is ready to
// be used.
func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) {
channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error {

// TODO(eugene) wait for 6 confirmations here

chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
fundingPoint := completeChan.FundingOutpoint
Expand All @@ -1463,7 +1569,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
*shortChanID, chanID)
if err != nil {
fndgLog.Errorf("channel announcement failed: %v", err)
return
return err
}

// After the channel is successfully announced from the
Expand All @@ -1474,7 +1580,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint)
if err != nil {
fndgLog.Errorf("error deleting channel state: %v", err)
return
return err
}

// Finally, as the local channel discovery has been fully processed,
Expand All @@ -1486,7 +1592,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
}
f.localDiscoveryMtx.Unlock()

return
return nil
}

// processFundingLocked sends a message to the fundingManager allowing it to
Expand Down Expand Up @@ -2105,4 +2211,4 @@ func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) err
}
return nil
})
}
}

0 comments on commit 14e9aed

Please sign in to comment.