New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Populate channels map in batch #3470
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
val storeChannels = Stream | ||
.emits(actions.map(a => Stream.eval(storeChannelHash(a).map(_.asLeft[History[F]])))) | ||
.parJoinProcBounded | ||
val storeChannels = storeChannelHash(actions).map(_.asLeft[History[F]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this leftover .map(_.asLeft[History[F]])
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is, will remove.
continuationHash = hashContinuationsChannels(channels, sc) | ||
} yield (eventKey, ContinuationHash(continuationHash)) | ||
|
||
conts.toList.traverse(convert).flatMap(store.put) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorted keys can also speed up insertion.
https://stackoverflow.com/a/62980373
In LMDB by default the keys are compared lexically. ByteString has also lexical ordering defined. Maybe it's worth to test.
https://stackoverflow.com/questions/39387702/sorting-an-lmdb-file-for-sequential-access-according-to-key-order
http://104.237.133.194/doc/group__mdb.html#ga68e47ffcf72eceec553c72b1784ee0fe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -65,7 +65,7 @@ object ChannelStoreImpl { | |||
joinHash = hashJoinsChannel(channel, sc) | |||
} yield (eventKey, DataJoinHash(dataHash, joinHash)) | |||
|
|||
channels.toList.traverse(convert).flatMap(store.put) | |||
channels.toList.traverse(convert).flatMap(kvs => store.put(kvs.sortBy({ case (k, _) => k }))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgrospic is this what you meant by sorting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, should we make sorting default on put
API method of KVStore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not be the same sorting as LMDB does internally because it uses lexical comparison.
// Get lexical Ordering for ByteString
implicit val bsComparator: Comparator[ByteString] = ByteString.unsignedLexicographicalComparator()
// ... then use key as ByteString for sorting
...flatMap(kvs => store.put(kvs.sortBy({ case (k, _) => k.toByteString })))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm assuming that ByteString lexical comparison is the same as in LMDB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trick did not make any observable difference, might be it require more investigation.
bors merge |
Build failed: |
bors merge |
Build succeeded: |
Overview
Channels map population is taking long time because each hash is committed separately.
This PR introduces batch write. Preliminary results show that it significantly reduces replay time. (15sec -> 9 sec)
Notes
This is needed just for transitional period before channels map is removed completely.
Please make sure that this PR:
Bors cheat-sheet:
bors r+
runs integration tests and merges the PR (if it's approved),bors try
runs integration tests for the PR,bors delegate+
enables non-maintainer PR authors to run the above.