Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing: use correct fees by searching backwards #1321

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions autopilot/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
return d.node.ForEachChannel(d.tx, func(tx *bolt.Tx,
ei *channeldb.ChannelEdgeInfo, ep, _ *channeldb.ChannelEdgePolicy) error {

// Skip channels for which no outgoing edge policy is available.
//
// TODO(joostjager): Ideally the case where channels have a nil
// policy should be supported, as auto pilot is not looking at
// the policies. For now, it is not easily possible to get a
// reference to the other end LightningNode object without
// retrieving the policy.
if ep == nil {
return nil
}

pubkey, _ := ep.Node.PubKey()
edge := ChannelEdge{
Channel: Channel{
Expand Down
8 changes: 8 additions & 0 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ var (
number: 3,
migration: migrateInvoiceTimeSeriesOutgoingPayments,
},
{
// The version of the database where every channel
// always has two entries in the edges bucket. If
// a policy is unknown, this will be represented
// by a special byte sequence.
number: 4,
migration: migrateEdgePolicies,
},
}

// Big endian is the preferred byte order, due to cursor scans over
Expand Down
135 changes: 92 additions & 43 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,22 @@ var (
// edgeBucket is a bucket which houses all of the edge or channel
// information within the channel graph. This bucket essentially acts
// as an adjacency list, which in conjunction with a range scan, can be
// used to iterate over all the _outgoing_ edges for a particular node.
// Key in the bucket use a prefix scheme which leads with the node's
// public key and sends with the compact edge ID. For each edgeID,
// there will be two entries within the bucket, as the graph is
// directed: nodes may have different policies w.r.t to fees for their
// respective directions.
// used to iterate over all the incoming and outgoing edges for a
// particular node. Key in the bucket use a prefix scheme which leads
// with the node's public key and sends with the compact edge ID.
// For each chanID, there will be two entries within the bucket, as the
// graph is directed: nodes may have different policies w.r.t to fees
// for their respective directions.
//
// maps: pubKey || edgeID -> edge policy for node
// maps: pubKey || chanID -> channel edge policy for node
edgeBucket = []byte("graph-edge")

// unknownPolicy is represented as an empty slice. It is
// used as the value in edgeBucket for unknown channel edge policies.
// Unknown policies are still stored in the database to enable efficient
// lookup of incoming channel edges.
unknownPolicy = []byte{}

// chanStart is an array of all zero bytes which is used to perform
// range scans within the edgeBucket to obtain all of the outgoing
// edges for a particular node.
Expand Down Expand Up @@ -511,6 +517,18 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return err
}

// Mark edge policies for both sides as unknown. This is to
// enable efficient incoming channel lookup for a node.
for _, key := range []*[33]byte{&edge.NodeKey1Bytes,
&edge.NodeKey2Bytes} {

err := putChanEdgePolicyUnknown(edges, edge.ChannelID,
key[:])
if err != nil {
return err
}
}

// Finally we add it to the channel index which maps channel
// points (outpoints) to the shorter channel ID's.
var b bytes.Buffer
Expand Down Expand Up @@ -1759,12 +1777,14 @@ func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool, erro
return updateTime, exists, nil
}

// ForEachChannel iterates through all the outgoing channel edges from this
// node, executing the passed callback with each edge as its sole argument. The
// first edge policy is the outgoing edge *to* the connecting node, while the
// second is the incoming edge *from* the connecting node. If the callback
// returns an error, then the iteration is halted with the error propagated
// back up to the caller.
// ForEachChannel iterates through all channels of this node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
Expand Down Expand Up @@ -1805,44 +1825,32 @@ func (l *LightningNode) ForEachChannel(tx *bolt.Tx,
// as its prefix. This indicates that we've stepped over into
// another node's edges, so we can terminate our scan.
edgeCursor := edges.Cursor()
for nodeEdge, edgeInfo := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, edgeInfo = edgeCursor.Next() {
// If the prefix still matches, then the value is the
// raw edge information. So we can now serialize the
// edge info and fetch the outgoing node in order to
// retrieve the full channel edge.
edgeReader := bytes.NewReader(edgeInfo)
toEdgePolicy, err := deserializeChanEdgePolicy(edgeReader, nodes)
if err != nil {
return err
}
toEdgePolicy.db = l.db
toEdgePolicy.Node.db = l.db

for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() {
// If the prefix still matches, the channel id is
// returned in nodeEdge. Channel id is used to lookup
// the node at the other end of the channel and both
// edge policies.
chanID := nodeEdge[33:]
edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
if err != nil {
return err
}

// We'll also fetch the incoming edge so this
// information can be available to the caller.
incomingNode := toEdgePolicy.Node.PubKeyBytes[:]
fromEdgePolicy, err := fetchChanEdgePolicy(
edges, chanID, incomingNode, nodes,
outgoingPolicy, err := fetchChanEdgePolicy(
edges, chanID, nodePub, nodes,
)
if err != nil && err != ErrEdgeNotFound &&
err != ErrGraphNodeNotFound {

otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
if err != nil {
return err
}
if fromEdgePolicy != nil {
fromEdgePolicy.db = l.db
if fromEdgePolicy.Node != nil {
fromEdgePolicy.Node.db = l.db
}
}

incomingPolicy, err := fetchChanEdgePolicy(
edges, chanID, otherNode, nodes,
)

// Finally, we execute the callback.
err = cb(tx, &edgeInfo, toEdgePolicy, fromEdgePolicy)
err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
if err != nil {
return err
}
Expand Down Expand Up @@ -2016,6 +2024,21 @@ func (c *ChannelEdgeInfo) BitcoinKey2() (*btcec.PublicKey, error) {
return key, nil
}

// OtherNodeKeyBytes returns the node key bytes of the other end of
// the channel.
func (c *ChannelEdgeInfo) OtherNodeKeyBytes(thisNodeKey []byte) (
[]byte, error) {

switch {
case bytes.Equal(c.NodeKey1Bytes[:], thisNodeKey):
return c.NodeKey2Bytes[:], nil
case bytes.Equal(c.NodeKey2Bytes[:], thisNodeKey):
return c.NodeKey1Bytes[:], nil
default:
return nil, fmt.Errorf("Node not participating in this channel")
}
}

// ChannelAuthProof is the authentication proof (the signature portion) for a
// channel. Using the four signatures contained in the struct, and some
// auxiliary knowledge (the funding script, node identities, and outpoint) nodes
Expand Down Expand Up @@ -2871,7 +2894,11 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b

// If there was already an entry for this edge, then we'll need to
// delete the old one to ensure we don't leave around any after-images.
if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil {
// An unknown policy value does not have a update time recorded, so
// it also does not need to be removed.
if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
!bytes.Equal(edgeBytes[:], unknownPolicy) {

// In order to delete the old entry, we'll need to obtain the
// *prior* update time in order to delete it. To do this, we'll
// create an offset to slice in. Starting backwards, we'll
Expand Down Expand Up @@ -2899,6 +2926,23 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b
return edges.Put(edgeKey[:], b.Bytes()[:])
}

// putChanEdgePolicyUnknown marks the edge policy as unknown
// in the edges bucket.
func putChanEdgePolicyUnknown(edges *bolt.Bucket, channelID uint64,
from []byte) error {

var edgeKey [33 + 8]byte
copy(edgeKey[:], from)
byteOrder.PutUint64(edgeKey[33:], channelID)

if edges.Get(edgeKey[:]) != nil {
return fmt.Errorf("Cannot write unknown policy for channel %v "+
" when there is already a policy present", channelID)
}

return edges.Put(edgeKey[:], unknownPolicy)
}

func fetchChanEdgePolicy(edges *bolt.Bucket, chanID []byte,
nodePub []byte, nodes *bolt.Bucket) (*ChannelEdgePolicy, error) {

Expand All @@ -2911,6 +2955,11 @@ func fetchChanEdgePolicy(edges *bolt.Bucket, chanID []byte,
return nil, ErrEdgeNotFound
}

// No need to deserialize unknown policy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needed (discussion above)

if bytes.Equal(edgeBytes[:], unknownPolicy) {
return nil, nil
}

edgeReader := bytes.NewReader(edgeBytes)

return deserializeChanEdgePolicy(edgeReader, nodes)
Expand All @@ -2930,7 +2979,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket,
// something other than edge non-existence.
node1Pub := edgeInfo[:33]
edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub, nodes)
if err != nil && err != ErrEdgeNotFound {
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. rationale for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For chanID we now always expect to find two policies (that may be unknown). Previously, ErrEdgeNotFound was happy flow.

return nil, nil, err
}

Expand All @@ -2945,7 +2994,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket,
// half of the edge information.
node2Pub := edgeInfo[33:67]
edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub, nodes)
if err != nil && err != ErrEdgeNotFound {
if err != nil {
return nil, nil, err
}

Expand Down
Loading