From 847b06446186ed4cee41cf5e8721751d6d20a278 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:16 -0800 Subject: [PATCH 1/8] discovery/message_store: add gossip message store In this commit, we add a new store within the database that'll be responsible for storing gossip messages which we need to reliably send to peers. This aims to replace the current messageStore that exists within the gossiper, so much of this logic is borrowed from there. One of the main differences between the two is that we now index messages with a new key format in which we take into account the message's type. This allows us to store different messages for a specific channel with a peer. The old key format is still supported in order to prevent a database migration. --- discovery/message_store.go | 294 ++++++++++++++++++++++++++ discovery/message_store_test.go | 351 ++++++++++++++++++++++++++++++++ 2 files changed, 645 insertions(+) create mode 100644 discovery/message_store.go create mode 100644 discovery/message_store_test.go diff --git a/discovery/message_store.go b/discovery/message_store.go new file mode 100644 index 00000000000..e0c10a865ac --- /dev/null +++ b/discovery/message_store.go @@ -0,0 +1,294 @@ +package discovery + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + // messageStoreBucket is a key used to create a top level bucket in the + // gossiper database, used for storing messages that are to be sent to + // peers. Upon restarts, these messages will be read and resent to their + // respective peers. + // + // maps: + // pubKey (33 bytes) + msgShortChanID (8 bytes) + msgType (2 bytes) -> msg + messageStoreBucket = []byte("message-store") + + // ErrUnsupportedMessage is an error returned when we attempt to add a + // message to the store that is not supported. + ErrUnsupportedMessage = errors.New("unsupported message type") + + // ErrCorruptedMessageStore indicates that the on-disk bucketing + // structure has altered since the gossip message store instance was + // initialized. + ErrCorruptedMessageStore = errors.New("gossip message store has been " + + "corrupted") +) + +// GossipMessageStore is a store responsible for storing gossip messages which +// we should reliably send to our peers. +type GossipMessageStore interface { + // AddMessage adds a message to the store for this peer. + AddMessage(lnwire.Message, [33]byte) error + + // DeleteMessage deletes a message from the store for this peer. + DeleteMessage(lnwire.Message, [33]byte) error + + // Messages returns the total set of messages that exist within the + // store for all peers. + Messages() (map[[33]byte][]lnwire.Message, error) + + // Peers returns the public key of all peers with messages within the + // store. + Peers() (map[[33]byte]struct{}, error) + + // MessagesForPeer returns the set of messages that exists within the + // store for the given peer. + MessagesForPeer([33]byte) ([]lnwire.Message, error) +} + +// MessageStore is an implementation of the GossipMessageStore interface backed +// by a channeldb instance. By design, this store will only keep the latest +// version of a message (like in the case of multiple ChannelUpdate's) for a +// channel with a peer. +type MessageStore struct { + db *channeldb.DB +} + +// A compile-time assertion to ensure messageStore implements the +// GossipMessageStore interface. +var _ GossipMessageStore = (*MessageStore)(nil) + +// NewMessageStore creates a new message store backed by a channeldb instance. +func NewMessageStore(db *channeldb.DB) (*MessageStore, error) { + err := db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(messageStoreBucket) + return err + }) + if err != nil { + return nil, fmt.Errorf("unable to create required buckets: %v", + err) + } + + return &MessageStore{db}, nil +} + +// msgShortChanID retrieves the short channel ID of the message. +func msgShortChanID(msg lnwire.Message) (lnwire.ShortChannelID, error) { + var shortChanID lnwire.ShortChannelID + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + shortChanID = msg.ShortChannelID + case *lnwire.ChannelUpdate: + shortChanID = msg.ShortChannelID + default: + return shortChanID, ErrUnsupportedMessage + } + + return shortChanID, nil +} + +// messageStoreKey constructs the database key for the message to be stored. +func messageStoreKey(msg lnwire.Message, peerPubKey [33]byte) ([]byte, error) { + shortChanID, err := msgShortChanID(msg) + if err != nil { + return nil, err + } + + var k [33 + 8 + 2]byte + copy(k[:33], peerPubKey[:]) + binary.BigEndian.PutUint64(k[33:41], shortChanID.ToUint64()) + binary.BigEndian.PutUint16(k[41:43], uint16(msg.MsgType())) + + return k[:], nil +} + +// AddMessage adds a message to the store for this peer. +func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error { + // Construct the key for which we'll find this message with in the store. + msgKey, err := messageStoreKey(msg, peerPubKey) + if err != nil { + return err + } + + // Serialize the message with its wire encoding. + var b bytes.Buffer + if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil { + return err + } + + return s.db.Batch(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + return messageStore.Put(msgKey, b.Bytes()) + }) +} + +// DeleteMessage deletes a message from the store for this peer. +func (s *MessageStore) DeleteMessage(msg lnwire.Message, + peerPubKey [33]byte) error { + + // Construct the key for which we'll find this message with in the + // store. + msgKey, err := messageStoreKey(msg, peerPubKey) + if err != nil { + return err + } + + return s.db.Batch(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + // In the event that we're attempting to delete a ChannelUpdate + // from the store, we'll make sure that we're actually deleting + // the correct one as it can be overwritten. + if msg, ok := msg.(*lnwire.ChannelUpdate); ok { + // Deleting a value from a bucket that doesn't exist + // acts as a NOP, so we'll return if a message doesn't + // exist under this key. + v := messageStore.Get(msgKey) + if v == nil { + return nil + } + + dbMsg, err := lnwire.ReadMessage(bytes.NewReader(v), 0) + if err != nil { + return err + } + + // If the timestamps don't match, then the update stored + // should be the latest one, so we'll avoid deleting it. + if msg.Timestamp != dbMsg.(*lnwire.ChannelUpdate).Timestamp { + return nil + } + } + + return messageStore.Delete(msgKey) + }) +} + +// readMessage reads a message from its serialized form and ensures its +// supported by the current version of the message store. +func readMessage(msgBytes []byte) (lnwire.Message, error) { + msg, err := lnwire.ReadMessage(bytes.NewReader(msgBytes), 0) + if err != nil { + return nil, err + } + + // Check if the message is supported by the store. We can reuse the + // check for ShortChannelID as its a dependency on messages stored. + if _, err := msgShortChanID(msg); err != nil { + return nil, err + } + + return msg, nil +} + +// Messages returns the total set of messages that exist within the store for +// all peers. +func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) { + msgs := make(map[[33]byte][]lnwire.Message) + err := s.db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + return messageStore.ForEach(func(k, v []byte) error { + var pubKey [33]byte + copy(pubKey[:], k[:33]) + + // Deserialize the message from its raw bytes and filter + // out any which are not currently supported by the + // store. + msg, err := readMessage(v) + if err == ErrUnsupportedMessage { + return nil + } + if err != nil { + return err + } + + msgs[pubKey] = append(msgs[pubKey], msg) + return nil + }) + }) + if err != nil { + return nil, err + } + + return msgs, nil +} + +// MessagesForPeer returns the set of messages that exists within the store for +// the given peer. +func (s *MessageStore) MessagesForPeer( + peerPubKey [33]byte) ([]lnwire.Message, error) { + + var msgs []lnwire.Message + err := s.db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + c := messageStore.Cursor() + k, v := c.Seek(peerPubKey[:]) + for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() { + // Deserialize the message from its raw bytes and filter + // out any which are not currently supported by the + // store. + msg, err := readMessage(v) + if err == ErrUnsupportedMessage { + continue + } + if err != nil { + return err + } + + msgs = append(msgs, msg) + } + + return nil + }) + if err != nil { + return nil, err + } + + return msgs, nil +} + +// Peers returns the public key of all peers with messages within the store. +func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) { + peers := make(map[[33]byte]struct{}) + err := s.db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return ErrCorruptedMessageStore + } + + return messageStore.ForEach(func(k, _ []byte) error { + var pubKey [33]byte + copy(pubKey[:], k[:33]) + peers[pubKey] = struct{}{} + return nil + }) + }) + if err != nil { + return nil, err + } + + return peers, nil +} diff --git a/discovery/message_store_test.go b/discovery/message_store_test.go new file mode 100644 index 00000000000..a106ad22566 --- /dev/null +++ b/discovery/message_store_test.go @@ -0,0 +1,351 @@ +package discovery + +import ( + "bytes" + "io/ioutil" + "math/rand" + "os" + "reflect" + "testing" + + "github.com/btcsuite/btcd/btcec" + "github.com/coreos/bbolt" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +func createTestMessageStore(t *testing.T) (*MessageStore, func()) { + t.Helper() + + tempDir, err := ioutil.TempDir("", "channeldb") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + os.RemoveAll(tempDir) + t.Fatalf("unable to open db: %v", err) + } + + cleanUp := func() { + db.Close() + os.RemoveAll(tempDir) + } + + store, err := NewMessageStore(db) + if err != nil { + cleanUp() + t.Fatalf("unable to initialize message store: %v", err) + } + + return store, cleanUp +} + +func randPubKey(t *testing.T) *btcec.PublicKey { + priv, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("unable to create private key: %v", err) + } + + return priv.PubKey() +} + +func randCompressedPubKey(t *testing.T) [33]byte { + t.Helper() + + pubKey := randPubKey(t) + + var compressedPubKey [33]byte + copy(compressedPubKey[:], pubKey.SerializeCompressed()) + + return compressedPubKey +} + +func randAnnounceSignatures() *lnwire.AnnounceSignatures { + return &lnwire.AnnounceSignatures{ + ShortChannelID: lnwire.NewShortChanIDFromInt(rand.Uint64()), + } +} + +func randChannelUpdate() *lnwire.ChannelUpdate { + return &lnwire.ChannelUpdate{ + ShortChannelID: lnwire.NewShortChanIDFromInt(rand.Uint64()), + } +} + +// TestMessageStoreMessages ensures that messages can be properly queried from +// the store. +func TestMessageStoreMessages(t *testing.T) { + t.Parallel() + + // We'll start by creating our test message store. + msgStore, cleanUp := createTestMessageStore(t) + defer cleanUp() + + // We'll then create some test messages for two test peers, and none for + // an additional test peer. + channelUpdate1 := randChannelUpdate() + announceSignatures1 := randAnnounceSignatures() + peer1 := randCompressedPubKey(t) + if err := msgStore.AddMessage(channelUpdate1, peer1); err != nil { + t.Fatalf("unable to add message: %v", err) + } + if err := msgStore.AddMessage(announceSignatures1, peer1); err != nil { + t.Fatalf("unable to add message: %v", err) + } + expectedPeerMsgs1 := map[uint64]lnwire.MessageType{ + channelUpdate1.ShortChannelID.ToUint64(): channelUpdate1.MsgType(), + announceSignatures1.ShortChannelID.ToUint64(): announceSignatures1.MsgType(), + } + + channelUpdate2 := randChannelUpdate() + peer2 := randCompressedPubKey(t) + if err := msgStore.AddMessage(channelUpdate2, peer2); err != nil { + t.Fatalf("unable to add message: %v", err) + } + expectedPeerMsgs2 := map[uint64]lnwire.MessageType{ + channelUpdate2.ShortChannelID.ToUint64(): channelUpdate2.MsgType(), + } + + peer3 := randCompressedPubKey(t) + expectedPeerMsgs3 := map[uint64]lnwire.MessageType{} + + // assertPeerMsgs is a helper closure that we'll use to ensure we + // retrieve the correct set of messages for a given peer. + assertPeerMsgs := func(peerMsgs []lnwire.Message, + expected map[uint64]lnwire.MessageType) { + + t.Helper() + + if len(peerMsgs) != len(expected) { + t.Fatalf("expected %d pending messages, got %d", + len(expected), len(peerMsgs)) + } + for _, msg := range peerMsgs { + var shortChanID uint64 + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + shortChanID = msg.ShortChannelID.ToUint64() + case *lnwire.ChannelUpdate: + shortChanID = msg.ShortChannelID.ToUint64() + default: + t.Fatalf("found unexpected message type %T", msg) + } + + msgType, ok := expected[shortChanID] + if !ok { + t.Fatalf("retrieved message with unexpected ID "+ + "%d from store", shortChanID) + } + if msgType != msg.MsgType() { + t.Fatalf("expected message of type %v, got %v", + msg.MsgType(), msgType) + } + } + } + + // Then, we'll query the store for the set of messages for each peer and + // ensure it matches what we expect. + peers := [][33]byte{peer1, peer2, peer3} + expectedPeerMsgs := []map[uint64]lnwire.MessageType{ + expectedPeerMsgs1, expectedPeerMsgs2, expectedPeerMsgs3, + } + for i, peer := range peers { + peerMsgs, err := msgStore.MessagesForPeer(peer) + if err != nil { + t.Fatalf("unable to retrieve messages: %v", err) + } + assertPeerMsgs(peerMsgs, expectedPeerMsgs[i]) + } + + // Finally, we'll query the store for all of its messages of every peer. + // Again, each peer should have a set of messages that match what we + // expect. + // + // We'll construct the expected response. Only the first two peers will + // have messages. + totalPeerMsgs := make(map[[33]byte]map[uint64]lnwire.MessageType, 2) + for i := 0; i < 2; i++ { + totalPeerMsgs[peers[i]] = expectedPeerMsgs[i] + } + + msgs, err := msgStore.Messages() + if err != nil { + t.Fatalf("unable to retrieve all peers with pending messages: "+ + "%v", err) + } + if len(msgs) != len(totalPeerMsgs) { + t.Fatalf("expected %d peers with messages, got %d", + len(totalPeerMsgs), len(msgs)) + } + for peer, peerMsgs := range msgs { + expected, ok := totalPeerMsgs[peer] + if !ok { + t.Fatalf("expected to find pending messages for peer %x", + peer) + } + + assertPeerMsgs(peerMsgs, expected) + } + + peerPubKeys, err := msgStore.Peers() + if err != nil { + t.Fatalf("unable to retrieve all peers with pending messages: "+ + "%v", err) + } + if len(peerPubKeys) != len(totalPeerMsgs) { + t.Fatalf("expected %d peers with messages, got %d", + len(totalPeerMsgs), len(peerPubKeys)) + } + for peerPubKey := range peerPubKeys { + if _, ok := totalPeerMsgs[peerPubKey]; !ok { + t.Fatalf("expected to find peer %x", peerPubKey) + } + } +} + +// TestMessageStoreUnsupportedMessage ensures that we are not able to add a +// message which is unsupported, and if a message is found to be unsupported by +// the current version of the store, that it is properly filtered out from the +// response. +func TestMessageStoreUnsupportedMessage(t *testing.T) { + t.Parallel() + + // We'll start by creating our test message store. + msgStore, cleanUp := createTestMessageStore(t) + defer cleanUp() + + // Create a message that is known to not be supported by the store. + peer := randCompressedPubKey(t) + unsupportedMsg := &lnwire.Error{} + + // Attempting to add it to the store should result in + // ErrUnsupportedMessage. + err := msgStore.AddMessage(unsupportedMsg, peer) + if err != ErrUnsupportedMessage { + t.Fatalf("expected ErrUnsupportedMessage, got %v", err) + } + + // We'll now pretend that the message is actually supported in a future + // version of the store, so it's able to be added successfully. To + // replicate this, we'll add the message manually rather than through + // the existing AddMessage method. + msgKey := peer[:] + var rawMsg bytes.Buffer + if _, err := lnwire.WriteMessage(&rawMsg, unsupportedMsg, 0); err != nil { + t.Fatalf("unable to serialize message: %v", err) + } + err = msgStore.db.Update(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + return messageStore.Put(msgKey, rawMsg.Bytes()) + }) + if err != nil { + t.Fatalf("unable to add unsupported message to store: %v", err) + } + + // Finally, we'll check that the store can properly filter out messages + // that are currently unknown to it. We'll make sure this is done for + // both Messages and MessagesForPeer. + totalMsgs, err := msgStore.Messages() + if err != nil { + t.Fatalf("unable to retrieve messages: %v", err) + } + if len(totalMsgs) != 0 { + t.Fatalf("expected to filter out unsupported message") + } + peerMsgs, err := msgStore.MessagesForPeer(peer) + if err != nil { + t.Fatalf("unable to retrieve peer messages: %v", err) + } + if len(peerMsgs) != 0 { + t.Fatalf("expected to filter out unsupported message") + } +} + +// TestMessageStoreDeleteMessage ensures that we can properly delete messages +// from the store. +func TestMessageStoreDeleteMessage(t *testing.T) { + t.Parallel() + + msgStore, cleanUp := createTestMessageStore(t) + defer cleanUp() + + // assertMsg is a helper closure we'll use to ensure a message + // does/doesn't exist within the store. + assertMsg := func(msg lnwire.Message, peer [33]byte, exists bool) { + t.Helper() + + storeMsgs, err := msgStore.MessagesForPeer(peer) + if err != nil { + t.Fatalf("unable to retrieve messages: %v", err) + } + + found := false + for _, storeMsg := range storeMsgs { + if reflect.DeepEqual(msg, storeMsg) { + found = true + } + } + + if found != exists { + str := "find" + if !exists { + str = "not find" + } + t.Fatalf("expected to %v message %v", str, + spew.Sdump(msg)) + } + } + + // An AnnounceSignatures message should exist within the store after + // adding it, and should no longer exists after deleting it. + peer := randCompressedPubKey(t) + annSig := randAnnounceSignatures() + if err := msgStore.AddMessage(annSig, peer); err != nil { + t.Fatalf("unable to add message: %v", err) + } + assertMsg(annSig, peer, true) + if err := msgStore.DeleteMessage(annSig, peer); err != nil { + t.Fatalf("unable to delete message: %v", err) + } + assertMsg(annSig, peer, false) + + // The store allows overwriting ChannelUpdates, since there can be + // multiple versions, so we'll test things slightly different. + // + // The ChannelUpdate message should exist within the store after adding + // it. + chanUpdate := randChannelUpdate() + if err := msgStore.AddMessage(chanUpdate, peer); err != nil { + t.Fatalf("unable to add message: %v", err) + } + assertMsg(chanUpdate, peer, true) + + // Now, we'll create a new version for the same ChannelUpdate message. + // Adding this one to the store will overwrite the previous one, so only + // the new one should exist. + newChanUpdate := randChannelUpdate() + newChanUpdate.ShortChannelID = chanUpdate.ShortChannelID + newChanUpdate.Timestamp = chanUpdate.Timestamp + 1 + if err := msgStore.AddMessage(newChanUpdate, peer); err != nil { + t.Fatalf("unable to add message: %v", err) + } + assertMsg(chanUpdate, peer, false) + assertMsg(newChanUpdate, peer, true) + + // Deleting the older message should act as a NOP and should NOT delete + // the newer version as the older no longer exists. + if err := msgStore.DeleteMessage(chanUpdate, peer); err != nil { + t.Fatalf("unable to delete message: %v", err) + } + assertMsg(chanUpdate, peer, false) + assertMsg(newChanUpdate, peer, true) + + // The newer version should no longer exist within the store after + // deleting it. + if err := msgStore.DeleteMessage(newChanUpdate, peer); err != nil { + t.Fatalf("unable to delete message: %v", err) + } + assertMsg(newChanUpdate, peer, false) +} From 9febc9cc0450ec87dba6ba9f783a9baf384a29c8 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:20 -0800 Subject: [PATCH 2/8] channeldb: add gossiper message store key migration In this commit, we introduce a migration for the message store sub-bucket that will migrate all keys within it to a new key format. This new key format is composed of the peer's public key, followed by the short channel ID, followed by the message type. This migration is needed in order to provide backwards-compatibility with messages that were previously stored before the introduction of the new key format. --- channeldb/db.go | 7 +++ channeldb/meta_test.go | 2 +- channeldb/migrations.go | 73 +++++++++++++++++++++++++++ channeldb/migrations_test.go | 96 ++++++++++++++++++++++++++++++++++++ 4 files changed, 177 insertions(+), 1 deletion(-) diff --git a/channeldb/db.go b/channeldb/db.go index c4262ffa84f..51f3fe9aa4e 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -90,6 +90,13 @@ var ( number: 7, migration: migrateOptionalChannelCloseSummaryFields, }, + { + // The DB version that changes the gossiper's message + // store keys to account for the message's type and + // ShortChannelID. + number: 8, + migration: migrateGossipMessageStoreKeys, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index 921e55c546c..76d0cb257c4 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), if err == nil && shouldFail { t.Fatal("error wasn't received on migration stage") } else if err != nil && !shouldFail { - t.Fatal("error was received on migration stage") + t.Fatalf("error was received on migration stage: %v", err) } // afterMigration usually used for checking the database state and diff --git a/channeldb/migrations.go b/channeldb/migrations.go index 5d4919db9e2..f86e416b965 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/lnwire" ) // migrateNodeAndEdgeUpdateIndex is a migration function that will update the @@ -610,3 +611,75 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error { return nil } + +var messageStoreBucket = []byte("message-store") + +// migrateGossipMessageStoreKeys migrates the key format for gossip messages +// found in the message store to a new one that takes into consideration the of +// the message being stored. +func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error { + // We'll start by retrieving the bucket in which these messages are + // stored within. If there isn't one, there's nothing left for us to do + // so we can avoid the migration. + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return nil + } + + log.Info("Migrating to the gossip message store new key format") + + // Otherwise we'll proceed with the migration. We'll start by coalescing + // all the current messages within the store, which are indexed by the + // public key of the peer which they should be sent to, followed by the + // short channel ID of the channel for which the message belongs to. We + // should only expect to find channel announcement signatures as that + // was the only support message type previously. + msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures) + err := messageStore.ForEach(func(k, v []byte) error { + var msgKey [33 + 8]byte + copy(msgKey[:], k) + + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(bytes.NewReader(v), 0); err != nil { + return err + } + + msgs[msgKey] = msg + + return nil + + }) + if err != nil { + return err + } + + // Then, we'll go over all of our messages, remove their previous entry, + // and add another with the new key format. Once we've done this for + // every message, we can consider the migration complete. + for oldMsgKey, msg := range msgs { + if err := messageStore.Delete(oldMsgKey[:]); err != nil { + return err + } + + // Construct the new key for which we'll find this message with + // in the store. It'll be the same as the old, but we'll also + // include the message type. + var msgType [2]byte + binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType())) + newMsgKey := append(oldMsgKey[:], msgType[:]...) + + // Serialize the message with its wire encoding. + var b bytes.Buffer + if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil { + return err + } + + if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil { + return err + } + } + + log.Info("Migration to the gossip message store new key format complete!") + + return nil +} diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index ed2829c0e55..9223108d333 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -12,6 +12,7 @@ import ( "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" ) // TestPaymentStatusesMigration checks that already completed payments will have @@ -468,3 +469,98 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) { false) } } + +// TestMigrateGossipMessageStoreKeys ensures that the migration to the new +// gossip message store key format is successful/unsuccessful under various +// scenarios. +func TestMigrateGossipMessageStoreKeys(t *testing.T) { + t.Parallel() + + // Construct the message which we'll use to test the migration, along + // with its old and new key formats. + shortChanID := lnwire.ShortChannelID{BlockHeight: 10} + msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID} + + var oldMsgKey [33 + 8]byte + copy(oldMsgKey[:33], pubKey.SerializeCompressed()) + binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64()) + + var newMsgKey [33 + 8 + 2]byte + copy(newMsgKey[:41], oldMsgKey[:]) + binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType())) + + // Before the migration, we'll create the bucket where the messages + // should live and insert them. + beforeMigration := func(db *DB) { + var b bytes.Buffer + if err := msg.Encode(&b, 0); err != nil { + t.Fatalf("unable to serialize message: %v", err) + } + + err := db.Update(func(tx *bbolt.Tx) error { + messageStore, err := tx.CreateBucketIfNotExists( + messageStoreBucket, + ) + if err != nil { + return err + } + + return messageStore.Put(oldMsgKey[:], b.Bytes()) + }) + if err != nil { + t.Fatal(err) + } + } + + // After the migration, we'll make sure that: + // 1. We cannot find the message under its old key. + // 2. We can find the message under its new key. + // 3. The message matches the original. + afterMigration := func(db *DB) { + meta, err := db.FetchMeta(nil) + if err != nil { + t.Fatalf("unable to fetch db version: %v", err) + } + if meta.DbVersionNumber != 1 { + t.Fatalf("migration should have succeeded but didn't") + } + + var rawMsg []byte + err = db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return errors.New("message store bucket not " + + "found") + } + rawMsg = messageStore.Get(oldMsgKey[:]) + if rawMsg != nil { + t.Fatal("expected to not find message under " + + "old key, but did") + } + rawMsg = messageStore.Get(newMsgKey[:]) + if rawMsg == nil { + return fmt.Errorf("expected to find message " + + "under new key, but didn't") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0) + if err != nil { + t.Fatalf("unable to deserialize raw message: %v", err) + } + if !reflect.DeepEqual(msg, gotMsg) { + t.Fatalf("expected message: %v\ngot message: %v", + spew.Sdump(msg), spew.Sdump(gotMsg)) + } + } + + applyMigration( + t, beforeMigration, afterMigration, + migrateGossipMessageStoreKeys, false, + ) +} From 2277535e6b81430b22c8a8d0c0ec38ae036165d5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:27 -0800 Subject: [PATCH 3/8] server+discovery: replace gossiper message store with MessageStore --- discovery/gossiper.go | 205 +++++++++++-------------------------- discovery/gossiper_test.go | 41 +------- discovery/mock_test.go | 133 ++++++++++++++++++++++++ server.go | 6 ++ 4 files changed, 202 insertions(+), 183 deletions(-) create mode 100644 discovery/mock_test.go diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a8f154400eb..f1195d45b31 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2,7 +2,6 @@ package discovery import ( "bytes" - "encoding/binary" "errors" "fmt" "runtime" @@ -13,7 +12,6 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -25,13 +23,6 @@ import ( ) var ( - // messageStoreKey is a key used to create a top level bucket in - // the gossiper database, used for storing messages that are to - // be sent to peers. Currently this is used for reliably sending - // AnnounceSignatures messages, by persisting them until a send - // operation has succeeded. - messageStoreKey = []byte("message-store") - // ErrGossiperShuttingDown is an error that is returned if the gossiper // is in the process of being shut down. ErrGossiperShuttingDown = errors.New("gossiper is shutting down") @@ -137,6 +128,10 @@ type Config struct { // proof storage to make waiting proofs persistent. DB *channeldb.DB + // MessageStore is a persistent storage of gossip messages which we will + // use to determine which messages need to be resent for a given peer. + MessageStore GossipMessageStore + // AnnSigner is an instance of the MessageSigner interface which will // be used to manually sign any outgoing channel updates. The signer // implementation should be backed by the public key of the backing @@ -814,126 +809,69 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { // will try to resend them. If we have the full proof, we can safely delete the // message from the messageStore. func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { - type msgTuple struct { - peer *btcec.PublicKey - msg *lnwire.AnnounceSignatures - dbKey []byte + peerMsgsToResend, err := d.cfg.MessageStore.Messages() + if err != nil { + return err } - // Fetch all the AnnounceSignatures messages that was added to the - // database. - // - // TODO(halseth): database access should be abstracted - // behind interface. - var msgsResend []msgTuple - if err := d.cfg.DB.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(messageStoreKey) - if bucket == nil { - return nil + // We now iterate over these messages, resending those that we don't + // have the full proof for, deleting the rest. + for peer, msgsToResend := range peerMsgsToResend { + pubKey, err := btcec.ParsePubKey(peer[:], btcec.S256()) + if err != nil { + return err } - if err := bucket.ForEach(func(k, v []byte) error { - // The database value represents the encoded - // AnnounceSignatures message. - r := bytes.NewReader(v) - msg := &lnwire.AnnounceSignatures{} - if err := msg.Decode(r, 0); err != nil { - return err - } + for _, msg := range msgsToResend { + msg := msg.(*lnwire.AnnounceSignatures) - // The first 33 bytes of the database key is the peer's - // public key. - peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) + // Check if the full channel proof exists in our graph. + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) if err != nil { - return err - } - - // Make a copy of the database key corresponding to - // these AnnounceSignatures. - dbKey := make([]byte, len(k)) - copy(dbKey, k) - - t := msgTuple{peer, msg, dbKey} - - // Add the message to the slice, such that we can - // resend it after the database transaction is over. - msgsResend = append(msgsResend, t) - return nil - }); err != nil { - return err - } - return nil - }); err != nil { - return err - } - - // deleteMsg removes the message associated with the passed msgTuple - // from the messageStore. - deleteMsg := func(t msgTuple) error { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", t.msg.ChannelID) - if err := d.cfg.DB.Update(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(messageStoreKey) - if bucket == nil { - return fmt.Errorf("bucket " + - "unexpectedly did not exist") + // If the channel cannot be found, it is most likely a + // leftover message for a channel that was closed. In + // this case we delete it from the message store. + log.Warnf("unable to fetch channel info for "+ + "chanID=%v from graph: %v. Will delete local"+ + "proof from database", + msg.ChannelID, err) + err = d.cfg.MessageStore.DeleteMessage(msg, peer) + if err != nil { + return err + } + continue } - return bucket.Delete(t.dbKey[:]) - }); err != nil { - return fmt.Errorf("Failed deleting message "+ - "from database: %v", err) - } - return nil - } - - // We now iterate over these messages, resending those that we don't - // have the full proof for, deleting the rest. - for _, t := range msgsResend { - // Check if the full channel proof exists in our graph. - chanInfo, _, _, err := d.cfg.Router.GetChannelByID( - t.msg.ShortChannelID) - if err != nil { - // If the channel cannot be found, it is most likely a - // leftover message for a channel that was closed. In - // this case we delete it from the message store. - log.Warnf("unable to fetch channel info for "+ - "chanID=%v from graph: %v. Will delete local"+ - "proof from database", - t.msg.ChannelID, err) - if err := deleteMsg(t); err != nil { - return err - } - continue - } - - // 1. If the full proof does not exist in the graph, it means - // that we haven't received the remote proof yet (or that we - // crashed before able to assemble the full proof). Since the - // remote node might think they have delivered their proof to - // us, we will resend _our_ proof to trigger a resend on their - // part: they will then be able to assemble and send us the - // full proof. - if chanInfo.AuthProof == nil { - err := d.sendAnnSigReliably(t.msg, t.peer) - if err != nil { - return err + // 1. If the full proof does not exist in the graph, it means + // that we haven't received the remote proof yet (or that we + // crashed before able to assemble the full proof). Since the + // remote node might think they have delivered their proof to + // us, we will resend _our_ proof to trigger a resend on their + // part: they will then be able to assemble and send us the + // full proof. + if chanInfo.AuthProof == nil { + err := d.sendAnnSigReliably(msg, pubKey) + if err != nil { + return err + } + continue } - continue - } - // 2. If the proof does exist in the graph, we have - // successfully received the remote proof and assembled the - // full proof. In this case we can safely delete the local - // proof from the database. In case the remote hasn't been able - // to assemble the full proof yet (maybe because of a crash), - // we will send them the full proof if we notice that they - // retry sending their half proof. - if chanInfo.AuthProof != nil { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", t.msg.ChannelID) - if err := deleteMsg(t); err != nil { - return err + // 2. If the proof does exist in the graph, we have + // successfully received the remote proof and assembled the + // full proof. In this case we can safely delete the local + // proof from the database. In case the remote hasn't been able + // to assemble the full proof yet (maybe because of a crash), + // we will send them the full proof if we notice that they + // retry sending their half proof. + if chanInfo.AuthProof != nil { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", msg.ChannelID) + err := d.cfg.MessageStore.DeleteMessage(msg, peer) + if err != nil { + return err + } } } } @@ -2434,31 +2372,10 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( // We first add this message to the database, such that in case // we do not succeed in sending it to the peer, we'll fetch it - // from the DB next time we start, and retry. We use the peer ID - // + shortChannelID as key, as there possibly is more than one - // channel opening in progress to the same peer. - var key [41]byte - copy(key[:33], remotePeer.SerializeCompressed()) - binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64()) - - err := d.cfg.DB.Update(func(tx *bbolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists(messageStoreKey) - if err != nil { - return err - } - - // Encode the AnnounceSignatures message. - var b bytes.Buffer - if err := msg.Encode(&b, 0); err != nil { - return err - } - - // Add the encoded message to the database using the peer - // + shortChanID as key. - return bucket.Put(key[:], b.Bytes()) - - }) - if err != nil { + // from the DB next time we start, and retry. + var remotePubKey [33]byte + copy(remotePubKey[:], remotePeer.SerializeCompressed()) + if err := d.cfg.MessageStore.AddMessage(msg, remotePubKey); err != nil { return err } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index be029878ef5..d32ca0af97a 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -619,6 +619,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { RetransmitDelay: retransmitDelay, ProofMatureDelta: proofMatureDelta, DB: db, + MessageStore: newMockMessageStore(), }, nodeKeyPub1) if err != nil { cleanUpDb() @@ -1655,6 +1656,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { RetransmitDelay: retransmitDelay, ProofMatureDelta: proofMatureDelta, DB: ctx.gossiper.cfg.DB, + MessageStore: ctx.gossiper.cfg.MessageStore, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) @@ -2861,42 +2863,3 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { t.Fatalf("unable to process announcement: %v", err) } } - -// mockPeer implements the lnpeer.Peer interface and is used to test the -// gossiper's interaction with peers. -type mockPeer struct { - pk *btcec.PublicKey - sentMsgs chan lnwire.Message - quit chan struct{} -} - -var _ lnpeer.Peer = (*mockPeer)(nil) - -func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { - if p.sentMsgs == nil && p.quit == nil { - return nil - } - - for _, msg := range msgs { - select { - case p.sentMsgs <- msg: - case <-p.quit: - } - } - - return nil -} -func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { - return nil -} -func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } -func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } -func (p *mockPeer) PubKey() [33]byte { - var pubkey [33]byte - copy(pubkey[:], p.pk.SerializeCompressed()) - return pubkey -} -func (p *mockPeer) Address() net.Addr { return nil } -func (p *mockPeer) QuitSignal() <-chan struct{} { - return p.quit -} diff --git a/discovery/mock_test.go b/discovery/mock_test.go new file mode 100644 index 00000000000..85c6c4f308c --- /dev/null +++ b/discovery/mock_test.go @@ -0,0 +1,133 @@ +package discovery + +import ( + "net" + "sync" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// mockPeer implements the lnpeer.Peer interface and is used to test the +// gossiper's interaction with peers. +type mockPeer struct { + pk *btcec.PublicKey + sentMsgs chan lnwire.Message + quit chan struct{} +} + +var _ lnpeer.Peer = (*mockPeer)(nil) + +func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { + if p.sentMsgs == nil && p.quit == nil { + return nil + } + + for _, msg := range msgs { + select { + case p.sentMsgs <- msg: + case <-p.quit: + } + } + + return nil +} +func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { + return nil +} +func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } +func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } +func (p *mockPeer) PubKey() [33]byte { + var pubkey [33]byte + copy(pubkey[:], p.pk.SerializeCompressed()) + return pubkey +} +func (p *mockPeer) Address() net.Addr { return nil } +func (p *mockPeer) QuitSignal() <-chan struct{} { + return p.quit +} + +// mockMessageStore is an in-memory implementation of the MessageStore interface +// used for the gossiper's unit tests. +type mockMessageStore struct { + sync.Mutex + messages map[[33]byte]map[lnwire.Message]struct{} +} + +func newMockMessageStore() *mockMessageStore { + return &mockMessageStore{ + messages: make(map[[33]byte]map[lnwire.Message]struct{}), + } +} + +var _ GossipMessageStore = (*mockMessageStore)(nil) + +func (s *mockMessageStore) AddMessage(msg lnwire.Message, pubKey [33]byte) error { + s.Lock() + defer s.Unlock() + + if _, ok := s.messages[pubKey]; !ok { + s.messages[pubKey] = make(map[lnwire.Message]struct{}) + } + + s.messages[pubKey][msg] = struct{}{} + + return nil +} + +func (s *mockMessageStore) DeleteMessage(msg lnwire.Message, pubKey [33]byte) error { + s.Lock() + defer s.Unlock() + + peerMsgs, ok := s.messages[pubKey] + if !ok { + return nil + } + + delete(peerMsgs, msg) + return nil +} + +func (s *mockMessageStore) Messages() (map[[33]byte][]lnwire.Message, error) { + s.Lock() + defer s.Unlock() + + msgs := make(map[[33]byte][]lnwire.Message, len(s.messages)) + for peer, peerMsgs := range s.messages { + for msg := range peerMsgs { + msgs[peer] = append(msgs[peer], msg) + } + } + return msgs, nil +} + +func (s *mockMessageStore) Peers() (map[[33]byte]struct{}, error) { + s.Lock() + defer s.Unlock() + + peers := make(map[[33]byte]struct{}, len(s.messages)) + for peer := range s.messages { + peers[peer] = struct{}{} + } + return peers, nil +} + +func (s *mockMessageStore) MessagesForPeer(pubKey [33]byte) ([]lnwire.Message, error) { + s.Lock() + defer s.Unlock() + + peerMsgs, ok := s.messages[pubKey] + if !ok { + return nil, nil + } + + msgs := make([]lnwire.Message, 0, len(peerMsgs)) + for msg := range peerMsgs { + msgs = append(msgs, msg) + } + + return msgs, nil +} diff --git a/server.go b/server.go index 38df7e34871..13210c6e234 100644 --- a/server.go +++ b/server.go @@ -583,6 +583,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, s.chanDB.ChannelGraph(), ) + gossipMessageStore, err := discovery.NewMessageStore(s.chanDB) + if err != nil { + return nil, err + } + s.authGossiper, err = discovery.New(discovery.Config{ Router: s.chanRouter, Notifier: s.cc.chainNotifier, @@ -598,6 +603,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitDelay: time.Minute * 30, DB: chanDB, + MessageStore: gossipMessageStore, AnnSigner: s.nodeSigner, }, s.identityPriv.PubKey(), From 73b4bc4b6810b76e71d1095e2e1dab901daa9ed2 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:34 -0800 Subject: [PATCH 4/8] server+discovery: remove channeldb.DB reference within the gossiper Now that we've replaced the built-in messageStore with the channeldb.GossipMessageStore, the reference to channeldb.DB is no longer needed. --- discovery/gossiper.go | 40 +++++++++++-------------- discovery/gossiper_test.go | 61 ++++++++++++++++++++------------------ server.go | 23 +++++++------- 3 files changed, 62 insertions(+), 62 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f1195d45b31..d11f5299098 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -124,9 +124,14 @@ type Config struct { // should check if we need re-broadcast any of our personal channels. RetransmitDelay time.Duration - // DB is a global boltdb instance which is needed to pass it in waiting - // proof storage to make waiting proofs persistent. - DB *channeldb.DB + // WaitingProofStore is a persistent storage of partial channel proof + // announcement messages. We use it to buffer half of the material + // needed to reconstruct a full authenticated channel announcement. + // Once we receive the other half the channel proof, we'll be able to + // properly validate it and re-broadcast it out to the network. + // + // TODO(wilmer): make interface to prevent channeldb dependency. + WaitingProofStore *channeldb.WaitingProofStore // MessageStore is a persistent storage of gossip messages which we will // use to determine which messages need to be resent for a given peer. @@ -188,13 +193,6 @@ type AuthenticatedGossiper struct { prematureChannelUpdates map[uint64][]*networkMsg pChanUpdMtx sync.Mutex - // waitingProofs is a persistent storage of partial channel proof - // announcement messages. We use it to buffer half of the material - // needed to reconstruct a full authenticated channel announcement. - // Once we receive the other half the channel proof, we'll be able to - // properly validate it and re-broadcast it out to the network. - waitingProofs *channeldb.WaitingProofStore - // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. @@ -229,12 +227,7 @@ type AuthenticatedGossiper struct { // New creates a new AuthenticatedGossiper instance, initialized with the // passed configuration parameters. -func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { - storage, err := channeldb.NewWaitingProofStore(cfg.DB) - if err != nil { - return nil, err - } - +func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { return &AuthenticatedGossiper{ selfKey: selfKey, cfg: &cfg, @@ -243,11 +236,10 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), prematureAnnouncements: make(map[uint32][]*networkMsg), prematureChannelUpdates: make(map[uint64][]*networkMsg), - waitingProofs: storage, channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), peerSyncers: make(map[routing.Vertex]*gossipSyncer), - }, nil + } } // SynchronizeNode sends a message to the service indicating it should @@ -2084,7 +2076,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - if err := d.waitingProofs.Add(proof); err != nil { + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) @@ -2198,7 +2191,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // proof than we should store it this one, and wait for // opposite to be received. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - oppositeProof, err := d.waitingProofs.Get(proof.OppositeKey()) + oppositeProof, err := d.cfg.WaitingProofStore.Get( + proof.OppositeKey(), + ) if err != nil && err != channeldb.ErrWaitingProofNotFound { err := fmt.Errorf("unable to get "+ "the opposite proof for short_chan_id=%v: %v", @@ -2209,7 +2204,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } if err == channeldb.ErrWaitingProofNotFound { - if err := d.waitingProofs.Add(proof); err != nil { + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) @@ -2278,7 +2274,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - err = d.waitingProofs.Remove(proof.OppositeKey()) + err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) if err != nil { err := fmt.Errorf("unable remove opposite proof "+ "for the channel with chanID=%v: %v", diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index d32ca0af97a..0e2e51f4f48 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -593,8 +593,14 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil, nil, err } + waitingProofStore, err := channeldb.NewWaitingProofStore(db) + if err != nil { + cleanUpDb() + return nil, nil, err + } + broadcastedMessage := make(chan msgWithSenders, 10) - gossiper, err := New(Config{ + gossiper := New(Config{ Notifier: notifier, Broadcast: func(senders map[routing.Vertex]struct{}, msgs ...lnwire.Message) error { @@ -614,17 +620,14 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, nil, nil}, nil }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - DB: db, - MessageStore: newMockMessageStore(), + Router: router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), }, nodeKeyPub1) - if err != nil { - cleanUpDb() - return nil, nil, fmt.Errorf("unable to create router %v", err) - } + if err := gossiper.Start(); err != nil { cleanUpDb() return nil, nil, fmt.Errorf("unable to start router: %v", err) @@ -993,7 +996,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1026,7 +1029,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1096,7 +1099,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1236,7 +1239,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(p *channeldb.WaitingProof) error { number++ return nil @@ -1406,7 +1409,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1461,7 +1464,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1624,7 +1627,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1640,7 +1643,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Shut down gossiper, and restart. This should trigger a new attempt // to send the message to the peer. ctx.gossiper.Stop() - gossiper, err := New(Config{ + gossiper := New(Config{ Notifier: ctx.gossiper.cfg.Notifier, Broadcast: ctx.gossiper.cfg.Broadcast, SendToPeer: func(target *btcec.PublicKey, @@ -1651,12 +1654,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan }, - Router: ctx.gossiper.cfg.Router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - DB: ctx.gossiper.cfg.DB, - MessageStore: ctx.gossiper.cfg.MessageStore, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, + MessageStore: ctx.gossiper.cfg.MessageStore, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) @@ -1723,7 +1726,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1912,7 +1915,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2494,7 +2497,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2523,7 +2526,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil diff --git a/server.go b/server.go index 13210c6e234..89b8d39caee 100644 --- a/server.go +++ b/server.go @@ -587,8 +587,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, if err != nil { return nil, err } + waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB) + if err != nil { + return nil, err + } - s.authGossiper, err = discovery.New(discovery.Config{ + s.authGossiper = discovery.New(discovery.Config{ Router: s.chanRouter, Notifier: s.cc.chainNotifier, ChainHash: *activeNetParams.GenesisHash, @@ -598,19 +602,16 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { return s.FindPeer(pub) }, - NotifyWhenOnline: s.NotifyWhenOnline, - ProofMatureDelta: 0, - TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), - RetransmitDelay: time.Minute * 30, - DB: chanDB, - MessageStore: gossipMessageStore, - AnnSigner: s.nodeSigner, + NotifyWhenOnline: s.NotifyWhenOnline, + ProofMatureDelta: 0, + TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), + RetransmitDelay: time.Minute * 30, + WaitingProofStore: waitingProofStore, + MessageStore: gossipMessageStore, + AnnSigner: s.nodeSigner, }, s.identityPriv.PubKey(), ) - if err != nil { - return nil, err - } utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB) if err != nil { From 6e556aa897ce7077f3191577c81ab88cfbc09e17 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:41 -0800 Subject: [PATCH 5/8] discovery/gossiper_test: prevent race conditions within mockGraphSource --- discovery/gossiper_test.go | 62 ++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 0e2e51f4f48..1decc1e3af6 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -108,28 +108,36 @@ func (n *mockSigner) SignMessage(pubKey *btcec.PublicKey, } type mockGraphSource struct { - nodes []*channeldb.LightningNode - infos map[uint64]*channeldb.ChannelEdgeInfo - edges map[uint64][]*channeldb.ChannelEdgePolicy bestHeight uint32 + + mu sync.Mutex + nodes []channeldb.LightningNode + infos map[uint64]channeldb.ChannelEdgeInfo + edges map[uint64][]channeldb.ChannelEdgePolicy } func newMockRouter(height uint32) *mockGraphSource { return &mockGraphSource{ bestHeight: height, - infos: make(map[uint64]*channeldb.ChannelEdgeInfo), - edges: make(map[uint64][]*channeldb.ChannelEdgePolicy), + infos: make(map[uint64]channeldb.ChannelEdgeInfo), + edges: make(map[uint64][]channeldb.ChannelEdgePolicy), } } var _ routing.ChannelGraphSource = (*mockGraphSource)(nil) func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { - r.nodes = append(r.nodes, node) + r.mu.Lock() + defer r.mu.Unlock() + + r.nodes = append(r.nodes, *node) return nil } func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.infos[info.ChannelID]; ok { return errors.New("info already exist") } @@ -137,15 +145,15 @@ func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { // Usually, the capacity is fetched in the router from the funding txout. // Since the mockGraphSource can't access the txout, assign a default value. info.Capacity = maxBtcFundingAmount - r.infos[info.ChannelID] = info + r.infos[info.ChannelID] = *info return nil } func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error { - r.edges[edge.ChannelID] = append( - r.edges[edge.ChannelID], - edge, - ) + r.mu.Lock() + defer r.mu.Unlock() + + r.edges[edge.ChannelID] = append(r.edges[edge.ChannelID], *edge) return nil } @@ -159,11 +167,19 @@ func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) { func (r *mockGraphSource) AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error { - info, ok := r.infos[chanID.ToUint64()] + + r.mu.Lock() + defer r.mu.Unlock() + + chanIDInt := chanID.ToUint64() + info, ok := r.infos[chanIDInt] if !ok { return errors.New("channel does not exist") } + info.AuthProof = proof + r.infos[chanIDInt] = info + return nil } @@ -186,6 +202,9 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( *channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error) { + r.mu.Lock() + defer r.mu.Unlock() + chanInfo, ok := r.infos[chanID.ToUint64()] if !ok { return nil, nil, nil, channeldb.ErrEdgeNotFound @@ -193,14 +212,16 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( edges := r.edges[chanID.ToUint64()] if len(edges) == 0 { - return chanInfo, nil, nil, nil + return &chanInfo, nil, nil, nil } if len(edges) == 1 { - return chanInfo, edges[0], nil, nil + edge1 := edges[0] + return &chanInfo, &edge1, nil, nil } - return chanInfo, edges[0], edges[1], nil + edge1, edge2 := edges[0], edges[1] + return &chanInfo, &edge1, &edge2, nil } func (r *mockGraphSource) FetchLightningNode( @@ -208,7 +229,7 @@ func (r *mockGraphSource) FetchLightningNode( for _, node := range r.nodes { if bytes.Equal(nodePub[:], node.PubKeyBytes[:]) { - return node, nil + return &node, nil } } @@ -218,6 +239,9 @@ func (r *mockGraphSource) FetchLightningNode( // IsStaleNode returns true if the graph source has a node announcement for the // target node with a more recent timestamp. func (r *mockGraphSource) IsStaleNode(nodePub routing.Vertex, timestamp time.Time) bool { + r.mu.Lock() + defer r.mu.Unlock() + for _, node := range r.nodes { if node.PubKeyBytes == nodePub { return node.LastUpdate.After(timestamp) || @@ -258,6 +282,9 @@ func (r *mockGraphSource) IsPublicNode(node routing.Vertex) (bool, error) { // IsKnownEdge returns true if the graph source already knows of the passed // channel ID. func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { + r.mu.Lock() + defer r.mu.Unlock() + _, ok := r.infos[chanID.ToUint64()] return ok } @@ -267,6 +294,9 @@ func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool { + r.mu.Lock() + defer r.mu.Unlock() + edges, ok := r.edges[chanID.ToUint64()] if !ok { return false From 2f679f60152a2f85c0c2b32d42d4aeefb273dea2 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:49 -0800 Subject: [PATCH 6/8] discovery/reliable_sender: implement message-agnostic reliable sender In this commit, we implement a new subsystem for the gossiper that uses some of the existing logic for resending channel announcement signatures and implements it in a way to make it message-agnostic, meaning that any type of message can be resent. Along the way we also modify the way this works to prevent multiple goroutines per peer _and_ message. A peerHandler will be spawned for each peer for which we attempt to send a message reliably to. This handler is responsible for managing requests to reliably send messages to a peer while also taking the peer's connection lifecycle into account by requesting notifications for when the peer connects/disconnects. A peer connection notification is first requested to determine when we should attempt to send any pending messages. After the messages are sent, a peer disconnection notification is requested to ensure we don't continue to request connection notifications while the peer remains connected. Once there are no more pending messages left to be sent for a given peer, the peerHandler can be torn down. --- discovery/reliable_sender.go | 316 ++++++++++++++++++++++++++++++ discovery/reliable_sender_test.go | 312 +++++++++++++++++++++++++++++ 2 files changed, 628 insertions(+) create mode 100644 discovery/reliable_sender.go create mode 100644 discovery/reliable_sender_test.go diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go new file mode 100644 index 00000000000..c336b01de48 --- /dev/null +++ b/discovery/reliable_sender.go @@ -0,0 +1,316 @@ +package discovery + +import ( + "sync" + + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// reliableSenderCfg contains all of necessary items for the reliableSender to +// carry out its duties. +type reliableSenderCfg struct { + // NotifyWhenOnline is a function that allows the gossiper to be + // notified when a certain peer comes online, allowing it to + // retry sending a peer message. + // + // NOTE: The peerChan channel must be buffered. + // + // TODO(wilmer): use [33]byte to avoid unnecessary serializations. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + + // NotifyWhenOffline is a function that allows the gossiper to be + // notified when a certain peer disconnects, allowing it to request a + // notification for when it reconnects. + NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} + + // MessageStore is a persistent storage of gossip messages which we will + // use to determine which messages need to be resent for a given peer. + MessageStore GossipMessageStore + + // IsMsgStale determines whether a message retrieved from the backing + // MessageStore is seen as stale by the current graph. + IsMsgStale func(lnwire.Message) bool +} + +// peerManager contains the set of channels required for the peerHandler to +// properly carry out its duties. +type peerManager struct { + // msgs is the channel through which messages will be streamed to the + // handler in order to send the message to the peer while they're + // online. + msgs chan lnwire.Message + + // done is a channel that will be closed to signal that the handler for + // the given peer has been torn down for whatever reason. + done chan struct{} +} + +// reliableSender is a small subsystem of the gossiper used to reliably send +// gossip messages to peers. +type reliableSender struct { + start sync.Once + stop sync.Once + + cfg reliableSenderCfg + + // activePeers keeps track of whether a peerHandler exists for a given + // peer. A peerHandler is tasked with handling requests for messages + // that should be reliably sent to peers while also taking into account + // the peer's connection lifecycle. + activePeers map[[33]byte]peerManager + activePeersMtx sync.Mutex + + wg sync.WaitGroup + quit chan struct{} +} + +// newReliableSender returns a new reliableSender backed by the given config. +func newReliableSender(cfg *reliableSenderCfg) *reliableSender { + return &reliableSender{ + cfg: *cfg, + activePeers: make(map[[33]byte]peerManager), + quit: make(chan struct{}), + } +} + +// Start spawns message handlers for any peers with pending messages. +func (s *reliableSender) Start() error { + var err error + s.start.Do(func() { + err = s.resendPendingMsgs() + }) + return err +} + +// Stop halts the reliable sender from sending messages to peers. +func (s *reliableSender) Stop() { + s.stop.Do(func() { + close(s.quit) + s.wg.Wait() + }) +} + +// sendMessage constructs a request to send a message reliably to a peer. In the +// event that the peer is currently offline, this will only write the message to +// disk. Once the peer reconnects, this message, along with any others pending, +// will be sent to the peer. +func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error { + // We'll start by persisting the message to disk. This allows us to + // resend the message upon restarts and peer reconnections. + if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil { + return err + } + + // Then, we'll spawn a peerHandler for this peer to handle resending its + // pending messages while taking into account its connection lifecycle. +spawnHandler: + msgHandler, ok := s.spawnPeerHandler(peerPubKey) + + // If the handler wasn't previously active, we can exit now as we know + // that the message will be sent once the peer online notification is + // received. This prevents us from potentially sending the message + // twice. + if !ok { + return nil + } + + // Otherwise, we'll attempt to stream the message to the handler. + // There's a subtle race condition where the handler can be torn down + // due to all of the messages sent being stale, so we'll handle this + // gracefully by spawning another one to prevent blocking. + select { + case msgHandler.msgs <- msg: + case <-msgHandler.done: + goto spawnHandler + case <-s.quit: + return ErrGossiperShuttingDown + } + + return nil +} + +// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't +// one already active. The boolean returned signals whether there was already +// one active or not. +func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) { + s.activePeersMtx.Lock() + defer s.activePeersMtx.Unlock() + + msgHandler, ok := s.activePeers[peerPubKey] + if !ok { + msgHandler = peerManager{ + msgs: make(chan lnwire.Message), + done: make(chan struct{}), + } + s.activePeers[peerPubKey] = msgHandler + + s.wg.Add(1) + go s.peerHandler(msgHandler, peerPubKey) + } + + return msgHandler, ok +} + +// peerHandler is responsible for handling our reliable message send requests +// for a given peer while also taking into account the peer's connection +// lifecycle. Any messages that are attempted to be sent while the peer is +// offline will be queued and sent once the peer reconnects. +// +// NOTE: This must be run as a goroutine. +func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { + defer s.wg.Done() + + // We'll start by requesting a notification for when the peer + // reconnects. + pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256()) + peerChan := make(chan lnpeer.Peer, 1) + +waitUntilOnline: + log.Debugf("Requesting online notification for peer=%x", peerPubKey) + + s.cfg.NotifyWhenOnline(pubKey, peerChan) + + var peer lnpeer.Peer +out: + for { + select { + // While we're waiting, we'll also consume any messages that + // must be sent to prevent blocking the caller. These can be + // ignored for now since the peer is currently offline. Once + // they reconnect, the messages will be sent since they should + // have been persisted to disk. + case <-peerMgr.msgs: + case peer = <-peerChan: + break out + case <-s.quit: + return + } + } + + log.Debugf("Peer=%x is now online, proceeding to send pending messages", + peerPubKey) + + // Once we detect the peer has reconnected, we'll also request a + // notification for when they disconnect. We'll use this to make sure + // they haven't disconnected (in the case of a flappy peer, etc.) by the + // time we attempt to send them the pending messages. + log.Debugf("Requesting offline notification for peer=%x", peerPubKey) + + offlineChan := s.cfg.NotifyWhenOffline(peerPubKey) + + pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey) + if err != nil { + log.Errorf("Unable to retrieve pending messages for peer %x: %v", + peerPubKey, err) + return + } + + // With the peer online, we can now proceed to send our pending messages + // for them. + for _, msg := range pendingMsgs { + // Retrieve the short channel ID for which this message applies + // for logging purposes. The error can be ignored as the store + // can only contain messages which have a ShortChannelID field. + shortChanID, _ := msgShortChanID(msg) + + if err := peer.SendMessage(false, msg); err != nil { + log.Errorf("Unable to send %v message for channel=%v "+ + "to %x: %v", msg.MsgType(), shortChanID, + peerPubKey, err) + goto waitUntilOnline + } + + log.Debugf("Successfully sent %v message for channel=%v with "+ + "peer=%x upon reconnection", msg.MsgType(), shortChanID, + peerPubKey) + + // Now that the message has at least been sent once, we can + // check whether it's stale. This guarantees that + // AnnounceSignatures are sent at least once if we happen to + // already have signatures for both parties. + if s.cfg.IsMsgStale(msg) { + err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey) + if err != nil { + log.Errorf("Unable to remove stale %v message "+ + "for channel=%v with peer %x: %v", + msg.MsgType(), shortChanID, peerPubKey, + err) + continue + } + + log.Debugf("Removed stale %v message for channel=%v "+ + "with peer=%x", msg.MsgType(), shortChanID, + peerPubKey) + } + } + + // If all of our messages were stale, then there's no need for this + // handler to continue running, so we can exit now. + pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey) + if err != nil { + log.Errorf("Unable to retrieve pending messages for peer %x: %v", + peerPubKey, err) + return + } + + if len(pendingMsgs) == 0 { + log.Debugf("No pending messages left for peer=%x", peerPubKey) + + s.activePeersMtx.Lock() + delete(s.activePeers, peerPubKey) + s.activePeersMtx.Unlock() + + close(peerMgr.done) + + return + } + + // Once the pending messages are sent, we can continue to send any + // future messages while the peer remains connected. + for { + select { + case msg := <-peerMgr.msgs: + // Retrieve the short channel ID for which this message + // applies for logging purposes. The error can be + // ignored as the store can only contain messages which + // have a ShortChannelID field. + shortChanID, _ := msgShortChanID(msg) + + if err := peer.SendMessage(false, msg); err != nil { + log.Errorf("Unable to send %v message for "+ + "channel=%v to %x: %v", msg.MsgType(), + shortChanID, peerPubKey, err) + } + + log.Debugf("Successfully sent %v message for "+ + "channel=%v with peer=%x", msg.MsgType(), + shortChanID, peerPubKey) + + case <-offlineChan: + goto waitUntilOnline + + case <-s.quit: + return + } + } +} + +// resendPendingMsgs retrieves and sends all of the messages within the message +// store that should be reliably sent to their respective peers. +func (s *reliableSender) resendPendingMsgs() error { + // Fetch all of the peers for which we have pending messages for and + // spawn a peerMsgHandler for each. Once the peer is seen as online, all + // of the pending messages will be sent. + peers, err := s.cfg.MessageStore.Peers() + if err != nil { + return err + } + + for peer := range peers { + s.spawnPeerHandler(peer) + } + + return nil +} diff --git a/discovery/reliable_sender_test.go b/discovery/reliable_sender_test.go new file mode 100644 index 00000000000..8633b7d401e --- /dev/null +++ b/discovery/reliable_sender_test.go @@ -0,0 +1,312 @@ +package discovery + +import ( + "fmt" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// newTestReliableSender creates a new reliable sender instance used for +// testing. +func newTestReliableSender(t *testing.T) *reliableSender { + t.Helper() + + cfg := &reliableSenderCfg{ + NotifyWhenOnline: func(pubKey *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + peerChan <- &mockPeer{pk: pubKey} + }, + NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}, 1) + return c + }, + MessageStore: newMockMessageStore(), + IsMsgStale: func(lnwire.Message) bool { + return false + }, + } + + return newReliableSender(cfg) +} + +// assertMsgsSent ensures that the given messages can be read from a mock peer's +// msgChan. +func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message, + msgs ...lnwire.Message) { + + t.Helper() + + m := make(map[lnwire.Message]struct{}, len(msgs)) + for _, msg := range msgs { + m[msg] = struct{}{} + } + + for i := 0; i < len(msgs); i++ { + select { + case msg := <-msgChan: + if _, ok := m[msg]; !ok { + t.Fatalf("found unexpected message sent: %v", + spew.Sdump(msg)) + } + case <-time.After(time.Second): + t.Fatal("reliable sender did not send message to peer") + } + } +} + +// waitPredicate is a helper test function that will wait for a timeout period +// of time until the passed predicate returns true. +func waitPredicate(t *testing.T, timeout time.Duration, pred func() bool) { + t.Helper() + + const pollInterval = 20 * time.Millisecond + exitTimer := time.After(timeout) + + for { + <-time.After(pollInterval) + + select { + case <-exitTimer: + t.Fatalf("predicate not satisfied after timeout") + default: + } + + if pred() { + return + } + } +} + +// TestReliableSenderFlow ensures that the flow for sending messages reliably to +// a peer while taking into account its connection lifecycle works as expected. +func TestReliableSenderFlow(t *testing.T) { + t.Parallel() + + reliableSender := newTestReliableSender(t) + + // Create a mock peer to send the messages to. + pubKey := randPubKey(t) + msgsSent := make(chan lnwire.Message) + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + + // Override NotifyWhenOnline and NotifyWhenOffline to provide the + // notification channels so that we can control when notifications get + // dispatched. + notifyOnline := make(chan chan<- lnpeer.Peer, 2) + notifyOffline := make(chan chan struct{}, 1) + + reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + notifyOnline <- peerChan + } + reliableSender.cfg.NotifyWhenOffline = func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}, 1) + notifyOffline <- c + return c + } + + // We'll start by creating our first message which we should reliably + // send to our peer. + msg1 := randChannelUpdate() + var peerPubKey [33]byte + copy(peerPubKey[:], pubKey.SerializeCompressed()) + if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Since there isn't a peerHandler for this peer currently active due to + // this being the first message being sent reliably, we should expect to + // see a notification request for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // We'll then attempt to send another additional message reliably. + msg2 := randAnnounceSignatures() + if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // This should not however request another peer online notification as + // the peerHandler has already been started and is waiting for the + // notification to be dispatched. + select { + case <-notifyOnline: + t.Fatal("reliable sender should not request online notification") + case <-time.After(time.Second): + } + + // We'll go ahead and notify the peer. + peerChan <- peer + + // By doing so, we should expect to see a notification request for when + // the peer is offline. + var offlineChan chan struct{} + select { + case offlineChan = <-notifyOffline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request offline notification") + } + + // We should also see the messages arrive at the peer since they are now + // seen as online. + assertMsgsSent(t, peer.sentMsgs, msg1, msg2) + + // Then, we'll send one more message reliably. + msg3 := randChannelUpdate() + if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Again, this should not request another peer online notification + // request since we are currently waiting for the peer to be offline. + select { + case <-notifyOnline: + t.Fatal("reliable sender should not request online notification") + case <-time.After(time.Second): + } + + // The expected message should be sent to the peer. + assertMsgsSent(t, peer.sentMsgs, msg3) + + // We'll then notify that the peer is offline. + close(offlineChan) + + // This should cause an online notification to be requested. + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // Once we dispatch it, we should expect to see the messages be resent + // to the peer as they are not stale. + peerChan <- peer + + select { + case <-notifyOffline: + case <-time.After(5 * time.Second): + t.Fatal("reliable sender did not request offline notification") + } + + assertMsgsSent(t, peer.sentMsgs, msg1, msg2, msg3) +} + +// TestReliableSenderStaleMessages ensures that the reliable sender is no longer +// active for a peer which has successfully sent all of its messages and deemed +// them as stale. +func TestReliableSenderStaleMessages(t *testing.T) { + t.Parallel() + + reliableSender := newTestReliableSender(t) + + // Create a mock peer to send the messages to. + pubKey := randPubKey(t) + msgsSent := make(chan lnwire.Message) + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + + // Override NotifyWhenOnline to provide the notification channel so that + // we can control when notifications get dispatched. + notifyOnline := make(chan chan<- lnpeer.Peer, 1) + reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + notifyOnline <- peerChan + } + + // We'll also override IsMsgStale to mark all messages as stale as we're + // interested in testing the stale message behavior. + reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + return true + } + + // We'll start by creating our first message which we should reliably + // send to our peer, but will be seen as stale. + msg1 := randAnnounceSignatures() + var peerPubKey [33]byte + copy(peerPubKey[:], pubKey.SerializeCompressed()) + if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Since there isn't a peerHandler for this peer currently active due to + // this being the first message being sent reliably, we should expect to + // see a notification request for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // We'll go ahead and notify the peer. + peerChan <- peer + + // This should cause the message to be sent to the peer since they are + // now seen as online. The message will be sent at least once to ensure + // they can propagate before deciding whether they are stale or not. + assertMsgsSent(t, peer.sentMsgs, msg1) + + // We'll create another message which we'll send reliably. This one + // won't be seen as stale. + msg2 := randChannelUpdate() + + // We'll then wait for the message to be removed from the backing + // message store since it is seen as stale and has been sent at least + // once. Once the message is removed, the peerHandler should be torn + // down as there are no longer any pending messages within the store. + var predErr error + waitPredicate(t, time.Second, func() bool { + msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer( + peerPubKey, + ) + if err != nil { + predErr = fmt.Errorf("unable to retrieve messages for "+ + "peer: %v", err) + return false + } + if len(msgs) != 0 { + predErr = fmt.Errorf("expected to not find any "+ + "messages for peer, found %d", len(msgs)) + return false + } + + predErr = nil + return true + }) + if predErr != nil { + t.Fatal(predErr) + } + + // Override IsMsgStale to no longer mark messages as stale. + reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + return false + } + + // We'll request the message to be sent reliably. + if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // We should see an online notification request indicating that a new + // peerHandler has been spawned since it was previously torn down. + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // Finally, notifying the peer is online should prompt the message to be + // sent. Only the ChannelUpdate will be sent in this case since the + // AnnounceSignatures message above was seen as stale. + peerChan <- peer + + assertMsgsSent(t, peer.sentMsgs, msg2) +} From 4996d49118fa5f6a4e99313fb1d32180be54807c Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:56 -0800 Subject: [PATCH 7/8] server+discovery: use reliableSender to replace existing resend logic --- discovery/gossiper.go | 227 +++++++------------ discovery/gossiper_test.go | 432 ++++++++----------------------------- server.go | 2 +- 3 files changed, 171 insertions(+), 490 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index d11f5299098..90c2cb0e7a4 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -95,10 +95,6 @@ type Config struct { Broadcast func(skips map[routing.Vertex]struct{}, msg ...lnwire.Message) error - // SendToPeer is a function which allows the service to send a set of - // messages to a particular peer identified by the target public key. - SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error - // FindPeer returns the actively registered peer for a given remote // public key. An error is returned if the peer was not found or a // shutdown has been requested. @@ -109,8 +105,15 @@ type Config struct { // retry sending a peer message. // // NOTE: The peerChan channel must be buffered. + // + // TODO(wilmer): use [33]byte to avoid unnecessary serializations. NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + // NotifyWhenOffline is a function that allows the gossiper to be + // notified when a certain peer disconnects, allowing it to request a + // notification for when it reconnects. + NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} + // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 @@ -222,13 +225,17 @@ type AuthenticatedGossiper struct { syncerMtx sync.RWMutex peerSyncers map[routing.Vertex]*gossipSyncer + // reliableSender is a subsystem responsible for handling reliable + // message send requests to peers. + reliableSender *reliableSender + sync.Mutex } // New creates a new AuthenticatedGossiper instance, initialized with the // passed configuration parameters. func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { - return &AuthenticatedGossiper{ + gossiper := &AuthenticatedGossiper{ selfKey: selfKey, cfg: &cfg, networkMsgs: make(chan *networkMsg), @@ -240,6 +247,15 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { recentRejects: make(map[uint64]struct{}), peerSyncers: make(map[routing.Vertex]*gossipSyncer), } + + gossiper.reliableSender = newReliableSender(&reliableSenderCfg{ + NotifyWhenOnline: cfg.NotifyWhenOnline, + NotifyWhenOffline: cfg.NotifyWhenOffline, + MessageStore: cfg.MessageStore, + IsMsgStale: gossiper.isMsgStale, + }) + + return gossiper } // SynchronizeNode sends a message to the service indicating it should @@ -398,11 +414,10 @@ func (d *AuthenticatedGossiper) Start() error { } d.bestHeight = height - // In case we had an AnnounceSignatures ready to be sent when the - // gossiper was last shut down, we must continue on our quest to - // deliver this message to our peer such that they can craft the - // full channel proof. - if err := d.resendAnnounceSignatures(); err != nil { + // Start the reliable sender. In case we had any pending messages ready + // to be sent when the gossiper was last shut down, we must continue on + // our quest to deliver them to their respective peers. + if err := d.reliableSender.Start(); err != nil { return err } @@ -430,6 +445,10 @@ func (d *AuthenticatedGossiper) Stop() { close(d.quit) d.wg.Wait() + + // We'll stop our reliable sender after all of the gossiper's goroutines + // have exited to ensure nothing can cause it to continue executing. + d.reliableSender.Stop() } // TODO(roasbeef): need method to get current gossip timestamp? @@ -795,81 +814,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { return msgs } -// resendAnnounceSignatures will inspect the messageStore database bucket for -// AnnounceSignatures messages that we recently tried to send to a peer. If the -// associated channels still not have the full channel proofs assembled, we -// will try to resend them. If we have the full proof, we can safely delete the -// message from the messageStore. -func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { - peerMsgsToResend, err := d.cfg.MessageStore.Messages() - if err != nil { - return err - } - - // We now iterate over these messages, resending those that we don't - // have the full proof for, deleting the rest. - for peer, msgsToResend := range peerMsgsToResend { - pubKey, err := btcec.ParsePubKey(peer[:], btcec.S256()) - if err != nil { - return err - } - - for _, msg := range msgsToResend { - msg := msg.(*lnwire.AnnounceSignatures) - - // Check if the full channel proof exists in our graph. - chanInfo, _, _, err := d.cfg.Router.GetChannelByID( - msg.ShortChannelID) - if err != nil { - // If the channel cannot be found, it is most likely a - // leftover message for a channel that was closed. In - // this case we delete it from the message store. - log.Warnf("unable to fetch channel info for "+ - "chanID=%v from graph: %v. Will delete local"+ - "proof from database", - msg.ChannelID, err) - err = d.cfg.MessageStore.DeleteMessage(msg, peer) - if err != nil { - return err - } - continue - } - - // 1. If the full proof does not exist in the graph, it means - // that we haven't received the remote proof yet (or that we - // crashed before able to assemble the full proof). Since the - // remote node might think they have delivered their proof to - // us, we will resend _our_ proof to trigger a resend on their - // part: they will then be able to assemble and send us the - // full proof. - if chanInfo.AuthProof == nil { - err := d.sendAnnSigReliably(msg, pubKey) - if err != nil { - return err - } - continue - } - - // 2. If the proof does exist in the graph, we have - // successfully received the remote proof and assembled the - // full proof. In this case we can safely delete the local - // proof from the database. In case the remote hasn't been able - // to assemble the full proof yet (maybe because of a crash), - // we will send them the full proof if we notice that they - // retry sending their half proof. - if chanInfo.AuthProof != nil { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", msg.ChannelID) - err := d.cfg.MessageStore.DeleteMessage(msg, peer) - if err != nil { - return err - } - } - } - } - return nil -} - // findGossipSyncer is a utility method used by the gossiper to locate the // gossip syncer for an inbound message so we can properly dispatch the // incoming message. If a gossip syncer isn't found, then one will be created @@ -2113,21 +2057,21 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so they can also reconstruct the full channel // announcement. if !nMsg.isRemote { - var remotePeer *btcec.PublicKey + var remotePubKey [33]byte if isFirstNode { - remotePeer, _ = chanInfo.NodeKey2() + remotePubKey = chanInfo.NodeKey2Bytes } else { - remotePeer, _ = chanInfo.NodeKey1() + remotePubKey = chanInfo.NodeKey1Bytes } // Since the remote peer might not be online // we'll call a method that will attempt to // deliver the proof when it comes online. - err := d.sendAnnSigReliably(msg, remotePeer) + err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { - err := fmt.Errorf("unable to send reliably "+ - "to remote for short_chan_id=%v: %v", - shortChanID, err) - log.Error(err) + err := fmt.Errorf("unable to reliably send %v "+ + "for channel=%v to peer=%x: %v", + msg.MsgType(), msg.ShortChannelID, + remotePubKey, err) nMsg.err <- err return nil } @@ -2359,70 +2303,49 @@ func (d *AuthenticatedGossiper) fetchNodeAnn( return node.NodeAnnouncement(true) } -// sendAnnSigReliably will try to send the provided local AnnounceSignatures -// to the remote peer, waiting for it to come online if necessary. This -// method returns after adding the message to persistent storage, such -// that the caller knows that the message will be delivered at one point. -func (d *AuthenticatedGossiper) sendAnnSigReliably( - msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { - - // We first add this message to the database, such that in case - // we do not succeed in sending it to the peer, we'll fetch it - // from the DB next time we start, and retry. - var remotePubKey [33]byte - copy(remotePubKey[:], remotePeer.SerializeCompressed()) - if err := d.cfg.MessageStore.AddMessage(msg, remotePubKey); err != nil { - return err - } - - // We have succeeded adding the message to the database. We now launch - // a goroutine that will keep on trying sending the message to the - // remote peer until it succeeds, or the gossiper shuts down. In case - // of success, the message will be removed from the database. - d.wg.Add(1) - go func() { - defer d.wg.Done() - for { - log.Debugf("Sending AnnounceSignatures for channel "+ - "%v to remote peer %x", msg.ChannelID, - remotePeer.SerializeCompressed()) - err := d.cfg.SendToPeer(remotePeer, msg) - if err == nil { - // Sending succeeded, we can - // continue the flow. - break - } - - log.Errorf("unable to send AnnounceSignatures message "+ - "to peer(%x): %v. Will retry when online.", - remotePeer.SerializeCompressed(), err) +// isMsgStale determines whether a message retrieved from the backing +// MessageStore is seen as stale by the current graph. +func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID, + ) - peerChan := make(chan lnpeer.Peer, 1) - d.cfg.NotifyWhenOnline(remotePeer, peerChan) + // If the channel cannot be found, it is most likely a leftover + // message for a channel that was closed, so we can consider it + // stale. + if err == channeldb.ErrEdgeNotFound { + return true + } + if err != nil { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", err) + return false + } - select { - case <-peerChan: - // Retry sending. - log.Infof("Peer %x reconnected. Retry sending"+ - " AnnounceSignatures.", - remotePeer.SerializeCompressed()) + // If the proof exists in the graph, then we have successfully + // received the remote proof and assembled the full proof, so we + // can safely delete the local proof from the database. + return chanInfo.AuthProof != nil - case <-d.quit: - log.Infof("Gossiper shutting down, did not " + - "send AnnounceSignatures.") - return - } + case *lnwire.ChannelUpdate: + // The MessageStore will always store the latest ChannelUpdate + // as it is not aware of its timestamp (by design), so it will + // never be stale. We should still however check if the channel + // is part of our graph. If it's not, we can mark it as stale. + _, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + if err != nil && err != channeldb.ErrEdgeNotFound { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", err) } + return err == channeldb.ErrEdgeNotFound - log.Infof("Sent channel announcement proof to remote peer: %x", - remotePeer.SerializeCompressed()) - }() - - // This method returns after the message has been added to the database, - // such that the caller don't have to wait until the message is actually - // delivered, but can be assured that it will be delivered eventually - // when this method returns. - return nil + default: + // We'll make sure to not mark any unsupported messages as stale + // to ensure they are not removed. + return false + } } // updateChannel creates a new fully signed update for the channel, and updates diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 1decc1e3af6..043007bbf8a 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -644,12 +644,17 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil }, - SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error { - return nil - }, FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, nil, nil}, nil }, + NotifyWhenOnline: func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + peerChan <- &mockPeer{target, nil, nil} + }, + NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}) + return c + }, Router: router, TrickleDelay: trickleDelay, RetransmitDelay: retransmitDelay, @@ -880,21 +885,19 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { select { - case sentMsgs <- msg[0]: + case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") } - return nil } batch, err := createAnnouncements(0) @@ -1084,19 +1087,19 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly from the gossiper. + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + select { - case sentMsgs <- msg[0]: + case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") } - return nil } batch, err := createAnnouncements(0) @@ -1251,9 +1254,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // The local proof should be sent to the remote peer. select { case msg := <-sentMsgs: - if msg != batch.localProofAnn { - t.Fatalf("expected local proof to be sent, got %v", msg) - } + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatalf("local proof was not sent to peer") } @@ -1283,9 +1284,11 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } } -// Test that sending AnnounceSignatures to remote peer will continue -// to be tried until the peer comes online. -func TestSignatureAnnouncementRetry(t *testing.T) { +// TestSignatureAnnouncementRetryAtStartup tests that if we restart the +// gossiper, it will retry sending the AnnounceSignatures to the peer if it did +// not succeed before shutting down, and the full channel proof is not yet +// assembled. +func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Parallel() ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) @@ -1307,233 +1310,10 @@ func TestSignatureAnnouncementRetry(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} - // Recreate lightning network topology. Initialize router with channel - // between two nodes. - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.localChanAnn, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process channel ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.chanUpdAnn1, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process channel update: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.nodeAnn1, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process node ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - if err != nil { - t.Fatalf("unable to process channel update: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.nodeAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - if err != nil { - t.Fatalf("unable to process node ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - // Make the SendToPeer fail, simulating the peer being offline. - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - } - - // We expect the gossiper to register for a notification when the peer - // comes back online, so keep track of the channel it wants to get - // notified on. - notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - } - - // Pretending that we receive local channel announcement from funding - // manager, thereby kick off the announcement exchange process. - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.localProofAnn, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process local proof: %v", err) - } - - // Since sending this local announcement proof to the remote will fail, - // the gossiper should register for a notification when the remote is - // online again. - var conChan chan<- lnpeer.Peer - select { - case conChan = <-notifyPeers: - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not ask to get notified when " + - "peer is online") - } - - // Since both proofs are not yet exchanged, no message should be - // broadcasted yet. - select { - case <-ctx.broadcastedMessage: - t.Fatal("announcements were broadcast") - case <-time.After(2 * trickleDelay): - } - - number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - ); err != nil { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 1 { - t.Fatal("wrong number of objects in storage") - } - - // When the peer comes online, the gossiper gets notified, and should - // retry sending the AnnounceSignatures. We make the SendToPeer - // method work again. + // Set up a channel to intercept the messages sent to the remote peer. sentToPeer := make(chan lnwire.Message, 1) - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - sentToPeer <- msg[0] - return nil - } - - // Notify that peer is now online. This should trigger a new call - // to SendToPeer. - close(conChan) - - select { - case <-sentToPeer: - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not send message when peer came online") - } - - // Now give the gossiper the remote proof. This should trigger a - // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process remote proof: %v", err) - } - - for i := 0; i < 5; i++ { - select { - case <-ctx.broadcastedMessage: - case <-time.After(time.Second): - t.Fatal("announcement wasn't broadcast") - } - } - - number = 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - ); err != nil && err != channeldb.ErrWaitingProofNotFound { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 0 { - t.Fatal("waiting proof should be removed from storage") - } -} - -// Test that if we restart the gossiper, it will retry sending the -// AnnounceSignatures to the peer if it did not succeed before -// shutting down, and the full channel proof is not yet assembled. -func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { - t.Parallel() - - ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) - if err != nil { - t.Fatalf("can't create context: %v", err) - } - defer cleanup() - - batch, err := createAnnouncements(0) - if err != nil { - t.Fatalf("can't generate announcements: %v", err) - } - - localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1617,13 +1397,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { case <-time.After(2 * trickleDelay): } - // Make the SendToPeerFail, simulating the peer being offline. - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - } + // Since the reliable send to the remote peer of the local channel proof + // requires a notification when the peer comes online, we'll capture the + // channel through which it gets sent to control exactly when to + // dispatch it. notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1640,11 +1419,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatalf("unable to process :%v", err) } - // Since sending to the remote peer will fail, the gossiper should - // register for a notification when it comes back online. - var conChan chan<- lnpeer.Peer + // The gossiper should register for a notification for when the peer is + // online. select { - case conChan = <-notifyPeers: + case <-notifyPeers: case <-time.After(2 * time.Second): t.Fatalf("gossiper did not ask to get notified when " + "peer is online") @@ -1674,16 +1452,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // to send the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ - Notifier: ctx.gossiper.cfg.Notifier, - Broadcast: ctx.gossiper.cfg.Broadcast, - SendToPeer: func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - }, - NotifyWhenOnline: func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - }, + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, + NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, Router: ctx.gossiper.cfg.Router, TrickleDelay: trickleDelay, RetransmitDelay: retransmitDelay, @@ -1700,36 +1472,26 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { defer gossiper.Stop() ctx.gossiper = gossiper + remotePeer.quit = ctx.gossiper.quit - // After starting up, the gossiper will see that it has a waitingproof - // in the database, and will retry sending its part to the remote. Since - // SendToPeer will fail again, it should register for a notification - // when the peer comes online. + // After starting up, the gossiper will see that it has a proof in the + // WaitingProofStore, and will retry sending its part to the remote. + // It should register for a notification for when the peer is online. + var peerChan chan<- lnpeer.Peer select { - case conChan = <-notifyPeers: + case peerChan = <-notifyPeers: case <-time.After(2 * time.Second): t.Fatalf("gossiper did not ask to get notified when " + "peer is online") } - // Fix the SendToPeer method. - sentToPeer := make(chan lnwire.Message, 1) - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - select { - case sentToPeer <- msg[0]: - case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") - } - - return nil - } - // Notify that peer is now online. This should trigger a new call - // to SendToPeer. - close(conChan) + // Notify that peer is now online. This should allow the proof to be + // sent. + peerChan <- remotePeer select { - case <-sentToPeer: + case msg := <-sentToPeer: + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatalf("gossiper did not send message when peer came online") } @@ -1770,10 +1532,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } } -// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a -// remote proof is received when we already have the full proof, -// the gossiper will send the full proof (ChannelAnnouncement) to -// the remote peer. +// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a remote +// proof is received when we already have the full proof, the gossiper will send +// the full proof (ChannelAnnouncement) to the remote peer. func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Parallel() @@ -1796,7 +1557,19 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1880,27 +1653,6 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { case <-time.After(2 * trickleDelay): } - // Set up a channel we can use to inspect messages sent by the - // gossiper to the remote peer. - sentToPeer := make(chan lnwire.Message, 1) - remotePeer.sentMsgs = sentToPeer - remotePeer.quit = ctx.gossiper.quit - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - select { - case <-ctx.gossiper.quit: - return fmt.Errorf("gossiper shutting down") - case sentToPeer <- msg[0]: - } - return nil - } - - notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - } - // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { @@ -1928,9 +1680,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // We expect the gossiper to send this message to the remote peer. select { case msg := <-sentToPeer: - if msg != batch.localProofAnn { - t.Fatalf("wrong message sent to peer: %v", msg) - } + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatal("did not send local proof to peer") } @@ -2356,10 +2106,9 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } } -// TestReceiveRemoteChannelUpdateFirst tests that if we receive a -// ChannelUpdate from the remote before we have processed our -// own ChannelAnnouncement, it will be reprocessed later, after -// our ChannelAnnouncement. +// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate +// from the remote before we have processed our own ChannelAnnouncement, it will +// be reprocessed later, after our ChannelAnnouncement. func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Parallel() @@ -2369,21 +2118,6 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. - sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { - select { - case sentMsgs <- msg[0]: - case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") - } - return nil - } - batch, err := createAnnouncements(0) if err != nil { t.Fatalf("can't generate announcements: %v", err) @@ -2397,7 +2131,22 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. + sentMsgs := make(chan lnwire.Message, 10) + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + + // Override NotifyWhenOnline and FindPeer to return the remote peer + // which we expect meesages to be sent to. + ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { + return remotePeer, nil + } + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and @@ -2896,3 +2645,12 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { t.Fatalf("unable to process announcement: %v", err) } } + +func assertMessage(t *testing.T, expected, got lnwire.Message) { + t.Helper() + + if !reflect.DeepEqual(expected, got) { + t.Fatalf("expected: %v\ngot: %v", spew.Sdump(expected), + spew.Sdump(got)) + } +} diff --git a/server.go b/server.go index 89b8d39caee..32cf6fe13e8 100644 --- a/server.go +++ b/server.go @@ -598,11 +598,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ChainHash: *activeNetParams.GenesisHash, Broadcast: s.BroadcastMessage, ChanSeries: chanSeries, - SendToPeer: s.SendToPeer, FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { return s.FindPeer(pub) }, NotifyWhenOnline: s.NotifyWhenOnline, + NotifyWhenOffline: s.NotifyWhenOffline, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitDelay: time.Minute * 30, From 12168f022e61d73d82c3ca02ffb6fa5952622cf5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:19:04 -0800 Subject: [PATCH 8/8] server+discovery: send channel updates to remote peers reliably In this commit, we also allow channel updates for our channels to be sent reliably to our channel counterparty. This is especially crucial for private channels, since they're not announced, in order to ensure each party can receive funds from the other side. --- discovery/gossiper.go | 37 ++--- discovery/gossiper_test.go | 291 +++++++++++++++++++++++++++++++++---- server.go | 13 +- 3 files changed, 280 insertions(+), 61 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 90c2cb0e7a4..195773edad1 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -95,11 +95,6 @@ type Config struct { Broadcast func(skips map[routing.Vertex]struct{}, msg ...lnwire.Message) error - // FindPeer returns the actively registered peer for a given remote - // public key. An error is returned if the peer was not found or a - // shutdown has been requested. - FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error) - // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. @@ -1927,30 +1922,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. - var remotePub *btcec.PublicKey + var remotePubKey [33]byte switch { case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: - remotePub, _ = chanInfo.NodeKey2() + remotePubKey = chanInfo.NodeKey2Bytes case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: - remotePub, _ = chanInfo.NodeKey1() + remotePubKey = chanInfo.NodeKey1Bytes } - sPeer, err := d.cfg.FindPeer(remotePub) + // Now, we'll attempt to send the channel update message + // reliably to the remote peer in the background, so + // that we don't block if the peer happens to be offline + // at the moment. + err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { - log.Errorf("unable to send channel update -- "+ - "could not find peer %x: %v", - remotePub.SerializeCompressed(), - err) - } else { - // Send ChannelUpdate directly to remotePeer. - // TODO(halseth): make reliable send? - err = sPeer.SendMessage(false, msg) - if err != nil { - log.Errorf("unable to send channel "+ - "update message to peer %x: %v", - remotePub.SerializeCompressed(), - err) - } + err := fmt.Errorf("unable to reliably send %v "+ + "for channel=%v to peer=%x: %v", + msg.MsgType(), msg.ShortChannelID, + remotePubKey, err) + nMsg.err <- err + return nil } } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 043007bbf8a..560fdbb1ab7 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -644,9 +644,6 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil }, - FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, nil, nil}, nil - }, NotifyWhenOnline: func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { peerChan <- &mockPeer{target, nil, nil} @@ -888,9 +885,6 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -970,9 +964,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -1090,9 +1082,6 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -1201,9 +1190,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -1315,6 +1302,27 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { sentToPeer := make(chan lnwire.Message, 1) remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } + + // Override NotifyWhenOffline to return the channel which will notify + // the gossiper that the peer is offline. We'll use this to signal that + // the peer is offline so that the gossiper requests a notification when + // it comes back online. + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}) + notifyOffline <- c + return c + } + // Recreate lightning network topology. Initialize router with channel // between two nodes. select { @@ -1348,6 +1356,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(1 * time.Second): + t.Fatal("gossiper did not send channel update to peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -1407,6 +1421,17 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { notifyPeers <- connectedChan } + // Before sending the local channel proof, we'll notify that the peer is + // offline, so that it's not sent to the peer. + var peerOffline chan struct{} + select { + case peerOffline = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not request notification for when " + + "peer disconnects") + } + close(peerOffline) + // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { @@ -1428,12 +1453,21 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { "peer is online") } + // The proof should not be broadcast yet since we're still missing the + // remote party's. select { case <-ctx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } + // And it shouldn't be sent to the peer either as they are offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + number := 0 if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { @@ -1448,8 +1482,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatal("wrong number of objects in storage") } - // Shut down gossiper, and restart. This should trigger a new attempt - // to send the message to the peer. + // Restart the gossiper and restore its original NotifyWhenOnline and + // NotifyWhenOffline methods. This should trigger a new attempt to send + // the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ Notifier: ctx.gossiper.cfg.Notifier, @@ -1489,11 +1524,22 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // sent. peerChan <- remotePeer - select { - case msg := <-sentToPeer: - assertMessage(t, batch.localProofAnn, msg) - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not send message when peer came online") +out: + for { + select { + case msg := <-sentToPeer: + // Since the ChannelUpdate will also be resent as it is + // sent reliably, we'll need to filter it out. + if _, ok := msg.(*lnwire.AnnounceSignatures); !ok { + continue + } + + assertMessage(t, batch.localProofAnn, msg) + break out + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer " + + "came online") + } } // Now exchanging the remote channel proof, the channel announcement @@ -1604,6 +1650,12 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not send channel update to remove peer") + } select { case err = <-ctx.gossiper.ProcessLocalAnnouncement( @@ -2137,11 +2189,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { sentMsgs := make(chan lnwire.Message, 10) remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} - // Override NotifyWhenOnline and FindPeer to return the remote peer - // which we expect meesages to be sent to. - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return remotePeer, nil - } + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { @@ -2225,9 +2274,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // is announced or not (private channel). select { case msg := <-sentMsgs: - if msg != batch.chanUpdAnn1 { - t.Fatalf("expected local channel update, instead got %v", msg) - } + assertMessage(t, batch.chanUpdAnn1, msg) case <-time.After(1 * time.Second): t.Fatal("gossiper did not send channel update to peer") } @@ -2646,6 +2693,190 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } } +// TestSendChannelUpdateReliably ensures that the latest channel update for a +// channel is always sent upon the remote party reconnecting. +func TestSendChannelUpdateReliably(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context and a batch of + // announcements. + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + // We'll also create two keys, one for ourselves and another for the + // remote party. + localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", err) + } + + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Since we first wait to be notified of the peer before attempting to + // send the message, we'll overwrite NotifyWhenOnline and + // NotifyWhenOffline to instead give us access to the channel that will + // receive the notification. + notifyOnline := make(chan chan<- lnpeer.Peer, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + notifyOnline <- peerChan + } + notifyOffline := make(chan chan struct{}, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + _ [33]byte) <-chan struct{} { + + c := make(chan struct{}, 1) + notifyOffline <- c + return c + } + + // assertReceivedChannelUpdate is a helper closure we'll use to + // determine if the correct channel update was received. + assertReceivedChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) { + t.Helper() + + select { + case msg := <-sentToPeer: + assertMessage(t, batch.chanUpdAnn1, msg) + case <-time.After(2 * time.Second): + t.Fatal("did not send local channel update to peer") + } + } + + // Process the channel announcement for which we'll send a channel + // update for. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.localChanAnn, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel announcement") + } + if err != nil { + t.Fatalf("unable to process local channel announcement: %v", err) + } + + // It should not be broadcast due to not having an announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Now, we'll process the channel update. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // It should however send it to the peer directly. In order to do so, + // it'll request a notification for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // We can go ahead and notify the peer, which should trigger the message + // to be sent. + peerChan <- remotePeer + assertReceivedChannelUpdate(batch.chanUpdAnn1) + + // The gossiper should now request a notification for when the peer + // disconnects. We'll also trigger this now. + var offlineChan chan struct{} + select { + case offlineChan = <-notifyOffline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "disconnection") + } + + close(offlineChan) + + // Since it's offline, the gossiper should request another notification + // for when it comes back online. + select { + case peerChan = <-notifyOnline: + case <-time.After(2 * time.Second): + t.Fatal("gossiper did not request notification upon peer " + + "connection") + } + + // Now that the remote peer is offline, we'll send a new channel update. + prevTimestamp := batch.chanUpdAnn1.Timestamp + newChanUpdate, err := createUpdateAnnouncement( + 0, 0, nodeKeyPriv1, prevTimestamp+1, + ) + if err != nil { + t.Fatalf("unable to create new channel update: %v", err) + } + + // With the new update created, we'll go ahead and process it. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement( + batch.chanUpdAnn1, localKey, + ): + case <-time.After(2 * time.Second): + t.Fatal("did not process local channel update") + } + if err != nil { + t.Fatalf("unable to process local channel update: %v", err) + } + + // It should also not be broadcast due to the announcement not having an + // announcement proof. + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // The message should not be sent since the peer remains offline. + select { + case msg := <-sentToPeer: + t.Fatalf("received unexpected message: %v", spew.Sdump(msg)) + case <-time.After(time.Second): + } + + // Finally, we'll notify the peer is online and ensure the new channel + // update is received. + peerChan <- remotePeer + assertReceivedChannelUpdate(newChanUpdate) +} + func assertMessage(t *testing.T, expected, got lnwire.Message) { t.Helper() diff --git a/server.go b/server.go index 32cf6fe13e8..4f49f27404e 100644 --- a/server.go +++ b/server.go @@ -593,14 +593,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, } s.authGossiper = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { - return s.FindPeer(pub) - }, + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, NotifyWhenOnline: s.NotifyWhenOnline, NotifyWhenOffline: s.NotifyWhenOffline, ProofMatureDelta: 0,