Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery/gossiper: reliably send channel update msg to remote peer #1595

Merged
7 changes: 7 additions & 0 deletions channeldb/db.go
Expand Up @@ -90,6 +90,13 @@ var (
number: 7,
migration: migrateOptionalChannelCloseSummaryFields,
},
{
// The DB version that changes the gossiper's message
// store keys to account for the message's type and
// ShortChannelID.
number: 8,
migration: migrateGossipMessageStoreKeys,
},
}

// Big endian is the preferred byte order, due to cursor scans over
Expand Down
2 changes: 1 addition & 1 deletion channeldb/meta_test.go
Expand Up @@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
if err == nil && shouldFail {
t.Fatal("error wasn't received on migration stage")
} else if err != nil && !shouldFail {
t.Fatal("error was received on migration stage")
t.Fatalf("error was received on migration stage: %v", err)
}

// afterMigration usually used for checking the database state and
Expand Down
73 changes: 73 additions & 0 deletions channeldb/migrations.go
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lnwire"
)

// migrateNodeAndEdgeUpdateIndex is a migration function that will update the
Expand Down Expand Up @@ -610,3 +611,75 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error {

return nil
}

var messageStoreBucket = []byte("message-store")

// migrateGossipMessageStoreKeys migrates the key format for gossip messages
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
// found in the message store to a new one that takes into consideration the of
// the message being stored.
func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error {
// We'll start by retrieving the bucket in which these messages are
// stored within. If there isn't one, there's nothing left for us to do
// so we can avoid the migration.
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return nil
}

log.Info("Migrating to the gossip message store new key format")

// Otherwise we'll proceed with the migration. We'll start by coalescing
// all the current messages within the store, which are indexed by the
// public key of the peer which they should be sent to, followed by the
// short channel ID of the channel for which the message belongs to. We
// should only expect to find channel announcement signatures as that
// was the only support message type previously.
msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures)
err := messageStore.ForEach(func(k, v []byte) error {
var msgKey [33 + 8]byte
copy(msgKey[:], k)

msg := &lnwire.AnnounceSignatures{}
if err := msg.Decode(bytes.NewReader(v), 0); err != nil {
return err
}

msgs[msgKey] = msg

return nil

})
if err != nil {
return err
}

// Then, we'll go over all of our messages, remove their previous entry,
// and add another with the new key format. Once we've done this for
// every message, we can consider the migration complete.
for oldMsgKey, msg := range msgs {
if err := messageStore.Delete(oldMsgKey[:]); err != nil {
return err
}

// Construct the new key for which we'll find this message with
// in the store. It'll be the same as the old, but we'll also
// include the message type.
var msgType [2]byte
binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType()))
newMsgKey := append(oldMsgKey[:], msgType[:]...)

// Serialize the message with its wire encoding.
var b bytes.Buffer
if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
return err
}

if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil {
return err
}
}

log.Info("Migration to the gossip message store new key format complete!")

return nil
}
96 changes: 96 additions & 0 deletions channeldb/migrations_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/lnwire"
)

// TestPaymentStatusesMigration checks that already completed payments will have
Expand Down Expand Up @@ -468,3 +469,98 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) {
false)
}
}

// TestMigrateGossipMessageStoreKeys ensures that the migration to the new
// gossip message store key format is successful/unsuccessful under various
// scenarios.
func TestMigrateGossipMessageStoreKeys(t *testing.T) {
t.Parallel()

// Construct the message which we'll use to test the migration, along
// with its old and new key formats.
shortChanID := lnwire.ShortChannelID{BlockHeight: 10}
msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID}

var oldMsgKey [33 + 8]byte
copy(oldMsgKey[:33], pubKey.SerializeCompressed())
binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64())

var newMsgKey [33 + 8 + 2]byte
copy(newMsgKey[:41], oldMsgKey[:])
binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType()))

// Before the migration, we'll create the bucket where the messages
// should live and insert them.
beforeMigration := func(db *DB) {
var b bytes.Buffer
if err := msg.Encode(&b, 0); err != nil {
t.Fatalf("unable to serialize message: %v", err)
}

err := db.Update(func(tx *bbolt.Tx) error {
messageStore, err := tx.CreateBucketIfNotExists(
messageStoreBucket,
)
if err != nil {
return err
}

return messageStore.Put(oldMsgKey[:], b.Bytes())
})
if err != nil {
t.Fatal(err)
}
}

// After the migration, we'll make sure that:
// 1. We cannot find the message under its old key.
// 2. We can find the message under its new key.
// 3. The message matches the original.
afterMigration := func(db *DB) {
meta, err := db.FetchMeta(nil)
if err != nil {
t.Fatalf("unable to fetch db version: %v", err)
}
if meta.DbVersionNumber != 1 {
t.Fatalf("migration should have succeeded but didn't")
}

var rawMsg []byte
err = db.View(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return errors.New("message store bucket not " +
"found")
}
rawMsg = messageStore.Get(oldMsgKey[:])
if rawMsg != nil {
t.Fatal("expected to not find message under " +
"old key, but did")
}
rawMsg = messageStore.Get(newMsgKey[:])
if rawMsg == nil {
return fmt.Errorf("expected to find message " +
"under new key, but didn't")
}

return nil
})
if err != nil {
t.Fatal(err)
}

gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0)
if err != nil {
t.Fatalf("unable to deserialize raw message: %v", err)
}
if !reflect.DeepEqual(msg, gotMsg) {
t.Fatalf("expected message: %v\ngot message: %v",
spew.Sdump(msg), spew.Sdump(gotMsg))
}
}

applyMigration(
t, beforeMigration, afterMigration,
migrateGossipMessageStoreKeys, false,
)
}