Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,17 +77,27 @@ public CrudHandler(
public Optional<Result> get(Get originalGet) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
Get get = (Get) prepareStorageSelection(originalGet);
Snapshot.Key key = new Snapshot.Key(get);
readUnread(key, get);

TableMetadata metadata = getTableMetadata(get);

Snapshot.Key key;
if (ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) {
// In case of a Get with index, we don't know the key until we read the record
key = null;
} else {
key = new Snapshot.Key(get);
}

readUnread(key, get);

return snapshot
.getResult(key, get)
.map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled));
}

// Only for a Get with index, the argument `key` is null
@VisibleForTesting
void readUnread(Snapshot.Key key, Get get) throws CrudException {
void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
if (!snapshot.containsKeyInGetSet(get)) {
read(key, get);
}
Expand All @@ -95,7 +106,7 @@ void readUnread(Snapshot.Key key, Get get) throws CrudException {
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
// concurrently in the implicit pre-read
@VisibleForTesting
void read(Snapshot.Key key, Get get) throws CrudException {
void read(@Nullable Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
if (result.isPresent() || get.getConjunctions().isEmpty()) {
Expand All @@ -104,7 +115,18 @@ void read(Snapshot.Key key, Get get) throws CrudException {
// transaction read it first. However, we update it only if a get operation has no
// conjunction or the result exists. This is because we don’t know whether the record
// actually exists or not due to the conjunction.
snapshot.putIntoReadSet(key, result);
if (key != null) {
snapshot.putIntoReadSet(key, result);
} else {
// Only for a Get with index, the argument `key` is null

if (result.isPresent()) {
// Only when we can get the record with the Get with index, we can put it into the read
// set
key = new Snapshot.Key(get, result.get());
snapshot.putIntoReadSet(key, result);
}
}
}
snapshot.putIntoGetSet(get, result); // for re-read and validation
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,13 @@ public Key(Get get) {
this((Operation) get);
}

public Key(Get get, Result result) {
this.namespace = get.forNamespace().get();
this.table = get.forTable().get();
this.partitionKey = result.getPartitionKey().get();
this.clusteringKey = result.getClusteringKey();
}

public Key(Put put) {
this((Operation) put);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class CrudHandlerTest {
private static final String ANY_ID_2 = "id2";
private static final String ANY_NAME_1 = "name1";
private static final String ANY_NAME_2 = "name2";
private static final String ANY_NAME_3 = "name3";
private static final String ANY_TEXT_1 = "text1";
private static final String ANY_TEXT_2 = "text2";
private static final String ANY_TEXT_3 = "text3";
Expand All @@ -66,8 +67,10 @@ public class CrudHandlerTest {
TableMetadata.newBuilder()
.addColumn(ANY_NAME_1, DataType.TEXT)
.addColumn(ANY_NAME_2, DataType.TEXT)
.addColumn(ANY_NAME_3, DataType.TEXT)
.addPartitionKey(ANY_NAME_1)
.addClusteringKey(ANY_NAME_2)
.addSecondaryIndex(ANY_NAME_3)
.build());
private static final TransactionTableMetadata TRANSACTION_TABLE_METADATA =
new TransactionTableMetadata(TABLE_METADATA);
Expand Down Expand Up @@ -928,6 +931,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()

// Assert
verify(storage, never()).get(any());
verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), any(Optional.class));
verify(snapshot, never()).putIntoGetSet(any(Get.class), any(Optional.class));
}

Expand Down Expand Up @@ -1014,6 +1018,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
// Assert
verify(storage).get(any());
verify(snapshot).putIntoReadSet(key, Optional.of(new TransactionResult(result)));
verify(snapshot).putIntoGetSet(getForKey, Optional.of(new TransactionResult(result)));
}

@Test
Expand Down Expand Up @@ -1050,6 +1055,88 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
});
}

@Test
public void
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods()
throws CrudException, ExecutionException {
// Arrange
Get getWithIndex =
Get.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
.build();
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);
when(storage.get(any())).thenReturn(Optional.empty());

// Act
handler.readUnread(null, getWithIndex);

// Assert
verify(storage).get(any());
verify(snapshot, never()).putIntoReadSet(any(), any());
verify(snapshot).putIntoGetSet(getWithIndex, Optional.empty());
}

@Test
public void
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods()
throws CrudException, ExecutionException {
// Arrange
Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1)));
when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2)));
when(storage.get(any())).thenReturn(Optional.of(result));

Get getWithIndex =
Get.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
.build();
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);

// Act
handler.readUnread(null, getWithIndex);

// Assert
verify(storage).get(any());
verify(snapshot)
.putIntoReadSet(
new Snapshot.Key(getWithIndex, result), Optional.of(new TransactionResult(result)));
verify(snapshot).putIntoGetSet(getWithIndex, Optional.of(new TransactionResult(result)));
}

@Test
public void
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException()
throws ExecutionException {
// Arrange
Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get());
when(storage.get(any())).thenReturn(Optional.of(result));

Get getWithIndex =
Get.newBuilder()
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.indexKey(Key.ofText(ANY_NAME_3, ANY_TEXT_1))
.build();
when(snapshot.containsKeyInGetSet(getWithIndex)).thenReturn(false);

// Act Assert
assertThatThrownBy(() -> handler.readUnread(null, getWithIndex))
.isInstanceOf(UncommittedRecordException.class)
.satisfies(
e -> {
UncommittedRecordException exception = (UncommittedRecordException) e;
assertThat(exception.getSelection()).isEqualTo(getWithIndex);
assertThat(exception.getResults().size()).isEqualTo(1);
assertThat(exception.getResults().get(0)).isEqualTo(result);
});
}

@Test
public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws CrudException {
// Arrange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4531,6 +4531,149 @@ public void get_GetWithIndexGiven_WithSerializable_ShouldNotThrowAnyException()
assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class);
}

@Test
public void getAndUpdate_GetWithIndexGiven_ShouldUpdate() throws TransactionException {
// Arrange
manager.insert(
Insert.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, INITIAL_BALANCE)
.build());

// Act Assert
DistributedTransaction transaction = manager.begin();
Optional<Result> actual =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE))
.build());

assertThat(actual).isPresent();
assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);

transaction.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, 1)
.build());

transaction.commit();

transaction = manager.begin();
actual =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.build());
transaction.commit();

assertThat(actual).isPresent();
assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(actual.get().getInt(BALANCE)).isEqualTo(1);
}

@Test
public void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate() throws TransactionException {
// Arrange
manager.mutate(
Arrays.asList(
Insert.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, INITIAL_BALANCE)
.build(),
Insert.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
.intValue(BALANCE, INITIAL_BALANCE)
.build()));

// Act Assert
DistributedTransaction transaction = manager.begin();
List<Result> actualResults =
transaction.scan(
Scan.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.indexKey(Key.ofInt(BALANCE, INITIAL_BALANCE))
.build());

assertThat(actualResults).hasSize(2);
Set<Integer> expectedTypes = Sets.newHashSet(0, 1);
for (Result result : actualResults) {
assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0);
expectedTypes.remove(result.getInt(ACCOUNT_TYPE));
assertThat(result.getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
}
assertThat(expectedTypes).isEmpty();

transaction.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, 1)
.build());
transaction.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
.intValue(BALANCE, 2)
.build());

transaction.commit();

transaction = manager.begin();
Optional<Result> actual1 =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.build());
Optional<Result> actual2 =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
.build());
transaction.commit();

assertThat(actual1).isPresent();
assertThat(actual1.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(actual1.get().getInt(BALANCE)).isEqualTo(1);

assertThat(actual2).isPresent();
assertThat(actual2.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(actual2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1);
assertThat(actual2.get().getInt(BALANCE)).isEqualTo(2);
}

private DistributedTransaction prepareTransfer(
int fromId,
String fromNamespace,
Expand Down