diff --git a/fundingmanager.go b/fundingmanager.go index 24f2cdba8ea..5039f2af538 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -724,15 +724,27 @@ type pendingChansReq struct { // currently pending at the last state of the funding workflow. func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) { respChan := make(chan []*pendingChannel, 1) - errChan := make(chan error) + errChan := make(chan error, 1) req := &pendingChansReq{ resp: respChan, err: errChan, } - f.queries <- req - return <-respChan, <-errChan + select { + case f.queries <- req: + case <-f.quit: + return nil, fmt.Errorf("fundingmanager shutting down") + } + + select { + case resp := <-respChan: + return resp, nil + case err := <-errChan: + return nil, err + case <-f.quit: + return nil, fmt.Errorf("fundingmanager shutting down") + } } // CancelPeerReservations cancels all active reservations associated with the @@ -763,13 +775,7 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) { "node=%x: %v", nodePub[:], err) } - if resCtx.err != nil { - select { - case resCtx.err <- fmt.Errorf("peer disconnected"): - default: - } - } - + resCtx.err <- fmt.Errorf("peer disconnected") delete(nodeReservations, pendingID) } @@ -787,6 +793,20 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) { func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, tempChanID [32]byte, fundingErr error) { + fndgLog.Debugf("Failing funding flow for pendingID=%x: %v", + tempChanID, fundingErr) + + ctx, err := f.cancelReservationCtx(peer, tempChanID) + if err != nil { + fndgLog.Errorf("unable to cancel reservation: %v", err) + } + + // In case the case where the reservation existed, send the funding + // error on the error channel. + if ctx != nil { + ctx.err <- fundingErr + } + // We only send the exact error if it is part of out whitelisted set of // errors (lnwire.ErrorCode or lnwallet.ReservationError). var msg lnwire.ErrorData @@ -810,20 +830,11 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, Data: msg, } - fndgLog.Errorf("Failing funding flow: %v (%v)", fundingErr, - spew.Sdump(errMsg)) - - if _, err := f.cancelReservationCtx(peer, tempChanID); err != nil { - fndgLog.Errorf("unable to cancel reservation: %v", err) - } - - err := f.cfg.SendToPeer(peer, errMsg) - if err != nil { + fndgLog.Debugf("Sending funding error to peer (%x): %v", + peer.SerializeCompressed(), spew.Sdump(errMsg)) + if err := f.cfg.SendToPeer(peer, errMsg); err != nil { fndgLog.Errorf("unable to send error message to peer %v", err) - return } - - return } // reservationCoordinator is the primary goroutine tasked with progressing the @@ -880,7 +891,6 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { dbPendingChannels, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels() if err != nil { - msg.resp <- nil msg.err <- err return } @@ -898,7 +908,6 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { } msg.resp <- pendingChannels - msg.err <- nil } // processFundingOpen sends a message to the fundingManager allowing it to @@ -1216,7 +1225,6 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { fmsg.peerAddress.IdentityKey, err) f.failFundingFlow(fmsg.peerAddress.IdentityKey, msg.PendingChannelID, err) - resCtx.err <- err return } @@ -1261,7 +1269,6 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { fndgLog.Errorf("Unable to parse signature: %v", err) f.failFundingFlow(fmsg.peerAddress.IdentityKey, msg.PendingChannelID, err) - resCtx.err <- err return } err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated) @@ -1269,7 +1276,6 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { fndgLog.Errorf("Unable to send funding complete message: %v", err) f.failFundingFlow(fmsg.peerAddress.IdentityKey, msg.PendingChannelID, err) - resCtx.err <- err return } } @@ -1431,6 +1437,11 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { case <-timeoutChan: // We did not see the funding confirmation before // timeout, so we forget the channel. + err := fmt.Errorf("timeout waiting for funding tx "+ + "(%v) to confirm", completeChan.FundingOutpoint) + fndgLog.Warnf(err.Error()) + f.failFundingFlow(fmsg.peerAddress.IdentityKey, + pendingChanID, err) deleteFromDatabase() return case <-f.quit: @@ -1488,9 +1499,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { err := fmt.Errorf("Unable to find signed reservation for "+ "chan_id=%x", fmsg.msg.ChanID) fndgLog.Warnf(err.Error()) - // TODO: add ErrChanNotFound? f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + fmsg.msg.ChanID, err) return } @@ -1522,7 +1532,6 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig) if err != nil { fndgLog.Errorf("Unable to complete reservation sign complete: %v", err) - resCtx.err <- err f.failFundingFlow(fmsg.peerAddress.IdentityKey, pendingChanID, err) return @@ -2671,13 +2680,13 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { peerKey := fmsg.peerAddress.IdentityKey chanID := fmsg.err.ChanID - // First, we'll attempt to retrieve the funding workflow that this - // error was tied to. If we're unable to do so, then we'll exit early - // as this was an unwarranted error. - resCtx, err := f.getReservationCtx(peerKey, chanID) + // First, we'll attempt to retrieve and cancel the funding workflow + // that this error was tied to. If we're unable to do so, then we'll + // exit early as this was an unwarranted error. + resCtx, err := f.cancelReservationCtx(peerKey, chanID) if err != nil { fndgLog.Warnf("Received error for non-existent funding "+ - "flow: %v", spew.Sdump(protocolErr)) + "flow: %v (%v)", err, spew.Sdump(protocolErr)) return } @@ -2691,21 +2700,17 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { // If this isn't a simple error code, then we'll display the entire // thing. if len(protocolErr.Data) > 1 { - resCtx.err <- grpc.Errorf( + err = grpc.Errorf( lnErr.ToGrpcCode(), string(protocolErr.Data), ) } else { // Otherwise, we'll attempt to display just the error code // itself. - resCtx.err <- grpc.Errorf( + err = grpc.Errorf( lnErr.ToGrpcCode(), lnErr.String(), ) } - - if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil { - fndgLog.Warnf("unable to delete reservation: %v", err) - return - } + resCtx.err <- err } // pruneZombieReservations loops through all pending reservations and fails the @@ -2744,10 +2749,21 @@ func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, fndgLog.Infof("Cancelling funding reservation for node_key=%x, "+ "chan_id=%x", peerKey.SerializeCompressed(), pendingChanID[:]) - ctx, err := f.getReservationCtx(peerKey, pendingChanID) - if err != nil { - return nil, errors.Errorf("unable to find reservation: %v", - err) + peerIDKey := newSerializedKey(peerKey) + f.resMtx.Lock() + defer f.resMtx.Unlock() + + nodeReservations, ok := f.activeReservations[peerIDKey] + if !ok { + // No reservations for this node. + return nil, errors.Errorf("no active reservations for peer(%x)", + peerIDKey[:]) + } + + ctx, ok := nodeReservations[pendingChanID] + if !ok { + return nil, errors.Errorf("unknown channel (id: %x) for "+ + "peer(%x)", pendingChanID[:], peerIDKey[:]) } if err := ctx.reservation.Cancel(); err != nil { @@ -2755,7 +2771,13 @@ func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, err) } - f.deleteReservationCtx(peerKey, pendingChanID) + delete(nodeReservations, pendingChanID) + + // If this was the last active reservation for this peer, delete the + // peer's entry altogether. + if len(nodeReservations) == 0 { + delete(f.activeReservations, peerIDKey) + } return ctx, nil } @@ -2768,8 +2790,20 @@ func (f *fundingManager) deleteReservationCtx(peerKey *btcec.PublicKey, // channelManager? peerIDKey := newSerializedKey(peerKey) f.resMtx.Lock() - delete(f.activeReservations[peerIDKey], pendingChanID) - f.resMtx.Unlock() + defer f.resMtx.Unlock() + + nodeReservations, ok := f.activeReservations[peerIDKey] + if !ok { + // No reservations for this node. + return + } + delete(nodeReservations, pendingChanID) + + // If this was the last active reservation for this peer, delete the + // peer's entry altogether. + if len(nodeReservations) == 0 { + delete(f.activeReservations, peerIDKey) + } } // getReservationCtx returns the reservation context for a particular pending @@ -2783,8 +2817,8 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey, f.resMtx.RUnlock() if !ok { - return nil, errors.Errorf("unknown channel (id: %x)", - pendingChanID[:]) + return nil, errors.Errorf("unknown channel (id: %x) for "+ + "peer(%x)", pendingChanID[:], peerIDKey[:]) } return resCtx, nil diff --git a/fundingmanager_test.go b/fundingmanager_test.go index d74c2054f3a..a1f7686cd56 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -1447,6 +1447,9 @@ func TestFundingManagerFundingTimeout(t *testing.T) { Height: fundingBroadcastHeight + 288, } + // Bob should have sent an Error message to Alice. + assertErrorSent(t, bob.msgChan) + // Should not be pending anymore. assertNumPendingChannelsBecomes(t, bob, 0) } @@ -1511,6 +1514,9 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { // Since Alice was the initiator, the channel should not have timed out assertNumPendingChannelsRemains(t, alice, 1) + // Bob should have sent an Error message to Alice. + assertErrorSent(t, bob.msgChan) + // Since Bob was not the initiator, the channel should timeout assertNumPendingChannelsBecomes(t, bob, 0) } diff --git a/server.go b/server.go index 49027332eb1..080f2539dd9 100644 --- a/server.go +++ b/server.go @@ -2006,7 +2006,11 @@ func (s *server) OpenChannel(nodeKey *btcec.PublicKey, fundingFeePerVSize lnwallet.SatPerVByte, private bool, remoteCsvDelay uint16) (chan *lnrpc.OpenStatusUpdate, chan error) { - updateChan := make(chan *lnrpc.OpenStatusUpdate, 1) + // The updateChan will have a buffer of 2, since we expect a + // ChanPending + a ChanOpen update, and we want to make sure the + // funding process is not blocked if the caller is not reading the + // updates. + updateChan := make(chan *lnrpc.OpenStatusUpdate, 2) errChan := make(chan error, 1) var (