-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add util/groupcommit
package in core
subproject
#1607
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first review is complete, and I've left several comments. Please take a look when you have time! I will conduct a second review later on. Thank you for your great work!
// TODO: This should be replaced by other metrics mechanism. | ||
private void startMonitorExecutorService() { | ||
Runnable print = | ||
() -> logger.info("[MONITOR] Timestamp={}, Metrics={}", Instant.now(), getMetrics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we can make it a debug log. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good! I'll update it.
BTW, this will be soon replaced with #1614 which uses DropWizard Metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in ade2864
private void reserveSlot(Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> slot) { | ||
Slot<?, ?, ?, ?, ?> oldSlot = slots.put(slot.key(), slot); | ||
if (oldSlot != null) { | ||
logger.warn("An old slot exist unexpectedly. {}", oldSlot.fullKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't occur, right? If so, just showing the warning message is okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Right. I'll update it to throw AssertionError or IllegalStateException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 8d96fd4
* | ||
* @param fullKey A full key to specify the slot. | ||
*/ | ||
public void remove(FULL_KEY fullKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is this remove
method intended to be called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question.
Let's say a transaction executes some CRUD operations, starts PREPARE, and fails before the group commit for coordinator state. In this case, a group commit slot is allocated for the transaction and needed to be removed by calling this method not to leave garbage slots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comment in 4ff928a
* @param value A value to be set to the slot. | ||
* @throws GroupCommitException when group commit fails | ||
*/ | ||
public void ready(FULL_KEY fullKey, V value) throws GroupCommitException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this value
argument? Maybe we use it for a transaction ID (a child key) for each transaction within a group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of coordinator state commit, V
is supposed to be Snapshot
and a passed value will be committed with other snapshots in the same group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comment to make it clearer in fa5e726
* @param <EMIT_KEY> A key type that Emitter can interpret. | ||
* @param <V> A value type to be set to a slot. | ||
*/ | ||
public class GroupCommitter<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put @ThreadSafe
or @NotThreadSafe
for each class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in e581918
and simplify some code
Fix a race condition bug that a newly created DelayedGroup is already closed when reserving a slot. The scenario that the issue occurs is as follows - DelayedSlotMoveWorker adds the new DelayedGroup into GroupCleanupWorker - GroupCleanupWorker calls DelayedGroup.updateState() before DelayedSlotMoveWorker reserves a slot on the DelayedGroup - The slots of the DelayedGroup is still empty and it’s marked as DONE - DelayedSlotMoveWorker fails to reserve a slot on the DelayedGroup since it’s closed (DONE)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the great improvement and very sorry for the late review.
I took a brief look and left some comments and questions. PTAL!
(Let me check this one more time.)
Let me ask additional questions here.
- We discussed before that we might need to access the coordinator table twice in some recovery cases, but how does the current code handle the case?
- If one transaction in a group fails to commit, will the transaction be handled in a
DelayedGroup
?
// Waiting all the slots are set with values. | ||
// | ||
// Groups with OPEN status can move to CLOSED. | ||
CLOSED(true, false, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLOSED
corresponds to OPEN
so it's good, but it sounds like the group has already finished processing.
How about FIXED
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! Actually it was first like SIZE-FIXED and I changed it to CLOSED to sound to reject new requests. But as you said, it also sounds things are done. I'll update it to use FIX.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 5000335. Used SIZE_FIXED
to make it clearer.
// If it returns null, the Group is already closed and a retry is needed. | ||
@Nullable | ||
FULL_KEY reserveNewSlot(CHILD_KEY childKey) { | ||
long stamp = lock.writeLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might miss something, but since we have only one active group at a time, I'm wondering how much it badly affects performance if there are many concurrent transactions.
We could use multiple active groups to mitigate the potential issue, but it might complicate the mechanism.
Any thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I also thought and tried multiple current (== active) groups before. But, the performance with multiple active groups didn't have a good performance. I think how fast to fill up a group with slots (and values) is a very important factor to move it to the next state (i.e., ready to commit) in the group commit and using multiple active groups doesn't fit.
I benchmarked this util/groupcommit
pacakge itself and the average throughput was about 65K TPS without any wait while the throughput with some delays to emulate actual latencies of underlying database had far low throughputs. It means current single active group won't be a bottleneck in usual usages of ScalarDB, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the detailed explanation!
It makes sense. Let's leave it as is.
@feeblefakie Thanks for the questions!
This PR contains doesn't contain any integration with the coordinator state commit, so let me refer to another PR's change (https://github.com/scalar-labs/scalardb/pull/1523/files#diff-8049e66d11b81cdd5a1dc82c4977b5336741bcee24dd7ddd54c6a6940f09497bR83-R89). I tried the idea to put child-tx-id in a clustering key before in order to make it possible to fetch both group-committed-transactions' ID and separately-committed-transactions' ID. But, there wasn't significant performance improvement with YCSB-F benchmark despite it would require cumbersome table schema migrations. So, current implementation is to access the coordinator table twice.
No. Slots are moved to DelayedGroup only when it's delayed. If a NormalGroup fails to commit due to network or underlying database issues, all the slots in the group fail. BTW, it's about Commit-State phase. If a transaction fails in Prepare or Validate, it'll be just removed from the NormalGroup without being moved to DelayedGroup. |
in GroupCommitterConcurrentTest to see if the timeout mechanisms work
Thank you for the explanation.
Thank you! |
private final GroupCommitConfig config; | ||
|
||
GroupManager( | ||
String label, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It loos like this label
is never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it in 0007ba3
|
||
FULL_KEY fullKey(PARENT_KEY parentKey, CHILD_KEY childKey); | ||
|
||
boolean isFullKey(Object obj); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It looks like it's never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right. Indeed, it's not used in this PR. But, it'll be used in the subsequent PR https://github.com/scalar-labs/scalardb/pull/1523/files#diff-8049e66d11b81cdd5a1dc82c4977b5336741bcee24dd7ddd54c6a6940f09497bR75. Let me keep it if possible.
package com.scalar.db.util.groupcommit; | ||
|
||
@FunctionalInterface | ||
interface ThrowableRunnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it in e8faee2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looking good!
Left one question. PTAL!
T removed = queue.poll(); | ||
// Check if the removed group is expected just in case. | ||
if (removed == null || !removed.equals(item)) { | ||
logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand it, but I'm wondering in what cases this happens.
If it happens even in normal cases, should it be warning
instead of error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It shouldn't happen in normal cases. Throwing AssertionError or IllegalStateException is an option. But throwing either of them from background worker doesn't make sense so much and I think just outputting error log is enough.
But, I noticed the error message probably looks unclear thanks to your comment. I updated it in bdf344f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good!
Yeah, it cannot stop processing even such a case, so having the updated comment makes sense.
@feeblefakie Yeah, good point. I'll try TPC-C with the clustering key version group commit before submitting a PR for the integration of the group commit with the coordinator state commit. |
and add some comments
@feeblefakie @brfrn169 Thanks for reviewing this PR! @brfrn169 The performance improvement task of DelayedSlotMoveWorker will be handled in another PR 545ad20#diff-21491e996128b6ee7175154186a58be20ca714274559ba304eb39c59be8db4faR110-R113 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left very minor suggestions. It looks good to me, but let me review this once again later.
long stamp = lock.writeLock(); | ||
try { | ||
// TODO: NormalGroup.removeNotReadySlots() calls updateStatus() potentially resulting in | ||
// asyncEmit(). Maybe it should be called outside the lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
// asyncEmit(). Maybe it should be called outside the lock. | |
// delegateEmitTaskToWaiter(). Maybe it should be called outside the lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Fixed it in 8658300
private final CHILD_KEY key; | ||
// If a result value is null, the value is already emitted. | ||
// Otherwise, the result lambda must be emitted by the receiver's thread. | ||
private final CompletableFuture<ThrowableRunnable> completableFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final CompletableFuture<ThrowableRunnable> completableFuture = new CompletableFuture<>(); | |
private final CompletableFuture<ThrowableRunnable<Exception>> completableFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Fixed it in 8658300
try { | ||
// If a result value is null, the value is already emitted. | ||
// Otherwise, the result lambda must be emitted by the receiver's thread. | ||
ThrowableRunnable taskToEmit = completableFuture.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThrowableRunnable taskToEmit = completableFuture.get(); | |
ThrowableRunnable<Exception> taskToEmit = completableFuture.get(); |
} | ||
throw new GroupCommitException( | ||
String.format("Group commit failed. Group: %s", parentGroup.get()), cause); | ||
} catch (Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} catch (Throwable e) { | |
} catch (Exception e) { |
|
||
// Delegates the emit task to the client. The client receiving this task needs to handle the emit | ||
// task. | ||
void delegateTaskToWaiter(ThrowableRunnable task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void delegateTaskToWaiter(ThrowableRunnable task) { | |
void delegateTaskToWaiter(ThrowableRunnable<Exception> task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Fixed it in 8658300
|
||
// This task is passed to only the first slot, so the slot will be resumed. | ||
// Other slots will be blocked until `markAsXxxx()` is called. | ||
ThrowableRunnable<GroupCommitException> taskForEmitterSlot = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThrowableRunnable<GroupCommitException> taskForEmitterSlot = | |
ThrowableRunnable<Exception> taskForEmitterSlot = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Fixed it in 8658300
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you for your great work!
Description
ScalarDB may sometimes need to work with a high latency storage (e.g., multi-region replicated database). In such a case, the throughput of application using ScalarDB would be affected by the high latency transactions. With a group commit mechanism, high latency issue can be hidden with concurrent transactions.
In this PR, we introduce
util/groupcommit
package incore
subproject which provides a mechanism to support the ScalarDB protocol. The package alone doesn't affect the existing implementations. A following PR will integrate the new package and the existing implementations.Related issues and/or PRs
N/A
Changes made
This PR basically adds the following classes.
![image](https://private-user-images.githubusercontent.com/59043/312776661-460ea419-17fd-41a2-94a7-f7df37c4efd9.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjI4MTg4MjAsIm5iZiI6MTcyMjgxODUyMCwicGF0aCI6Ii81OTA0My8zMTI3NzY2NjEtNDYwZWE0MTktMTdmZC00MWEyLTk0YTctZjdkZjM3YzRlZmQ5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA4MDUlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwODA1VDAwNDIwMFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTVkNGI2OTFiZjFmZGMwMmFiZDhlZjBkZGJkNzIwMjQ4NTE4OGE2OGJhNjVhN2Y0NTdjYmQ3MzQ0NzEyMjRlZDUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.m-H9S95WRIr8cZYMHTEAY9EksWkCP8FOLoqLXqkuaYM)
The
util/groupcommit
package has a concept about keys and they should be described before looking into the classes.The main components introduced by this PR are as follows:
Emittable.emit()
which is injected by the client with either of the following keys once the group gets READYVery rough overall architecture of the group commit
![image](https://private-user-images.githubusercontent.com/59043/312801309-7e2c38ba-8b36-429f-83e4-4581396e1367.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjI4MTg4MjAsIm5iZiI6MTcyMjgxODUyMCwicGF0aCI6Ii81OTA0My8zMTI4MDEzMDktN2UyYzM4YmEtOGIzNi00MjlmLTgzZTQtNDU4MTM5NmUxMzY3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA4MDUlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwODA1VDAwNDIwMFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWYyYWE0MzZiNDE5OGEzNTc1YmUwOTRlMjZmNDc5OWIzMzY1YWU2YWNhMTQ5ODU0NjkzZDQzNDUwYmEzODIyZmUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.HECheO1H48sGBn9wu1tuHHhpns7ticLqDiJsZ60pBVs)
Replace
txid
withKey
when reading the description.Lifecycle of Group
![image](https://private-user-images.githubusercontent.com/59043/312801500-22ce107e-e6a6-4663-921b-99b06987c96a.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjI4MTg4MjAsIm5iZiI6MTcyMjgxODUyMCwicGF0aCI6Ii81OTA0My8zMTI4MDE1MDAtMjJjZTEwN2UtZTZhNi00NjYzLTkyMWItOTliMDY5ODdjOTZhLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA4MDUlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwODA1VDAwNDIwMFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTZmYzEzODFjZjc3MjViNDk3Y2ZkN2QwNDA2NGIzMmY0Yzk3OTRlZGViOGVmZmQ4ZDVkNGU3MzU3MjE3NDdjMjImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.01GKzX_HBEyQUb9uznf9x-BfcfcG5BHhzQOOEiWBIBs)
Checklist
Additional notes (optional)
The current implementation contains the following 3 TODOs. They'll be handled in other PRs.
Release notes
N/A (This PR itself isn't a user facing change).