Skip to content

Commit

Permalink
funding: decouple funding wait from fundingLocked and chanAnn
Browse files Browse the repository at this point in the history
This commit decouples the wait for funding transaction confirmations
in the waitForFundingConfirmation function from the announcement of
the channel in the sendFundingLockedAndAnnounceChannel function.
Additionally, the sendFundingLockedAndAnnounceChannel function is
now decoupled into the sendFundingLocked and sendChannelAnnouncement
functions.
  • Loading branch information
nsa committed Sep 13, 2017
1 parent e5f3ee0 commit e004e10
Showing 1 changed file with 158 additions and 53 deletions.
211 changes: 158 additions & 53 deletions fundingmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,11 @@ func (f *fundingManager) Start() error {

f.localDiscoverySignals[chanID] = make(chan struct{})

doneChan := make(chan struct{})
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{})

go func(ch *channeldb.OpenChannel) {
go f.waitForFundingWithTimeout(ch, doneChan, timeoutChan)
go f.waitForFundingWithTimeout(ch, confChan, timeoutChan)

select {
case <-timeoutChan:
Expand All @@ -393,8 +393,37 @@ func (f *fundingManager) Start() error {
case <-f.quit:
// The fundingManager is shutting down, and will
// resume wait on startup.
case <-doneChan:
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("confChan was closed")
return
}
// Success, funding transaction was confirmed.

// With the channel marked open, we'll create the
// state-machine object which wraps the database
// state.
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, f.cfg.FeeEstimator, ch)
if err != nil {
fndgLog.Errorf("error creating new " +
"lightning channel: %v", err)
return
}
defer lnChannel.Stop()

err = f.sendFundingLocked(ch, lnChannel, shortChanID);
if err != nil {
fndgLog.Errorf("Failed to send " +
"fundingLocked: %v", err)
return
}
err = f.sendChannelAnnouncement(ch, lnChannel, shortChanID);
if err != nil {
fndgLog.Errorf("Failed to send channel " +
"announcement: %v", err)
return
}
}
}(channel)
}
Expand Down Expand Up @@ -439,8 +468,28 @@ func (f *fundingManager) Start() error {
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.sendFundingLockedAndAnnounceChannel(channel,
shortChanID)

lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, f.cfg.FeeEstimator, channel)
if err != nil {
fndgLog.Errorf("error creating new " +
"lightning channel: %v", err)
return
}
defer lnChannel.Stop()

err = f.sendFundingLocked(channel, lnChannel, shortChanID);
if err != nil {
fndgLog.Errorf("Failed to send " +
"fundingLocked: %v", err)
return
}
err = f.sendChannelAnnouncement(channel, lnChannel, shortChanID);
if err != nil {
fndgLog.Errorf("Failed to send channel" +
"announcement: %v", err)
return
}
}()

case fundingLockedSent:
Expand All @@ -458,8 +507,13 @@ func (f *fundingManager) Start() error {
}
defer lnChannel.Stop()

f.sendChannelAnnouncement(channel, lnChannel,
err = f.sendChannelAnnouncement(channel, lnChannel,
shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcement: %v", err)
return
}
}()

default:
Expand Down Expand Up @@ -1088,9 +1142,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// transaction in 288 blocks (~ 48 hrs), by canceling the reservation
// and canceling the wait for the funding confirmation.
go func() {
doneChan := make(chan struct{})
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{})
go f.waitForFundingWithTimeout(completeChan, doneChan,
go f.waitForFundingWithTimeout(completeChan, confChan,
timeoutChan)

select {
Expand All @@ -1100,11 +1154,39 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
deleteFromDatabase()
case <-f.quit:
// The fundingManager is shutting down, will resume
// wait for funding transaction on startup.
case <-doneChan:
// wait for funding transaction on startup.
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding confirmation failed")
return
}
// Success, funding transaction was confirmed.
f.deleteReservationCtx(peerKey,
fmsg.msg.PendingChannelID)

// With the channel marked open, we'll create the
// state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(nil, nil,
f.cfg.FeeEstimator, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning channel:" +
" %v", err)
return
}
defer lnChannel.Stop()

if err = f.sendFundingLocked(completeChan, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send fundingLocked: %v", err)
return
}

if err = f.sendChannelAnnouncement(completeChan, lnChannel,
shortChanID); err != nil {
fndgLog.Errorf("Failed to send channel " +
"announcement: %v", err)
return
}

f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
}
}()
}
Expand Down Expand Up @@ -1189,7 +1271,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
}

go func() {
doneChan := make(chan struct{})
confChan := make(chan *lnwire.ShortChannelID)
cancelChan := make(chan struct{})

// In case the fundingManager is stopped at some point during
Expand All @@ -1200,13 +1282,40 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
doneChan)
confChan)
}()

select {
case <-f.quit:
return
case <-doneChan:
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding confirmation failed")
return
}

// With the channel marked open, we'll create the state-machine object
// which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(nil, nil,
f.cfg.FeeEstimator, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning " +
"channel: %v", err)
return
}
defer lnChannel.Stop()

err = f.sendFundingLocked(completeChan, lnChannel, shortChanID);
if err != nil {
fndgLog.Errorf("Failed to send fundingLocked: %v", err)
return
}
err = f.sendChannelAnnouncement(completeChan, lnChannel, shortChanID);
if err != nil {
fndgLog.Errorf("Failed to send channel " +
"announcement: %v", err)
return
}
}

// Finally give the caller a final update notifying them that
Expand All @@ -1230,21 +1339,22 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that
// will cancel the wait for confirmation if maxWaitNumBlocksFundingConf has
// passed from bestHeight. In the case of timeout, the timeoutChan will be
// closed. In case of confirmation or error, doneChan will be closed.
// closed. In case of error, confChan will be closed. In case of success,
// a *lnwire.ShortChannelID will be passed to confChan.
func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel,
doneChan chan<- struct{}, timeoutChan chan<- struct{}) {
confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) {

epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn()
if err != nil {
fndgLog.Errorf("unable to register for epoch notification: %v",
err)
close(doneChan)
close(confChan)
return
}

defer epochClient.Cancel()

waitingDoneChan := make(chan struct{})
waitingConfChan := make(chan *lnwire.ShortChannelID)
cancelChan := make(chan struct{})

// Add this goroutine to wait group so we can be sure that it is
Expand All @@ -1253,7 +1363,7 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
waitingDoneChan)
waitingConfChan)
}()

// On block maxHeight we will cancel the funding confirmation wait.
Expand Down Expand Up @@ -1283,8 +1393,13 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
// The fundingManager is shutting down, will resume
// waiting for the funding transaction on startup.
return
case <-waitingDoneChan:
close(doneChan)
case shortChanID, ok := <-waitingConfChan:
if !ok {
fndgLog.Errorf("waitingConfChan was closed")
close(confChan)
return
}
confChan <- shortChanID
return
}
}
Expand All @@ -1295,11 +1410,12 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
// function of waitForFundingConfirmation is to wait for blockchain
// confirmation, and then to notify the other systems that must be notified
// when a channel has become active for lightning transactions.
// The wait can be canceled by closing the cancelChan.
// The wait can be canceled by closing the cancelChan. In case of success,
// a *lnwire.ShortChannelID will be passed to confChan.
func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.OpenChannel,
cancelChan <-chan struct{}, doneChan chan<- struct{}) {
cancelChan <-chan struct{}, confChan chan<- *lnwire.ShortChannelID) {

defer close(doneChan)
defer close(confChan)

// Register with the ChainNotifier for a notification once the funding
// transaction reaches `numConfs` confirmations.
Expand Down Expand Up @@ -1386,44 +1502,33 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
return
}

// Now that the funding transaction has the required number of
// confirmations, we send the fundingLocked message to the peer.
f.sendFundingLockedAndAnnounceChannel(completeChan, &shortChanID)
confChan <- &shortChanID
}

// sendFundingLockedAndAnnounceChannel creates and sends the fundingLocked
// message, and then the channel announcement. This should be called after the
// funding transaction has been confirmed, and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) {
// sendFundingLocked creates and sends the fundingLocked message.
// This should be called after the funding transaction has been confirmed,
// and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID) error {

chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)

// With the channel marked open, we'll create the state-machine object
// which wraps the database state.
channel, err := lnwallet.NewLightningChannel(nil, nil,
f.cfg.FeeEstimator, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning channel: %v", err)
return
}
defer channel.Stop()

// Next, we'll send over the funding locked message which marks that we
// consider the channel open by presenting the remote party with our
// next revocation key. Without the revocation key, the remote party
// will be unable to propose state transitions.
nextRevocation, err := channel.NextRevocationKey()
if err != nil {
fndgLog.Errorf("unable to create next revocation: %v", err)
return
return err
}
fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation)

err = f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg)
if err != nil {
fndgLog.Errorf("unable to send fundingLocked to peer: %v", err)
return
return err
}

// As the fundingLocked message is now sent to the peer, the channel is
Expand All @@ -1435,20 +1540,20 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
if err != nil {
fndgLog.Errorf("error setting channel state to "+
"fundingLockedSent: %v", err)
return
return err
}

// TODO(roasbeef): wait 6 blocks before announcing

f.sendChannelAnnouncement(completeChan, channel, shortChanID)
return nil
}

// sendChannelAnnouncement broadcast the necessary channel announcement
// messages to the network. Should be called after the fundingLocked message
// is sent (channelState is 'fundingLockedSent') and the channel is ready to
// be used.
func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) {
channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error {

// TODO(eugene) wait for 6 confirmations here

chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
fundingPoint := completeChan.FundingOutpoint
Expand All @@ -1463,7 +1568,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
*shortChanID, chanID)
if err != nil {
fndgLog.Errorf("channel announcement failed: %v", err)
return
return err
}

// After the channel is successfully announced from the
Expand All @@ -1474,7 +1579,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint)
if err != nil {
fndgLog.Errorf("error deleting channel state: %v", err)
return
return err
}

// Finally, as the local channel discovery has been fully processed,
Expand All @@ -1486,7 +1591,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
}
f.localDiscoveryMtx.Unlock()

return
return nil
}

// processFundingLocked sends a message to the fundingManager allowing it to
Expand Down

0 comments on commit e004e10

Please sign in to comment.