Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,10 @@ Optional<TransactionResult> read(@Nullable Snapshot.Key key, Get get) throws Cru
}

if (result.isPresent() || get.getConjunctions().isEmpty()) {
// Keep the read set latest to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first. However, we update it only if a get operation has no
// conjunction or the result exists. This is because we don’t know whether the record
// actually exists or not due to the conjunction.
// We put the result into the read set only if a get operation has no conjunction or the
// result exists. This is because we don’t know whether the record actually exists or not
// due to the conjunction.

if (key != null) {
putIntoReadSetInSnapshot(key, result);
} else {
Expand Down Expand Up @@ -277,9 +276,6 @@ private Optional<TransactionResult> processScanResult(
}

if (ret.isPresent()) {
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
putIntoReadSetInSnapshot(key, ret);
}

Expand Down Expand Up @@ -319,7 +315,7 @@ public void closeScanners() throws CrudException {

private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional<TransactionResult> result) {
// In read-only mode, we don't need to put the result into the read set
if (!readOnly) {
if (!readOnly && !snapshot.containsKeyInReadSet(key)) {
snapshot.putIntoReadSet(key, result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,45 @@ public void get_ForNonExistingTable_ShouldThrowIllegalArgumentException()
assertThatThrownBy(() -> handler.get(get)).isInstanceOf(IllegalArgumentException.class);
}

@Test
public void get_DifferentGetButSameRecordReturned_ShouldNotOverwriteReadSet()
throws ExecutionException, CrudException {
// Arrange
Get get1 = prepareGet();
Get get2 = Get.newBuilder(get1).where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3)).build();
Get getForStorage1 = toGetForStorageFrom(get1);
Get getForStorage2 =
Get.newBuilder(get2)
.clearProjections()
.projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
.clearConditions()
.where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
.or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
.consistency(Consistency.LINEARIZABLE)
.build();
Result result = prepareResult(TransactionState.COMMITTED);
Optional<TransactionResult> expected = Optional.of(new TransactionResult(result));
Snapshot.Key key = new Snapshot.Key(getForStorage1);
when(snapshot.getResult(any(), any())).thenReturn(expected).thenReturn(expected);
when(snapshot.containsKeyInReadSet(key)).thenReturn(false).thenReturn(true);
when(storage.get(any())).thenReturn(Optional.of(result));

// Act
Optional<Result> results1 = handler.get(get1);
Optional<Result> results2 = handler.get(get2);

// Assert
assertThat(results1)
.isEqualTo(
Optional.of(
new FilteredResult(
expected.get(), Collections.emptyList(), TABLE_METADATA, false)));
assertThat(results2).isEqualTo(results1);
verify(storage).get(getForStorage1);
verify(storage).get(getForStorage2);
verify(snapshot).putIntoReadSet(key, expected);
}

@ParameterizedTest
@EnumSource(ScanType.class)
void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanType scanType)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.transaction.consensuscommit;

import static com.scalar.db.api.ConditionBuilder.column;
import static com.scalar.db.api.ConditionBuilder.updateIf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -45,6 +46,7 @@
import com.scalar.db.exception.transaction.CrudConflictException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.exception.transaction.PreparationConflictException;
import com.scalar.db.exception.transaction.RollbackException;
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.io.DataType;
Expand Down Expand Up @@ -6073,6 +6075,91 @@ public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyExceptio
assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
}

@Test
public void
commit_ConflictingExternalUpdate_DifferentGetButSameRecordReturned_ShouldThrowCommitConflictExceptionAndPreserveExternalChanges()
throws UnknownTransactionStatusException, CrudException, RollbackException {
// Arrange
manager.insert(
Insert.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, INITIAL_BALANCE)
.build());

// Act Assert
DistributedTransaction transaction = manager.begin();

// Retrieve the record
Optional<Result> result =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.build());

assertThat(result).isPresent();
assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(result.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);

// Update the balance of the record
transaction.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.condition(updateIf(column(BALANCE).isEqualToInt(INITIAL_BALANCE)).build())
.intValue(BALANCE, 100)
.build());

// Update the balance of the record by another transaction
manager.update(
Update.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.intValue(BALANCE, 200)
.build());

// Retrieve the record again, but use a different Get object (with a where clause)
result =
transaction.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.where(column(BALANCE).isEqualToInt(200))
.build());

assertThat(result).isNotPresent();

assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class);
transaction.rollback();

// Assert
result =
manager.get(
Get.newBuilder()
.namespace(namespace1)
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
.build());

assertThat(result).isPresent();
assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0);
assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
assertThat(result.get().getInt(BALANCE)).isEqualTo(200);
}

@Test
public void manager_get_GetGivenForCommittedRecord_WithSerializable_ShouldReturnRecord()
throws TransactionException {
Expand Down