Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/com/scalar/db/common/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ public enum CoreError implements ScalarDbError {
"A transaction conflict occurred in the Insert operation",
"",
""),
CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS(
Category.CONCURRENCY_ERROR, "0026", "A conflict occurred when committing records", "", ""),

//
// Errors for the internal error category
Expand Down Expand Up @@ -935,6 +937,8 @@ public enum CoreError implements ScalarDbError {
""),
CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED(
Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""),
CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED(
Category.INTERNAL_ERROR, "0058", "Committing records failed", "", ""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.LazyInit;
import com.scalar.db.api.Delete;
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.TransactionState;
Expand All @@ -24,6 +26,8 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
Expand All @@ -38,6 +42,7 @@ public class CommitHandler {
private final ParallelExecutor parallelExecutor;
private final MutationsGrouper mutationsGrouper;
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
private final boolean onePhaseCommitEnabled;

@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;

Expand All @@ -48,13 +53,15 @@ public CommitHandler(
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor,
MutationsGrouper mutationsGrouper,
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
boolean onePhaseCommitEnabled) {
this.storage = checkNotNull(storage);
this.coordinator = checkNotNull(coordinator);
this.tableMetadataManager = checkNotNull(tableMetadataManager);
this.parallelExecutor = checkNotNull(parallelExecutor);
this.mutationsGrouper = checkNotNull(mutationsGrouper);
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
this.onePhaseCommitEnabled = onePhaseCommitEnabled;
}

/**
Expand Down Expand Up @@ -118,6 +125,16 @@ public void commit(Snapshot snapshot, boolean readOnly)

Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot);

if (canOnePhaseCommit(snapshot)) {
try {
onePhaseCommitRecords(snapshot);
return;
Copy link
Contributor

@komamitsu komamitsu Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the commit state, I think SSR will treat the write operations as an aborted transaction's ones and ignore them...

Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if SSR is enabled, one-phase commit must not be enabled. We should consider adding validation for this on the cluster side later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}
}

if (hasWritesOrDeletesInSnapshot) {
try {
prepareRecords(snapshot);
Expand Down Expand Up @@ -170,6 +187,52 @@ public void commit(Snapshot snapshot, boolean readOnly)
}
}

@VisibleForTesting
boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException {
if (!onePhaseCommitEnabled) {
return false;
}

// If validation is required (in SERIALIZABLE isolation), we cannot one-phase commit the
// transaction
if (snapshot.isValidationRequired()) {
return false;
}

// If the snapshot has no write and deletes, we do not one-phase commit the transaction
if (!snapshot.hasWritesOrDeletes()) {
return false;
}

List<Delete> deletesInDeleteSet = snapshot.getDeletesInDeleteSet();

// If a record corresponding to a delete in the delete set does not exist in the storage, we
// cannot one-phase commit the transaction. This is because the storage does not support
// delete-if-not-exists semantics, so we cannot detect conflicts with other transactions.
for (Delete delete : deletesInDeleteSet) {
Copy link
Contributor

@komamitsu komamitsu Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to know what would happen if I removed this block, so I removed the block and executed ConsensusCommitNullMetadataIntegrationTestBase. But all the tests passed unexpectedly. Is it difficult to write a test case where this block is necessary? (or I'm missing something....)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, suppose a user tries to get record A but it doesn’t exist, and then the user attempts to delete that non-existing record.

In this case, under one-phase commit, we would need to perform a delete-if-not-exists operation for that record to detect conflicts—since another transaction might insert the record. However, because we don’t have delete-if-not-exists semantics in ScalarDB, we can’t do that. As a result, we cannot apply one-phase commit in this case.

Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, one possible solution is to introduce a logical-delete state. In that case, we would perform a put-if-not-exists operation for the record representing that logical-delete state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the explanation! It would be great if we add a test case for the situation later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but unfortunately I wanted to find a proper integration test case to know actual use case.

Optional<TransactionResult> result = snapshot.getFromReadSet(new Snapshot.Key(delete));

// For deletes, we always perform implicit pre-reads if the result does not exit in the read
// set. So the result should always exist in the read set.
assert result != null;

if (!result.isPresent()) {
return false;
}
}

try {
// If the mutations can be grouped altogether, the mutations can be done in a single mutate
// API call, so we can one-phase commit the transaction
return mutationsGrouper.canBeGroupedAltogether(
Stream.concat(snapshot.getPutsInWriteSet().stream(), deletesInDeleteSet.stream())
.collect(Collectors.toList()));
} catch (ExecutionException e) {
throw new CommitException(
CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId());
}
}

protected void handleCommitConflict(Snapshot snapshot, Exception cause)
throws CommitConflictException, UnknownTransactionStatusException {
try {
Expand Down Expand Up @@ -197,6 +260,30 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause)
}
}

@VisibleForTesting
void onePhaseCommitRecords(Snapshot snapshot) throws CommitException {
try {
OnePhaseCommitMutationComposer composer =
new OnePhaseCommitMutationComposer(snapshot.getId(), tableMetadataManager);
snapshot.to(composer);

// One-phase commit does not require grouping mutations and using the parallel executor since
// it is always executed in a single mutate API call.
storage.mutate(composer.get());
} catch (NoMutationException e) {
throw new CommitConflictException(
CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId());
} catch (RetriableExecutionException e) {
throw new CommitConflictException(
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS.buildMessage(),
e,
snapshot.getId());
} catch (ExecutionException e) {
throw new CommitException(
CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId());
}
}

public void prepareRecords(Snapshot snapshot) throws PreparationException {
try {
PrepareMutationComposer composer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public CommitHandlerWithGroupCommit(
ParallelExecutor parallelExecutor,
MutationsGrouper mutationsGrouper,
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
boolean onePhaseCommitEnabled,
CoordinatorGroupCommitter groupCommitter) {
super(
storage,
coordinator,
tableMetadataManager,
parallelExecutor,
mutationsGrouper,
coordinatorWriteOmissionOnReadOnlyEnabled);
coordinatorWriteOmissionOnReadOnlyEnabled,
onePhaseCommitEnabled);
checkNotNull(groupCommitter);
// The methods of this emitter will be called via GroupCommitter.ready().
groupCommitter.setEmitter(new Emitter(coordinator));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ConsensusCommitConfig {

public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
PREFIX + "coordinator.write_omission_on_read_only.enabled";
public static final String ONE_PHASE_COMMIT_ENABLED = PREFIX + "one_phase_commit.enabled";
public static final String PARALLEL_IMPLICIT_PRE_READ =
PREFIX + "parallel_implicit_pre_read.enabled";
public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";
Expand Down Expand Up @@ -75,8 +76,9 @@ public class ConsensusCommitConfig {
private final boolean asyncRollbackEnabled;

private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
private final boolean onePhaseCommitEnabled;
private final boolean parallelImplicitPreReadEnabled;
private final boolean isIncludeMetadataEnabled;
private final boolean includeMetadataEnabled;

private final boolean coordinatorGroupCommitEnabled;
private final int coordinatorGroupCommitSlotCapacity;
Expand Down Expand Up @@ -145,10 +147,12 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
coordinatorWriteOmissionOnReadOnlyEnabled =
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);

isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);
onePhaseCommitEnabled = getBoolean(properties, ONE_PHASE_COMMIT_ENABLED, false);

parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);

includeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);

coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
coordinatorGroupCommitSlotCapacity =
getInt(
Expand Down Expand Up @@ -219,12 +223,16 @@ public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() {
return coordinatorWriteOmissionOnReadOnlyEnabled;
}

public boolean isOnePhaseCommitEnabled() {
return onePhaseCommitEnabled;
}

public boolean isParallelImplicitPreReadEnabled() {
return parallelImplicitPreReadEnabled;
}

public boolean isIncludeMetadataEnabled() {
return isIncludeMetadataEnabled;
return includeMetadataEnabled;
}

public boolean isCoordinatorGroupCommitEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
parallelExecutor,
mutationsGrouper,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
config.isOnePhaseCommitEnabled(),
groupCommitter);
} else {
return new CommitHandler(
Expand All @@ -151,7 +152,8 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
tableMetadataManager,
parallelExecutor,
mutationsGrouper,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
config.isOnePhaseCommitEnabled());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.scalar.db.io.Key;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -50,6 +51,41 @@ public List<List<Mutation>> groupMutations(Collection<Mutation> mutations)
return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList());
}

public boolean canBeGroupedAltogether(Collection<Mutation> mutations) throws ExecutionException {
if (mutations.size() <= 1) {
return true;
}

Iterator<Mutation> iterator = mutations.iterator();
Mutation firstMutation = iterator.next();
assert firstMutation.forNamespace().isPresent();
StorageInfo storageInfo =
storageInfoProvider.getStorageInfo(firstMutation.forNamespace().get());
MutationGroup firstGroup = new MutationGroup(firstMutation, storageInfo);

int maxCount = firstGroup.storageInfo.getMaxAtomicMutationsCount();
int mutationCount = 1;

while (iterator.hasNext()) {
Mutation otherMutation = iterator.next();
assert otherMutation.forNamespace().isPresent();
StorageInfo otherStorageInfo =
storageInfoProvider.getStorageInfo(otherMutation.forNamespace().get());
MutationGroup otherGroup = new MutationGroup(otherMutation, otherStorageInfo);

if (!firstGroup.equals(otherGroup)) {
return false; // Found a mutation that does not belong to the first group
}

mutationCount++;
if (mutationCount > maxCount) {
return false; // Exceeds the maximum allowed count for this group
}
}

return true; // All mutations belong to the same group and within the count limit
}

private static class MutationGroup {
public final StorageInfo storageInfo;
@Nullable public final String namespace;
Expand Down
Loading