diff --git a/core/src/main/java/com/scalar/db/common/CoreError.java b/core/src/main/java/com/scalar/db/common/CoreError.java index d312dabe6b..fff02ce179 100644 --- a/core/src/main/java/com/scalar/db/common/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/CoreError.java @@ -783,6 +783,8 @@ public enum CoreError implements ScalarDbError { "A transaction conflict occurred in the Insert operation", "", ""), + CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS( + Category.CONCURRENCY_ERROR, "0026", "A conflict occurred when committing records", "", ""), // // Errors for the internal error category @@ -935,6 +937,8 @@ public enum CoreError implements ScalarDbError { ""), CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED( Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""), + CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED( + Category.INTERNAL_ERROR, "0058", "Committing records failed", "", ""), // // Errors for the unknown transaction status error category diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index dff010e863..e775b876f7 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -2,7 +2,9 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.concurrent.LazyInit; +import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.Mutation; import com.scalar.db.api.TransactionState; @@ -24,6 +26,8 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; @@ -38,6 +42,7 @@ public class CommitHandler { private final ParallelExecutor parallelExecutor; private final MutationsGrouper mutationsGrouper; protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled; + private final boolean onePhaseCommitEnabled; @LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook; @@ -48,13 +53,15 @@ public CommitHandler( TransactionTableMetadataManager tableMetadataManager, ParallelExecutor parallelExecutor, MutationsGrouper mutationsGrouper, - boolean coordinatorWriteOmissionOnReadOnlyEnabled) { + boolean coordinatorWriteOmissionOnReadOnlyEnabled, + boolean onePhaseCommitEnabled) { this.storage = checkNotNull(storage); this.coordinator = checkNotNull(coordinator); this.tableMetadataManager = checkNotNull(tableMetadataManager); this.parallelExecutor = checkNotNull(parallelExecutor); this.mutationsGrouper = checkNotNull(mutationsGrouper); this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled; + this.onePhaseCommitEnabled = onePhaseCommitEnabled; } /** @@ -118,6 +125,16 @@ public void commit(Snapshot snapshot, boolean readOnly) Optional> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot); + if (canOnePhaseCommit(snapshot)) { + try { + onePhaseCommitRecords(snapshot); + return; + } catch (Exception e) { + safelyCallOnFailureBeforeCommit(snapshot); + throw e; + } + } + if (hasWritesOrDeletesInSnapshot) { try { prepareRecords(snapshot); @@ -170,6 +187,52 @@ public void commit(Snapshot snapshot, boolean readOnly) } } + @VisibleForTesting + boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException { + if (!onePhaseCommitEnabled) { + return false; + } + + // If validation is required (in SERIALIZABLE isolation), we cannot one-phase commit the + // transaction + if (snapshot.isValidationRequired()) { + return false; + } + + // If the snapshot has no write and deletes, we do not one-phase commit the transaction + if (!snapshot.hasWritesOrDeletes()) { + return false; + } + + List deletesInDeleteSet = snapshot.getDeletesInDeleteSet(); + + // If a record corresponding to a delete in the delete set does not exist in the storage, we + // cannot one-phase commit the transaction. This is because the storage does not support + // delete-if-not-exists semantics, so we cannot detect conflicts with other transactions. + for (Delete delete : deletesInDeleteSet) { + Optional result = snapshot.getFromReadSet(new Snapshot.Key(delete)); + + // For deletes, we always perform implicit pre-reads if the result does not exit in the read + // set. So the result should always exist in the read set. + assert result != null; + + if (!result.isPresent()) { + return false; + } + } + + try { + // If the mutations can be grouped altogether, the mutations can be done in a single mutate + // API call, so we can one-phase commit the transaction + return mutationsGrouper.canBeGroupedAltogether( + Stream.concat(snapshot.getPutsInWriteSet().stream(), deletesInDeleteSet.stream()) + .collect(Collectors.toList())); + } catch (ExecutionException e) { + throw new CommitException( + CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId()); + } + } + protected void handleCommitConflict(Snapshot snapshot, Exception cause) throws CommitConflictException, UnknownTransactionStatusException { try { @@ -197,6 +260,30 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause) } } + @VisibleForTesting + void onePhaseCommitRecords(Snapshot snapshot) throws CommitException { + try { + OnePhaseCommitMutationComposer composer = + new OnePhaseCommitMutationComposer(snapshot.getId(), tableMetadataManager); + snapshot.to(composer); + + // One-phase commit does not require grouping mutations and using the parallel executor since + // it is always executed in a single mutate API call. + storage.mutate(composer.get()); + } catch (NoMutationException e) { + throw new CommitConflictException( + CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId()); + } catch (RetriableExecutionException e) { + throw new CommitConflictException( + CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS.buildMessage(), + e, + snapshot.getId()); + } catch (ExecutionException e) { + throw new CommitException( + CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId()); + } + } + public void prepareRecords(Snapshot snapshot) throws PreparationException { try { PrepareMutationComposer composer = diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 09712bf2b1..1cb7d721db 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -31,6 +31,7 @@ public CommitHandlerWithGroupCommit( ParallelExecutor parallelExecutor, MutationsGrouper mutationsGrouper, boolean coordinatorWriteOmissionOnReadOnlyEnabled, + boolean onePhaseCommitEnabled, CoordinatorGroupCommitter groupCommitter) { super( storage, @@ -38,7 +39,8 @@ public CommitHandlerWithGroupCommit( tableMetadataManager, parallelExecutor, mutationsGrouper, - coordinatorWriteOmissionOnReadOnlyEnabled); + coordinatorWriteOmissionOnReadOnlyEnabled, + onePhaseCommitEnabled); checkNotNull(groupCommitter); // The methods of this emitter will be called via GroupCommitter.ready(). groupCommitter.setEmitter(new Emitter(coordinator)); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java index 7e1b9c5c07..292cb8c10c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java @@ -35,6 +35,7 @@ public class ConsensusCommitConfig { public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED = PREFIX + "coordinator.write_omission_on_read_only.enabled"; + public static final String ONE_PHASE_COMMIT_ENABLED = PREFIX + "one_phase_commit.enabled"; public static final String PARALLEL_IMPLICIT_PRE_READ = PREFIX + "parallel_implicit_pre_read.enabled"; public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled"; @@ -75,8 +76,9 @@ public class ConsensusCommitConfig { private final boolean asyncRollbackEnabled; private final boolean coordinatorWriteOmissionOnReadOnlyEnabled; + private final boolean onePhaseCommitEnabled; private final boolean parallelImplicitPreReadEnabled; - private final boolean isIncludeMetadataEnabled; + private final boolean includeMetadataEnabled; private final boolean coordinatorGroupCommitEnabled; private final int coordinatorGroupCommitSlotCapacity; @@ -145,10 +147,12 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) { coordinatorWriteOmissionOnReadOnlyEnabled = getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true); - isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false); + onePhaseCommitEnabled = getBoolean(properties, ONE_PHASE_COMMIT_ENABLED, false); parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true); + includeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false); + coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false); coordinatorGroupCommitSlotCapacity = getInt( @@ -219,12 +223,16 @@ public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() { return coordinatorWriteOmissionOnReadOnlyEnabled; } + public boolean isOnePhaseCommitEnabled() { + return onePhaseCommitEnabled; + } + public boolean isParallelImplicitPreReadEnabled() { return parallelImplicitPreReadEnabled; } public boolean isIncludeMetadataEnabled() { - return isIncludeMetadataEnabled; + return includeMetadataEnabled; } public boolean isCoordinatorGroupCommitEnabled() { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index d3020dab4a..bf8c78249c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -143,6 +143,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) { parallelExecutor, mutationsGrouper, config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), + config.isOnePhaseCommitEnabled(), groupCommitter); } else { return new CommitHandler( @@ -151,7 +152,8 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) { tableMetadataManager, parallelExecutor, mutationsGrouper, - config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); + config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), + config.isOnePhaseCommitEnabled()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java index 1412cefbc2..787650749f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java @@ -7,6 +7,7 @@ import com.scalar.db.io.Key; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -50,6 +51,41 @@ public List> groupMutations(Collection mutations) return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList()); } + public boolean canBeGroupedAltogether(Collection mutations) throws ExecutionException { + if (mutations.size() <= 1) { + return true; + } + + Iterator iterator = mutations.iterator(); + Mutation firstMutation = iterator.next(); + assert firstMutation.forNamespace().isPresent(); + StorageInfo storageInfo = + storageInfoProvider.getStorageInfo(firstMutation.forNamespace().get()); + MutationGroup firstGroup = new MutationGroup(firstMutation, storageInfo); + + int maxCount = firstGroup.storageInfo.getMaxAtomicMutationsCount(); + int mutationCount = 1; + + while (iterator.hasNext()) { + Mutation otherMutation = iterator.next(); + assert otherMutation.forNamespace().isPresent(); + StorageInfo otherStorageInfo = + storageInfoProvider.getStorageInfo(otherMutation.forNamespace().get()); + MutationGroup otherGroup = new MutationGroup(otherMutation, otherStorageInfo); + + if (!firstGroup.equals(otherGroup)) { + return false; // Found a mutation that does not belong to the first group + } + + mutationCount++; + if (mutationCount > maxCount) { + return false; // Exceeds the maximum allowed count for this group + } + } + + return true; // All mutations belong to the same group and within the count limit + } + private static class MutationGroup { public final StorageInfo storageInfo; @Nullable public final String namespace; diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposer.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposer.java new file mode 100644 index 0000000000..55fc339e90 --- /dev/null +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposer.java @@ -0,0 +1,132 @@ +package com.scalar.db.transaction.consensuscommit; + +import static com.scalar.db.transaction.consensuscommit.Attribute.ID; +import static com.scalar.db.transaction.consensuscommit.ConsensusCommitOperationAttributes.isInsertModeEnabled; +import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getNextTxVersion; +import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata; + +import com.google.common.annotations.VisibleForTesting; +import com.scalar.db.api.ConditionBuilder; +import com.scalar.db.api.Consistency; +import com.scalar.db.api.Delete; +import com.scalar.db.api.DeleteBuilder; +import com.scalar.db.api.Operation; +import com.scalar.db.api.Put; +import com.scalar.db.api.PutBuilder; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionState; +import com.scalar.db.exception.storage.ExecutionException; +import java.util.LinkedHashSet; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +@NotThreadSafe +public class OnePhaseCommitMutationComposer extends AbstractMutationComposer { + + public OnePhaseCommitMutationComposer( + String id, TransactionTableMetadataManager tableMetadataManager) { + super(id, tableMetadataManager); + } + + @VisibleForTesting + OnePhaseCommitMutationComposer( + String id, long current, TransactionTableMetadataManager tableMetadataManager) { + super(id, current, tableMetadataManager); + } + + @Override + public void add(Operation base, @Nullable TransactionResult result) throws ExecutionException { + if (base instanceof Put) { + add((Put) base, result); + } else { + assert base instanceof Delete; + add((Delete) base, result); + } + } + + private void add(Put base, @Nullable TransactionResult result) throws ExecutionException { + mutations.add(composePut(base, result)); + } + + private void add(Delete base, @Nullable TransactionResult result) { + mutations.add(composeDelete(base, result)); + } + + private Put composePut(Put base, @Nullable TransactionResult result) throws ExecutionException { + assert base.forNamespace().isPresent() && base.forTable().isPresent(); + + PutBuilder.Buildable putBuilder = + Put.newBuilder() + .namespace(base.forNamespace().get()) + .table(base.forTable().get()) + .partitionKey(base.getPartitionKey()) + .consistency(Consistency.LINEARIZABLE); + base.getClusteringKey().ifPresent(putBuilder::clusteringKey); + base.getColumns().values().forEach(putBuilder::value); + + putBuilder.textValue(Attribute.ID, id); + putBuilder.intValue(Attribute.STATE, TransactionState.COMMITTED.get()); + putBuilder.bigIntValue(Attribute.PREPARED_AT, current); + putBuilder.bigIntValue(Attribute.COMMITTED_AT, current); + + if (!isInsertModeEnabled(base) && result != null) { // overwrite existing record + int version = result.getVersion(); + putBuilder.intValue(Attribute.VERSION, getNextTxVersion(version)); + + // check if the record is not interrupted by other conflicting transactions + if (result.isDeemedAsCommitted()) { + // record is deemed-commit state + putBuilder.condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isNullText()).build()); + } else { + putBuilder.condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(result.getId())) + .build()); + } + + // Set before image columns to null + TransactionTableMetadata transactionTableMetadata = + getTransactionTableMetadata(tableMetadataManager, base); + LinkedHashSet beforeImageColumnNames = + transactionTableMetadata.getBeforeImageColumnNames(); + TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata(); + setBeforeImageColumnsToNull(putBuilder, beforeImageColumnNames, tableMetadata); + } else { // initial record or insert mode enabled + putBuilder.intValue(Attribute.VERSION, getNextTxVersion(null)); + + // check if the record is not created by other conflicting transactions + putBuilder.condition(ConditionBuilder.putIfNotExists()); + } + + return putBuilder.build(); + } + + private Delete composeDelete(Delete base, @Nullable TransactionResult result) { + assert base.forNamespace().isPresent() && base.forTable().isPresent(); + + // If a record corresponding to a delete in the delete set does not exist in the storage, we + // cannot one-phase commit the transaction. This is because the storage does not support + // delete-if-not-exists semantics, so we cannot detect conflicts with other transactions. + assert result != null; + + DeleteBuilder.Buildable deleteBuilder = + Delete.newBuilder() + .namespace(base.forNamespace().get()) + .table(base.forTable().get()) + .partitionKey(base.getPartitionKey()) + .consistency(Consistency.LINEARIZABLE); + base.getClusteringKey().ifPresent(deleteBuilder::clusteringKey); + + // check if the record is not interrupted by other conflicting transactions + if (result.isDeemedAsCommitted()) { + deleteBuilder.condition( + ConditionBuilder.deleteIf(ConditionBuilder.column(ID).isNullText()).build()); + } else { + deleteBuilder.condition( + ConditionBuilder.deleteIf(ConditionBuilder.column(ID).isEqualToText(result.getId())) + .build()); + } + + return deleteBuilder.build(); + } +} diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 1c95ae979d..a76ad4acbc 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -25,7 +25,6 @@ import com.scalar.db.common.CoreError; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; -import com.scalar.db.exception.transaction.PreparationConflictException; import com.scalar.db.exception.transaction.ValidationConflictException; import com.scalar.db.io.Column; import com.scalar.db.transaction.consensuscommit.ParallelExecutor.ParallelExecutorTask; @@ -303,8 +302,7 @@ private TableMetadata getTableMetadata(Key key) throws CrudException { } } - public void to(MutationComposer composer) - throws ExecutionException, PreparationConflictException { + public void to(MutationComposer composer) throws ExecutionException { for (Entry entry : writeSet.entrySet()) { TransactionResult result = readSet.containsKey(entry.getKey()) ? readSet.get(entry.getKey()).orElse(null) : null; diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 3b779385d7..4a82b5d9ad 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -83,7 +83,8 @@ public TwoPhaseConsensusCommitManager( tableMetadataManager, parallelExecutor, new MutationsGrouper(new StorageInfoProvider(admin)), - config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); + config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), + config.isOnePhaseCommitEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); } @@ -108,7 +109,8 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) { tableMetadataManager, parallelExecutor, new MutationsGrouper(new StorageInfoProvider(admin)), - config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); + config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), + config.isOnePhaseCommitEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index cf5e32260e..3a355ece42 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -15,6 +16,7 @@ import static org.mockito.Mockito.when; import com.google.common.util.concurrent.Uninterruptibles; +import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.Get; import com.scalar.db.api.Put; @@ -67,6 +69,7 @@ public class CommitHandlerTest { private CommitHandler handler; protected ParallelExecutor parallelExecutor; + protected MutationsGrouper mutationsGrouper; protected String anyId() { return ANY_ID; @@ -83,7 +86,8 @@ protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnRe tableMetadataManager, parallelExecutor, new MutationsGrouper(storageInfoProvider), - coordinatorWriteOmissionOnReadOnlyEnabled); + coordinatorWriteOmissionOnReadOnlyEnabled, + false); } @BeforeEach @@ -92,6 +96,7 @@ void setUp() throws Exception { parallelExecutor = new ParallelExecutor(config); handler = spy(createCommitHandler(true)); + mutationsGrouper = spy(new MutationsGrouper(storageInfoProvider)); extraInitialize(); @@ -147,10 +152,17 @@ private Get prepareGet() { .build(); } + private Delete prepareDelete() { + return Delete.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .build(); + } + private Snapshot prepareSnapshotWithDifferentPartitionPut() { - Snapshot snapshot = - new Snapshot( - anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + Snapshot snapshot = prepareSnapshot(); // different partition Put put1 = preparePut1(); @@ -165,9 +177,7 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() { } private Snapshot prepareSnapshotWithSamePartitionPut() { - Snapshot snapshot = - new Snapshot( - anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + Snapshot snapshot = prepareSnapshot(); // same partition Put put1 = preparePut1(); @@ -182,9 +192,7 @@ private Snapshot prepareSnapshotWithSamePartitionPut() { } private Snapshot prepareSnapshotWithoutWrites() { - Snapshot snapshot = - new Snapshot( - anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + Snapshot snapshot = prepareSnapshot(); Get get = prepareGet(); snapshot.putIntoGetSet(get, Optional.empty()); @@ -193,9 +201,7 @@ private Snapshot prepareSnapshotWithoutWrites() { } private Snapshot prepareSnapshotWithoutReads() { - Snapshot snapshot = - new Snapshot( - anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + Snapshot snapshot = prepareSnapshot(); // same partition Put put1 = preparePut1(); @@ -206,6 +212,15 @@ private Snapshot prepareSnapshotWithoutReads() { return snapshot; } + private Snapshot prepareSnapshotWithIsolation(Isolation isolation) { + return new Snapshot(anyId(), isolation, tableMetadataManager, new ParallelExecutor(config)); + } + + private Snapshot prepareSnapshot() { + return new Snapshot( + anyId(), Isolation.SNAPSHOT, tableMetadataManager, new ParallelExecutor(config)); + } + private void setBeforePreparationSnapshotHookIfNeeded(boolean withSnapshotHook) { if (withSnapshotHook) { doReturn(beforePreparationSnapshotHookFuture) @@ -949,6 +964,224 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException() verify(handler).onFailureBeforeCommit(snapshot); } + @Test + public void canOnePhaseCommit_WhenOnePhaseCommitDisabled_ShouldReturnFalse() throws Exception { + // Arrange + Snapshot snapshot = prepareSnapshot(); + + // Act + boolean actual = handler.canOnePhaseCommit(snapshot); + + // Assert + assertThat(actual).isFalse(); + verify(mutationsGrouper, never()).canBeGroupedAltogether(anyList()); + } + + @Test + public void canOnePhaseCommit_WhenValidationRequired_ShouldReturnFalse() throws Exception { + // Arrange + CommitHandler handler = createCommitHandlerWithOnePhaseCommit(); + Snapshot snapshot = prepareSnapshotWithIsolation(Isolation.SERIALIZABLE); + + // Act + boolean actual = handler.canOnePhaseCommit(snapshot); + + // Assert + assertThat(actual).isFalse(); + verify(mutationsGrouper, never()).canBeGroupedAltogether(anyList()); + } + + @Test + public void canOnePhaseCommit_WhenNoWritesAndDeletes_ShouldReturnFalse() throws Exception { + // Arrange + CommitHandler handler = createCommitHandlerWithOnePhaseCommit(); + Snapshot snapshot = prepareSnapshotWithoutWrites(); + + // Act + boolean actual = handler.canOnePhaseCommit(snapshot); + + // Assert + assertThat(actual).isFalse(); + verify(mutationsGrouper, never()).canBeGroupedAltogether(anyList()); + } + + @Test + public void canOnePhaseCommit_WhenDeleteWithoutExistingRecord_ShouldReturnFalse() + throws Exception { + // Arrange + CommitHandler handler = createCommitHandlerWithOnePhaseCommit(); + Snapshot snapshot = prepareSnapshot(); + + // Setup a delete with no corresponding record in read set + Delete delete = prepareDelete(); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); + snapshot.putIntoReadSet(new Snapshot.Key(delete), Optional.empty()); + + // Act + boolean actual = handler.canOnePhaseCommit(snapshot); + + // Assert + assertThat(actual).isFalse(); + verify(mutationsGrouper, never()).canBeGroupedAltogether(anyList()); + } + + @Test + public void canOnePhaseCommit_WhenMutationsCanBeGrouped_ShouldReturnTrue() throws Exception { + // Arrange + CommitHandler handler = createCommitHandlerWithOnePhaseCommit(); + Snapshot snapshot = prepareSnapshot(); + + Delete delete = prepareDelete(); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); + TransactionResult result = mock(TransactionResult.class); + snapshot.putIntoReadSet(new Snapshot.Key(delete), Optional.of(result)); + + doReturn(true).when(mutationsGrouper).canBeGroupedAltogether(anyList()); + + // Act + boolean actual = handler.canOnePhaseCommit(snapshot); + + // Assert + assertThat(actual).isTrue(); + verify(mutationsGrouper).canBeGroupedAltogether(anyList()); + } + + @Test + public void canOnePhaseCommit_WhenMutationsCannotBeGrouped_ShouldReturnFalse() throws Exception { + // Arrange + CommitHandler handler = createCommitHandlerWithOnePhaseCommit(); + Snapshot snapshot = prepareSnapshot(); + + Delete delete = prepareDelete(); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); + TransactionResult result = mock(TransactionResult.class); + snapshot.putIntoReadSet(new Snapshot.Key(delete), Optional.of(result)); + + doReturn(false).when(mutationsGrouper).canBeGroupedAltogether(anyList()); + + // Act + boolean actual = handler.canOnePhaseCommit(snapshot); + + // Assert + assertThat(actual).isFalse(); + verify(mutationsGrouper).canBeGroupedAltogether(anyList()); + } + + @Test + public void canOnePhaseCommit_WhenMutationsGrouperThrowsException_ShouldThrowCommitException() + throws ExecutionException { + // Arrange + CommitHandler handler = createCommitHandlerWithOnePhaseCommit(); + Snapshot snapshot = prepareSnapshot(); + + Delete delete = prepareDelete(); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); + TransactionResult result = mock(TransactionResult.class); + snapshot.putIntoReadSet(new Snapshot.Key(delete), Optional.of(result)); + + doThrow(ExecutionException.class).when(mutationsGrouper).canBeGroupedAltogether(anyList()); + + // Act Assert + assertThatThrownBy(() -> handler.canOnePhaseCommit(snapshot)) + .isInstanceOf(CommitException.class) + .hasCauseInstanceOf(ExecutionException.class); + } + + @Test + public void onePhaseCommitRecords_WhenSuccessful_ShouldMutateUsingComposerMutations() + throws CommitException, ExecutionException { + // Arrange + Snapshot snapshot = spy(prepareSnapshotWithSamePartitionPut()); + doNothing().when(storage).mutate(anyList()); + + // Act + handler.onePhaseCommitRecords(snapshot); + + // Assert + verify(storage).mutate(anyList()); + verify(snapshot).to(any(OnePhaseCommitMutationComposer.class)); + } + + @Test + public void + onePhaseCommitRecords_WhenNoMutationExceptionThrown_ShouldThrowCommitConflictException() + throws ExecutionException { + // Arrange + Snapshot snapshot = prepareSnapshotWithSamePartitionPut(); + doThrow(NoMutationException.class).when(storage).mutate(anyList()); + + // Act Assert + assertThatThrownBy(() -> handler.onePhaseCommitRecords(snapshot)) + .isInstanceOf(CommitConflictException.class) + .hasCauseInstanceOf(NoMutationException.class); + } + + @Test + public void + onePhaseCommitRecords_WhenRetriableExecutionExceptionThrown_ShouldThrowCommitConflictException() + throws ExecutionException { + // Arrange + Snapshot snapshot = prepareSnapshotWithSamePartitionPut(); + doThrow(RetriableExecutionException.class).when(storage).mutate(anyList()); + + // Act Assert + assertThatThrownBy(() -> handler.onePhaseCommitRecords(snapshot)) + .isInstanceOf(CommitConflictException.class) + .hasCauseInstanceOf(RetriableExecutionException.class); + } + + @Test + public void onePhaseCommitRecords_WhenExecutionExceptionThrown_ShouldThrowCommitException() + throws ExecutionException { + // Arrange + Snapshot snapshot = prepareSnapshotWithSamePartitionPut(); + doThrow(ExecutionException.class).when(storage).mutate(anyList()); + + // Act Assert + assertThatThrownBy(() -> handler.onePhaseCommitRecords(snapshot)) + .isInstanceOf(CommitException.class) + .hasCauseInstanceOf(ExecutionException.class); + } + + @Test + public void commit_OnePhaseCommitted_ShouldNotThrowAnyException() + throws CommitException, UnknownTransactionStatusException { + // Arrange + CommitHandler handler = spy(createCommitHandlerWithOnePhaseCommit()); + Snapshot snapshot = prepareSnapshotWithSamePartitionPut(); + + doReturn(true).when(handler).canOnePhaseCommit(snapshot); + doNothing().when(handler).onePhaseCommitRecords(snapshot); + + // Act + handler.commit(snapshot, true); + + // Assert + verify(handler).canOnePhaseCommit(snapshot); + verify(handler).onePhaseCommitRecords(snapshot); + } + + @Test + public void commit_OnePhaseCommitted_CommitExceptionThrown_ShouldThrowCommitException() + throws CommitException { + // Arrange + CommitHandler handler = spy(createCommitHandlerWithOnePhaseCommit()); + Snapshot snapshot = prepareSnapshotWithSamePartitionPut(); + + doReturn(true).when(handler).canOnePhaseCommit(snapshot); + doThrow(CommitException.class).when(handler).onePhaseCommitRecords(snapshot); + + // Act Assert + assertThatThrownBy(() -> handler.commit(snapshot, true)).isInstanceOf(CommitException.class); + + verify(handler).onFailureBeforeCommit(snapshot); + } + + private CommitHandler createCommitHandlerWithOnePhaseCommit() { + return new CommitHandler( + storage, coordinator, tableMetadataManager, parallelExecutor, mutationsGrouper, true, true); + } + protected void doThrowExceptionWhenCoordinatorPutState( TransactionState targetState, Class exceptionClass) throws CoordinatorException { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java index 6163b0bf9a..3d05640e28 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java @@ -22,6 +22,8 @@ import com.scalar.db.util.groupcommit.GroupCommitConfig; import java.util.List; import java.util.UUID; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; @@ -69,6 +71,7 @@ protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnRe parallelExecutor, new MutationsGrouper(storageInfoProvider), coordinatorWriteOmissionOnReadOnlyEnabled, + false, groupCommitter); } @@ -195,4 +198,71 @@ public void commit_NoReadsInSnapshot_ShouldNotValidateRecords(boolean withSnapsh // Assert verify(groupCommitter, never()).remove(anyId()); } + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void canOnePhaseCommit_WhenOnePhaseCommitDisabled_ShouldReturnFalse() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void canOnePhaseCommit_WhenValidationRequired_ShouldReturnFalse() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void canOnePhaseCommit_WhenNoWritesAndDeletes_ShouldReturnFalse() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void canOnePhaseCommit_WhenDeleteWithoutExistingRecord_ShouldReturnFalse() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void canOnePhaseCommit_WhenMutationsCanBeGrouped_ShouldReturnTrue() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void canOnePhaseCommit_WhenMutationsCannotBeGrouped_ShouldReturnFalse() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void canOnePhaseCommit_WhenMutationsGrouperThrowsException_ShouldThrowCommitException() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void onePhaseCommitRecords_WhenSuccessful_ShouldMutateUsingComposerMutations() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void + onePhaseCommitRecords_WhenNoMutationExceptionThrown_ShouldThrowCommitConflictException() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void + onePhaseCommitRecords_WhenRetriableExecutionExceptionThrown_ShouldThrowCommitConflictException() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void onePhaseCommitRecords_WhenExecutionExceptionThrown_ShouldThrowCommitException() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void commit_OnePhaseCommitted_ShouldNotThrowAnyException() {} + + @Disabled("Enabling both one-phase commit and group commit is not supported") + @Override + @Test + public void commit_OnePhaseCommitted_CommitExceptionThrown_ShouldThrowCommitException() {} } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java index 263724c5dd..e2d1aa8f4e 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfigTest.java @@ -28,6 +28,7 @@ public void constructor_NoPropertiesGiven_ShouldLoadAsDefaultValues() { assertThat(config.isAsyncCommitEnabled()).isFalse(); assertThat(config.isAsyncRollbackEnabled()).isFalse(); assertThat(config.isCoordinatorWriteOmissionOnReadOnlyEnabled()).isTrue(); + assertThat(config.isOnePhaseCommitEnabled()).isFalse(); assertThat(config.isParallelImplicitPreReadEnabled()).isTrue(); assertThat(config.isIncludeMetadataEnabled()).isFalse(); } @@ -169,6 +170,19 @@ public void constructor_AsyncExecutionRelatedPropertiesGiven_ShouldLoadProperly( assertThat(config.isCoordinatorWriteOmissionOnReadOnlyEnabled()).isFalse(); } + @Test + public void constructor_PropertiesWithOnePhaseCommitEnabledGiven_ShouldLoadProperly() { + // Arrange + Properties props = new Properties(); + props.setProperty(ConsensusCommitConfig.ONE_PHASE_COMMIT_ENABLED, "true"); + + // Act + ConsensusCommitConfig config = new ConsensusCommitConfig(new DatabaseConfig(props)); + + // Assert + assertThat(config.isOnePhaseCommitEnabled()).isTrue(); + } + @Test public void constructor_PropertiesWithParallelImplicitPreReadEnabledGiven_ShouldLoadProperly() { // Arrange diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java index a71db38600..1ca89b2f05 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java @@ -336,6 +336,280 @@ public void groupMutations_WithStorageAtomicityAndBatchSizeLimit_ShouldGroupCorr assertThat(result.get(2)).containsExactly(mutation2, mutation4); } + @Test + public void canBeGroupedAltogether_WithEmptyCollection_ShouldReturnTrue() + throws ExecutionException { + // Act + boolean result = grouper.canBeGroupedAltogether(Collections.emptyList()); + + // Assert + assertThat(result).isTrue(); + } + + @Test + public void + canBeGroupedAltogether_WithAllMutationsInSameGroupForRecordAtomicity_ShouldReturnTrue() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.RECORD); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create multiple mutations with same partition key, clustering key, table and namespace + Key partitionKey1 = mock(Key.class); + Key clusteringKey1 = mock(Key.class); + Mutation mutation1 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey1)); + Mutation mutation2 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey1)); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isTrue(); + } + + @Test + public void + canBeGroupedAltogether_WithAllMutationsInSameGroupForPartitionAtomicity_ShouldReturnTrue() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.PARTITION); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create mutations with same partition key but different clustering keys + Key partitionKey1 = mock(Key.class); + Key clusteringKey1 = mock(Key.class); + Key clusteringKey2 = mock(Key.class); + Mutation mutation1 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey1)); + Mutation mutation2 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey2)); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isTrue(); + } + + @Test + public void canBeGroupedAltogether_WithAllMutationsInSameGroupForTableAtomicity_ShouldReturnTrue() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.TABLE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create mutations with same table but different partition keys + Key partitionKey1 = mock(Key.class); + Key partitionKey2 = mock(Key.class); + Mutation mutation1 = createMutation(namespace, table, partitionKey1, Optional.empty()); + Mutation mutation2 = createMutation(namespace, table, partitionKey2, Optional.empty()); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isTrue(); + } + + @Test + public void + canBeGroupedAltogether_WithAllMutationsInSameGroupForNamespaceAtomicity_ShouldReturnTrue() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table1 = "table1"; + String table2 = "table2"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.NAMESPACE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create mutations with same namespace but different tables + Key partitionKey1 = mock(Key.class); + Key partitionKey2 = mock(Key.class); + Mutation mutation1 = createMutation(namespace, table1, partitionKey1, Optional.empty()); + Mutation mutation2 = createMutation(namespace, table2, partitionKey2, Optional.empty()); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isTrue(); + } + + @Test + public void + canBeGroupedAltogether_WithAllMutationsInSameGroupForStorageAtomicity_ShouldReturnTrue() + throws ExecutionException { + // Arrange + String namespace1 = "ns1"; + String namespace2 = "ns2"; + String table1 = "table1"; + String table2 = "table2"; + + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.STORAGE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace1)).thenReturn(storageInfo); + when(storageInfoProvider.getStorageInfo(namespace2)).thenReturn(storageInfo); + + // Create mutations with different namespaces but same storage + Key partitionKey1 = mock(Key.class); + Key partitionKey2 = mock(Key.class); + Mutation mutation1 = createMutation(namespace1, table1, partitionKey1, Optional.empty()); + Mutation mutation2 = createMutation(namespace2, table2, partitionKey2, Optional.empty()); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isTrue(); + } + + @Test + public void canBeGroupedAltogether_WithMutationsInDifferentGroups_ShouldReturnFalse() + throws ExecutionException { + // Arrange + String namespace1 = "ns1"; + String namespace2 = "ns2"; + String table1 = "table1"; + String table2 = "table2"; + + StorageInfo storageInfo1 = mock(StorageInfo.class); + when(storageInfo1.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.TABLE); + when(storageInfo1.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo1.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace1)).thenReturn(storageInfo1); + + StorageInfo storageInfo2 = mock(StorageInfo.class); + when(storageInfo2.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.TABLE); + when(storageInfo2.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo2.getStorageName()).thenReturn("storage2"); + when(storageInfoProvider.getStorageInfo(namespace2)).thenReturn(storageInfo2); + + // Create mutations with different storage + Key partitionKey1 = mock(Key.class); + Key partitionKey2 = mock(Key.class); + Mutation mutation1 = createMutation(namespace1, table1, partitionKey1, Optional.empty()); + Mutation mutation2 = createMutation(namespace2, table2, partitionKey2, Optional.empty()); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isFalse(); + } + + @Test + public void + canBeGroupedAltogether_WithMutationsInDifferentTables_ShouldReturnFalseForTableAtomicity() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table1 = "table1"; + String table2 = "table2"; + + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.TABLE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create mutations with same namespace but different tables + Key partitionKey1 = mock(Key.class); + Key partitionKey2 = mock(Key.class); + Mutation mutation1 = createMutation(namespace, table1, partitionKey1, Optional.empty()); + Mutation mutation2 = createMutation(namespace, table2, partitionKey2, Optional.empty()); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isFalse(); + } + + @Test + public void + canBeGroupedAltogether_WithMutationsInDifferentPartitions_ShouldReturnFalseForPartitionAtomicity() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.PARTITION); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create mutations with same table but different partition keys + Key partitionKey1 = mock(Key.class); + Key partitionKey2 = mock(Key.class); + Mutation mutation1 = createMutation(namespace, table, partitionKey1, Optional.empty()); + Mutation mutation2 = createMutation(namespace, table, partitionKey2, Optional.empty()); + + // Act + boolean result = grouper.canBeGroupedAltogether(Arrays.asList(mutation1, mutation2)); + + // Assert + assertThat(result).isFalse(); + } + + @Test + public void canBeGroupedAltogether_WithOverMaxCount_ShouldReturnFalse() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.TABLE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(3); // Max 3 mutations allowed + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create 4 mutations (one over max count) + List mutations = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + Key partitionKey = mock(Key.class); + mutations.add(createMutation(namespace, table, partitionKey, Optional.empty())); + } + + // Act + boolean result = grouper.canBeGroupedAltogether(mutations); + + // Assert + assertThat(result).isFalse(); + } + private Mutation createMutation( String namespace, String table, Key partitionKey, Optional clusteringKey) { Mutation mutation = mock(Put.class); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposerTest.java new file mode 100644 index 0000000000..027689d4db --- /dev/null +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposerTest.java @@ -0,0 +1,439 @@ +package com.scalar.db.transaction.consensuscommit; + +import static com.scalar.db.api.ConditionBuilder.*; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_COMMITTED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_ID; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_PREFIX; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_PREPARED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_STATE; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_VERSION; +import static com.scalar.db.transaction.consensuscommit.Attribute.COMMITTED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.ID; +import static com.scalar.db.transaction.consensuscommit.Attribute.PREPARED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.STATE; +import static com.scalar.db.transaction.consensuscommit.Attribute.VERSION; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.scalar.db.api.Consistency; +import com.scalar.db.api.Delete; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Operation; +import com.scalar.db.api.Put; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionState; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.io.BigIntColumn; +import com.scalar.db.io.BlobColumn; +import com.scalar.db.io.BooleanColumn; +import com.scalar.db.io.Column; +import com.scalar.db.io.DataType; +import com.scalar.db.io.DateColumn; +import com.scalar.db.io.DoubleColumn; +import com.scalar.db.io.FloatColumn; +import com.scalar.db.io.IntColumn; +import com.scalar.db.io.Key; +import com.scalar.db.io.TextColumn; +import com.scalar.db.io.TimeColumn; +import com.scalar.db.io.TimestampColumn; +import com.scalar.db.io.TimestampTZColumn; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class OnePhaseCommitMutationComposerTest { + private static final String ANY_NAMESPACE_NAME = "namespace"; + private static final String ANY_TABLE_NAME = "table"; + private static final String ANY_ID_1 = "id1"; + private static final String ANY_ID_2 = "id2"; + private static final long ANY_TIME_MILLIS_1 = 100; + private static final long ANY_TIME_MILLIS_2 = 200; + private static final long ANY_TIME_MILLIS_3 = 300; + private static final String ANY_NAME_1 = "name1"; + private static final String ANY_NAME_2 = "name2"; + private static final String ANY_NAME_3 = "name3"; + private static final String ANY_NAME_4 = "name4"; + private static final String ANY_NAME_5 = "name5"; + private static final String ANY_NAME_6 = "name6"; + private static final String ANY_NAME_7 = "name7"; + private static final String ANY_NAME_8 = "name8"; + private static final String ANY_NAME_9 = "name9"; + private static final String ANY_NAME_10 = "name10"; + private static final String ANY_NAME_11 = "name11"; + private static final String ANY_NAME_12 = "name12"; + private static final String ANY_NAME_13 = "name13"; + private static final String ANY_TEXT_1 = "text1"; + private static final String ANY_TEXT_2 = "text2"; + private static final String ANY_TEXT_3 = "text3"; + private static final int ANY_INT = 100; + private static final long ANY_BIGINT = 1000L; + private static final float ANY_FLOAT = 1.23f; + private static final double ANY_DOUBLE = 7.89; + private static final byte[] ANY_BLOB = new byte[] {1, 2, 3}; + private static final LocalDate ANY_DATE = LocalDate.of(2020, 1, 1); + private static final LocalTime ANY_TIME = LocalTime.of(12, 0, 0); + private static final LocalDateTime ANY_TIMESTAMP = LocalDateTime.of(2020, 1, 1, 12, 0, 0); + private static final Instant ANY_TIMESTAMPTZ = + LocalDateTime.of(2020, 1, 1, 12, 0, 0).toInstant(ZoneOffset.UTC); + + private static final TableMetadata TABLE_METADATA = + ConsensusCommitUtils.buildTransactionTableMetadata( + TableMetadata.newBuilder() + .addColumn(ANY_NAME_1, DataType.TEXT) + .addColumn(ANY_NAME_2, DataType.TEXT) + .addColumn(ANY_NAME_3, DataType.INT) + .addColumn(ANY_NAME_4, DataType.BOOLEAN) + .addColumn(ANY_NAME_5, DataType.BIGINT) + .addColumn(ANY_NAME_6, DataType.FLOAT) + .addColumn(ANY_NAME_7, DataType.DOUBLE) + .addColumn(ANY_NAME_8, DataType.TEXT) + .addColumn(ANY_NAME_9, DataType.BLOB) + .addColumn(ANY_NAME_10, DataType.DATE) + .addColumn(ANY_NAME_11, DataType.TIME) + .addColumn(ANY_NAME_12, DataType.TIMESTAMP) + .addColumn(ANY_NAME_13, DataType.TIMESTAMPTZ) + .addPartitionKey(ANY_NAME_1) + .addClusteringKey(ANY_NAME_2) + .build()); + + @Mock private TransactionTableMetadataManager tableMetadataManager; + + private OnePhaseCommitMutationComposer composer; + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this).close(); + + // Arrange + composer = + new OnePhaseCommitMutationComposer(ANY_ID_2, ANY_TIME_MILLIS_3, tableMetadataManager); + + when(tableMetadataManager.getTransactionTableMetadata(any(Operation.class))) + .thenReturn(new TransactionTableMetadata(TABLE_METADATA)); + } + + private Put preparePut() { + return Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT) + .build(); + } + + private Put preparePutWithInsertMode() { + return Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT) + .enableInsertMode() + .build(); + } + + private Delete prepareDelete() { + return Delete.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .build(); + } + + private TransactionResult prepareInitialResult(String id) { + ImmutableMap.Builder> builder = + ImmutableMap.>builder() + .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) + .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) + .put(ANY_NAME_3, IntColumn.of(ANY_NAME_3, ANY_INT)) + .put(ANY_NAME_4, BooleanColumn.of(ANY_NAME_4, false)) + .put(ANY_NAME_5, BigIntColumn.of(ANY_NAME_5, ANY_BIGINT)) + .put(ANY_NAME_6, FloatColumn.of(ANY_NAME_6, ANY_FLOAT)) + .put(ANY_NAME_7, DoubleColumn.of(ANY_NAME_7, ANY_DOUBLE)) + .put(ANY_NAME_8, TextColumn.of(ANY_NAME_8, ANY_TEXT_3)) + .put(ANY_NAME_9, BlobColumn.of(ANY_NAME_9, ANY_BLOB)) + .put(ANY_NAME_10, DateColumn.of(ANY_NAME_10, ANY_DATE)) + .put(ANY_NAME_11, TimeColumn.of(ANY_NAME_11, ANY_TIME)) + .put(ANY_NAME_12, TimestampColumn.of(ANY_NAME_12, ANY_TIMESTAMP)) + .put(ANY_NAME_13, TimestampTZColumn.of(ANY_NAME_13, ANY_TIMESTAMPTZ)) + .put(ID, TextColumn.of(ID, id)) + .put(PREPARED_AT, BigIntColumn.of(PREPARED_AT, ANY_TIME_MILLIS_1)) + .put(COMMITTED_AT, BigIntColumn.of(COMMITTED_AT, ANY_TIME_MILLIS_2)) + .put(STATE, IntColumn.of(STATE, TransactionState.COMMITTED.get())) + .put(VERSION, IntColumn.of(VERSION, 1)) + .put(BEFORE_PREFIX + ANY_NAME_3, IntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_3)) + .put(BEFORE_PREFIX + ANY_NAME_4, BooleanColumn.ofNull(BEFORE_PREFIX + ANY_NAME_4)) + .put(BEFORE_PREFIX + ANY_NAME_5, BigIntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_5)) + .put(BEFORE_PREFIX + ANY_NAME_6, FloatColumn.ofNull(BEFORE_PREFIX + ANY_NAME_6)) + .put(BEFORE_PREFIX + ANY_NAME_7, DoubleColumn.ofNull(BEFORE_PREFIX + ANY_NAME_7)) + .put(BEFORE_PREFIX + ANY_NAME_8, TextColumn.ofNull(BEFORE_PREFIX + ANY_NAME_8)) + .put(BEFORE_PREFIX + ANY_NAME_9, BlobColumn.ofNull(BEFORE_PREFIX + ANY_NAME_9)) + .put(BEFORE_PREFIX + ANY_NAME_10, DateColumn.ofNull(BEFORE_PREFIX + ANY_NAME_10)) + .put(BEFORE_PREFIX + ANY_NAME_11, TimeColumn.ofNull(BEFORE_PREFIX + ANY_NAME_11)) + .put(BEFORE_PREFIX + ANY_NAME_12, TimestampColumn.ofNull(BEFORE_PREFIX + ANY_NAME_12)) + .put(BEFORE_PREFIX + ANY_NAME_13, TimestampTZColumn.ofNull(BEFORE_PREFIX + ANY_NAME_13)) + .put(BEFORE_ID, TextColumn.ofNull(BEFORE_ID)) + .put(BEFORE_PREPARED_AT, BigIntColumn.ofNull(BEFORE_PREPARED_AT)) + .put(BEFORE_COMMITTED_AT, BigIntColumn.ofNull(BEFORE_COMMITTED_AT)) + .put(BEFORE_STATE, IntColumn.ofNull(BEFORE_STATE)) + .put(BEFORE_VERSION, IntColumn.ofNull(BEFORE_VERSION)); + return new TransactionResult(new ResultImpl(builder.build(), TABLE_METADATA)); + } + + private TransactionResult prepareInitialResultWithNullMetadata() { + ImmutableMap> columns = + ImmutableMap.>builder() + .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) + .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) + .put(ANY_NAME_3, IntColumn.of(ANY_NAME_3, ANY_INT)) + .put(ANY_NAME_4, BooleanColumn.of(ANY_NAME_4, false)) + .put(ANY_NAME_5, BigIntColumn.of(ANY_NAME_5, ANY_BIGINT)) + .put(ANY_NAME_6, FloatColumn.of(ANY_NAME_6, ANY_FLOAT)) + .put(ANY_NAME_7, DoubleColumn.of(ANY_NAME_7, ANY_DOUBLE)) + .put(ANY_NAME_8, TextColumn.of(ANY_NAME_8, ANY_TEXT_3)) + .put(ANY_NAME_9, BlobColumn.of(ANY_NAME_9, ANY_BLOB)) + .put(ANY_NAME_10, DateColumn.of(ANY_NAME_10, ANY_DATE)) + .put(ANY_NAME_11, TimeColumn.of(ANY_NAME_11, ANY_TIME)) + .put(ANY_NAME_12, TimestampColumn.of(ANY_NAME_12, ANY_TIMESTAMP)) + .put(ANY_NAME_13, TimestampTZColumn.of(ANY_NAME_13, ANY_TIMESTAMPTZ)) + .put(ID, TextColumn.ofNull(ID)) + .put(PREPARED_AT, BigIntColumn.ofNull(PREPARED_AT)) + .put(COMMITTED_AT, BigIntColumn.ofNull(COMMITTED_AT)) + .put(STATE, IntColumn.ofNull(STATE)) + .put(VERSION, IntColumn.ofNull(VERSION)) + .put(BEFORE_PREFIX + ANY_NAME_3, IntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_3)) + .put(BEFORE_PREFIX + ANY_NAME_4, BooleanColumn.ofNull(BEFORE_PREFIX + ANY_NAME_4)) + .put(BEFORE_PREFIX + ANY_NAME_5, BigIntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_5)) + .put(BEFORE_PREFIX + ANY_NAME_6, FloatColumn.ofNull(BEFORE_PREFIX + ANY_NAME_6)) + .put(BEFORE_PREFIX + ANY_NAME_7, DoubleColumn.ofNull(BEFORE_PREFIX + ANY_NAME_7)) + .put(BEFORE_PREFIX + ANY_NAME_8, TextColumn.ofNull(BEFORE_PREFIX + ANY_NAME_8)) + .put(BEFORE_PREFIX + ANY_NAME_9, BlobColumn.ofNull(BEFORE_PREFIX + ANY_NAME_9)) + .put(BEFORE_PREFIX + ANY_NAME_10, DateColumn.ofNull(BEFORE_PREFIX + ANY_NAME_10)) + .put(BEFORE_PREFIX + ANY_NAME_11, TimeColumn.ofNull(BEFORE_PREFIX + ANY_NAME_11)) + .put(BEFORE_PREFIX + ANY_NAME_12, TimestampColumn.ofNull(BEFORE_PREFIX + ANY_NAME_12)) + .put(BEFORE_PREFIX + ANY_NAME_13, TimestampTZColumn.ofNull(BEFORE_PREFIX + ANY_NAME_13)) + .put(BEFORE_ID, TextColumn.ofNull(BEFORE_ID)) + .put(BEFORE_PREPARED_AT, BigIntColumn.ofNull(BEFORE_PREPARED_AT)) + .put(BEFORE_COMMITTED_AT, BigIntColumn.ofNull(BEFORE_COMMITTED_AT)) + .put(BEFORE_STATE, IntColumn.ofNull(BEFORE_STATE)) + .put(BEFORE_VERSION, IntColumn.ofNull(BEFORE_VERSION)) + .build(); + return new TransactionResult(new ResultImpl(columns, TABLE_METADATA)); + } + + @Test + public void add_PutAndNullResultGiven_ShouldComposeCorrectly() throws Exception { + // Arrange + Put put = preparePut(); + + // Act + composer.add(put, null); + + // Assert + List mutations = composer.get(); + assertThat(mutations).hasSize(1); + assertThat(mutations.get(0)) + .isEqualTo( + Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT) + .textValue(ID, ANY_ID_2) + .intValue(STATE, TransactionState.COMMITTED.get()) + .bigIntValue(PREPARED_AT, ANY_TIME_MILLIS_3) + .bigIntValue(COMMITTED_AT, ANY_TIME_MILLIS_3) + .intValue(VERSION, 1) + .condition(putIfNotExists()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + + @Test + public void add_PutWithInsertModeAndNullResultGiven_ShouldComposeCorrectly() throws Exception { + // Arrange + Put putWithInsertMode = preparePutWithInsertMode(); + + // Act + composer.add(putWithInsertMode, null); + + // Assert + List mutations = composer.get(); + assertThat(mutations).hasSize(1); + assertThat(mutations.get(0)) + .isEqualTo( + Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT) + .textValue(ID, ANY_ID_2) + .intValue(STATE, TransactionState.COMMITTED.get()) + .bigIntValue(PREPARED_AT, ANY_TIME_MILLIS_3) + .bigIntValue(COMMITTED_AT, ANY_TIME_MILLIS_3) + .intValue(VERSION, 1) + .condition(putIfNotExists()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + + @Test + public void add_PutAndExistingCommittedResult_ShouldComposeCorrectly() throws Exception { + // Arrange + Put put = preparePut(); + TransactionResult result = prepareInitialResult(ANY_ID_1); + + // Act + composer.add(put, result); + + // Assert + List mutations = composer.get(); + assertThat(mutations).hasSize(1); + assertThat(mutations.get(0)) + .isEqualTo( + Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT) + .textValue(ID, ANY_ID_2) + .intValue(STATE, TransactionState.COMMITTED.get()) + .bigIntValue(PREPARED_AT, ANY_TIME_MILLIS_3) + .bigIntValue(COMMITTED_AT, ANY_TIME_MILLIS_3) + .intValue(VERSION, 2) // Incremented version + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .condition(putIf(column(ID).isEqualToText(ANY_ID_1)).build()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + + @Test + public void add_PutWithInsertModeAndExistingCommittedResult_ShouldComposeCorrectly() + throws Exception { + // Arrange + Put putWithInsertMode = preparePutWithInsertMode(); + TransactionResult result = prepareInitialResult(ANY_ID_1); + + // Act + composer.add(putWithInsertMode, result); + + // Assert + List mutations = composer.get(); + assertThat(mutations).hasSize(1); + assertThat(mutations.get(0)) + .isEqualTo( + Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT) + .textValue(ID, ANY_ID_2) + .intValue(STATE, TransactionState.COMMITTED.get()) + .bigIntValue(PREPARED_AT, ANY_TIME_MILLIS_3) + .bigIntValue(COMMITTED_AT, ANY_TIME_MILLIS_3) + .intValue(VERSION, 1) + .condition(putIfNotExists()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + + @Test + public void add_PutAndDeemedCommittedResult_ShouldComposeCorrectly() throws Exception { + // Arrange + Put put = preparePut(); + TransactionResult resultWithNullMetadata = prepareInitialResultWithNullMetadata(); + + // Act + composer.add(put, resultWithNullMetadata); + + // Assert + List mutations = composer.get(); + assertThat(mutations).hasSize(1); + assertThat(mutations.get(0)) + .isEqualTo( + Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT) + .textValue(ID, ANY_ID_2) + .intValue(STATE, TransactionState.COMMITTED.get()) + .bigIntValue(PREPARED_AT, ANY_TIME_MILLIS_3) + .bigIntValue(COMMITTED_AT, ANY_TIME_MILLIS_3) + .intValue(VERSION, 1) // The first version + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .condition(putIf(column(ID).isNullText()).build()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + + @Test + public void add_DeleteAndExistingCommittedResult_ShouldComposeCorrectly() throws Exception { + // Arrange + Delete delete = prepareDelete(); + TransactionResult result = prepareInitialResult(ANY_ID_1); + + // Act + composer.add(delete, result); + + // Assert + List mutations = composer.get(); + assertThat(mutations).hasSize(1); + assertThat(mutations.get(0)) + .isEqualTo( + Delete.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .condition(deleteIf(column(ID).isEqualToText(ANY_ID_1)).build()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } +} diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java index f4396d45e8..ba2f8b5638 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java @@ -168,10 +168,17 @@ private CommitHandler createCommitHandler( parallelExecutor, mutationsGrouper, true, + false, groupCommitter); } else { return new CommitHandler( - storage, coordinator, tableMetadataManager, parallelExecutor, mutationsGrouper, true); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + mutationsGrouper, + true, + false); } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index 8b47a16ecc..eed1fa1153 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -7669,22 +7669,37 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE } @ParameterizedTest - @EnumSource(Isolation.class) + @MethodSource("isolationAndOnePhaseCommitEnabled") public void - putAndCommit_SinglePartitionMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( - Isolation isolation) + insertAndCommit_SinglePartitionMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( + Isolation isolation, boolean onePhaseCommitEnabled) throws TransactionException, ExecutionException, CoordinatorException { + if (isGroupCommitEnabled() && onePhaseCommitEnabled) { + // Enabling both one-phase commit and group commit is not supported + return; + } + // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(isolation); - IntValue balance = new IntValue(BALANCE, INITIAL_BALANCE); - List puts = preparePuts(namespace1, TABLE_1); - puts.get(0).withValue(balance); - puts.get(1).withValue(balance); + ConsensusCommitManager manager = createConsensusCommitManager(isolation, onePhaseCommitEnabled); DistributedTransaction transaction = manager.begin(); // Act - transaction.put(puts.get(0)); - transaction.put(puts.get(1)); + transaction.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build()); + transaction.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, 200) + .build()); transaction.commit(); // Assert @@ -7693,43 +7708,103 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE case RECORD: // twice for prepare, twice for commit verify(storage, times(4)).mutate(anyList()); + + // commit-state should occur + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit( + anyString(), anyList(), any(TransactionState.class), anyLong()); + return; + } + verify(coordinator).putState(any(Coordinator.State.class)); break; case PARTITION: case TABLE: case NAMESPACE: case STORAGE: - // one for prepare, one for commit - verify(storage, times(2)).mutate(anyList()); + if (onePhaseCommitEnabled && isolation != Isolation.SERIALIZABLE) { + // one-phase commit, so only one mutation call + verify(storage).mutate(anyList()); + + // no commit-state should occur + verify(coordinator, never()).putState(any(Coordinator.State.class)); + } else { + // one for prepare, one for commit + verify(storage, times(2)).mutate(anyList()); + + // commit-state should occur + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit( + anyString(), anyList(), any(TransactionState.class), anyLong()); + } else { + verify(coordinator).putState(any(Coordinator.State.class)); + } + } break; default: throw new AssertionError(); } - if (isGroupCommitEnabled()) { - verify(coordinator) - .putStateForGroupCommit(anyString(), anyList(), any(TransactionState.class), anyLong()); - return; - } - verify(coordinator).putState(any(Coordinator.State.class)); + Optional result1 = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + assertThat(result1.isPresent()).isTrue(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(100); + + Optional result2 = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .build()); + assertThat(result2.isPresent()).isTrue(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(200); } @ParameterizedTest - @EnumSource(Isolation.class) + @MethodSource("isolationAndOnePhaseCommitEnabled") public void - putAndCommit_TwoPartitionsMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( - Isolation isolation) + insertAndCommit_TwoPartitionsMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( + Isolation isolation, boolean onePhaseCommitEnabled) throws TransactionException, ExecutionException, CoordinatorException { + if (isGroupCommitEnabled() && onePhaseCommitEnabled) { + // Enabling both one-phase commit and group commit is not supported + return; + } + // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(isolation); - IntValue balance = new IntValue(BALANCE, INITIAL_BALANCE); - List puts = preparePuts(namespace1, TABLE_1); - puts.get(0).withValue(balance); - puts.get(NUM_TYPES).withValue(balance); // next account + ConsensusCommitManager manager = createConsensusCommitManager(isolation, onePhaseCommitEnabled); DistributedTransaction transaction = manager.begin(); // Act - transaction.put(puts.get(0)); - transaction.put(puts.get(NUM_TYPES)); + transaction.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build()); + transaction.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 200) + .build()); transaction.commit(); // Assert @@ -7739,33 +7814,83 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE case PARTITION: // twice for prepare, twice for commit verify(storage, times(4)).mutate(anyList()); + + // commit-state should occur + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit( + anyString(), anyList(), any(TransactionState.class), anyLong()); + } else { + verify(coordinator).putState(any(Coordinator.State.class)); + } break; case TABLE: case NAMESPACE: case STORAGE: - // one for prepare, one for commit - verify(storage, times(2)).mutate(anyList()); + if (onePhaseCommitEnabled && isolation != Isolation.SERIALIZABLE) { + // one-phase commit, so only one mutation call + verify(storage).mutate(anyList()); + + // no commit-state should occur + verify(coordinator, never()).putState(any(Coordinator.State.class)); + } else { + // one for prepare, one for commit + verify(storage, times(2)).mutate(anyList()); + + // commit-state should occur + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit( + anyString(), anyList(), any(TransactionState.class), anyLong()); + } else { + verify(coordinator).putState(any(Coordinator.State.class)); + } + } break; default: throw new AssertionError(); } - if (isGroupCommitEnabled()) { - verify(coordinator) - .putStateForGroupCommit(anyString(), anyList(), any(TransactionState.class), anyLong()); - return; - } - verify(coordinator).putState(any(Coordinator.State.class)); + Optional result1 = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + assertThat(result1.isPresent()).isTrue(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(100); + + Optional result2 = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + assertThat(result2.isPresent()).isTrue(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(200); } @ParameterizedTest - @EnumSource(Isolation.class) + @MethodSource("isolationAndOnePhaseCommitEnabled") public void insertAndCommit_TwoNamespacesMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( - Isolation isolation) + Isolation isolation, boolean onePhaseCommitEnabled) throws TransactionException, ExecutionException, CoordinatorException { + if (isGroupCommitEnabled() && onePhaseCommitEnabled) { + // Enabling both one-phase commit and group commit is not supported + return; + } + // Arrange - ConsensusCommitManager manager = createConsensusCommitManager(isolation); + ConsensusCommitManager manager = createConsensusCommitManager(isolation, onePhaseCommitEnabled); DistributedTransaction transaction = manager.begin(); // Act @@ -7775,7 +7900,7 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE .table(TABLE_1) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) + .intValue(BALANCE, 100) .build()); transaction.insert( Insert.newBuilder() @@ -7783,7 +7908,7 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE .table(TABLE_2) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) - .intValue(BALANCE, INITIAL_BALANCE) + .intValue(BALANCE, 200) .build()); transaction.commit(); @@ -7795,6 +7920,14 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE // twice for prepare, twice for commit verify(storage, times(4)).mutate(anyList()); + + // commit-state should occur + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit(anyString(), anyList(), any(TransactionState.class), anyLong()); + } else { + verify(coordinator).putState(any(Coordinator.State.class)); + } } else { // same storage switch (storageInfo1.getMutationAtomicityUnit()) { @@ -7804,22 +7937,67 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE case NAMESPACE: // twice for prepare, twice for commit verify(storage, times(4)).mutate(anyList()); + + // commit-state should occur + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit( + anyString(), anyList(), any(TransactionState.class), anyLong()); + } else { + verify(coordinator).putState(any(Coordinator.State.class)); + } break; case STORAGE: - // one for prepare, one for commit - verify(storage, times(2)).mutate(anyList()); + if (onePhaseCommitEnabled && isolation != Isolation.SERIALIZABLE) { + // one-phase commit, so only one mutation call + verify(storage).mutate(anyList()); + + // no commit-state should occur + verify(coordinator, never()).putState(any(Coordinator.State.class)); + } else { + // one for prepare, one for commit + verify(storage, times(2)).mutate(anyList()); + + // commit-state should occur + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit( + anyString(), anyList(), any(TransactionState.class), anyLong()); + } else { + verify(coordinator).putState(any(Coordinator.State.class)); + } + } break; default: throw new AssertionError(); } } - if (isGroupCommitEnabled()) { - verify(coordinator) - .putStateForGroupCommit(anyString(), anyList(), any(TransactionState.class), anyLong()); - return; - } - verify(coordinator).putState(any(Coordinator.State.class)); + Optional result1 = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + assertThat(result1.isPresent()).isTrue(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(100); + + Optional result2 = + manager.get( + Get.newBuilder() + .namespace(namespace2) + .table(TABLE_2) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + assertThat(result2.isPresent()).isTrue(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(200); } @Test @@ -8216,6 +8394,11 @@ private int getBalance(Result result) { } private ConsensusCommitManager createConsensusCommitManager(Isolation isolation) { + return createConsensusCommitManager(isolation, false); + } + + private ConsensusCommitManager createConsensusCommitManager( + Isolation isolation, boolean onePhaseCommitEnabled) { storage = spy(originalStorage); coordinator = spy(new Coordinator(storage, consensusCommitConfig)); TransactionTableMetadataManager tableMetadataManager = @@ -8223,7 +8406,7 @@ private ConsensusCommitManager createConsensusCommitManager(Isolation isolation) recovery = spy(new RecoveryHandler(storage, coordinator, tableMetadataManager)); recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager); groupCommitter = CoordinatorGroupCommitter.from(consensusCommitConfig).orElse(null); - commit = spy(createCommitHandler(tableMetadataManager, groupCommitter)); + commit = spy(createCommitHandler(tableMetadataManager, groupCommitter, onePhaseCommitEnabled)); return new ConsensusCommitManager( storage, admin, @@ -8239,7 +8422,8 @@ private ConsensusCommitManager createConsensusCommitManager(Isolation isolation) private CommitHandler createCommitHandler( TransactionTableMetadataManager tableMetadataManager, - @Nullable CoordinatorGroupCommitter groupCommitter) { + @Nullable CoordinatorGroupCommitter groupCommitter, + boolean onePhaseCommitEnabled) { MutationsGrouper mutationsGrouper = new MutationsGrouper(new StorageInfoProvider(admin)); if (groupCommitter != null) { return new CommitHandlerWithGroupCommit( @@ -8249,10 +8433,17 @@ private CommitHandler createCommitHandler( parallelExecutor, mutationsGrouper, true, + false, groupCommitter); } else { return new CommitHandler( - storage, coordinator, tableMetadataManager, parallelExecutor, mutationsGrouper, true); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + mutationsGrouper, + true, + onePhaseCommitEnabled); } } @@ -8302,6 +8493,14 @@ static Stream isolationAndReadOnlyModeAndCommitType() { .map(commitType -> Arguments.of(isolation, readOnly, commitType)))); } + static Stream isolationAndOnePhaseCommitEnabled() { + return Arrays.stream(Isolation.values()) + .flatMap( + isolation -> + Stream.of(false, true) + .map(onePhaseCommitEnabled -> Arguments.of(isolation, onePhaseCommitEnabled))); + } + enum CommitType { NORMAL_COMMIT, GROUP_COMMIT,