Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 102 additions & 88 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ type Config struct {
// the last trickle tick.
TrickleDelay time.Duration

// RetransmitDelay is the period of a timer which indicates that we
// should check if we need to prune or re-broadcast any of our
// personal channels. This addresses the case of "zombie" channels and
// channel advertisements that have been dropped, or not properly
// propagated through the network.
RetransmitDelay time.Duration

// DB is a global boltdb instance which is needed to pass it in waiting
// proof storage to make waiting proofs persistent.
DB *channeldb.DB
Expand Down Expand Up @@ -336,11 +343,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// * can use mostly empty struct in db as place holder
var announcementBatch []lnwire.Message

// TODO(roasbeef): parametrize the above
retransmitTimer := time.NewTicker(time.Minute * 30)
retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay)
defer retransmitTimer.Stop()

// TODO(roasbeef): parametrize the above
trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
defer trickleTimer.Stop()

Expand Down Expand Up @@ -449,36 +454,50 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcementBatch = nil

// The retransmission timer has ticked which indicates that we
// should broadcast our personal channels to the network. This
// addresses the case of channel advertisements whether being
// dropped, or not properly propagated through the network.
// should check if we need to prune or re-broadcast any of our
// personal channels. This addresses the case of "zombie" channels and
// channel advertisements that have been dropped, or not properly
// propagated through the network.
case <-retransmitTimer.C:
var selfChans []lnwire.Message

// Iterate over our channels and construct the
// announcements array.
err := d.cfg.Router.ForAllOutgoingChannels(func(ei *channeldb.ChannelEdgeInfo,
p *channeldb.ChannelEdgePolicy) error {

c := &lnwire.ChannelUpdate{
Signature: p.Signature,
ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID),
ChainHash: ei.ChainHash,
Timestamp: uint32(p.LastUpdate.Unix()),
Flags: p.Flags,
TimeLockDelta: p.TimeLockDelta,
HtlcMinimumMsat: p.MinHTLC,
BaseFee: uint32(p.FeeBaseMSat),
FeeRate: uint32(p.FeeProportionalMillionths),
// Iterate over all of our channels and check if any of them fall within
// the prune interval or re-broadcast interval.
err := d.cfg.Router.ForAllOutgoingChannels(func(info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {

const pruneInterval = time.Hour * 24 * 14
const broadcastInterval = time.Hour * 24 * 13

timeElapsed := time.Since(edge.LastUpdate)

// Prune the edge if it is has not been updated for the past 2 weeks.
// Rebroadcast edge if its last update is close to the 2-week interval.
if timeElapsed >= pruneInterval {
err := d.cfg.Router.DeleteEdge(info)
if err != nil {
log.Errorf("unable to prune stale edge: %v", err)
return err
}
} else if timeElapsed >= broadcastInterval {
// Re-sign and update the channel on disk and retrieve our
// ChannelUpdate to broadcast.
chanUpdate, err := d.updateChannel(info, edge)
if err != nil {
log.Errorf("unable to update channel: %v", err)
return err
}
selfChans = append(selfChans, chanUpdate)
}
selfChans = append(selfChans, c)

return nil
})
if err != nil {
log.Errorf("unable to retrieve outgoing channels: %v", err)
log.Errorf("error while retrieving outgoing channels: %v", err)
continue
}

// If we don't have any channels to re-broadcast, then continue.
if len(selfChans) == 0 {
continue
}
Expand All @@ -487,7 +506,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
len(selfChans))

// With all the wire announcements properly crafted,
// we'll broadcast our known outgoing channel to all
// we'll broadcast our known outgoing channels to all
// our immediate peers.
if err := d.cfg.Broadcast(nil, selfChans...); err != nil {
log.Errorf("unable to re-broadcast "+
Expand Down Expand Up @@ -532,9 +551,7 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest

haveChanFilter := len(chansToUpdate) != 0

var chanUpdates []*lnwire.ChannelUpdate
chanEdges := make(map[lnwire.ShortChannelID]*channeldb.ChannelEdgePolicy)

var signedAnns []lnwire.Message
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
Expand All @@ -547,76 +564,25 @@ func (d *AuthenticatedGossiper) processFeeChanUpdate(feeUpdate *feeUpdateRequest
return nil
}

// Otherwise, add the channel update to our batch to be
// updated, as we'll be re-signing it shortly.
c := &lnwire.ChannelUpdate{
Signature: edge.Signature,
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID),
Timestamp: uint32(edge.LastUpdate.Unix()),
Flags: edge.Flags,
TimeLockDelta: edge.TimeLockDelta,
HtlcMinimumMsat: edge.MinHTLC,
BaseFee: uint32(edge.FeeBaseMSat),
FeeRate: uint32(edge.FeeProportionalMillionths),
}
chanUpdates = append(chanUpdates, c)

// We'll also add it to our edge map so we can find it easily
// later to update the state within the database.
chanEdges[c.ShortChannelID] = edge
return nil
})
if err != nil {
return nil, err
}

// With the set of channel updates we need to sign obtained, we'll not
// generate new signatures for each of them using applying the new fee
// schema before signing.
signedAnns := make([]lnwire.Message, len(chanUpdates))
for i, chanUpdate := range chanUpdates {
edge := chanEdges[chanUpdate.ShortChannelID]
now := time.Now()

// First, we'll apply the new few schema update to the channel
// update and also the backing database struct.
chanUpdate.BaseFee = uint32(feeUpdate.newSchema.BaseFee)
chanUpdate.FeeRate = feeUpdate.newSchema.FeeRate
chanUpdate.Timestamp = uint32(now.Unix())
// Apply the new fee schema to the edge.
edge.FeeBaseMSat = feeUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
feeUpdate.newSchema.FeeRate,
)
edge.LastUpdate = now

// With the update applied, we'll generate a new signature over
// a digest of the channel announcement itself.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey,
chanUpdate)
// Re-sign and update the backing ChannelGraphSource, and retrieve our
// ChannelUpdate to broadcast.
chanUpdate, err := d.updateChannel(info, edge)
if err != nil {
return nil, err
return err
}

// Next, we'll set the new signature in place, and update the
// reference in the backing slice.
edge.Signature = sig
chanUpdate.Signature = sig
signedAnns[i] = chanUpdate
signedAnns = append(signedAnns, chanUpdate)

// To ensure that our signature is valid, we'll verify it
// ourself before committing it to the slice returned.
err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate)
if err != nil {
return nil, fmt.Errorf("generated invalid channel update "+
"sig: %v", err)
}

// Finally, we'll update the fee schema for this edge on disk.
edge.Node.PubKey.Curve = nil
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
return nil, err
}
return nil
})
if err != nil {
return nil, err
}

return signedAnns, nil
Expand Down Expand Up @@ -1202,3 +1168,51 @@ func (d *AuthenticatedGossiper) synchronizeWithNode(syncReq *syncRequest) error
// single batch to the target peer.
return d.cfg.SendToPeer(targetNode, announceMessages...)
}

// updateChannel creates a new fully signed update for the channel,
// and updates the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) {

edge.LastUpdate = time.Now()
chanUpdate := &lnwire.ChannelUpdate{
Signature: edge.Signature,
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID),
Timestamp: uint32(edge.LastUpdate.Unix()),
Flags: edge.Flags,
TimeLockDelta: edge.TimeLockDelta,
HtlcMinimumMsat: edge.MinHTLC,
BaseFee: uint32(edge.FeeBaseMSat),
FeeRate: uint32(edge.FeeProportionalMillionths),
}

// With the update applied, we'll generate a new signature over
// a digest of the channel announcement itself.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate)
if err != nil {
return nil, err
}

// Next, we'll set the new signature in place, and update the
// reference in the backing slice.
edge.Signature = sig
chanUpdate.Signature = sig

// To ensure that our signature is valid, we'll verify it
// ourself before committing it to the slice returned.
err = d.validateChannelUpdateAnn(d.selfKey, chanUpdate)
if err != nil {
return nil, fmt.Errorf("generated invalid channel update "+
"sig: %v", err)
}

// Finally, we'll write the new edge policy to disk.
edge.Node.PubKey.Curve = nil
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
return nil, err
}

return chanUpdate, err
}
7 changes: 7 additions & 0 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
nodeKeyPub2 = nodeKeyPriv2.PubKey()

trickleDelay = time.Millisecond * 100
retransmitDelay = time.Hour * 1
proofMatureDelta uint32
)

Expand Down Expand Up @@ -134,6 +135,11 @@ func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error {
return nil
}

func (r *mockGraphSource) DeleteEdge(info *channeldb.ChannelEdgeInfo) error {
delete(r.infos, info.ChannelID)
return nil
}

func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error {
r.edges[edge.ChannelID] = append(
r.edges[edge.ChannelID],
Expand Down Expand Up @@ -437,6 +443,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
},
Router: router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
DB: db,
}, nodeKeyPub1)
Expand Down
10 changes: 10 additions & 0 deletions routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type ChannelGraphSource interface {
// edge/channel might be used in construction of payment path.
AddEdge(edge *channeldb.ChannelEdgeInfo) error

// DeleteEdge is used to delete an edge from the router database.
DeleteEdge(edge *channeldb.ChannelEdgeInfo) error

// AddProof updates the channel edge info with proof which is needed to
// properly announce the edge to the rest of the network.
AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error
Expand Down Expand Up @@ -1105,6 +1108,13 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
}
}

// DeleteEdge is used to delete an edge from the router database.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) DeleteEdge(edge *channeldb.ChannelEdgeInfo) error {
return r.cfg.Graph.DeleteChannelEdge(&edge.ChannelPoint)
}

// UpdateEdge is used to update edge information, without this message edge
// considered as not fully constructed.
//
Expand Down
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
SendToPeer: s.SendToPeer,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * 300,
RetransmitDelay: time.Minute * 30,
DB: chanDB,
AnnSigner: s.nodeSigner,
},
Expand Down