diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 2c8bdeb4133..b8a5c6ecf9c 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2,12 +2,14 @@ package discovery import ( "bytes" + "encoding/binary" "fmt" "runtime" "sync" "sync/atomic" "time" + "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" @@ -20,6 +22,15 @@ import ( "github.com/roasbeef/btcd/wire" ) +var ( + // messageStoreKey is a key used to create a top level bucket in + // the gossiper database, used for storing messages that are to + // be sent to peers. Currently this is used for reliably sending + // AnnounceSignatures messages, by peristing them until a send + // operation has succeeded. + messageStoreKey = []byte("message-store") +) + // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { @@ -78,6 +89,11 @@ type Config struct { // messages to a particular peer identified by the target public key. SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error + // NotifyWhenOnline is a function that allows the gossiper to be + // notified when a certain peer comes online, allowing it to + // retry sending a peer message. + NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 @@ -327,6 +343,14 @@ func (d *AuthenticatedGossiper) Start() error { } d.bestHeight = height + // In case we had an AnnounceSignatures ready to be sent when the + // gossiper was last shut down, we must continue on our quest to + // deliver this message to our peer such that they can craft the + // full channel proof. + if err := d.resendAnnounceSignatures(); err != nil { + return err + } + d.wg.Add(1) go d.networkHandler() @@ -526,6 +550,136 @@ func (d *deDupedAnnouncements) Emit() []lnwire.Message { return announcements } +// resendAnnounceSignatures will inspect the messageStore database +// bucket for AnnounceSignatures messages that we recently tried +// to send to a peer. If the associated channels still not have the +// full channel proofs assembled, we will try to resend them. If +// we have the full proof, we can safely delete the message from +// the messageStore. +func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { + type msgTuple struct { + peer *btcec.PublicKey + msg *lnwire.AnnounceSignatures + dbKey []byte + } + + // Fetch all the AnnounceSignatures messages that was added + // to the database. + // TODO(halseth): database access should be abstracted + // behind interface. + var msgsResend []msgTuple + if err := d.cfg.DB.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(messageStoreKey) + if bucket == nil { + return nil + } + + // Iterate over each message added to the database. + if err := bucket.ForEach(func(k, v []byte) error { + // The database value represents the encoded + // AnnounceSignatures message. + r := bytes.NewReader(v) + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(r, 0); err != nil { + return err + } + + // The first 33 bytes of the database key is + // the peer's public key. + peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) + if err != nil { + return err + } + t := msgTuple{peer, msg, k} + + // Add the message to the slice, such that we + // can resend it after the database transaction + // is over. + msgsResend = append(msgsResend, t) + return nil + }); err != nil { + return err + } + return nil + }); err != nil { + return err + } + + // deleteMsg removes the message associated with the passed + // msgTuple from the messageStore. + deleteMsg := func(t msgTuple) error { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", t.msg.ChannelID) + if err := d.cfg.DB.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(messageStoreKey) + if bucket == nil { + return fmt.Errorf("bucket " + + "unexpectedly did not exist") + } + + return bucket.Delete(t.dbKey[:]) + }); err != nil { + return fmt.Errorf("Failed deleting message "+ + "from database: %v", err) + } + return nil + } + + // We now iterate over these messages, resending those that we + // don't have the full proof for, deleting the rest. + for _, t := range msgsResend { + // Check if the full channel proof exists in our graph. + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + t.msg.ShortChannelID) + if err != nil { + // If the channel cannot be found, it is most likely + // a leftover message for a channel that was closed. + // In this case we delete it from the message store. + log.Warnf("unable to fetch channel info for "+ + "chanID=%v from graph: %v. Will delete local"+ + "proof from database", + t.msg.ChannelID, err) + if err := deleteMsg(t); err != nil { + return err + } + continue + } + + // 1. If the full proof does not exist in the graph, + // it means that we haven't received the remote proof + // yet (or that we crashed before able to assemble the + // full proof). Since the remote node might think they + // have delivered their proof to us, we will resend + // _our_ proof to trigger a resend on their part: + // they will then be able to assemble and send us the + // full proof. + if chanInfo.AuthProof == nil { + err := d.sendAnnSigReliably(t.msg, t.peer) + if err != nil { + return err + } + continue + } + + // 2. If the proof does exist in the graph, we have + // successfully received the remote proof and assembled + // the full proof. In this case we can safely delete the + // local proof from the database. In case the remote + // hasn't been able to assemble the full proof yet + // (maybe because of a crash), we will send them the full + // proof if we notice that they retry sending their half + // proof. + if chanInfo.AuthProof != nil { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", t.msg.ChannelID) + if err := deleteMsg(t); err != nil { + return err + } + } + } + return nil +} + // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically @@ -1010,8 +1164,78 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { + // The edge will get rejected if we already + // added the same edge without AuthProof to the + // graph. If the received announcement contains + // a proof, we can add this proof to our edge. + // We can end up in this situatation in the case + // where we create a channel, but for some + // reason fail to receive the remote peer's + // proof, while the remote peer is able to fully + // assemble the proof and craft the + // ChannelAnnouncement. + // TODO(halseth): the following chunk of code + // should be moved into own method, indentation + // and readability is not exactly on point. + chanInfo, e1, e2, err2 := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) + if err2 != nil { + log.Errorf("Failed fetching channel "+ + "edge: %v", err2) + nMsg.err <- err2 + return nil + } + + // If the edge already exists in the graph, but + // has no proof attached, we can add that now. + if chanInfo.AuthProof == nil && proof != nil { + chanAnn, e1Ann, e2Ann := + createChanAnnouncement(proof, + chanInfo, e1, e2) + + // Validate the assembled proof. + err := ValidateChannelAnn(chanAnn) + if err != nil { + err := errors.Errorf("assembled"+ + "channel announcement "+ + "proof for shortChanID=%v"+ + " isn't valid: %v", + msg.ShortChannelID, err) + + log.Error(err) + nMsg.err <- err + return nil + } + err = d.cfg.Router.AddProof( + msg.ShortChannelID, proof) + if err != nil { + err := errors.Errorf("unable "+ + "add proof to "+ + "shortChanID=%v: %v", + msg.ShortChannelID, err) + log.Error(err) + nMsg.err <- err + return nil + } + announcements = append(announcements, + chanAnn) + if e1Ann != nil { + announcements = append( + announcements, e1Ann) + } + if e2Ann != nil { + announcements = append( + announcements, e2Ann) + } + + nMsg.err <- nil + return announcements + } + + // If not, this was just an outdated edge. log.Debugf("Router rejected channel edge: %v", err) + } else { log.Errorf("Router rejected channel edge: %v", err) @@ -1283,7 +1507,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // Ensure that we know of a channel with the target channel ID // before proceeding further. - chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) if err != nil { // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. @@ -1320,6 +1545,72 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } + // If proof was sent by a local sub-system, then we'll + // send the announcement signature to the remote node + // so they can also reconstruct the full channel + // announcement. + if !nMsg.isRemote { + var remotePeer *btcec.PublicKey + if isFirstNode { + remotePeer = chanInfo.NodeKey2 + } else { + remotePeer = chanInfo.NodeKey1 + } + // Since the remote peer might not be online + // we'll call a method that will attempt to + // deliver the proof when it comes online. + if err := d.sendAnnSigReliably(msg, remotePeer); err != nil { + err := errors.Errorf("unable to send reliably "+ + "to remote for short_chan_id=%v: %v", + shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil + } + } + + // Check if we already have the full proof for this channel. + if chanInfo.AuthProof != nil { + // If we already have the fully assembled proof, then + // the peer sending us their proof has probably not + // received our local proof yet. So be kind and send + // them the full proof. + if nMsg.isRemote { + peerID := nMsg.peer.SerializeCompressed() + log.Debugf("Got AnnounceSignatures for " + + "channel with full proof.") + + d.wg.Add(1) + go func() { + defer d.wg.Done() + log.Debugf("Received half proof for "+ + "channel %v with existing "+ + "full proof. Sending full "+ + "proof to peer=%x", + msg.ChannelID, + peerID) + + chanAnn, _, _ := createChanAnnouncement( + chanInfo.AuthProof, chanInfo, e1, e2) + err := d.cfg.SendToPeer(nMsg.peer, chanAnn) + if err != nil { + log.Errorf("Failed sending "+ + "full proof to "+ + "peer=%x: %v", + peerID, err) + return + } + log.Debugf("Full proof sent to peer=%x"+ + " for chanID=%v", peerID, msg.ChannelID) + }() + } + + log.Debugf("Already have proof for channel "+ + "with chanID=%v", msg.ChannelID) + nMsg.err <- nil + return nil + } + // Check that we received the opposite proof. If so, then we're // now able to construct the full proof, and create the channel // announcement. If we didn't receive the opposite half of the @@ -1346,33 +1637,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // If proof was sent by a local sub-system, then we'll - // send the announcement signature to the remote node - // so they can also reconstruct the full channel - // announcement. - if !nMsg.isRemote { - // Check that first node of the channel info - // corresponds to us. - var remotePeer *btcec.PublicKey - if isFirstNode { - remotePeer = chanInfo.NodeKey2 - } else { - remotePeer = chanInfo.NodeKey1 - } - - err := d.cfg.SendToPeer(remotePeer, msg) - if err != nil { - log.Errorf("unable to send "+ - "announcement message to peer: %x", - remotePeer.SerializeCompressed()) - } - - log.Infof("Sent channel announcement proof "+ - "for short_chan_id=%v to remote peer: "+ - "%x", shortChanID, - remotePeer.SerializeCompressed()) - } - log.Infof("1/2 of channel ann proof received for "+ "short_chan_id=%v, waiting for other half", shortChanID) @@ -1381,9 +1645,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // If we now have both halves of the channel announcement - // proof, then we'll reconstruct the initial announcement so we - // can validate it shortly below. + // We now have both halves of the channel announcement proof, + // then we'll reconstruct the initial announcement so we can + // validate it shortly below. var dbProof channeldb.ChannelAuthProof if isFirstNode { dbProof.NodeSig1 = msg.NodeSignature @@ -1450,26 +1714,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l announcements = append(announcements, e2Ann) } - // If this a local announcement, then we'll send it to the - // remote side so they can reconstruct the full channel - // announcement proof. - if !nMsg.isRemote { - var remotePeer *btcec.PublicKey - if isFirstNode { - remotePeer = chanInfo.NodeKey2 - } else { - remotePeer = chanInfo.NodeKey1 - } - - log.Debugf("Sending local AnnounceSignatures message "+ - "to peer(%x)", remotePeer.SerializeCompressed()) - if err = d.cfg.SendToPeer(remotePeer, msg); err != nil { - log.Errorf("unable to send announcement "+ - "message to peer: %x", - remotePeer.SerializeCompressed()) - } - } - nMsg.err <- nil return announcements @@ -1479,6 +1723,90 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l } } +// sendAnnSigReliably will try to send the provided local AnnounceSignatures +// to the remote peer, waiting for it to come online if necessary. This +// method returns after adding the message to persistent storage, such +// that the caller knows that the message will be delivered at one point. +func (d *AuthenticatedGossiper) sendAnnSigReliably( + msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { + // We first add this message to the database, such that in case + // we do not succeed in sending it to the peer, we'll fetch it + // from the DB next time we start, and retry. We use the peer ID + // + shortChannelID as key, as there possibly is more than one + // channel oepning in progress to the same peer. + var key [41]byte + copy(key[:33], remotePeer.SerializeCompressed()) + binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64()) + + err := d.cfg.DB.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(messageStoreKey) + if err != nil { + return err + } + + // Encode the AnnounceSignatures message. + var b bytes.Buffer + if err := msg.Encode(&b, 0); err != nil { + return err + } + + // Add the encoded message to the database using the peer + // + shortChanID as key. + return bucket.Put(key[:], b.Bytes()) + + }) + if err != nil { + return err + } + + // We have succeeded adding the message to the database. We now launch + // a goroutine that will keep on trying sending the message to the + // remote peer until it succeeds, or the gossiper shuts down. In case + // of success, the message will be removed from the database. + d.wg.Add(1) + go func() { + defer d.wg.Done() + for { + log.Debugf("Sending AnnounceSignatures for channel "+ + "%v to remote peer %x", msg.ChannelID, + remotePeer.SerializeCompressed()) + err := d.cfg.SendToPeer(remotePeer, msg) + if err == nil { + // Sending succeeded, we can + // continue the flow. + break + } + + log.Errorf("unable to send AnnounceSignatures message "+ + "to peer(%x): %v. Will retry when online.", + remotePeer.SerializeCompressed(), err) + + connected := make(chan struct{}) + d.cfg.NotifyWhenOnline(remotePeer, connected) + + select { + case <-connected: + log.Infof("peer %x reconnected. Retry sending" + + " AnnounceSignatures.") + // Retry sending. + case <-d.quit: + log.Infof("Gossiper shutting down, did not send" + + " AnnounceSignatures.") + return + } + } + + log.Infof("Sent channel announcement proof to remote peer: %x", + remotePeer.SerializeCompressed()) + }() + + // This method returns after the message has been added to the database, + // such that the caller don't have to wait until the message is actually + // delivered, but can be assured that it will be delivered eventually + // when this method returns. + return nil +} + // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 426147c0264..ecff2b8c3e6 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -155,6 +155,11 @@ func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) { func (r *mockGraphSource) AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error { + info, ok := r.infos[chanID.ToUint64()] + if !ok { + return errors.New("channel does not exist") + } + info.AuthProof = proof return nil } @@ -341,7 +346,8 @@ func createNodeAnnouncement(priv *btcec.PrivateKey) (*lnwire.NodeAnnouncement, } signer := mockSigner{priv} - if a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a); err != nil { + a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a) + if err != nil { return nil, err } @@ -498,7 +504,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create node announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -521,7 +531,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create channel announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -544,7 +558,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create update announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -641,8 +659,8 @@ func TestPrematureAnnouncement(t *testing.T) { } } -// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper properly -// processes partial and fully announcement signatures message. +// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper +// properly processes partial and fully announcement signatures message. func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Parallel() @@ -674,7 +692,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -684,10 +707,16 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -706,7 +735,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Fatal("gossiper did not send channel update to peer") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -718,7 +752,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -743,7 +782,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Fatal("wrong number of objects in storage") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -783,7 +827,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { defer cleanup() // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. + // sent directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { select { @@ -805,8 +849,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process, in // this case the announcement should be added in the orphan batch - // because we haven't announced the channel yet. - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + // because we haven't announce the channel yet. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to proceed announcement: %v", err) } @@ -827,7 +876,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { t.Fatalf("unable to process: %v", err) } @@ -838,10 +893,16 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -860,7 +921,12 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { t.Fatal("gossiper did not send channel update to peer") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } @@ -872,11 +938,28 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // After that we process local announcement, and waiting to receive // the channel announcement. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } + // The local proof should be sent to the remote peer. + select { + case msg := <-sentMsgs: + if msg != batch.localProofAnn { + t.Fatalf("expected local proof to be sent, got %v", msg) + } + case <-time.After(2 * time.Second): + t.Fatalf("local proof was not sent to peer") + } + + // And since both remote and local announcements are processed, we + // should be broadcasting the final channel announcements. for i := 0; i < 3; i++ { select { case <-ctx.broadcastedMessage: @@ -886,6 +969,133 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(p *channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatalf("wrong number of objects in storage: %v", number) + } +} + +// Test that sending AnnounceSignatures to remote peer will continue +// to be tried until the peer comes online. +func TestSignatureAnnouncementRetry(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Make the SendToPeer fail, simulating the peer being offline. + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + + // We expect the gossiper to register for a notification when the peer + // comes back online, so keep track of the channel it wants to get + // notified on. + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // Since sending this local announcement proof to the remote will fail, + // the gossiper should register for a notification when the remote is + // online again. + var conChan chan<- struct{} + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + // Since both proofs are not yet exchanged, no message should be + // broadcasted yet. + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 if err := ctx.gossiper.waitingProofs.ForAll( func(*channeldb.WaitingProof) error { number++ @@ -895,9 +1105,447 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { t.Fatalf("unable to retrieve objects from store: %v", err) } + if number != 1 { + t.Fatal("wrong number of objects in storage") + } + + // When the peer comes online, the gossiper gets notified, and should + // retry sending the AnnnounceSignatures. We make the SendToPeer + // method work again. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + sentToPeer <- msg[0] + return nil + } + + // Notify that peer is now online. THis should trigger a new call + // to SendToPeer. + close(conChan) + + select { + case <-sentToPeer: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer came online") + } + + // Now give the gossiper the remote proof. This should trigger a + // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } +} + +// Test that if we restart the gossiper, it will retry sending the +// AnnounceSignatures to the peer if it did not succeed before +// shutting down, and the full channel proof is not yet assembled. +func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Make the SendToPeerFail, simulating the peer being offline. + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // Since sending to the remote peer will fail, the gossiper should + // register for a notification when it comes back online. + var conChan chan<- struct{} + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 1 { t.Fatal("wrong number of objects in storage") } + + // Shut down gossiper, and restart. This should trigger a new attempt + // to send the message to the peer. + ctx.gossiper.Stop() + gossiper, err := New(Config{ + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + SendToPeer: func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + }, + NotifyWhenOnline: func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + }, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + DB: ctx.gossiper.cfg.DB, + }, ctx.gossiper.selfKey) + if err != nil { + t.Fatalf("unable to recreate gossiper: %v", err) + } + if err := gossiper.Start(); err != nil { + t.Fatalf("unable to start recreated gossiper: %v", err) + } + defer gossiper.Stop() + + ctx.gossiper = gossiper + + // After starting up, the gossiper will see that it has a waitingproof + // in the database, and will retry sending its part to the remote. Since + // SendToPeer will fail again, it should register for a notification + // when the peer comes online. + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + // Fix the SendToPeer method. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + select { + case sentToPeer <- msg[0]: + case <-ctx.gossiper.quit: + return fmt.Errorf("shutting down") + } + + return nil + } + // Notify that peer is now online. This should trigger a new call + // to SendToPeer. + close(conChan) + + select { + case <-sentToPeer: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer came online") + } + + // Now exchanging the remote channel proof, the channel annoncement + // broadcast should continue as normal. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } +} + +// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a +// remote proof is received when we already have the full proof, +// the gossiper will send the full proof (ChannelAnnouncement) to +// the remote peer. +func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + select { + case <-ctx.gossiper.quit: + return fmt.Errorf("gossiper shutting down") + case sentToPeer <- msg[0]: + } + return nil + } + + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // We expect the gossiper to send this message to the remote peer. + select { + case msg := <-sentToPeer: + if msg != batch.localProofAnn { + t.Fatalf("wrong message sent to peer: %v", msg) + } + case <-time.After(2 * time.Second): + t.Fatal("did not send local proof to peer") + } + + // And all channel announcements should be broadcast. + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } + + // Now give the gossiper the remote proof yet again. This should + // trigger a send of the full ChannelAnnouncement. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // We expect the gossiper to send this message to the remote peer. + select { + case msg := <-sentToPeer: + _, ok := msg.(*lnwire.ChannelAnnouncement) + if !ok { + t.Fatalf("expected ChannelAnnouncement, intead got %T", msg) + } + case <-time.After(2 * time.Second): + t.Fatal("did not send local proof to peer") + } + } // TestDeDuplicatedAnnouncements ensures that the deDupedAnnouncements struct diff --git a/peer.go b/peer.go index 5518d4e8c87..33cee2aec85 100644 --- a/peer.go +++ b/peer.go @@ -48,8 +48,8 @@ const ( // a buffered channel which will be sent upon once the write is complete. This // buffered channel acts as a semaphore to be used for synchronization purposes. type outgoinMsg struct { - msg lnwire.Message - sentChan chan struct{} // MUST be buffered. + msg lnwire.Message + errChan chan error // MUST be buffered. } // newChannelMsg packages a lnwallet.LightningChannel with a channel that @@ -1027,8 +1027,8 @@ out: // callers to optionally synchronize sends with the // writeHandler. err := p.writeMessage(outMsg.msg) - if outMsg.sentChan != nil { - close(outMsg.sentChan) + if outMsg.errChan != nil { + outMsg.errChan <- err } if err != nil { @@ -1122,12 +1122,17 @@ func (p *peer) PingTime() int64 { } // queueMsg queues a new lnwire.Message to be eventually sent out on the -// wire. -func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { +// wire. It returns an error if we failed to queue the message. An error +// is sent on errChan if the message fails being sent to the peer, or +// nil otherwise. +func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) { select { - case p.outgoingQueue <- outgoinMsg{msg, doneChan}: + case p.outgoingQueue <- outgoinMsg{msg, errChan}: case <-p.quit: - return + peerLog.Debugf("Peer shutting down, could not enqueue msg.") + if errChan != nil { + errChan <- fmt.Errorf("peer shutting down") + } } } diff --git a/server.go b/server.go index 58c1cdda459..ab049366f89 100644 --- a/server.go +++ b/server.go @@ -302,6 +302,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, ChainHash: *activeNetParams.GenesisHash, Broadcast: s.BroadcastMessage, SendToPeer: s.SendToPeer, + NotifyWhenOnline: s.NotifyWhenOnline, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitDelay: time.Minute * 30, @@ -950,15 +951,27 @@ func (s *server) sendToPeer(target *btcec.PublicKey, return err } - s.sendPeerMessages(targetPeer, msgs, nil) - + // Send messages to the peer and return any error from + // sending a message. + errChans := s.sendPeerMessages(targetPeer, msgs, nil) + for _, errChan := range errChans { + select { + case err := <-errChan: + return err + case <-s.quit: + return ErrServerShuttingDown + } + } return nil } // sendPeerMessages enqueues a list of messages into the outgoingQueue of the // `targetPeer`. This method supports additional broadcast-level // synchronization by using the additional `wg` to coordinate a particular -// broadcast. +// broadcast. Since this method will wait for the return error from sending +// each message, it should be run as a goroutine (see comment below) and +// the error ignored if used for broadcasting messages, where the result +// from sending the messages is not of importance. // // NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a // go routine--both `wg` and the server's WaitGroup should be incremented @@ -968,7 +981,7 @@ func (s *server) sendToPeer(target *btcec.PublicKey, func (s *server) sendPeerMessages( targetPeer *peer, msgs []lnwire.Message, - wg *sync.WaitGroup) { + wg *sync.WaitGroup) []chan error { // If a WaitGroup is provided, we assume that this method was spawned // as a go routine, and that it is being tracked by both the server's @@ -981,9 +994,17 @@ func (s *server) sendPeerMessages( defer wg.Done() } + // We queue each message, creating a slice of error channels that + // can be inspected after every message is successfully added to + // the queue. + var errChans []chan error for _, msg := range msgs { - targetPeer.queueMsg(msg, nil) + errChan := make(chan error, 1) + targetPeer.queueMsg(msg, errChan) + errChans = append(errChans, errChan) } + + return errChans } // FindPeer will return the peer that corresponds to the passed in public key.