Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -68,35 +72,43 @@ public CrudHandler(
this.parallelExecutor = parallelExecutor;
}

public Optional<Result> get(Get get) throws CrudException {
List<String> originalProjections = new ArrayList<>(get.getProjections());
public Optional<Result> get(Get originalGet) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
Get get = (Get) prepareStorageSelection(originalGet);
Snapshot.Key key = new Snapshot.Key(get);
readUnread(key, get);
return createGetResult(key, originalProjections);
return createGetResult(key, get, originalProjections);
}

@VisibleForTesting
void readUnread(Snapshot.Key key, Get get) throws CrudException {
if (!snapshot.containsKeyInReadSet(key)) {
if (!snapshot.containsKeyInGetSet(get)) {
read(key, get);
}
}

private void read(Snapshot.Key key, Get get) throws CrudException {
// Although this class is not thread-safe, this method is actually thread-safe, so we call it
// concurrently in the implicit pre-read
@VisibleForTesting
void read(Snapshot.Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
// Keep the read set latest to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, result);
snapshot.put(get, result); // for re-read and validation
return;
}
throw new UncommittedRecordException(
get, result.get(), "This record needs recovery", snapshot.getId());
}

private Optional<Result> createGetResult(Snapshot.Key key, List<String> projections)
private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String> projections)
throws CrudException {
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
return snapshot
.get(key)
.mergeResult(key, snapshot.get(get))
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
}

Expand All @@ -115,20 +127,22 @@ public List<Result> scan(Scan scan) throws CrudException {
return results;
}

private List<Result> scanInternal(Scan scan) throws CrudException {
List<String> originalProjections = new ArrayList<>(scan.getProjections());
private List<Result> scanInternal(Scan originalScan) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
Scan scan = (Scan) prepareStorageSelection(originalScan);

List<Result> results = new ArrayList<>();
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Optional<List<Snapshot.Key>> keysInSnapshot = snapshot.get(scan);
if (keysInSnapshot.isPresent()) {
for (Snapshot.Key key : keysInSnapshot.get()) {
snapshot.get(key).ifPresent(results::add);
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
if (resultsInSnapshot.isPresent()) {
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
snapshot
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}
return createScanResults(scan, originalProjections, results);
}

List<Snapshot.Key> keys = new ArrayList<>();
Scanner scanner = null;
try {
scanner = getFromStorage(scan);
Expand All @@ -141,12 +155,12 @@ private List<Result> scanInternal(Scan scan) throws CrudException {

Snapshot.Key key = new Snapshot.Key(scan, r);

if (!snapshot.containsKeyInReadSet(key)) {
snapshot.put(key, Optional.of(result));
}
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, Optional.of(result));

keys.add(key);
snapshot.get(key).ifPresent(results::add);
snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
}
} finally {
if (scanner != null) {
Expand All @@ -157,15 +171,16 @@ private List<Result> scanInternal(Scan scan) throws CrudException {
}
}
}
snapshot.put(scan, keys);
snapshot.put(scan, results);

return createScanResults(scan, originalProjections, results);
}

private List<Result> createScanResults(Scan scan, List<String> projections, List<Result> results)
private List<Result> createScanResults(
Scan scan, List<String> projections, Map<Snapshot.Key, TransactionResult> results)
throws CrudException {
TableMetadata metadata = getTableMetadata(scan.forNamespace().get(), scan.forTable().get());
return results.stream()
return results.values().stream()
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled))
.collect(Collectors.toList());
}
Expand All @@ -182,8 +197,8 @@ public void put(Put put) throws CrudException {
}

if (put.getCondition().isPresent()) {
if (put.isImplicitPreReadEnabled()) {
readUnread(key, createGet(key));
if (put.isImplicitPreReadEnabled() && !snapshot.containsKeyInReadSet(key)) {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
put, snapshot.getFromReadSet(key).orElse(null));
Expand All @@ -196,7 +211,9 @@ public void delete(Delete delete) throws CrudException {
Snapshot.Key key = new Snapshot.Key(delete);

if (delete.getCondition().isPresent()) {
readUnread(key, createGet(key));
if (!snapshot.containsKeyInReadSet(key)) {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
delete, snapshot.getFromReadSet(key).orElse(null));
}
Expand Down Expand Up @@ -226,30 +243,21 @@ public void readIfImplicitPreReadEnabled() throws CrudException {
}
}

private Get createGet(Snapshot.Key key) {
private Get createGet(Snapshot.Key key) throws CrudException {
GetBuilder.BuildableGet buildableGet =
Get.newBuilder()
.namespace(key.getNamespace())
.table(key.getTable())
.partitionKey(key.getPartitionKey());
key.getClusteringKey().ifPresent(buildableGet::clusteringKey);
return buildableGet.build();
return (Get) prepareStorageSelection(buildableGet.build());
}

// Although this class is not thread-safe, this method is actually thread-safe because the storage
// is thread-safe
@VisibleForTesting
Optional<TransactionResult> getFromStorage(Get get) throws CrudException {
try {
get.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(get).getAfterImageColumnNames();
get.withProjections(afterImageColumnNames);
}
get.withConsistency(Consistency.LINEARIZABLE);
return storage.get(get).map(TransactionResult::new);
} catch (ExecutionException e) {
throw new CrudException("Get failed", e, snapshot.getId());
Expand All @@ -258,18 +266,26 @@ Optional<TransactionResult> getFromStorage(Get get) throws CrudException {

private Scanner getFromStorage(Scan scan) throws CrudException {
try {
scan.clearProjections();
return storage.scan(scan);
} catch (ExecutionException e) {
throw new CrudException("Scan failed", e, snapshot.getId());
}
}

private Selection prepareStorageSelection(Selection selection) throws CrudException {
try {
selection.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(scan).getAfterImageColumnNames();
scan.withProjections(afterImageColumnNames);
tableMetadataManager.getTransactionTableMetadata(selection).getAfterImageColumnNames();
selection.withProjections(afterImageColumnNames);
}
scan.withConsistency(Consistency.LINEARIZABLE);
return storage.scan(scan);
selection.withConsistency(Consistency.LINEARIZABLE);
return selection;
} catch (ExecutionException e) {
throw new CrudException("Scan failed", e, snapshot.getId());
throw new CrudException("Getting a table metadata failed", e, snapshot.getId());
}
}

Expand Down
Loading