Skip to content

Commit

Permalink
multi: further decouple graph, remove cache for graph
Browse files Browse the repository at this point in the history
To further separate the channel graph from the channel state, we
refactor the AddrsForNode method to use the graphs's public methods
instead of directly accessing any buckets. This makes sure that we can
have the channel state cached with just its buckets while not using a
kvdb level cache for the graph.
At the same time we refactor the graph's test to also be less dependent
upon the channel state DB.
  • Loading branch information
guggero committed Aug 26, 2021
1 parent 2dc339d commit fbf52b6
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 214 deletions.
2 changes: 1 addition & 1 deletion autopilot/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
return nil, err
}

dbNode, err := d.db.FetchLightningNode(nil, vertex)
dbNode, err := d.db.FetchLightningNode(vertex)
switch {
case err == channeldb.ErrGraphNodeNotFound:
fallthrough
Expand Down
79 changes: 19 additions & 60 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)

const (
Expand Down Expand Up @@ -275,22 +276,6 @@ func resetChanStateCache(cache *kvdb.Cache) error {
cache.SkipBucket(fwdPackagesKey)
cache.SkipBucket(revocationLogBucket)

if err := cache.AddTopLevelBucket(nodeBucket); err != nil {
return err
}

if err := cache.AddTopLevelBucket(edgeBucket); err != nil {
return err
}

if err := cache.AddTopLevelBucket(edgeIndexBucket); err != nil {
return err
}

if err := cache.AddTopLevelBucket(graphMetaBucket); err != nil {
return err
}

// Add other buckets holding channel state to the channeldb cache.
if err := cache.AddTopLevelBucket(openChannelBucket); err != nil {
return err
Expand Down Expand Up @@ -358,10 +343,14 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
// Set the parent pointer (only used in tests).
chanDB.ChannelStateDB.parent = chanDB

chanDB.graph = newChannelGraph(
var err error
chanDB.graph, err = NewChannelGraph(
cache, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval,
)
if err != nil {
return nil, err
}

// Synchronize the version of database and apply migrations if needed.
if err := chanDB.syncVersions(dbVersions); err != nil {
Expand All @@ -377,7 +366,7 @@ func (d *DB) Path() string {
return d.dbPath
}

var topLevelBuckets = [][]byte{
var dbTopLevelBuckets = [][]byte{
openChannelBucket,
closedChannelBucket,
forwardingLogBucket,
Expand All @@ -388,10 +377,6 @@ var topLevelBuckets = [][]byte{
paymentsIndexBucket,
peersBucket,
nodeInfoBucket,
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,
metaBucket,
closeSummaryBucket,
outpointBucket,
Expand All @@ -402,7 +387,7 @@ var topLevelBuckets = [][]byte{
// operation is fully atomic.
func (d *DB) Wipe() error {
err := kvdb.Update(d, func(tx kvdb.RwTx) error {
for _, tlb := range topLevelBuckets {
for _, tlb := range dbTopLevelBuckets {
err := tx.DeleteTopLevelBucket(tlb)
if err != nil && err != kvdb.ErrBucketNotFound {
return err
Expand Down Expand Up @@ -434,42 +419,12 @@ func initChannelDB(db kvdb.Backend) error {
return nil
}

for _, tlb := range topLevelBuckets {
for _, tlb := range dbTopLevelBuckets {
if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
return err
}
}

nodes := tx.ReadWriteBucket(nodeBucket)
_, err = nodes.CreateBucket(aliasIndexBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucket(nodeUpdateIndexBucket)
if err != nil {
return err
}

edges := tx.ReadWriteBucket(edgeBucket)
if _, err := edges.CreateBucket(edgeIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(edgeUpdateIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(channelPointBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(zombieBucket); err != nil {
return err
}

graphMeta := tx.ReadWriteBucket(graphMetaBucket)
_, err = graphMeta.CreateBucket(pruneLogBucket)
if err != nil {
return err
}

meta.DbVersionNumber = getLatestDBVersion(dbVersions)
return putMeta(meta, tx)
}, func() {})
Expand Down Expand Up @@ -1166,7 +1121,7 @@ func (c *ChannelStateDB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,

var (
linkNode *LinkNode
graphNode LightningNode
graphNode *LightningNode
)

dbErr := kvdb.View(c.db, func(tx kvdb.RTx) error {
Expand All @@ -1180,21 +1135,25 @@ func (c *ChannelStateDB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
// We'll also query the graph for this peer to see if they have
// any addresses that we don't currently have stored within the
// link node database.
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
pubKey, err := route.NewVertexFromBytes(
nodePub.SerializeCompressed(),
)
if err != nil {
return err
}
compressedPubKey := nodePub.SerializeCompressed()
graphNode, err = fetchLightningNode(nodes, compressedPubKey)
graphNode, err = c.parent.graph.FetchLightningNode(pubKey)
if err != nil && err != ErrGraphNodeNotFound {
// If the node isn't found, then that's OK, as we still
// have the link node data.
return err
} else if err == ErrGraphNodeNotFound {
graphNode = &LightningNode{}
}

return nil
}, func() {
linkNode = nil
graphNode = nil
})
if dbErr != nil {
return nil, dbErr
Expand Down
115 changes: 94 additions & 21 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,14 @@ type ChannelGraph struct {
nodeScheduler batch.Scheduler
}

// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func newChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
batchCommitInterval time.Duration) *ChannelGraph {
func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
batchCommitInterval time.Duration) (*ChannelGraph, error) {

if err := initChannelGraph(db); err != nil {
return nil, err
}

g := &ChannelGraph{
db: db,
Expand All @@ -201,7 +205,85 @@ func newChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
db, nil, batchCommitInterval,
)

return g
return g, nil
}

var graphTopLevelBuckets = [][]byte{
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,
}

// Wipe completely deletes all saved state within all used buckets within the
// database. The deletion is done in a single transaction, therefore this
// operation is fully atomic.
func (c *ChannelGraph) Wipe() error {
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
for _, tlb := range graphTopLevelBuckets {
err := tx.DeleteTopLevelBucket(tlb)
if err != nil && err != kvdb.ErrBucketNotFound {
return err
}
}
return nil
}, func() {})
if err != nil {
return err
}

return initChannelGraph(c.db)
}

// createChannelDB creates and initializes a fresh version of channeldb. In
// the case that the target path has not yet been created or doesn't yet exist,
// then the path is created. Additionally, all required top-level buckets used
// within the database are created.
func initChannelGraph(db kvdb.Backend) error {
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
for _, tlb := range graphTopLevelBuckets {
if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
return err
}
}

nodes := tx.ReadWriteBucket(nodeBucket)
_, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
if err != nil {
return err
}

edges := tx.ReadWriteBucket(edgeBucket)
_, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
_, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
if err != nil {
return err
}
_, err = edges.CreateBucketIfNotExists(channelPointBucket)
if err != nil {
return err
}
_, err = edges.CreateBucketIfNotExists(zombieBucket)
if err != nil {
return err
}

graphMeta := tx.ReadWriteBucket(graphMetaBucket)
_, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
return err
}, func() {})
if err != nil {
return fmt.Errorf("unable to create new channel graph: %v", err)
}

return nil
}

// Database returns a pointer to the underlying database.
Expand All @@ -218,7 +300,9 @@ func (c *ChannelGraph) Database() kvdb.Backend {
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
// for that particular channel edge routing policy will be passed into the
// callback.
func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo,
*ChannelEdgePolicy, *ChannelEdgePolicy) error) error {

// TODO(roasbeef): ptr map to reduce # of allocs? no duplicates

return kvdb.View(c.db, func(tx kvdb.RTx) error {
Expand Down Expand Up @@ -2356,17 +2440,11 @@ func (l *LightningNode) isPublic(tx kvdb.RTx, sourcePubKey []byte) (bool, error)
// FetchLightningNode attempts to look up a target node by its identity public
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
// returned.
//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) (
func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (
*LightningNode, error) {

var node *LightningNode

fetchNode := func(tx kvdb.RTx) error {
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
// First grab the nodes bucket which stores the mapping from
// pubKey to node information.
nodes := tx.ReadBucket(nodeBucket)
Expand All @@ -2393,14 +2471,9 @@ func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) (
node = &n

return nil
}

var err error
if tx == nil {
err = kvdb.View(c.db, fetchNode, func() {})
} else {
err = fetchNode(tx)
}
}, func() {
node = nil
})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fbf52b6

Please sign in to comment.