From 421aef44d218e5d7eeb92daf32a014ac31cf2992 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 15 Nov 2017 18:23:46 -0800 Subject: [PATCH 1/5] peer: add error chan to queueMsg --- peer.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/peer.go b/peer.go index 5518d4e8c87..33cee2aec85 100644 --- a/peer.go +++ b/peer.go @@ -48,8 +48,8 @@ const ( // a buffered channel which will be sent upon once the write is complete. This // buffered channel acts as a semaphore to be used for synchronization purposes. type outgoinMsg struct { - msg lnwire.Message - sentChan chan struct{} // MUST be buffered. + msg lnwire.Message + errChan chan error // MUST be buffered. } // newChannelMsg packages a lnwallet.LightningChannel with a channel that @@ -1027,8 +1027,8 @@ out: // callers to optionally synchronize sends with the // writeHandler. err := p.writeMessage(outMsg.msg) - if outMsg.sentChan != nil { - close(outMsg.sentChan) + if outMsg.errChan != nil { + outMsg.errChan <- err } if err != nil { @@ -1122,12 +1122,17 @@ func (p *peer) PingTime() int64 { } // queueMsg queues a new lnwire.Message to be eventually sent out on the -// wire. -func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { +// wire. It returns an error if we failed to queue the message. An error +// is sent on errChan if the message fails being sent to the peer, or +// nil otherwise. +func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) { select { - case p.outgoingQueue <- outgoinMsg{msg, doneChan}: + case p.outgoingQueue <- outgoinMsg{msg, errChan}: case <-p.quit: - return + peerLog.Debugf("Peer shutting down, could not enqueue msg.") + if errChan != nil { + errChan <- fmt.Errorf("peer shutting down") + } } } From da1a850c13e4ca84f8aef59ca84105b8e5a84f58 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 15 Nov 2017 18:24:59 -0800 Subject: [PATCH 2/5] server: return error from sendPeerMessages This commit adds a return error to sendPeerMessages, making it possible to observe if a message sent to a peer fails or succeeds. --- server.go | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/server.go b/server.go index 58c1cdda459..50e2f4e8ae7 100644 --- a/server.go +++ b/server.go @@ -950,15 +950,27 @@ func (s *server) sendToPeer(target *btcec.PublicKey, return err } - s.sendPeerMessages(targetPeer, msgs, nil) - + // Send messages to the peer and return any error from + // sending a message. + errChans := s.sendPeerMessages(targetPeer, msgs, nil) + for _, errChan := range errChans { + select { + case err := <-errChan: + return err + case <-s.quit: + return ErrServerShuttingDown + } + } return nil } // sendPeerMessages enqueues a list of messages into the outgoingQueue of the // `targetPeer`. This method supports additional broadcast-level // synchronization by using the additional `wg` to coordinate a particular -// broadcast. +// broadcast. Since this method will wait for the return error from sending +// each message, it should be run as a goroutine (see comment below) and +// the error ignored if used for broadcasting messages, where the result +// from sending the messages is not of importance. // // NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a // go routine--both `wg` and the server's WaitGroup should be incremented @@ -968,7 +980,7 @@ func (s *server) sendToPeer(target *btcec.PublicKey, func (s *server) sendPeerMessages( targetPeer *peer, msgs []lnwire.Message, - wg *sync.WaitGroup) { + wg *sync.WaitGroup) []chan error { // If a WaitGroup is provided, we assume that this method was spawned // as a go routine, and that it is being tracked by both the server's @@ -981,9 +993,17 @@ func (s *server) sendPeerMessages( defer wg.Done() } + // We queue each message, creating a slice of error channels that + // can be inspected after every message is successfully added to + // the queue. + var errChans []chan error for _, msg := range msgs { - targetPeer.queueMsg(msg, nil) + errChan := make(chan error, 1) + targetPeer.queueMsg(msg, errChan) + errChans = append(errChans, errChan) } + + return errChans } // FindPeer will return the peer that corresponds to the passed in public key. From 953dae0b1032c85c416c72d9e91b5e783d88be4c Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 17 Nov 2017 19:21:50 -0800 Subject: [PATCH 3/5] discovery: make gossiper able to resend failed AnnounceSignatures This commit makes the gossiper track the state of a local AnnounceSignature message, such that it can retry sending it to the remote peer if needed. It will also persist this state in the WaitingProofStore, such that it can resume from this state at startup. --- discovery/gossiper.go | 430 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 379 insertions(+), 51 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 2c8bdeb4133..b8a5c6ecf9c 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2,12 +2,14 @@ package discovery import ( "bytes" + "encoding/binary" "fmt" "runtime" "sync" "sync/atomic" "time" + "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" @@ -20,6 +22,15 @@ import ( "github.com/roasbeef/btcd/wire" ) +var ( + // messageStoreKey is a key used to create a top level bucket in + // the gossiper database, used for storing messages that are to + // be sent to peers. Currently this is used for reliably sending + // AnnounceSignatures messages, by peristing them until a send + // operation has succeeded. + messageStoreKey = []byte("message-store") +) + // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { @@ -78,6 +89,11 @@ type Config struct { // messages to a particular peer identified by the target public key. SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error + // NotifyWhenOnline is a function that allows the gossiper to be + // notified when a certain peer comes online, allowing it to + // retry sending a peer message. + NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 @@ -327,6 +343,14 @@ func (d *AuthenticatedGossiper) Start() error { } d.bestHeight = height + // In case we had an AnnounceSignatures ready to be sent when the + // gossiper was last shut down, we must continue on our quest to + // deliver this message to our peer such that they can craft the + // full channel proof. + if err := d.resendAnnounceSignatures(); err != nil { + return err + } + d.wg.Add(1) go d.networkHandler() @@ -526,6 +550,136 @@ func (d *deDupedAnnouncements) Emit() []lnwire.Message { return announcements } +// resendAnnounceSignatures will inspect the messageStore database +// bucket for AnnounceSignatures messages that we recently tried +// to send to a peer. If the associated channels still not have the +// full channel proofs assembled, we will try to resend them. If +// we have the full proof, we can safely delete the message from +// the messageStore. +func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { + type msgTuple struct { + peer *btcec.PublicKey + msg *lnwire.AnnounceSignatures + dbKey []byte + } + + // Fetch all the AnnounceSignatures messages that was added + // to the database. + // TODO(halseth): database access should be abstracted + // behind interface. + var msgsResend []msgTuple + if err := d.cfg.DB.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(messageStoreKey) + if bucket == nil { + return nil + } + + // Iterate over each message added to the database. + if err := bucket.ForEach(func(k, v []byte) error { + // The database value represents the encoded + // AnnounceSignatures message. + r := bytes.NewReader(v) + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(r, 0); err != nil { + return err + } + + // The first 33 bytes of the database key is + // the peer's public key. + peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) + if err != nil { + return err + } + t := msgTuple{peer, msg, k} + + // Add the message to the slice, such that we + // can resend it after the database transaction + // is over. + msgsResend = append(msgsResend, t) + return nil + }); err != nil { + return err + } + return nil + }); err != nil { + return err + } + + // deleteMsg removes the message associated with the passed + // msgTuple from the messageStore. + deleteMsg := func(t msgTuple) error { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", t.msg.ChannelID) + if err := d.cfg.DB.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(messageStoreKey) + if bucket == nil { + return fmt.Errorf("bucket " + + "unexpectedly did not exist") + } + + return bucket.Delete(t.dbKey[:]) + }); err != nil { + return fmt.Errorf("Failed deleting message "+ + "from database: %v", err) + } + return nil + } + + // We now iterate over these messages, resending those that we + // don't have the full proof for, deleting the rest. + for _, t := range msgsResend { + // Check if the full channel proof exists in our graph. + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + t.msg.ShortChannelID) + if err != nil { + // If the channel cannot be found, it is most likely + // a leftover message for a channel that was closed. + // In this case we delete it from the message store. + log.Warnf("unable to fetch channel info for "+ + "chanID=%v from graph: %v. Will delete local"+ + "proof from database", + t.msg.ChannelID, err) + if err := deleteMsg(t); err != nil { + return err + } + continue + } + + // 1. If the full proof does not exist in the graph, + // it means that we haven't received the remote proof + // yet (or that we crashed before able to assemble the + // full proof). Since the remote node might think they + // have delivered their proof to us, we will resend + // _our_ proof to trigger a resend on their part: + // they will then be able to assemble and send us the + // full proof. + if chanInfo.AuthProof == nil { + err := d.sendAnnSigReliably(t.msg, t.peer) + if err != nil { + return err + } + continue + } + + // 2. If the proof does exist in the graph, we have + // successfully received the remote proof and assembled + // the full proof. In this case we can safely delete the + // local proof from the database. In case the remote + // hasn't been able to assemble the full proof yet + // (maybe because of a crash), we will send them the full + // proof if we notice that they retry sending their half + // proof. + if chanInfo.AuthProof != nil { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", t.msg.ChannelID) + if err := deleteMsg(t); err != nil { + return err + } + } + } + return nil +} + // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically @@ -1010,8 +1164,78 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { + // The edge will get rejected if we already + // added the same edge without AuthProof to the + // graph. If the received announcement contains + // a proof, we can add this proof to our edge. + // We can end up in this situatation in the case + // where we create a channel, but for some + // reason fail to receive the remote peer's + // proof, while the remote peer is able to fully + // assemble the proof and craft the + // ChannelAnnouncement. + // TODO(halseth): the following chunk of code + // should be moved into own method, indentation + // and readability is not exactly on point. + chanInfo, e1, e2, err2 := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) + if err2 != nil { + log.Errorf("Failed fetching channel "+ + "edge: %v", err2) + nMsg.err <- err2 + return nil + } + + // If the edge already exists in the graph, but + // has no proof attached, we can add that now. + if chanInfo.AuthProof == nil && proof != nil { + chanAnn, e1Ann, e2Ann := + createChanAnnouncement(proof, + chanInfo, e1, e2) + + // Validate the assembled proof. + err := ValidateChannelAnn(chanAnn) + if err != nil { + err := errors.Errorf("assembled"+ + "channel announcement "+ + "proof for shortChanID=%v"+ + " isn't valid: %v", + msg.ShortChannelID, err) + + log.Error(err) + nMsg.err <- err + return nil + } + err = d.cfg.Router.AddProof( + msg.ShortChannelID, proof) + if err != nil { + err := errors.Errorf("unable "+ + "add proof to "+ + "shortChanID=%v: %v", + msg.ShortChannelID, err) + log.Error(err) + nMsg.err <- err + return nil + } + announcements = append(announcements, + chanAnn) + if e1Ann != nil { + announcements = append( + announcements, e1Ann) + } + if e2Ann != nil { + announcements = append( + announcements, e2Ann) + } + + nMsg.err <- nil + return announcements + } + + // If not, this was just an outdated edge. log.Debugf("Router rejected channel edge: %v", err) + } else { log.Errorf("Router rejected channel edge: %v", err) @@ -1283,7 +1507,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // Ensure that we know of a channel with the target channel ID // before proceeding further. - chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) if err != nil { // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. @@ -1320,6 +1545,72 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } + // If proof was sent by a local sub-system, then we'll + // send the announcement signature to the remote node + // so they can also reconstruct the full channel + // announcement. + if !nMsg.isRemote { + var remotePeer *btcec.PublicKey + if isFirstNode { + remotePeer = chanInfo.NodeKey2 + } else { + remotePeer = chanInfo.NodeKey1 + } + // Since the remote peer might not be online + // we'll call a method that will attempt to + // deliver the proof when it comes online. + if err := d.sendAnnSigReliably(msg, remotePeer); err != nil { + err := errors.Errorf("unable to send reliably "+ + "to remote for short_chan_id=%v: %v", + shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil + } + } + + // Check if we already have the full proof for this channel. + if chanInfo.AuthProof != nil { + // If we already have the fully assembled proof, then + // the peer sending us their proof has probably not + // received our local proof yet. So be kind and send + // them the full proof. + if nMsg.isRemote { + peerID := nMsg.peer.SerializeCompressed() + log.Debugf("Got AnnounceSignatures for " + + "channel with full proof.") + + d.wg.Add(1) + go func() { + defer d.wg.Done() + log.Debugf("Received half proof for "+ + "channel %v with existing "+ + "full proof. Sending full "+ + "proof to peer=%x", + msg.ChannelID, + peerID) + + chanAnn, _, _ := createChanAnnouncement( + chanInfo.AuthProof, chanInfo, e1, e2) + err := d.cfg.SendToPeer(nMsg.peer, chanAnn) + if err != nil { + log.Errorf("Failed sending "+ + "full proof to "+ + "peer=%x: %v", + peerID, err) + return + } + log.Debugf("Full proof sent to peer=%x"+ + " for chanID=%v", peerID, msg.ChannelID) + }() + } + + log.Debugf("Already have proof for channel "+ + "with chanID=%v", msg.ChannelID) + nMsg.err <- nil + return nil + } + // Check that we received the opposite proof. If so, then we're // now able to construct the full proof, and create the channel // announcement. If we didn't receive the opposite half of the @@ -1346,33 +1637,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // If proof was sent by a local sub-system, then we'll - // send the announcement signature to the remote node - // so they can also reconstruct the full channel - // announcement. - if !nMsg.isRemote { - // Check that first node of the channel info - // corresponds to us. - var remotePeer *btcec.PublicKey - if isFirstNode { - remotePeer = chanInfo.NodeKey2 - } else { - remotePeer = chanInfo.NodeKey1 - } - - err := d.cfg.SendToPeer(remotePeer, msg) - if err != nil { - log.Errorf("unable to send "+ - "announcement message to peer: %x", - remotePeer.SerializeCompressed()) - } - - log.Infof("Sent channel announcement proof "+ - "for short_chan_id=%v to remote peer: "+ - "%x", shortChanID, - remotePeer.SerializeCompressed()) - } - log.Infof("1/2 of channel ann proof received for "+ "short_chan_id=%v, waiting for other half", shortChanID) @@ -1381,9 +1645,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // If we now have both halves of the channel announcement - // proof, then we'll reconstruct the initial announcement so we - // can validate it shortly below. + // We now have both halves of the channel announcement proof, + // then we'll reconstruct the initial announcement so we can + // validate it shortly below. var dbProof channeldb.ChannelAuthProof if isFirstNode { dbProof.NodeSig1 = msg.NodeSignature @@ -1450,26 +1714,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l announcements = append(announcements, e2Ann) } - // If this a local announcement, then we'll send it to the - // remote side so they can reconstruct the full channel - // announcement proof. - if !nMsg.isRemote { - var remotePeer *btcec.PublicKey - if isFirstNode { - remotePeer = chanInfo.NodeKey2 - } else { - remotePeer = chanInfo.NodeKey1 - } - - log.Debugf("Sending local AnnounceSignatures message "+ - "to peer(%x)", remotePeer.SerializeCompressed()) - if err = d.cfg.SendToPeer(remotePeer, msg); err != nil { - log.Errorf("unable to send announcement "+ - "message to peer: %x", - remotePeer.SerializeCompressed()) - } - } - nMsg.err <- nil return announcements @@ -1479,6 +1723,90 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l } } +// sendAnnSigReliably will try to send the provided local AnnounceSignatures +// to the remote peer, waiting for it to come online if necessary. This +// method returns after adding the message to persistent storage, such +// that the caller knows that the message will be delivered at one point. +func (d *AuthenticatedGossiper) sendAnnSigReliably( + msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { + // We first add this message to the database, such that in case + // we do not succeed in sending it to the peer, we'll fetch it + // from the DB next time we start, and retry. We use the peer ID + // + shortChannelID as key, as there possibly is more than one + // channel oepning in progress to the same peer. + var key [41]byte + copy(key[:33], remotePeer.SerializeCompressed()) + binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64()) + + err := d.cfg.DB.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(messageStoreKey) + if err != nil { + return err + } + + // Encode the AnnounceSignatures message. + var b bytes.Buffer + if err := msg.Encode(&b, 0); err != nil { + return err + } + + // Add the encoded message to the database using the peer + // + shortChanID as key. + return bucket.Put(key[:], b.Bytes()) + + }) + if err != nil { + return err + } + + // We have succeeded adding the message to the database. We now launch + // a goroutine that will keep on trying sending the message to the + // remote peer until it succeeds, or the gossiper shuts down. In case + // of success, the message will be removed from the database. + d.wg.Add(1) + go func() { + defer d.wg.Done() + for { + log.Debugf("Sending AnnounceSignatures for channel "+ + "%v to remote peer %x", msg.ChannelID, + remotePeer.SerializeCompressed()) + err := d.cfg.SendToPeer(remotePeer, msg) + if err == nil { + // Sending succeeded, we can + // continue the flow. + break + } + + log.Errorf("unable to send AnnounceSignatures message "+ + "to peer(%x): %v. Will retry when online.", + remotePeer.SerializeCompressed(), err) + + connected := make(chan struct{}) + d.cfg.NotifyWhenOnline(remotePeer, connected) + + select { + case <-connected: + log.Infof("peer %x reconnected. Retry sending" + + " AnnounceSignatures.") + // Retry sending. + case <-d.quit: + log.Infof("Gossiper shutting down, did not send" + + " AnnounceSignatures.") + return + } + } + + log.Infof("Sent channel announcement proof to remote peer: %x", + remotePeer.SerializeCompressed()) + }() + + // This method returns after the message has been added to the database, + // such that the caller don't have to wait until the message is actually + // delivered, but can be assured that it will be delivered eventually + // when this method returns. + return nil +} + // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, From 0e28b0d6d46428900d6b2db6892018328cfbf755 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 17 Nov 2017 19:23:39 -0800 Subject: [PATCH 4/5] discovery test: update gossiper test for new database structure --- discovery/gossiper_test.go | 684 ++++++++++++++++++++++++++++++++++++- 1 file changed, 666 insertions(+), 18 deletions(-) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 426147c0264..ecff2b8c3e6 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -155,6 +155,11 @@ func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) { func (r *mockGraphSource) AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error { + info, ok := r.infos[chanID.ToUint64()] + if !ok { + return errors.New("channel does not exist") + } + info.AuthProof = proof return nil } @@ -341,7 +346,8 @@ func createNodeAnnouncement(priv *btcec.PrivateKey) (*lnwire.NodeAnnouncement, } signer := mockSigner{priv} - if a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a); err != nil { + a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a) + if err != nil { return nil, err } @@ -498,7 +504,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create node announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -521,7 +531,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create channel announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -544,7 +558,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create update announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -641,8 +659,8 @@ func TestPrematureAnnouncement(t *testing.T) { } } -// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper properly -// processes partial and fully announcement signatures message. +// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper +// properly processes partial and fully announcement signatures message. func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Parallel() @@ -674,7 +692,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -684,10 +707,16 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -706,7 +735,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Fatal("gossiper did not send channel update to peer") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -718,7 +752,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -743,7 +782,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Fatal("wrong number of objects in storage") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -783,7 +827,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { defer cleanup() // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. + // sent directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { select { @@ -805,8 +849,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process, in // this case the announcement should be added in the orphan batch - // because we haven't announced the channel yet. - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + // because we haven't announce the channel yet. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to proceed announcement: %v", err) } @@ -827,7 +876,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { t.Fatalf("unable to process: %v", err) } @@ -838,10 +893,16 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -860,7 +921,12 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { t.Fatal("gossiper did not send channel update to peer") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } @@ -872,11 +938,28 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // After that we process local announcement, and waiting to receive // the channel announcement. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } + // The local proof should be sent to the remote peer. + select { + case msg := <-sentMsgs: + if msg != batch.localProofAnn { + t.Fatalf("expected local proof to be sent, got %v", msg) + } + case <-time.After(2 * time.Second): + t.Fatalf("local proof was not sent to peer") + } + + // And since both remote and local announcements are processed, we + // should be broadcasting the final channel announcements. for i := 0; i < 3; i++ { select { case <-ctx.broadcastedMessage: @@ -886,6 +969,133 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(p *channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatalf("wrong number of objects in storage: %v", number) + } +} + +// Test that sending AnnounceSignatures to remote peer will continue +// to be tried until the peer comes online. +func TestSignatureAnnouncementRetry(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Make the SendToPeer fail, simulating the peer being offline. + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + + // We expect the gossiper to register for a notification when the peer + // comes back online, so keep track of the channel it wants to get + // notified on. + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // Since sending this local announcement proof to the remote will fail, + // the gossiper should register for a notification when the remote is + // online again. + var conChan chan<- struct{} + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + // Since both proofs are not yet exchanged, no message should be + // broadcasted yet. + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 if err := ctx.gossiper.waitingProofs.ForAll( func(*channeldb.WaitingProof) error { number++ @@ -895,9 +1105,447 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { t.Fatalf("unable to retrieve objects from store: %v", err) } + if number != 1 { + t.Fatal("wrong number of objects in storage") + } + + // When the peer comes online, the gossiper gets notified, and should + // retry sending the AnnnounceSignatures. We make the SendToPeer + // method work again. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + sentToPeer <- msg[0] + return nil + } + + // Notify that peer is now online. THis should trigger a new call + // to SendToPeer. + close(conChan) + + select { + case <-sentToPeer: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer came online") + } + + // Now give the gossiper the remote proof. This should trigger a + // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } +} + +// Test that if we restart the gossiper, it will retry sending the +// AnnounceSignatures to the peer if it did not succeed before +// shutting down, and the full channel proof is not yet assembled. +func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Make the SendToPeerFail, simulating the peer being offline. + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // Since sending to the remote peer will fail, the gossiper should + // register for a notification when it comes back online. + var conChan chan<- struct{} + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 1 { t.Fatal("wrong number of objects in storage") } + + // Shut down gossiper, and restart. This should trigger a new attempt + // to send the message to the peer. + ctx.gossiper.Stop() + gossiper, err := New(Config{ + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + SendToPeer: func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + }, + NotifyWhenOnline: func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + }, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + DB: ctx.gossiper.cfg.DB, + }, ctx.gossiper.selfKey) + if err != nil { + t.Fatalf("unable to recreate gossiper: %v", err) + } + if err := gossiper.Start(); err != nil { + t.Fatalf("unable to start recreated gossiper: %v", err) + } + defer gossiper.Stop() + + ctx.gossiper = gossiper + + // After starting up, the gossiper will see that it has a waitingproof + // in the database, and will retry sending its part to the remote. Since + // SendToPeer will fail again, it should register for a notification + // when the peer comes online. + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + // Fix the SendToPeer method. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + select { + case sentToPeer <- msg[0]: + case <-ctx.gossiper.quit: + return fmt.Errorf("shutting down") + } + + return nil + } + // Notify that peer is now online. This should trigger a new call + // to SendToPeer. + close(conChan) + + select { + case <-sentToPeer: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer came online") + } + + // Now exchanging the remote channel proof, the channel annoncement + // broadcast should continue as normal. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } +} + +// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a +// remote proof is received when we already have the full proof, +// the gossiper will send the full proof (ChannelAnnouncement) to +// the remote peer. +func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + select { + case <-ctx.gossiper.quit: + return fmt.Errorf("gossiper shutting down") + case sentToPeer <- msg[0]: + } + return nil + } + + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // We expect the gossiper to send this message to the remote peer. + select { + case msg := <-sentToPeer: + if msg != batch.localProofAnn { + t.Fatalf("wrong message sent to peer: %v", msg) + } + case <-time.After(2 * time.Second): + t.Fatal("did not send local proof to peer") + } + + // And all channel announcements should be broadcast. + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } + + // Now give the gossiper the remote proof yet again. This should + // trigger a send of the full ChannelAnnouncement. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // We expect the gossiper to send this message to the remote peer. + select { + case msg := <-sentToPeer: + _, ok := msg.(*lnwire.ChannelAnnouncement) + if !ok { + t.Fatalf("expected ChannelAnnouncement, intead got %T", msg) + } + case <-time.After(2 * time.Second): + t.Fatal("did not send local proof to peer") + } + } // TestDeDuplicatedAnnouncements ensures that the deDupedAnnouncements struct From 110a888ef955fca5ba300f908ad97c84e59a17c2 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 17 Nov 2017 19:24:21 -0800 Subject: [PATCH 5/5] server: add NotifyWhenOnline method to gossiper --- server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server.go b/server.go index 50e2f4e8ae7..ab049366f89 100644 --- a/server.go +++ b/server.go @@ -302,6 +302,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, ChainHash: *activeNetParams.GenesisHash, Broadcast: s.BroadcastMessage, SendToPeer: s.SendToPeer, + NotifyWhenOnline: s.NotifyWhenOnline, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitDelay: time.Minute * 30,