Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudException;
Expand All @@ -19,8 +20,11 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -63,30 +67,35 @@ public CrudHandler(
this.mutationConditionsValidator = mutationConditionsValidator;
}

public Optional<Result> get(Get get) throws CrudException {
List<String> originalProjections = new ArrayList<>(get.getProjections());
public Optional<Result> get(Get originalGet) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
Get get = (Get) prepareStorageSelection(originalGet);

Optional<TransactionResult> result;
Snapshot.Key key = new Snapshot.Key(get);

if (snapshot.containsKeyInReadSet(key)) {
return createGetResult(key, originalProjections);
if (snapshot.containsKeyInGetSet(get)) {
return createGetResult(key, get, originalProjections);
}

result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
// Keep the read set latest to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, result);
return createGetResult(key, originalProjections);
snapshot.put(get, result); // for re-read and validation
return createGetResult(key, get, originalProjections);
}
throw new UncommittedRecordException(
result.get(), "this record needs recovery", snapshot.getId());
}

private Optional<Result> createGetResult(Snapshot.Key key, List<String> projections)
private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String> projections)
throws CrudException {
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
return snapshot
.get(key)
.mergeResult(key, snapshot.get(get))
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
}

Expand All @@ -104,20 +113,22 @@ public List<Result> scan(Scan scan) throws CrudException {
return results;
}

private List<Result> scanInternal(Scan scan) throws CrudException {
List<String> originalProjections = new ArrayList<>(scan.getProjections());
private List<Result> scanInternal(Scan originalScan) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
Scan scan = (Scan) prepareStorageSelection(originalScan);

List<Result> results = new ArrayList<>();
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Optional<List<Snapshot.Key>> keysInSnapshot = snapshot.get(scan);
if (keysInSnapshot.isPresent()) {
for (Snapshot.Key key : keysInSnapshot.get()) {
snapshot.get(key).ifPresent(results::add);
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
if (resultsInSnapshot.isPresent()) {
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
snapshot
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}
return createScanResults(scan, originalProjections, results);
}

List<Snapshot.Key> keys = new ArrayList<>();
Scanner scanner = null;
try {
scanner = getFromStorage(scan);
Expand All @@ -130,12 +141,12 @@ private List<Result> scanInternal(Scan scan) throws CrudException {

Snapshot.Key key = new Snapshot.Key(scan, r);

if (!snapshot.containsKeyInReadSet(key)) {
snapshot.put(key, Optional.of(result));
}
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, Optional.of(result));

keys.add(key);
snapshot.get(key).ifPresent(results::add);
snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
}
} finally {
if (scanner != null) {
Expand All @@ -146,15 +157,16 @@ private List<Result> scanInternal(Scan scan) throws CrudException {
}
}
}
snapshot.put(scan, keys);
snapshot.put(scan, results);

return createScanResults(scan, originalProjections, results);
}

private List<Result> createScanResults(Scan scan, List<String> projections, List<Result> results)
private List<Result> createScanResults(
Scan scan, List<String> projections, Map<Snapshot.Key, TransactionResult> results)
throws CrudException {
TableMetadata metadata = getTableMetadata(scan.forNamespace().get(), scan.forTable().get());
return results.stream()
return results.values().stream()
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled))
.collect(Collectors.toList());
}
Expand All @@ -171,37 +183,38 @@ public void delete(Delete delete) throws UnsatisfiedConditionException {
snapshot.put(new Snapshot.Key(delete), delete);
}

private Optional<TransactionResult> getFromStorage(Get get) throws CrudException {
@VisibleForTesting
Optional<TransactionResult> getFromStorage(Get get) throws CrudException {
try {
get.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(get).getAfterImageColumnNames();
get.withProjections(afterImageColumnNames);
}
get.withConsistency(Consistency.LINEARIZABLE);
return storage.get(get).map(TransactionResult::new);
} catch (ExecutionException e) {
throw new CrudException("get failed", e, snapshot.getId());
}
}

private Scanner getFromStorage(Scan scan) throws CrudException {
@VisibleForTesting
Scanner getFromStorage(Scan scan) throws CrudException {
try {
return storage.scan(scan);
} catch (ExecutionException e) {
throw new CrudException("scan failed", e, snapshot.getId());
}
}

private Selection prepareStorageSelection(Selection selection) throws CrudException {
try {
scan.clearProjections();
selection.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(scan).getAfterImageColumnNames();
scan.withProjections(afterImageColumnNames);
tableMetadataManager.getTransactionTableMetadata(selection).getAfterImageColumnNames();
selection.withProjections(afterImageColumnNames);
}
scan.withConsistency(Consistency.LINEARIZABLE);
return storage.scan(scan);
selection.withConsistency(Consistency.LINEARIZABLE);
return selection;
} catch (ExecutionException e) {
throw new CrudException("scan failed", e, snapshot.getId());
throw new CrudException("getting a table metadata failed", e, snapshot.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class Snapshot {
private final TransactionTableMetadataManager tableMetadataManager;
private final ParallelExecutor parallelExecutor;
private final Map<Key, Optional<TransactionResult>> readSet;
private final Map<Scan, List<Key>> scanSet;
private final Map<Get, Optional<TransactionResult>> getSet;
private final Map<Scan, Map<Key, TransactionResult>> scanSet;
private final Map<Key, Put> writeSet;
private final Map<Key, Delete> deleteSet;

Expand All @@ -65,6 +66,7 @@ public Snapshot(
this.tableMetadataManager = tableMetadataManager;
this.parallelExecutor = parallelExecutor;
readSet = new HashMap<>();
getSet = new HashMap<>();
scanSet = new HashMap<>();
writeSet = new HashMap<>();
deleteSet = new HashMap<>();
Expand All @@ -78,7 +80,8 @@ public Snapshot(
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor,
Map<Key, Optional<TransactionResult>> readSet,
Map<Scan, List<Key>> scanSet,
Map<Get, Optional<TransactionResult>> getSet,
Map<Scan, Map<Key, TransactionResult>> scanSet,
Map<Key, Put> writeSet,
Map<Key, Delete> deleteSet) {
this.id = id;
Expand All @@ -87,6 +90,7 @@ public Snapshot(
this.tableMetadataManager = tableMetadataManager;
this.parallelExecutor = parallelExecutor;
this.readSet = readSet;
this.getSet = getSet;
this.scanSet = scanSet;
this.writeSet = writeSet;
this.deleteSet = deleteSet;
Expand All @@ -107,8 +111,12 @@ public void put(Key key, Optional<TransactionResult> result) {
readSet.put(key, result);
}

public void put(Scan scan, List<Key> keys) {
scanSet.put(scan, keys);
public void put(Get get, Optional<TransactionResult> result) {
getSet.put(get, result);
}

public void put(Scan scan, Map<Key, TransactionResult> results) {
scanSet.put(scan, results);
}

public void put(Key key, Put put) {
Expand Down Expand Up @@ -137,21 +145,18 @@ public Optional<TransactionResult> getFromReadSet(Key key) {
return readSet.containsKey(key) ? readSet.get(key) : Optional.empty();
}

public Optional<TransactionResult> get(Key key) throws CrudException {
public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
throws CrudException {
if (deleteSet.containsKey(key)) {
return Optional.empty();
} else if (readSet.containsKey(key)) {
if (writeSet.containsKey(key)) {
// merge the result in the read set and the put in the write set
return Optional.of(
new TransactionResult(
new MergedResult(readSet.get(key), writeSet.get(key), getTableMetadata(key))));
} else {
return readSet.get(key);
}
} else if (writeSet.containsKey(key)) {
// merge the result in the read set and the put in the write set
return Optional.of(
new TransactionResult(
new MergedResult(result, writeSet.get(key), getTableMetadata(key))));
} else {
return result;
}
throw new IllegalArgumentException(
"getting data neither in the read set nor the delete set is not allowed");
}

private TableMetadata getTableMetadata(Key key) throws CrudException {
Expand Down Expand Up @@ -185,7 +190,17 @@ private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
}
}

public Optional<List<Key>> get(Scan scan) {
public boolean containsKeyInGetSet(Get get) {
return getSet.containsKey(get);
}

public Optional<TransactionResult> get(Get get) {
// We expect this method is called after putting the result of the get operation in the get set.
assert getSet.containsKey(get);
return getSet.get(get);
}

public Optional<Map<Key, TransactionResult>> get(Scan scan) {
if (scanSet.containsKey(scan)) {
return Optional.ofNullable(scanSet.get(scan));
}
Expand Down Expand Up @@ -222,6 +237,10 @@ private boolean isWriteSetOverlappedWith(Scan scan) {
}

for (Map.Entry<Key, Put> entry : writeSet.entrySet()) {
if (scanSet.get(scan).containsKey(entry.getKey())) {
return true;
}

Put put = entry.getValue();

if (!put.forNamespace().equals(scan.forNamespace())
Expand Down Expand Up @@ -278,7 +297,7 @@ private boolean isWriteSetOverlappedWith(Scan scan) {

private boolean isWriteSetOverlappedWith(ScanWithIndex scan) {
for (Map.Entry<Key, Put> entry : writeSet.entrySet()) {
if (scanSet.get(scan).contains(entry.getKey())) {
if (scanSet.get(scan).containsKey(entry.getKey())) {
return true;
}

Expand Down Expand Up @@ -315,7 +334,7 @@ private boolean isWriteSetOverlappedWith(ScanAll scan) {
// yet. Thus, we need to evaluate if the scan condition potentially matches put operations.

// Check for cases 1 and 2
if (scanSet.get(scan).contains(entry.getKey())) {
if (scanSet.get(scan).containsKey(entry.getKey())) {
return true;
}

Expand Down Expand Up @@ -433,14 +452,14 @@ void toSerializableWithExtraRead(DistributedStorage storage)
List<ParallelExecutorTask> tasks = new ArrayList<>();

// Read set by scan is re-validated to check if there is no anti-dependency
for (Map.Entry<Scan, List<Key>> entry : scanSet.entrySet()) {
for (Map.Entry<Scan, Map<Key, TransactionResult>> entry : scanSet.entrySet()) {
tasks.add(
() -> {
Map<Key, TransactionResult> currentReadMap = new HashMap<>();
Set<Key> validatedReadSet = new HashSet<>();
Scanner scanner = null;
Scan scan = entry.getKey();
try {
Scan scan = entry.getKey();
// only get tx_id and tx_version columns because we use only them to compare
scan.clearProjections();
scan.withProjection(Attribute.ID).withProjection(Attribute.VERSION);
Expand All @@ -464,13 +483,15 @@ void toSerializableWithExtraRead(DistributedStorage storage)
}
}

for (Key key : entry.getValue()) {
for (Map.Entry<Key, TransactionResult> e : entry.getValue().entrySet()) {
Key key = e.getKey();
TransactionResult result = e.getValue();
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
continue;
}
// Check if read records are not changed
TransactionResult latestResult = currentReadMap.get(key);
if (isChanged(Optional.ofNullable(latestResult), readSet.get(key))) {
if (isChanged(Optional.ofNullable(latestResult), Optional.of(result))) {
throwExceptionDueToAntiDependency();
}
validatedReadSet.add(key);
Expand All @@ -483,35 +504,23 @@ void toSerializableWithExtraRead(DistributedStorage storage)
});
}

// Calculate read set validated by scan
Set<Key> validatedReadSetByScan = new HashSet<>();
for (List<Key> values : scanSet.values()) {
validatedReadSetByScan.addAll(values);
}

// Read set by get is re-validated to check if there is no anti-dependency
for (Map.Entry<Key, Optional<TransactionResult>> entry : readSet.entrySet()) {
Key key = entry.getKey();
if (writeSet.containsKey(key)
|| deleteSet.containsKey(key)
|| validatedReadSetByScan.contains(key)) {
for (Map.Entry<Get, Optional<TransactionResult>> entry : getSet.entrySet()) {
Get get = entry.getKey();
Key key = new Key(get);
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
continue;
}

tasks.add(
() -> {
Optional<TransactionResult> originalResult = getSet.get(get);
// only get tx_id and tx_version columns because we use only them to compare
Get get =
new Get(key.getPartitionKey(), key.getClusteringKey().orElse(null))
.withProjection(Attribute.ID)
.withProjection(Attribute.VERSION)
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(key.getNamespace())
.forTable(key.getTable());

get.clearProjections();
get.withProjection(Attribute.ID).withProjection(Attribute.VERSION);
Optional<TransactionResult> latestResult = storage.get(get).map(TransactionResult::new);
// Check if a read record is not changed
if (isChanged(latestResult, entry.getValue())) {
if (isChanged(latestResult, originalResult)) {
throwExceptionDueToAntiDependency();
}
});
Expand Down
Loading