diff --git a/core/src/main/java/com/scalar/db/api/ConditionBuilder.java b/core/src/main/java/com/scalar/db/api/ConditionBuilder.java index 6a8d0c4933..b809d6be4b 100644 --- a/core/src/main/java/com/scalar/db/api/ConditionBuilder.java +++ b/core/src/main/java/com/scalar/db/api/ConditionBuilder.java @@ -135,6 +135,23 @@ public static ConditionalExpression buildConditionalExpression( return new ConditionalExpression(column, operator); } + /** + * Builds a like expression with the specified column, operator, and escape character. + * + *

This method is primarily for internal use. Breaking changes can and will be introduced to + * this method. Users should not depend on it. + * + * @param column a target text column used to compare + * @param operator an operator used to compare the target column. The operator must be either LIKE + * or NOT_LIKE. + * @param escape an escape character for the like operator + * @return a conditional expression + */ + public static ConditionalExpression buildLikeExpression( + TextColumn column, Operator operator, String escape) { + return new LikeExpression(column, operator, escape); + } + /** * Returns a builder object for a condition expression for PutIf/DeleteIf * @@ -352,6 +369,7 @@ public ConditionalExpression isNotEqualToBlob(byte[] value) { public ConditionalExpression isNotEqualToBlob(ByteBuffer value) { return new ConditionalExpression(columnName, value, Operator.NE); } + /** * Creates a 'not equal' conditional expression for a DATE value. * @@ -391,6 +409,7 @@ public ConditionalExpression isNotEqualToTimestamp(LocalDateTime value) { public ConditionalExpression isNotEqualToTimestampTZ(Instant value) { return new ConditionalExpression(TimestampTZColumn.of(columnName, value), Operator.NE); } + /** * Creates a 'greater than' conditional expression for a BOOLEAN value. * @@ -590,6 +609,7 @@ public ConditionalExpression isGreaterThanOrEqualToBlob(byte[] value) { public ConditionalExpression isGreaterThanOrEqualToBlob(ByteBuffer value) { return new ConditionalExpression(columnName, value, Operator.GTE); } + /** * Creates a 'greater than or equal' conditional expression for a DATE value. * @@ -709,6 +729,7 @@ public ConditionalExpression isLessThanBlob(byte[] value) { public ConditionalExpression isLessThanBlob(ByteBuffer value) { return new ConditionalExpression(columnName, value, Operator.LT); } + /** * Creates a 'less than' conditional expression for a DATE value. * @@ -748,6 +769,7 @@ public ConditionalExpression isLessThanTimestamp(LocalDateTime value) { public ConditionalExpression isLessThanTimestampTZ(Instant value) { return new ConditionalExpression(TimestampTZColumn.of(columnName, value), Operator.LT); } + /** * Creates a 'less than or equal' conditional expression for a BOOLEAN value. * @@ -1029,6 +1051,7 @@ public ConditionalExpression isNotNullText() { public ConditionalExpression isNotNullBlob() { return new ConditionalExpression(BlobColumn.ofNull(columnName), Operator.IS_NOT_NULL); } + /** * Creates an 'is not null' conditional expression for a DATE value. * diff --git a/core/src/main/java/com/scalar/db/api/LikeExpression.java b/core/src/main/java/com/scalar/db/api/LikeExpression.java index f3df7d2c5e..084fc20687 100644 --- a/core/src/main/java/com/scalar/db/api/LikeExpression.java +++ b/core/src/main/java/com/scalar/db/api/LikeExpression.java @@ -16,8 +16,9 @@ public class LikeExpression extends ConditionalExpression { * Constructs a {@code LikeExpression} with the specified column and operator. For the escape * character, the default one ("\", i.e., backslash) is used. * - * @param column a target column used to compare - * @param operator an operator used to compare the target column + * @param column a target text column used to compare + * @param operator an operator used to compare the target text column. The operator must be either + * LIKE or NOT_LIKE. */ LikeExpression(TextColumn column, Operator operator) { this(column, operator, DEFAULT_ESCAPE_CHAR); @@ -28,8 +29,9 @@ public class LikeExpression extends ConditionalExpression { * The escape character must be a string of a single character or an empty string. If an empty * string is specified, the escape character is disabled. * - * @param column a target column used to compare - * @param operator an operator used to compare the target column + * @param column a target text column used to compare + * @param operator an operator used to compare the target text column. The operator must be either + * LIKE or NOT_LIKE. * @param escape an escape character for the like operator */ LikeExpression(TextColumn column, Operator operator, String escape) { @@ -75,6 +77,11 @@ private void check(String pattern, Operator operator, String escape) { } } + @Override + public TextColumn getColumn() { + return (TextColumn) super.getColumn(); + } + /** * Returns the escape character for LIKE operator. * diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 66d213c3bc..9ce5ec289c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -4,11 +4,16 @@ import static com.scalar.db.transaction.consensuscommit.ConsensusCommitOperationAttributes.isImplicitPreReadEnabled; import com.google.common.annotations.VisibleForTesting; +import com.scalar.db.api.AndConditionSet; +import com.scalar.db.api.ConditionBuilder; +import com.scalar.db.api.ConditionSetBuilder; +import com.scalar.db.api.ConditionalExpression; import com.scalar.db.api.Consistency; import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.Get; import com.scalar.db.api.GetBuilder; +import com.scalar.db.api.LikeExpression; import com.scalar.db.api.Operation; import com.scalar.db.api.Put; import com.scalar.db.api.Result; @@ -25,13 +30,16 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -137,34 +145,45 @@ void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException { @VisibleForTesting Optional read(@Nullable Snapshot.Key key, Get get) throws CrudException { Optional result = getFromStorage(get); - if (!result.isPresent() || result.get().isCommitted()) { - if (result.isPresent() || get.getConjunctions().isEmpty()) { - // Keep the read set latest to create before image by using the latest record (result) - // because another conflicting transaction might have updated the record after this - // transaction read it first. However, we update it only if a get operation has no - // conjunction or the result exists. This is because we don’t know whether the record - // actually exists or not due to the conjunction. - if (key != null) { - putIntoReadSetInSnapshot(key, result); - } else { - // Only for a Get with index, the argument `key` is null + if (result.isPresent() && !result.get().isCommitted()) { + throw new UncommittedRecordException( + get, + result.get(), + CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage(), + snapshot.getId()); + } - if (result.isPresent()) { - // Only when we can get the record with the Get with index, we can put it into the read - // set - key = new Snapshot.Key(get, result.get()); - putIntoReadSetInSnapshot(key, result); - } + if (!get.getConjunctions().isEmpty()) { + // Because we also get records whose before images match the conjunctions, we need to check if + // the current status of the records actually match the conjunctions. + result = + result.filter( + r -> + ScalarDbUtils.columnsMatchAnyOfConjunctions( + r.getColumns(), get.getConjunctions())); + } + + if (result.isPresent() || get.getConjunctions().isEmpty()) { + // Keep the read set latest to create before image by using the latest record (result) + // because another conflicting transaction might have updated the record after this + // transaction read it first. However, we update it only if a get operation has no + // conjunction or the result exists. This is because we don’t know whether the record + // actually exists or not due to the conjunction. + if (key != null) { + putIntoReadSetInSnapshot(key, result); + } else { + // Only for a Get with index, the argument `key` is null + + if (result.isPresent()) { + // Only when we can get the record with the Get with index, we can put it into the read + // set + key = new Snapshot.Key(get, result.get()); + putIntoReadSetInSnapshot(key, result); } } - putIntoGetSetInSnapshot(get, result); - return result; } - throw new UncommittedRecordException( - get, - result.get(), - CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage(), - snapshot.getId()); + putIntoGetSetInSnapshot(get, result); + return result; } public List scan(Scan originalScan) throws CrudException { @@ -191,12 +210,24 @@ private LinkedHashMap scanInternal(Scan scan) Scanner scanner = null; try { - scanner = scanFromStorage(scan); + if (scan.getLimit() > 0) { + // Since the conjunctions may delete some records from the scan result, it is necessary to + // perform the scan without a limit. + scanner = scanFromStorage(Scan.newBuilder(scan).limit(0).build()); + } else { + scanner = scanFromStorage(scan); + } + for (Result r : scanner) { TransactionResult result = new TransactionResult(r); Snapshot.Key key = new Snapshot.Key(scan, r); - processScanResult(key, scan, result); - results.put(key, result); + Optional processedScanResult = processScanResult(key, scan, result); + processedScanResult.ifPresent(res -> results.put(key, res)); + + if (scan.getLimit() > 0 && results.size() >= scan.getLimit()) { + // If the scan has a limit, we stop scanning when we reach the limit. + break; + } } } catch (RuntimeException e) { Exception exception; @@ -224,8 +255,8 @@ private LinkedHashMap scanInternal(Scan scan) return results; } - private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult result) - throws CrudException { + private Optional processScanResult( + Snapshot.Key key, Scan scan, TransactionResult result) throws CrudException { if (!result.isCommitted()) { throw new UncommittedRecordException( scan, @@ -234,10 +265,25 @@ private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult re snapshot.getId()); } - // We always update the read set to create before image by using the latest record (result) - // because another conflicting transaction might have updated the record after this - // transaction read it first. - putIntoReadSetInSnapshot(key, Optional.of(result)); + Optional ret = Optional.of(result); + if (!scan.getConjunctions().isEmpty()) { + // Because we also get records whose before images match the conjunctions, we need to check if + // the current status of the records actually match the conjunctions. + ret = + ret.filter( + r -> + ScalarDbUtils.columnsMatchAnyOfConjunctions( + r.getColumns(), scan.getConjunctions())); + } + + if (ret.isPresent()) { + // We always update the read set to create before image by using the latest record (result) + // because another conflicting transaction might have updated the record after this + // transaction read it first. + putIntoReadSetInSnapshot(key, ret); + } + + return ret; } public TransactionCrudOperable.Scanner getScanner(Scan originalScan) throws CrudException { @@ -397,7 +443,16 @@ private Get createGet(Snapshot.Key key) throws CrudException { @VisibleForTesting Optional getFromStorage(Get get) throws CrudException { try { - return storage.get(get).map(TransactionResult::new); + if (get.getConjunctions().isEmpty()) { + // If there are no conjunctions, we can read the record directly + return storage.get(get).map(TransactionResult::new); + } else { + // If there are conjunctions, we need to convert them to include conditions on the before + // image + Set converted = convertConjunctions(get, get.getConjunctions()); + Get convertedGet = Get.newBuilder(get).clearConditions().whereOr(converted).build(); + return storage.get(convertedGet).map(TransactionResult::new); + } } catch (ExecutionException e) { throw new CrudException( CoreError.CONSENSUS_COMMIT_READING_RECORD_FROM_STORAGE_FAILED.buildMessage(), @@ -408,7 +463,16 @@ Optional getFromStorage(Get get) throws CrudException { private Scanner scanFromStorage(Scan scan) throws CrudException { try { - return storage.scan(scan); + if (scan.getConjunctions().isEmpty()) { + // If there are no conjunctions, we can read the record directly + return storage.scan(scan); + } else { + // If there are conjunctions, we need to convert them to include conditions on the before + // image + Set converted = convertConjunctions(scan, scan.getConjunctions()); + Scan convertedScan = Scan.newBuilder(scan).clearConditions().whereOr(converted).build(); + return storage.scan(convertedScan); + } } catch (ExecutionException e) { throw new CrudException( CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage(), @@ -417,6 +481,119 @@ private Scanner scanFromStorage(Scan scan) throws CrudException { } } + /** + * Converts the given conjunctions to include conditions on before images. + * + *

This is necessary because we might miss prepared records whose before images match the + * original conditions when reading from storage. For example, suppose we have the following + * records in storage: + * + *

+   *   | partition_key | clustering_key | column | status    | before_column | before_status  |
+   *   |---------------|----------------|--------|-----------|---------------|----------------|
+   *   | 0             | 0              | 1000   | COMMITTED |               |                |
+   *   | 0             | 1              | 200    | PREPARED  | 1000          | COMMITTED      |
+   * 
+ * + * If we scan records with the condition "column = 1000" without converting the condition + * (conjunction), we only get the first record, not the second one, because the condition does not + * match. However, the second record has not been committed yet, so we should still retrieve it, + * considering the possibility that the record will be rolled back. + * + *

To handle such cases, we convert the conjunctions to include conditions on the before image. + * For example, if the original condition is: + * + *

+   *   column = 1000
+   * 
+ * + * We convert it to: + * + *
+   *   column = 1000 OR before_column = 1000
+   * 
+ * + *

Here are more examples: + * + *

Example 1: + * + *

+   *   {@code column >= 500 AND column < 1000}
+   * 
+ * + * becomes: + * + *
+   *   {@code (column >= 500 AND column < 1000) OR (before_column >= 500 AND before_column < 1000)}
+   * 
+ * + *

Example 2: + * + *

+   *   {@code column1 = 500 OR column2 != 1000}
+   * 
+ * + * becomes: + * + *
+   *   {@code column1 = 500 OR column2 != 1000 OR before_column1 = 500 OR before_column2 != 1000}
+   * 
+ * + * This way, we can ensure that prepared records whose before images satisfy the original scan + * conditions are not missed during the scan. + * + * @param selection the selection to convert + * @param conjunctions the conjunctions to convert + * @return the converted conjunctions + */ + private Set convertConjunctions( + Selection selection, Set conjunctions) throws CrudException { + TableMetadata metadata = getTableMetadata(selection); + + Set converted = new HashSet<>(conjunctions.size() * 2); + + // Keep the original conjunctions + conjunctions.forEach( + c -> converted.add(ConditionSetBuilder.andConditionSet(c.getConditions()).build())); + + // Add conditions on the before image + for (Selection.Conjunction conjunction : conjunctions) { + Set conditions = new HashSet<>(conjunction.getConditions().size()); + for (ConditionalExpression condition : conjunction.getConditions()) { + String columnName = condition.getColumn().getName(); + + if (metadata.getPartitionKeyNames().contains(columnName) + || metadata.getClusteringKeyNames().contains(columnName)) { + // If the condition is on the primary key, we don't need to convert it + conditions.add(condition); + continue; + } + + // Convert the condition to use the before image column + ConditionalExpression convertedCondition; + if (condition instanceof LikeExpression) { + LikeExpression likeExpression = (LikeExpression) condition; + convertedCondition = + ConditionBuilder.buildLikeExpression( + likeExpression.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName), + likeExpression.getOperator(), + likeExpression.getEscape()); + } else { + convertedCondition = + ConditionBuilder.buildConditionalExpression( + condition.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName), + condition.getOperator()); + } + + conditions.add(convertedCondition); + } + + converted.add(ConditionSetBuilder.andConditionSet(conditions).build()); + } + + return converted; + } + private Selection prepareStorageSelection(Selection selection) throws CrudException { selection.clearProjections(); // Retrieve only the after images columns when including the metadata is disabled, otherwise @@ -486,6 +663,7 @@ private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOpera private final Scanner scanner; @Nullable private final LinkedHashMap results; + private final AtomicInteger scanCount = new AtomicInteger(); private final AtomicBoolean fullyScanned = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -493,7 +671,14 @@ public ConsensusCommitStorageScanner(Scan scan, List originalProjections throws CrudException { this.scan = scan; this.originalProjections = originalProjections; - scanner = scanFromStorage(scan); + + if (scan.getLimit() > 0) { + // Since the conjunctions may delete some records from the scan result, it is necessary to + // perform the scan without a limit. + scanner = scanFromStorage(Scan.newBuilder(scan).limit(0).build()); + } else { + scanner = scanFromStorage(scan); + } if (isValidationOrSnapshotReadRequired()) { results = new LinkedHashMap<>(); @@ -506,25 +691,45 @@ public ConsensusCommitStorageScanner(Scan scan, List originalProjections @Override public Optional one() throws CrudException { + if (fullyScanned.get()) { + return Optional.empty(); + } + try { - Optional r = scanner.one(); + while (true) { + Optional r = scanner.one(); - if (!r.isPresent()) { - fullyScanned.set(true); - return Optional.empty(); - } + if (!r.isPresent()) { + fullyScanned.set(true); + return Optional.empty(); + } - Snapshot.Key key = new Snapshot.Key(scan, r.get()); - TransactionResult result = new TransactionResult(r.get()); - processScanResult(key, scan, result); + Snapshot.Key key = new Snapshot.Key(scan, r.get()); + TransactionResult result = new TransactionResult(r.get()); - if (results != null) { - results.put(key, result); - } + Optional processedScanResult = processScanResult(key, scan, result); + if (!processedScanResult.isPresent()) { + continue; + } + + if (results != null) { + results.put(key, processedScanResult.get()); + } + scanCount.incrementAndGet(); - TableMetadata metadata = getTableMetadata(scan); - return Optional.of( - new FilteredResult(result, originalProjections, metadata, isIncludeMetadataEnabled)); + if (scan.getLimit() > 0 && scanCount.get() >= scan.getLimit()) { + // If the scan has a limit, we stop scanning when we reach the limit. + fullyScanned.set(true); + } + + TableMetadata metadata = getTableMetadata(scan); + return Optional.of( + new FilteredResult( + processedScanResult.get(), + originalProjections, + metadata, + isIncludeMetadataEnabled)); + } } catch (ExecutionException e) { closeScanner(); throw new CrudException( diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index b9c1ca8eba..5fbf1bf9a5 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -1,8 +1,13 @@ package com.scalar.db.transaction.consensuscommit; +import static com.scalar.db.api.ConditionBuilder.column; +import static com.scalar.db.api.ConditionBuilder.deleteIfExists; +import static com.scalar.db.api.ConditionBuilder.putIfExists; +import static com.scalar.db.api.ConditionSetBuilder.condition; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -12,7 +17,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.scalar.db.api.ConditionBuilder; import com.scalar.db.api.ConditionalExpression; import com.scalar.db.api.Consistency; import com.scalar.db.api.Delete; @@ -42,6 +46,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -63,6 +68,7 @@ public class CrudHandlerTest { private static final String ANY_NAME_1 = "name1"; private static final String ANY_NAME_2 = "name2"; private static final String ANY_NAME_3 = "name3"; + private static final String ANY_NAME_4 = "name4"; private static final String ANY_TEXT_1 = "text1"; private static final String ANY_TEXT_2 = "text2"; private static final String ANY_TEXT_3 = "text3"; @@ -76,6 +82,7 @@ public class CrudHandlerTest { .addColumn(ANY_NAME_1, DataType.TEXT) .addColumn(ANY_NAME_2, DataType.TEXT) .addColumn(ANY_NAME_3, DataType.TEXT) + .addColumn(ANY_NAME_4, DataType.INT) .addPartitionKey(ANY_NAME_1) .addClusteringKey(ANY_NAME_2) .addSecondaryIndex(ANY_NAME_3) @@ -139,7 +146,7 @@ private Scan prepareCrossPartitionScan() { .namespace(ANY_NAMESPACE_NAME) .table(ANY_TABLE_NAME) .all() - .where(ConditionBuilder.column("column").isEqualToInt(10)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) .build(); } @@ -152,10 +159,16 @@ private Scan toScanForStorageFrom(Scan scan) { } private TransactionResult prepareResult(TransactionState state) { + return prepareResult(ANY_TEXT_1, ANY_TEXT_2, state); + } + + private TransactionResult prepareResult( + String partitionKeyColumnValue, String clusteringKeyColumnValue, TransactionState state) { ImmutableMap> columns = ImmutableMap.>builder() - .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) - .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) + .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, partitionKeyColumnValue)) + .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, clusteringKeyColumnValue)) + .put(ANY_NAME_3, TextColumn.of(ANY_NAME_3, ANY_TEXT_3)) .put(Attribute.ID, ScalarDbUtils.toColumn(Attribute.toIdValue(ANY_ID_2))) .put(Attribute.STATE, ScalarDbUtils.toColumn(Attribute.toStateValue(state))) .put(Attribute.VERSION, ScalarDbUtils.toColumn(Attribute.toVersionValue(2))) @@ -356,14 +369,15 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept true); when(snapshot.isValidationRequired()).thenReturn(true); - ConditionalExpression condition = mock(ConditionalExpression.class); + ConditionalExpression condition = column(ANY_NAME_3).isEqualToText(ANY_TEXT_3); Get get = Get.newBuilder(prepareGet()).where(condition).build(); Get getForStorage = toGetForStorageFrom(get); + Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); Optional transactionResult = expected.map(e -> (TransactionResult) e); Snapshot.Key key = new Snapshot.Key(getForStorage); when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); - when(storage.get(getForStorage)).thenReturn(expected); + when(storage.get(any())).thenReturn(expected); when(snapshot.mergeResult( key, transactionResult, Collections.singleton(Selection.Conjunction.of(condition)))) .thenReturn(transactionResult); @@ -377,7 +391,13 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept Optional.of( new FilteredResult( expected.get(), Collections.emptyList(), TABLE_METADATA, false))); - verify(storage).get(getForStorage); + verify(storage) + .get( + Get.newBuilder(getForStorage) + .clearConditions() + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .build()); verify(snapshot, never()).putIntoReadSet(any(), any()); verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); } @@ -537,7 +557,6 @@ void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanT when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); } when(storage.scan(scanForStorage)).thenReturn(scanner); - when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); // Act List results = scanOrGetScanner(scan, scanType); @@ -579,7 +598,6 @@ void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshot when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); } when(storage.scan(scanForStorage)).thenReturn(scanner); - when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); // Act List results = scanOrGetScanner(scan, scanType); @@ -621,7 +639,6 @@ void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshot when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); } when(storage.scan(scanForStorage)).thenReturn(scanner); - when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); // Act List results = scanOrGetScanner(scan, scanType); @@ -664,7 +681,6 @@ void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshot when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); } when(storage.scan(scanForStorage)).thenReturn(scanner); - when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); // Act List results = scanOrGetScanner(scan, scanType); @@ -731,7 +747,6 @@ void scanOrGetScanner_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot(Scan when(snapshot.getResults(scanForStorage)) .thenReturn(Optional.empty()) .thenReturn(Optional.of(Maps.newLinkedHashMap(ImmutableMap.of(key, expected)))); - when(snapshot.getResult(key)).thenReturn(Optional.of(expected)); // Act List results1 = scanOrGetScanner(scan1, scanType); @@ -945,7 +960,6 @@ void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgum } when(storage.scan(any(ScanAll.class))).thenReturn(scanner); TransactionResult transactionResult = new TransactionResult(result); - when(snapshot.getResult(key)).thenReturn(Optional.of(transactionResult)); // Act List results = scanOrGetScanner(scan, scanType); @@ -994,6 +1008,120 @@ void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgum verify(snapshot, never()).verifyNoOverlap(any(), any()); } + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_WithLimit_ShouldReturnLimitedResults(ScanType scanType) + throws CrudException, ExecutionException, IOException { + // Arrange + Scan scanWithoutLimit = prepareScan(); + Scan scanWithLimit = Scan.newBuilder(scanWithoutLimit).limit(2).build(); + Scan scanForStorage = toScanForStorageFrom(scanWithoutLimit); + + Result result1 = prepareResult(ANY_TEXT_1, ANY_TEXT_2, TransactionState.COMMITTED); + Result result2 = prepareResult(ANY_TEXT_1, ANY_TEXT_3, TransactionState.COMMITTED); + + Snapshot.Key key1 = new Snapshot.Key(scanWithLimit, result1); + Snapshot.Key key2 = new Snapshot.Key(scanWithLimit, result2); + + TransactionResult transactionResult1 = new TransactionResult(result1); + TransactionResult transactionResult2 = new TransactionResult(result2); + + // Set up mock scanner to return two results + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Arrays.asList(result1, result2).iterator()); + } else { + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.empty()); + } + when(storage.scan(scanForStorage)).thenReturn(scanner); + + // Act + List results = scanOrGetScanner(scanWithLimit, scanType); + + // Assert + assertThat(results).hasSize(2); + assertThat(results.get(0)) + .isEqualTo( + new FilteredResult(transactionResult1, Collections.emptyList(), TABLE_METADATA, false)); + assertThat(results.get(1)) + .isEqualTo( + new FilteredResult(transactionResult2, Collections.emptyList(), TABLE_METADATA, false)); + + verify(scanner).close(); + verify(snapshot).putIntoReadSet(key1, Optional.of(transactionResult1)); + verify(snapshot).putIntoReadSet(key2, Optional.of(transactionResult2)); + + @SuppressWarnings("unchecked") + ArgumentCaptor> resultsCaptor = + ArgumentCaptor.forClass(LinkedHashMap.class); + verify(snapshot).putIntoScanSet(eq(scanWithLimit), resultsCaptor.capture()); + + LinkedHashMap capturedResults = resultsCaptor.getValue(); + assertThat(capturedResults).hasSize(2); + assertThat(capturedResults).containsKeys(key1, key2); + } + + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_WithLimitExceedingAvailableResults_ShouldReturnAllAvailableResults( + ScanType scanType) throws CrudException, ExecutionException, IOException { + // Arrange + Scan scanWithoutLimit = prepareScan(); + Scan scanWithLimit = + Scan.newBuilder(scanWithoutLimit).limit(5).build(); // Limit higher than available results + Scan scanForStorage = toScanForStorageFrom(scanWithoutLimit); + + Result result = prepareResult(TransactionState.COMMITTED); + Snapshot.Key key1 = new Snapshot.Key(scanWithLimit, result); + TransactionResult transactionResult1 = new TransactionResult(result); + + // Set up mock scanner to return one result (less than limit) + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } + when(storage.scan(scanForStorage)).thenReturn(scanner); + + // Act + List results = scanOrGetScanner(scanWithLimit, scanType); + + // Assert + assertThat(results).hasSize(1); + verify(scanner).close(); + verify(snapshot).putIntoReadSet(key1, Optional.of(transactionResult1)); + } + + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_WithLimit_UncommittedResult_ShouldThrowUncommittedRecordException( + ScanType scanType) throws ExecutionException, IOException { + // Arrange + Scan scanWithoutLimit = prepareScan(); + Scan scanWithLimit = Scan.newBuilder(scanWithoutLimit).limit(3).build(); + Scan scanForStorage = toScanForStorageFrom(scanWithoutLimit); + + Result uncommittedResult = prepareResult(ANY_TEXT_1, ANY_TEXT_3, TransactionState.PREPARED); + + // Set up mock scanner to return one committed and one uncommitted result + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(uncommittedResult).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(uncommittedResult)).thenReturn(Optional.empty()); + } + when(storage.scan(scanForStorage)).thenReturn(scanner); + + // Act & Assert + assertThatThrownBy(() -> scanOrGetScanner(scanWithLimit, scanType)) + .isInstanceOf(UncommittedRecordException.class); + + verify(scanner).close(); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot, never()).putIntoScanSet(any(), any()); + } + @Test public void scan_RuntimeExceptionCausedByExecutionExceptionThrownByIteratorHasNext_ShouldThrowCrudException() @@ -1207,7 +1335,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C .namespace("ns") .table("tbl") .partitionKey(Key.ofText("c1", "foo")) - .condition(ConditionBuilder.putIfExists()) + .condition(putIfExists()) .enableImplicitPreRead() .build(); Snapshot.Key key = new Snapshot.Key(put); @@ -1245,7 +1373,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C .namespace("ns") .table("tbl") .partitionKey(Key.ofText("c1", "foo")) - .condition(ConditionBuilder.putIfExists()) + .condition(putIfExists()) .enableImplicitPreRead() .build(); Snapshot.Key key = new Snapshot.Key(put); @@ -1285,7 +1413,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C .namespace("ns") .table("tbl") .partitionKey(Key.ofText("c1", "foo")) - .condition(ConditionBuilder.putIfExists()) + .condition(putIfExists()) .build(); Snapshot.Key key = new Snapshot.Key(put); when(snapshot.containsKeyInReadSet(key)).thenReturn(true); @@ -1321,7 +1449,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C .namespace("ns") .table("tbl") .partitionKey(Key.ofText("c1", "foo")) - .condition(ConditionBuilder.putIfExists()) + .condition(putIfExists()) .build(); // Act Assert @@ -1361,7 +1489,7 @@ public void delete_DeleteWithConditionGiven_WithResultInReadSet_ShouldCallApprop .namespace("ns") .table("tbl") .partitionKey(Key.ofText("c1", "foo")) - .condition(ConditionBuilder.deleteIfExists()) + .condition(deleteIfExists()) .build(); Snapshot.Key key = new Snapshot.Key(delete); when(snapshot.containsKeyInReadSet(key)).thenReturn(true); @@ -1397,7 +1525,7 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp .namespace("ns") .table("tbl") .partitionKey(Key.ofText("c1", "foo")) - .condition(ConditionBuilder.deleteIfExists()) + .condition(deleteIfExists()) .build(); Snapshot.Key key = new Snapshot.Key(delete); when(snapshot.containsKeyInReadSet(key)).thenReturn(false); @@ -1491,7 +1619,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() .namespace(key.getNamespace()) .table(key.getTable()) .partitionKey(key.getPartitionKey()) - .where(mock(ConditionalExpression.class)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_1)) .build(); when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false); when(storage.get(any())).thenReturn(Optional.empty()); @@ -1500,7 +1628,15 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() handler.readUnread(key, getForKey); // Assert - verify(storage).get(any()); + verify(storage) + .get( + Get.newBuilder() + .namespace(key.getNamespace()) + .table(key.getTable()) + .partitionKey(key.getPartitionKey()) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_1)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_1)) + .build()); verify(snapshot, never()).putIntoReadSet(key, Optional.empty()); verify(snapshot).putIntoGetSet(getForKey, Optional.empty()); } @@ -1515,7 +1651,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() when(key.getTable()).thenReturn(ANY_TABLE_NAME); when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); - Result result = mock(Result.class); when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); when(storage.get(any())).thenReturn(Optional.of(result)); @@ -1546,7 +1681,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() when(key.getTable()).thenReturn(ANY_TABLE_NAME); when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); - Result result = mock(Result.class); when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get()); when(storage.get(any())).thenReturn(Optional.of(result)); @@ -1598,7 +1732,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods() throws CrudException, ExecutionException { // Arrange - Result result = mock(Result.class); when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1))); when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2))); @@ -1628,7 +1761,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException() throws ExecutionException { // Arrange - Result result = mock(Result.class); when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get()); when(storage.get(any())).thenReturn(Optional.of(result)); @@ -1794,6 +1926,471 @@ public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() assertThat(transactionIdCaptor.getValue()).isEqualTo(ANY_TX_ID); } + @Test + public void get_WithConjunctions_ShouldConvertConjunctions() + throws CrudException, ExecutionException { + // Arrange + when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(storage.get(any())).thenReturn(Optional.of(result)); + + // Act + handler.get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .build()); + handler.get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isEqualToInt(10)) + .build()); + handler.get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(ANY_NAME_4).isEqualToInt(20)) + .build()); + handler.get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where( + condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4)) + .build()) + .or( + condition(column(ANY_NAME_4).isGreaterThanInt(30)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40)) + .build()) + .build()); + handler.get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .build()) + .and( + condition(column(ANY_NAME_4).isLessThanOrEqualToInt(50)) + .or(column(ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .build()); + handler.get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3)) + .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4)) + .build()); + + // Assert + verify(storage) + .get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isEqualToInt(10)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(10)) + .build()) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(ANY_NAME_4).isEqualToInt(20)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(20)) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where( + condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4)) + .build()) + .or( + condition(column(ANY_NAME_4).isGreaterThanInt(30)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3) + .isNotEqualToText(ANY_TEXT_3)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3) + .isNotEqualToText(ANY_TEXT_4)) + .build()) + .or( + condition(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(30)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(40)) + .build()) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .or( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and(column(ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .get( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3)) + .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_4)) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + + @Test + public void scan_WithConjunctions_ShouldConvertConjunctions() + throws CrudException, ExecutionException { + // Arrange + when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1))); + when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2))); + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + when(storage.scan(any())).thenReturn(scanner); + + // Act + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .build()); + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isEqualToInt(10)) + .build()); + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(ANY_NAME_4).isEqualToInt(20)) + .build()); + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where( + condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4)) + .build()) + .or( + condition(column(ANY_NAME_4).isGreaterThanInt(30)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40)) + .build()) + .build()); + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .build()) + .and( + condition(column(ANY_NAME_4).isLessThanOrEqualToInt(50)) + .or(column(ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .build()); + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3)) + .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4)) + .build()); + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .all() + .where(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3)) + .and(column(ANY_NAME_2).isLessThanOrEqualToText(ANY_TEXT_4)) + .build()); + handler.scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .all() + .where(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3)) + .and(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .build()); + + // Assert + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isEqualToInt(10)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(10)) + .build()) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(ANY_NAME_4).isEqualToInt(20)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(20)) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where( + condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4)) + .build()) + .or( + condition(column(ANY_NAME_4).isGreaterThanInt(30)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3) + .isNotEqualToText(ANY_TEXT_3)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3) + .isNotEqualToText(ANY_TEXT_4)) + .build()) + .or( + condition(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(30)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(40)) + .build()) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .or( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and(column(ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3)) + .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and( + column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50)) + .build()) + .or( + condition( + column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60)) + .build()) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3)) + .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_3)) + .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_4)) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .all() + .where(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3)) + .and(column(ANY_NAME_2).isLessThanOrEqualToText(ANY_TEXT_4)) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + verify(storage) + .scan( + Scan.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .all() + .where( + condition(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3)) + .and(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .build()) + .or( + condition(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3)) + .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4)) + .build()) + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build()); + } + private List scanOrGetScanner(Scan scan, ScanType scanType) throws CrudException { if (scanType == ScanType.SCAN) { return handler.scan(scan); diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index 3566204ef7..13c2073c26 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -1,5 +1,6 @@ package com.scalar.db.transaction.consensuscommit; +import static com.scalar.db.api.ConditionBuilder.column; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -18,7 +19,6 @@ import static org.mockito.Mockito.verify; import com.google.common.collect.Sets; -import com.scalar.db.api.ConditionBuilder; import com.scalar.db.api.Consistency; import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; @@ -43,8 +43,10 @@ import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CommitException; import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.PreparationConflictException; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.io.DataType; import com.scalar.db.io.IntValue; import com.scalar.db.io.Key; @@ -62,6 +64,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.stream.IntStream; import javax.annotation.Nullable; import org.junit.jupiter.api.AfterAll; @@ -2353,7 +2356,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR transaction.put(preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1)); Get get = Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isEqualToInt(1)) + .where(column(BALANCE).isEqualToInt(1)) .build(); Optional result = transaction.get(get); assertThatCode(transaction::commit).doesNotThrowAnyException(); @@ -2374,7 +2377,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR transaction.put(preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1)); Get get = Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isEqualToInt(0)) + .where(column(BALANCE).isEqualToInt(0)) .build(); Optional result = transaction.get(get); assertThatCode(transaction::commit).doesNotThrowAnyException(); @@ -2393,7 +2396,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR Put put = preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1); Get get = Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE)) + .where(column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE)) .build(); // Act @@ -2416,7 +2419,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR Put put = preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1); Get get = Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isEqualToInt(0)) + .where(column(BALANCE).isEqualToInt(0)) .build(); // Act @@ -2538,7 +2541,7 @@ public void scan_NonOverlappingPutGivenBefore_ShouldScan() throws TransactionExc .table(TABLE_1) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .start(Key.ofInt(ACCOUNT_TYPE, 0)) - .where(ConditionBuilder.column(BALANCE).isNotEqualToInt(1)) + .where(column(BALANCE).isNotEqualToInt(1)) .build(); // Act @@ -2559,7 +2562,7 @@ public void scan_NonOverlappingPutGivenBefore_ShouldScan() throws TransactionExc transaction.put(preparePut(0, 1, namespace1, TABLE_1).withValue(BALANCE, 9999)); Scan scan = Scan.newBuilder(prepareScan(0, 1, 1, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE)) + .where(column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE)) .build(); // Act @@ -2581,8 +2584,8 @@ public void scan_OverlappingPutWithConjunctionsGivenBefore_ShouldThrowIllegalArg .withValue(SOME_COLUMN, "aaa")); Scan scan = Scan.newBuilder(prepareScan(0, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isLessThanInt(1000)) - .and(ConditionBuilder.column(SOME_COLUMN).isEqualToText("aaa")) + .where(column(BALANCE).isLessThanInt(1000)) + .and(column(SOME_COLUMN).isEqualToText("aaa")) .build(); // Act @@ -2606,7 +2609,7 @@ public void scan_OverlappingPutWithConjunctionsGivenBefore_ShouldThrowIllegalArg .build()); Scan scan = Scan.newBuilder(prepareScanWithIndex(namespace1, TABLE_1, 1)) - .where(ConditionBuilder.column(SOME_COLUMN).isGreaterThanText("aaa")) + .where(column(SOME_COLUMN).isGreaterThanText("aaa")) .build(); // Act @@ -2688,8 +2691,8 @@ public void scan_OverlappingPutWithConjunctionsGivenBefore_ShouldThrowIllegalArg .withValue(SOME_COLUMN, "aaa")); Scan scan = Scan.newBuilder(prepareScanWithIndex(namespace1, TABLE_1, 999)) - .where(ConditionBuilder.column(BALANCE).isLessThanInt(1000)) - .and(ConditionBuilder.column(SOME_COLUMN).isEqualToText("aaa")) + .where(column(BALANCE).isLessThanInt(1000)) + .and(column(SOME_COLUMN).isEqualToText("aaa")) .build(); // Act @@ -2959,8 +2962,8 @@ public void get_GetWithMatchedConjunctionsGivenForCommittedRecord_ShouldReturnRe DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE); Get get = Get.newBuilder(prepareGet(1, 1, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isEqualToInt(INITIAL_BALANCE)) - .and(ConditionBuilder.column(SOME_COLUMN).isNullText()) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .and(column(SOME_COLUMN).isNullText()) .build(); // Act @@ -2983,8 +2986,8 @@ public void get_GetWithUnmatchedConjunctionsGivenForCommittedRecord_ShouldReturn DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE); Get get = Get.newBuilder(prepareGet(1, 1, namespace1, TABLE_1)) - .where(ConditionBuilder.column(BALANCE).isEqualToInt(INITIAL_BALANCE)) - .and(ConditionBuilder.column(SOME_COLUMN).isEqualToText("aaa")) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .and(column(SOME_COLUMN).isEqualToText("aaa")) .build(); // Act @@ -3028,7 +3031,7 @@ public void scan_CalledTwiceWithSameConditionsAndUpdateForHappenedInBetween_Shou .table(TABLE_1) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .start(Key.ofInt(ACCOUNT_TYPE, 0)) - .where(ConditionBuilder.column(BALANCE).isEqualToInt(1)) + .where(column(BALANCE).isEqualToInt(1)) .build(); List result1 = transaction1.scan(scan); @@ -3063,7 +3066,7 @@ public void scan_CalledTwiceWithSameConditionsAndUpdateForHappenedInBetween_Shou .table(TABLE_1) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .start(Key.ofInt(ACCOUNT_TYPE, 0)) - .where(ConditionBuilder.column(BALANCE).isEqualToInt(1)) + .where(column(BALANCE).isEqualToInt(1)) .build(); Scan scan2 = Scan.newBuilder() @@ -3071,7 +3074,7 @@ public void scan_CalledTwiceWithSameConditionsAndUpdateForHappenedInBetween_Shou .table(TABLE_1) .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) .start(Key.ofInt(ACCOUNT_TYPE, 0)) - .where(ConditionBuilder.column(BALANCE).isGreaterThanInt(1)) + .where(column(BALANCE).isGreaterThanInt(1)) .build(); List result1 = transaction1.scan(scan1); @@ -5273,6 +5276,623 @@ public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyExceptio assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); } + @Test + public void + get_WithConjunction_ForPreparedRecordWhoseBeforeImageMatchesConjunction_ShouldReturnRecordAfterLazyRecovery() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Create a prepared record without before image + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + + // Act Assert + Optional actual; + while (true) { + try { + actual = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .build()); + break; + } catch (CrudConflictException e) { + // Retry on conflict + } + } + + assertThat(actual).isPresent(); + assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + get_WithConjunction_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + // Create a committed record with before image to simulate an old committed record that has both + // after and before images + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + originalStorage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(Attribute.STATE, TransactionState.COMMITTED.get()) + .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis()) + .build()); + + // Act Assert + Optional actual = + manager.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .build()); + + assertThat(actual).isNotPresent(); + } + + @Test + public void + scan_WithConjunction_ForPreparedRecordWhoseBeforeImageMatchesConjunction_ShouldReturnRecordAfterLazyRecovery() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Create a prepared record without before image + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + + // Act Assert + List results; + while (true) { + try { + results = + manager.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .build()); + break; + } catch (CrudConflictException e) { + // Retry on conflict + } + } + + assertThat(results).hasSize(2); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + scan_WithConjunction_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Create a committed record with before image to simulate an old committed record that has both + // after and before images + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + originalStorage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(Attribute.STATE, TransactionState.COMMITTED.get()) + .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis()) + .build()); + + // Act Assert + List results = + manager.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .build()); + + assertThat(results).hasSize(1); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + scan_WithConjunctionAndLimit_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 3)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Create a committed record with before image to simulate an old committed record that has both + // after and before images + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + originalStorage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(Attribute.STATE, TransactionState.COMMITTED.get()) + .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis()) + .build()); + + // Act Assert + List results = + manager.scan( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .limit(2) + .build()); + + assertThat(results).hasSize(2); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + getScanner_WithConjunction_ForPreparedRecordWhoseBeforeImageMatchesConjunction_ShouldReturnRecordAfterLazyRecovery() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Create a prepared record without before image + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + + // Act Assert + List results; + while (true) { + try (TransactionManagerCrudOperable.Scanner scanner = + manager.getScanner( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .build())) { + results = scanner.all(); + break; + } catch (CrudConflictException e) { + // Retry on conflict + } + } + + assertThat(results).hasSize(2); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + getScanner_WithConjunction_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Create a committed record with before image to simulate an old committed record that has both + // after and before images + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + originalStorage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(Attribute.STATE, TransactionState.COMMITTED.get()) + .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis()) + .build()); + + // Act Assert + List results; + try (TransactionManagerCrudOperable.Scanner scanner = + manager.getScanner( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .build())) { + results = scanner.all(); + } + + assertThat(results).hasSize(1); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + getScanner_WithConjunctionAndLimit_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord() + throws UnknownTransactionStatusException, CrudException, ExecutionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 3)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + // Create a committed record with before image to simulate an old committed record that has both + // after and before images + Optional result = + originalStorage.get( + Get.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build()); + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 100) + .build(); + String transactionId = UUID.randomUUID().toString(); + PrepareMutationComposer prepareMutationComposer = + new PrepareMutationComposer( + transactionId, + System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1), + new TransactionTableMetadataManager(admin, 0)); + prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null)); + originalStorage.mutate(prepareMutationComposer.get()); + originalStorage.put( + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(Attribute.STATE, TransactionState.COMMITTED.get()) + .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis()) + .build()); + + // Act Assert + List results; + try (TransactionManagerCrudOperable.Scanner scanner = + manager.getScanner( + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE)) + .limit(2) + .build())) { + results = scanner.all(); + } + + assertThat(results).hasSize(2); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + } + @Test public void manager_get_GetGivenForCommittedRecord_WithSerializable_ShouldReturnRecord() throws TransactionException {