New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
etcd: STM transaction queue to effectively reduce retries for conflicting transactions #4457
Conversation
6e5b085
to
e8fb359
Compare
e8fb359
to
e63657f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation is much simpler than I thought it would be! Just completed an initial pass, and nothing glaring jumped out. Will do another pass once I run it on an actual replicated db lnd instance. It would also be interesting to create a small patch that lets us run certain itests w/ and w/o this change so we can gauge the rough impact of the change on perf.
channeldb/kvdb/etcd/commit_queue.go
Outdated
if !blocked { | ||
_, rsetContainsKey := rset[key] | ||
blocked = (c.writerMap[key] > 1 || | ||
(c.readerMap[key] > 0 && !rsetContainsKey)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need to block if there's a pending transaction in the queue that reads this key, we want to write it, but don't also read the key ourselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because if our read set contains the key then we already increased c.readerMap[key]
above, so to make sure reader lock count is non zero we have to "uncount ourselves".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be much easier to reason about by just by doing two passes through the sets:
for key := range rset {
blocked |= c.writerMap[key] > 0
}
for key := range wset {
blocked |= c.writerMap[key] > 0 || c.readerMap[key] > 0
}
for key := range rset {
c.readerMap[key] += 1
}
for key := range wset {
c.writerMap[key] += 1
}
Performance wise I doubt we'll see any difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's a bit hard to read... Unfortunately we can't use the the simplified version above because if the same transaction also reads the key (where no other readers are present) then will unnecessary block. This is the reason for the rsetContainsKey
variable. Added a few comments to clarify. I'm open to any suggestions you may find that simplifies though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
** and also simplified a bit to make it more? readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very cool how small the diff is!
channeldb/kvdb/etcd/stm.go
Outdated
// Run the tx closure to construct the read and write sets. | ||
// Also we expect that if there are no conflicting transactions | ||
// in the queue, then we only run apply once. | ||
if err = apply(s); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it intentional that this shadows the err
in the outer scope? o/w i don't see where that error is read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so it's a bit tricky to read this at first, but it's really simple actually.
- What we do is we first run the
apply
closure to gather the read/write sets so we can add the tx to the contention queue. - The
execute
closure is executed there (either immediately or in the queue goroutine).
The err
simply holds the error trough the above described execution graph.
3) we wait for the done
signal and them clean the keys from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the explanation, makes sense now! i also see that it's the return value at the end of the function, so that's where it is "read"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we return immediately if the error is non-nil here, wouldn't if err := apply(s); err != nil
be equivalent?
I agree that the shadowing is tricky to read. Could make sense to add more errors with descriptive names (i.e. executeErr
) to make it easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe it's simpler to read if we distinguish errors by scope. PTAL
e63657f
to
1862eef
Compare
Yes, originally the queue was optional but decided to make it non-optional as really it should be on all the time. |
0cb3658
to
cf25382
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🌮
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fun change :)
channeldb/kvdb/etcd/commit_queue.go
Outdated
for key := range rset { | ||
c.readerMap[key] += 1 | ||
if !blocked { | ||
blocked = c.writerMap[key] > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style suggestion: blocked ||= c.writerMap[key] > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
channeldb/kvdb/etcd/stm.go
Outdated
// Run the tx closure to construct the read and write sets. | ||
// Also we expect that if there are no conflicting transactions | ||
// in the queue, then we only run apply once. | ||
if err = apply(s); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we return immediately if the error is non-nil here, wouldn't if err := apply(s); err != nil
be equivalent?
I agree that the shadowing is tricky to read. Could make sense to add more errors with descriptive names (i.e. executeErr
) to make it easier.
channeldb/kvdb/etcd/commit_queue.go
Outdated
|
||
// Wait waits for the queue to stop (after the queue context has been canceled). | ||
func (c *commitQueue) Wait() { | ||
<-c.done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the more common waitgroup pattern instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
channeldb/kvdb/etcd/commit_queue.go
Outdated
if !blocked { | ||
_, rsetContainsKey := rset[key] | ||
blocked = (c.writerMap[key] > 1 || | ||
(c.readerMap[key] > 0 && !rsetContainsKey)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be much easier to reason about by just by doing two passes through the sets:
for key := range rset {
blocked |= c.writerMap[key] > 0
}
for key := range wset {
blocked |= c.writerMap[key] > 0 || c.readerMap[key] > 0
}
for key := range rset {
c.readerMap[key] += 1
}
for key := range wset {
c.writerMap[key] += 1
}
Performance wise I doubt we'll see any difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @halseth! Main change is the (hopefully) more readable rset/wset scans. PTAL
channeldb/kvdb/etcd/commit_queue.go
Outdated
|
||
// Wait waits for the queue to stop (after the queue context has been canceled). | ||
func (c *commitQueue) Wait() { | ||
<-c.done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
channeldb/kvdb/etcd/commit_queue.go
Outdated
if !blocked { | ||
_, rsetContainsKey := rset[key] | ||
blocked = (c.writerMap[key] > 1 || | ||
(c.readerMap[key] > 0 && !rsetContainsKey)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's a bit hard to read... Unfortunately we can't use the the simplified version above because if the same transaction also reads the key (where no other readers are present) then will unnecessary block. This is the reason for the rsetContainsKey
variable. Added a few comments to clarify. I'm open to any suggestions you may find that simplifies though.
channeldb/kvdb/etcd/commit_queue.go
Outdated
for key := range rset { | ||
c.readerMap[key] += 1 | ||
if !blocked { | ||
blocked = c.writerMap[key] > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
channeldb/kvdb/etcd/stm.go
Outdated
// Run the tx closure to construct the read and write sets. | ||
// Also we expect that if there are no conflicting transactions | ||
// in the queue, then we only run apply once. | ||
if err = apply(s); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe it's simpler to read if we distinguish errors by scope. PTAL
cf25382
to
6efdf97
Compare
channeldb/kvdb/etcd/commit_queue.go
Outdated
// Transaction is blocked if: | ||
// - there's any reader (which is not this tx). | ||
// - there's any writer. | ||
blocked = blocked || (c.readerMap[key] > 0 && !keyRead) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still not sure if this is correct. Say this tx reads and writes this key, increases c.readerMap[key]
to 2
.
That will leave (c.readerMap[key] > 0 && !keyRead) == false
while it should be blocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was a really nice catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some back and forth, decided to go with the simplified version above just with three loops.
aec925f
to
8ad8dcd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the latest iteration much easier to reason about this time around, kudos to the prior reviewers in this series!
LGTM 🚁
Should wait to merge this till we get 3/3 since the last iteration had a nice find.
@@ -214,6 +214,7 @@ jobs: | |||
matrix: | |||
unit_type: | |||
- btcd unit-cover | |||
- unit tags=kvdb_etcd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM now, great work! 😀
// Run the tx closure to construct the read and write sets. | ||
// Also we expect that if there are no conflicting transactions | ||
// in the queue, then we only run apply once. | ||
if preApplyErr := apply(s); preApplyErr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
channeldb/kvdb/etcd/commit_queue.go
Outdated
// the read set. Do not increment the reader counts yet as we'll need to | ||
// use the original read counts when scanning through the write set. | ||
for key := range rset { | ||
blocked = blocked || c.writerMap[key] > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: optimization here and below, can immediately break loop if already blocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
This commit adds commitQueue which is a lightweight contention manager for STM transactions. The queue attempts to queue up transactions that conflict for sequential execution, while leaving all "unblocked" transactons to run freely in parallel.
This commit integrates an externally passed commitQueue instance with the STM to reduce retries for conflicting transactions.
8ad8dcd
to
26effca
Compare
Thanks everyone for the reviews! |
rebased on #4411now on master since #4411 is merged
This PR adds (and integrates)
commitQueue
which' purpose is to detect conflicts for concurrently applied transactions and effectively reduce retries, by applying queuing up conflicting transactions for sequential execution, while leaving all non-conflicting ones to run freely (potentially in parallel).