Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batched graph updates #3367

Open
wants to merge 9 commits into
base: master
from

Conversation

@cfromknecht
Copy link
Collaborator

cfromknecht commented Aug 2, 2019

This PR introduces batched operations for channel graph ingestion in order to speed up IGD. Currently a single db transaction is used to process each channel_announcement, channel_update, and node_announcement, which with the current graph size results in around 160k-200k transactions.

With this PR we bring this down to linear in the duration it takes the sync the graph, as we'll no make at most 2 db transactions/sec. OMM I was able to get a 4 minute graph sync using this, implying only about 500 db transactions were made in total.

So far I'm seeing a 2x speedup in IGD. The CPU and memory usage should be much lower, but I'm still in the process of tuning and gathering exact performance metrics.

batch package

To accomplish this, we introduce a new batch package, which exposes a simple API for constructing batched operations on a bbolt database. The primary reason we require this, and can't use bbolt's native Batch operation, is that we maintain two caches external to bbolt, the channel cache and the reject cache.

Since both are write through caches, it is critical that we maintain external consistency whenever a write succeeds. Updating the cache should atomically add the whole batch of updates, or none of them. To achieve this, we need a mechanism for doing locking at a level above bbolt, since we don't have the luxury of reentrant locks.

Most of the batching logic is borrowed from bbolt itself, but we also add some slight modifications to handle any updates to the external cache under a single mutex.

There are two primary components:

// Request defines an operation that can be batched into a single bbolt
// transaction.
type Request struct {
	// Update is applied alongside other operations in the batch.
	//
	// NOTE: This method MUST NOT acquire any mutexes.
	Update func(tx *bbolt.Tx) error

 	// OnSuccess is applied if the batch or a subset of the batch including this
	// request all succeeded without failure.
	//
	// NOTE: This field is optional.
	OnSuccess func(err error) error

 	// Solo is applied if this request's Update failed, and was asked to
	// retry outside of the batch.
	Solo func() error
}

 // Scheduler abstracts a generic batching engine that accumulates an incoming
// set of Requests, executes them, and returns the error from the operation.
type Scheduler interface {
	// Execute schedules a Request for execution with the next available
	// batch. The resulting error is returned to the caller.
	Execute(req *Request) error
}

Different Schedulers may implement different policies for when a batch gets executed. The current one implemented, TimeScheduler, uses a fixed horizon after which it executes any concurrent requests added. Future schedulers may take into account metrics such as number of pending items in the batch or overall request throughput before deciding when to trigger, but that is left as an area for future research.

Using this package, it becomes simple to express our batched operations, here's an example of BatchedAddChannelEdge:

func (c *ChannelGraph) BatchedAddChannelEdge(edge *ChannelEdgeInfo) error {
	var alreadyExists bool
	return c.chanScheduler.Execute(&batch.Request{
		Update: func(tx *bbolt.Tx) error {
			err := addChannelEdge(tx, edge)

 			// Silence ErrEdgeAlreadyExist so that the batch can
			// succeed, but propagate the error via local state.
			if err == ErrEdgeAlreadyExist {
				alreadyExists = true
				return nil
			}

 			alreadyExists = false
			return err
		},
		OnSuccess: func(err error) error {
			switch {
			case err != nil:
				return err
			case alreadyExists:
				return ErrEdgeAlreadyExist
			default:
				c.rejectCache.remove(edge.ChannelID)
				c.chanCache.remove(edge.ChannelID)
				return nil
			}
		},
		Solo: func() error {
			return c.AddChannelEdge(edge)
		},
	})
}

whose logic largely coincides with what we'd expect, given the unbatched variant:

func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
	c.cacheMu.Lock()
	defer c.cacheMu.Unlock()	

 	err := c.db.Update(func(tx *bbolt.Tx) error {	
		return c.addChannelEdge(tx, edge)	
	})	
	if err != nil {	
		return err	
	}	

 	c.rejectCache.remove(edge.ChannelID)	
	c.chanCache.remove(edge.ChannelID)	

 	return nil
}

Note that in the batched variant, c.cacheMu is passed into the creation of the chanScheduler and is automatically acquired by the scheduler, so it is safe to modify the cache directly within the OnSuccess closure.

There are a few other areas in the code base where we may wish to adopt, e.g. the CircuitMap, which should be a straightforward change using this primitive. I suspect this may be of interest to other projects using bbolt, we may want to consider making the batch package its own versioned go module.

testing

Before proceeding in writing some more tests, I'm putting this up on travis with an temporary commit that causes the unbatched APIs to use the batched ones under the hood. This should give us a good idea that the semantics are unchanged before proceeding in writing more extensive tests to handle edge cases in the batched API.

Closes #3288

cfromknecht added 2 commits Jul 31, 2019
@wpaulino wpaulino added this to the 0.8.0 milestone Aug 2, 2019
@cfromknecht cfromknecht force-pushed the cfromknecht:batched-graph-updates branch from d55de95 to 88adc5e Aug 2, 2019
cfromknecht added 7 commits Jul 31, 2019
@cfromknecht cfromknecht force-pushed the cfromknecht:batched-graph-updates branch from 88adc5e to d34b61a Aug 2, 2019
db: db,
rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize),
}
g.chanScheduler = batch.NewTimeScheduler(db.DB, &g.cacheMu, 10*time.Millisecond)
g.nodeScheduler = batch.NewTimeScheduler(db.DB, nil, 10*time.Millisecond)

This comment has been minimized.

Copy link
@Crypt-iQ

Crypt-iQ Aug 4, 2019

Collaborator

I was able to set this slightly higher and still achieve the same sync time of about 2 minutes (granted, the peers I synced with on different runs were very fast). I used as high as 8 seconds, less db txn's. What do you think? I think perhaps the two schedulers could be staggered at different intervals so as to not cause db contention since we receive their messages at around the same time anyways?

With 10ms, I see writes taking a minimum of 20ms with very small batches (1-6 requests with the occasional large batch). Since there can only be one db writer at a time:

  • If there is another batch during this time, it will block until the first one completes. This can cause several batches to be stalled (not really a problem in terms of time in this scenario but not ideal).
  • More db txn's means more heap usage.

So far this doesn't affect the overall sync time in any way, but may cause more write contention with the database.

This comment has been minimized.

Copy link
@Crypt-iQ

Crypt-iQ Aug 4, 2019

Collaborator

I do see some memory usage with dereference calls when growing the mmap since these are large writes if 8 second batch time is used (~10MB) so a case could be made for InitialMmapSize as discussed in #3278 , but this is nothing compared to the GetBlock calls (6 GB +) made in the router.

This comment has been minimized.

Copy link
@Crypt-iQ

Crypt-iQ Aug 4, 2019

Collaborator

I also see a reduction from the 10ms scheduler at 845MB:

ROUTINE ======================== github.com/lightningnetwork/lnd/batch.(*batch).trigger in /Users/nsa/go/src/github.com/lightningnetwork/lnd/batch/batch.go
         0   845.71MB (flat, cum)  7.09% of Total
         .          .     27:}
         .          .     28:
         .          .     29:// trigger is the entry point for the batch and ensures that run is started at
         .          .     30:// most once.
         .          .     31:func (b *batch) trigger() {
         .   845.71MB     32:	b.start.Do(b.run)
         .          .     33:}
         .          .     34:
         .          .     35:// run executes the current batch of requests. If any individual requests fail
         .          .     36:// alongside others they will be retried by the caller.
         .          .     37:func (b *batch) run() {

to 269MB with the 8s scheduler:

ROUTINE ======================== github.com/lightningnetwork/lnd/batch.(*batch).trigger in /Users/nsa/go/src/github.com/lightningnetwork/lnd/batch/batch.go
         0   269.59MB (flat, cum)  1.80% of Total
         .          .     27:}
         .          .     28:
         .          .     29:// trigger is the entry point for the batch and ensures that run is started at
         .          .     30:// most once.
         .          .     31:func (b *batch) trigger() {
         .   269.59MB     32:	b.start.Do(b.run)
         .          .     33:}
         .          .     34:
         .          .     35:// run executes the current batch of requests. If any individual requests fail
         .          .     36:// alongside others they will be retried by the caller.
         .          .     37:func (b *batch) run() {

This comment has been minimized.

Copy link
@cfromknecht

cfromknecht Aug 5, 2019

Author Collaborator

@Crypt-iQ yes the 10ms time is just a temporary commit so that i could drop in the batched version into the regular API and verify it has the same semantics w/o having to modify any timeouts.

In practice we should be using a timeout on the order of seconds. I did my testing with a duration of 1 second and it performed quite well. We could probably do 8 seconds as well, though this also delays the time till first write pretty significantly. Most of the writes were taking 10-20ms, so only about 2-4% of one core would be dedicated to the db txn itself if we go with 1 second, and even lower with 8.

The main thing left to do is benchmark this on mobile and get a sense for how these parameters behave on different architectures, just haven't gotten around to doing so :)

@Roasbeef Roasbeef removed request for Roasbeef and wpaulino Aug 6, 2019
Copy link
Collaborator

joostjager left a comment

It will indeed be interesting to see how fast the IGD is on low powered devices. I saw my rpi3 take 5 hours for sync on a 100 mbit fiber connection.

My main question related to this PR is whether this is the right optimization path to take.

How would this PR compare to a different, quite extreme, approach: The graph only exists in memory. It is updated in memory and queried from memory (which we want at some point anyway to speed up path finding). Existing caches can be removed / merged with the in-memory graph.

Then there is a background process that periodically (maybe once a minute) serializes all of it into a byte array and writes it in a single bbolt key (or file even). On startup, that value is deserialized into memory again. Because the graph is non critical data, it is not a problem if we don't have all the very latest data on disk.

// operation in isolation.
var errSolo = errors.New(
"batch function returned an error and should be re-run solo",
)

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

Not sure about the name errSolo. Solo refers to the action that would resolve the error rather than the origin of the error? Alternative could be something like errBatchedFailed. But I also see this is what bbolt calls it.

// horizon. When the first request is received, a TimeScheduler waits a
// configurable duration for other concurrent requests to join the batch. Once
// this time has elapsed, the batch is closed and executed. Subsequent requests
// are then added to a new batch which undergoes the same process.

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

This reminds me of what @halseth is doing in lightninglabs/loop#75, although the action after batching is different and the scheduling itself as well (drive by comment).

// trigger is the entry point for the batch and ensures that run is started at
// most once.
func (b *batch) trigger() {
b.start.Do(b.run)

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

Can it happen that trigger is called more than once? If not, does this sync.Once camouflage potential bugs?

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

Could add some warning logs if it's run multiple times?

db *bbolt.DB
start sync.Once
reqs []*request
clear func(b *batch)

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

Would it be better to make this clear func()? The scheduler has the instance already and could use it in a lambda.

clear: s.clear,
locker: s.locker,
}
time.AfterFunc(s.duration, s.b.trigger)

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

bbolt also has a max batch size. Is that needed here too?

This comment has been minimized.

Copy link
@Crypt-iQ

Crypt-iQ Aug 6, 2019

Collaborator

This implementation uses bbolt's Update function to emulate batching, so the actual Batch call isn't called here.

db: db,
rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize),
}
g.chanScheduler = batch.NewTimeScheduler(db.DB, &g.cacheMu, time.Second)
g.nodeScheduler = batch.NewTimeScheduler(db.DB, nil, time.Second)

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

If there is no node cache, couldn't this just use the normal bbolt batching?

func (c *ChannelGraph) BatchedAddLightningNode(node *LightningNode) error {
return c.nodeScheduler.Execute(&batch.Request{
Update: func(tx *bbolt.Tx) error {
return addLightningNode(tx, node)

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

Should some errors be suppressed to let the batch go through?

edgeNotFound bool
)
return c.chanScheduler.Execute(&batch.Request{
Update: func(tx *bbolt.Tx) error {

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

Is it an option to add two error return parameters, to prevent this pattern with the local variable?

validationBarrier := routing.NewValidationBarrier(
runtime.NumCPU()*4, d.quit,
)
validationBarrier := routing.NewValidationBarrier(1000, d.quit)

This comment has been minimized.

Copy link
@joostjager

joostjager Aug 6, 2019

Collaborator

Could use some explanation in the commit message?

@Roasbeef Roasbeef requested review from valentinewallace and removed request for halseth Aug 13, 2019
Copy link
Contributor

valentinewallace left a comment

Great feature, so excited to get this in for mobile users!! 🙌

Mostly for feedback I just have some questions. Looking forward to reading what the testing looks like, but time-based batching scheduler seems like the perfect starting point for batching db txs like these 👍

// trigger is the entry point for the batch and ensures that run is started at
// most once.
func (b *batch) trigger() {
b.start.Do(b.run)

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

Could add some warning logs if it's run multiple times?

// return the error directly.
for _, req := range b.reqs {
if req.OnSuccess != nil {
req.errChan <- req.OnSuccess(err)

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

If we've reached this point, won't err always be nil? (Afaict, err only gets assigned here, and this line prevents us from ever reaching this line w/ a non-nil error)

If this is the case, seems like passing in nil might be a bit more clear (maybe that's just not conventional?).


// Execute schedules the provided request for batch execution along with other
// concurrent requests. The request will be executed within a fixed horizon,
// parameterizeed by the duration of the scheduler. The error from the

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

nit: s/parameterizeed/parameterized/

// in a channel update.
// information. Note that this method is expected to only be called to update an
// already present node from a node announcement, or to insert a node found in a
// channel update.
//
// TODO(roasbeef): also need sig of announcement
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

Curious if the plan is to get rid of this function since it just purely wraps BatchedAddLightningNode now? And same w/ AddChannelEdge.

// Scheduler abstracts a generic batching engine that accumulates an incoming
// set of Requests, executes them, and returns the error from the operation.
type Scheduler interface {
// Execute schedules a Request for execution with the next available

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

It seems like Execute not only schedules a Request, but also blocks until the batched request is executed?

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

Wondering in case the time-based scheduler increases its duration, in which case the callers of this function may not want to block until it completes 🤔


// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
// which to schedule batches. If the operation needs to modify a higher-level
// cache, the cache's lock should be provided to so that external consistency

This comment has been minimized.

Copy link
@valentinewallace

valentinewallace Aug 14, 2019

Contributor

nit: s/to so/so

@cfromknecht cfromknecht modified the milestones: 0.8.0, 0.8.1 Sep 16, 2019
@cfromknecht cfromknecht added v0.8.1 and removed v0.8.0 labels Sep 16, 2019
@wpaulino wpaulino removed the v0.8.1 label Oct 8, 2019
@joostjager joostjager added the v0.9.0 label Oct 8, 2019
@joostjager joostjager modified the milestones: 0.8.1, 0.9 Oct 8, 2019
@joostjager joostjager added this to TODO in v0.9.0-beta Oct 23, 2019
@joostjager joostjager moved this from TODO to WIP in v0.9.0-beta Oct 23, 2019
@joostjager joostjager moved this from WIP to TODO in v0.9.0-beta Nov 5, 2019
@halseth halseth requested a review from bhandras Nov 7, 2019
@Roasbeef Roasbeef removed the request for review from bhandras Nov 14, 2019
@Roasbeef Roasbeef modified the milestones: 0.9.0, 0.10.0 Jan 7, 2020
@Roasbeef Roasbeef added v0.10 and removed v0.9.0 labels Jan 17, 2020
@Roasbeef Roasbeef added this to In progress in v0.10.0-beta via automation Jan 17, 2020
@Roasbeef Roasbeef moved this from In Progress to Blocked in v0.10.0-beta Jan 17, 2020
@nicolasburtey

This comment has been minimized.

Copy link

nicolasburtey commented Jan 24, 2020

(what does IGD stand for?)

@Roasbeef

This comment has been minimized.

Copy link
Member

Roasbeef commented Jan 24, 2020

(what does IGD stand for?)

Initial graph download. This PR is intended to speed up syncing the graph from the p2p network.

@Roasbeef Roasbeef removed the v0.10 label Feb 3, 2020
@Roasbeef Roasbeef removed this from the 0.10.0 milestone Feb 17, 2020
@Roasbeef Roasbeef removed this from Blocked in v0.10.0-beta Feb 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
v0.9.0-beta
  
TODO
Linked issues

Successfully merging this pull request may close these issues.

7 participants
You can’t perform that action at this time.