Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to branch(3) : Add cross-partition scan options #1324

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public static Properties getProperties(@SuppressWarnings("unused") String testNa
props.setProperty(DatabaseConfig.CONTACT_POINTS, contactPoints);
props.setProperty(DatabaseConfig.USERNAME, username);
props.setProperty(DatabaseConfig.PASSWORD, password);
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to true in v3.x.

props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public static Properties getProperties(String testName) {
props.setProperty(DatabaseConfig.CONTACT_POINTS, contactPoint);
props.setProperty(DatabaseConfig.PASSWORD, password);
props.setProperty(DatabaseConfig.STORAGE, "cosmos");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");

if (databasePrefix.isPresent()) {
// Add the prefix and testName as a metadata database suffix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public static Properties getProperties(String testName) {
props.setProperty(DatabaseConfig.USERNAME, accessKeyId);
props.setProperty(DatabaseConfig.PASSWORD, secretAccessKey);
props.setProperty(DatabaseConfig.STORAGE, "dynamo");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");

// Add testName as a metadata namespace suffix
props.setProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public static Properties getProperties(String testName) {
props.setProperty(DatabaseConfig.USERNAME, username);
props.setProperty(DatabaseConfig.PASSWORD, password);
props.setProperty(DatabaseConfig.STORAGE, "jdbc");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
props.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "true");

// Add testName as a metadata schema suffix
props.setProperty(JdbcConfig.TABLE_METADATA_SCHEMA, JdbcAdmin.METADATA_SCHEMA + "_" + testName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public static Properties getPropertiesForCassandra(@SuppressWarnings("unused") S
properties.setProperty(DatabaseConfig.USERNAME, username);
properties.setProperty(DatabaseConfig.PASSWORD, password);
properties.setProperty(DatabaseConfig.STORAGE, "cassandra");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "false");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");
return properties;
}

Expand All @@ -54,6 +57,9 @@ public static Properties getPropertiesForJdbc(String testName) {
properties.setProperty(DatabaseConfig.USERNAME, username);
properties.setProperty(DatabaseConfig.PASSWORD, password);
properties.setProperty(DatabaseConfig.STORAGE, "jdbc");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "true");

// Add testName as a metadata schema suffix
properties.setProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.TableMetadataManager;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Column;
import com.scalar.db.io.Key;
Expand All @@ -27,9 +28,11 @@
@ThreadSafe
public class OperationChecker {

private final DatabaseConfig config;
private final TableMetadataManager tableMetadataManager;

public OperationChecker(TableMetadataManager tableMetadataManager) {
public OperationChecker(DatabaseConfig config, TableMetadataManager tableMetadataManager) {
this.config = config;
this.tableMetadataManager = tableMetadataManager;
}

Expand Down Expand Up @@ -122,6 +125,11 @@ public void check(Scan scan) throws ExecutionException {
}

private void check(ScanAll scanAll) throws ExecutionException {
if (!config.isCrossPartitionScanEnabled()) {
throw new IllegalArgumentException(
"Cross-partition scan is not enabled. Operation: " + scanAll);
}

TableMetadata metadata = getTableMetadata(scanAll);

checkProjections(scanAll, metadata);
Expand All @@ -130,8 +138,16 @@ private void check(ScanAll scanAll) throws ExecutionException {
throw new IllegalArgumentException("The limit cannot be negative. Operation: " + scanAll);
}

if (!config.isCrossPartitionScanOrderingEnabled() && !scanAll.getOrderings().isEmpty()) {
throw new IllegalArgumentException(
"Cross-partition scan ordering is not enabled. Operation: " + scanAll);
}
checkOrderings(scanAll, metadata);

if (!config.isCrossPartitionScanFilteringEnabled() && !scanAll.getConjunctions().isEmpty()) {
throw new IllegalArgumentException(
"Cross-partition scan filtering is not enabled. Operation: " + scanAll);
}
checkConjunctions(scanAll, metadata);
}

Expand Down
32 changes: 32 additions & 0 deletions core/src/main/java/com/scalar/db/config/DatabaseConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.config;

import static com.google.common.base.Preconditions.checkArgument;
import static com.scalar.db.config.ConfigUtils.getBoolean;
import static com.scalar.db.config.ConfigUtils.getInt;
import static com.scalar.db.config.ConfigUtils.getLong;
import static com.scalar.db.config.ConfigUtils.getString;
Expand Down Expand Up @@ -32,6 +33,9 @@ public class DatabaseConfig {
private long metadataCacheExpirationTimeSecs;
private long activeTransactionManagementExpirationTimeMillis;
@Nullable private String defaultNamespaceName;
private boolean crossPartitionScanEnabled;
private boolean crossPartitionScanFilteringEnabled;
private boolean crossPartitionScanOrderingEnabled;

public static final String PREFIX = "scalar.db.";
public static final String CONTACT_POINTS = PREFIX + "contact_points";
Expand All @@ -45,6 +49,10 @@ public class DatabaseConfig {
public static final String ACTIVE_TRANSACTION_MANAGEMENT_EXPIRATION_TIME_MILLIS =
PREFIX + "active_transaction_management.expiration_time_millis";
public static final String DEFAULT_NAMESPACE_NAME = PREFIX + "default_namespace_name";
public static final String SCAN_PREFIX = PREFIX + "cross_partition_scan.";
public static final String CROSS_PARTITION_SCAN = SCAN_PREFIX + "enabled";
public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled";
public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled";

public DatabaseConfig(File propertiesFile) throws IOException {
try (FileInputStream stream = new FileInputStream(propertiesFile)) {
Expand Down Expand Up @@ -94,6 +102,18 @@ protected void load() {
activeTransactionManagementExpirationTimeMillis =
getLong(getProperties(), ACTIVE_TRANSACTION_MANAGEMENT_EXPIRATION_TIME_MILLIS, -1);
defaultNamespaceName = getString(getProperties(), DEFAULT_NAMESPACE_NAME, null);
crossPartitionScanEnabled = getBoolean(getProperties(), CROSS_PARTITION_SCAN, true);
crossPartitionScanFilteringEnabled =
getBoolean(getProperties(), CROSS_PARTITION_SCAN_FILTERING, false);
crossPartitionScanOrderingEnabled =
getBoolean(getProperties(), CROSS_PARTITION_SCAN_ORDERING, false);

if (!crossPartitionScanEnabled
&& (crossPartitionScanFilteringEnabled || crossPartitionScanOrderingEnabled)) {
throw new IllegalArgumentException(
CROSS_PARTITION_SCAN
+ " must be enabled to use cross-partition scan with filtering or ordering");
}
}

public List<String> getContactPoints() {
Expand Down Expand Up @@ -131,4 +151,16 @@ public long getActiveTransactionManagementExpirationTimeMillis() {
public Optional<String> getDefaultNamespaceName() {
return Optional.ofNullable(defaultNamespaceName);
}

public boolean isCrossPartitionScanEnabled() {
return crossPartitionScanEnabled;
}

public boolean isCrossPartitionScanFilteringEnabled() {
return crossPartitionScanFilteringEnabled;
}

public boolean isCrossPartitionScanOrderingEnabled() {
return crossPartitionScanOrderingEnabled;
}
}
19 changes: 12 additions & 7 deletions core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
Expand All @@ -45,6 +44,13 @@ public class Cassandra extends AbstractDistributedStorage {
@Inject
public Cassandra(DatabaseConfig config) {
super(config);

if (config.isCrossPartitionScanFilteringEnabled()
|| config.isCrossPartitionScanOrderingEnabled()) {
throw new IllegalArgumentException(
"Cross-partition scan with filtering or ordering is not supported in Cassandra");
}

clusterManager = new ClusterManager(config);
Session session = clusterManager.getSession();

Expand All @@ -62,7 +68,7 @@ public Cassandra(DatabaseConfig config) {
metadataManager =
new TableMetadataManager(
new CassandraAdmin(clusterManager), config.getMetadataCacheExpirationTimeSecs());
operationChecker = new OperationChecker(metadataManager);
operationChecker = new OperationChecker(config, metadataManager);
}

@VisibleForTesting
Expand All @@ -81,6 +87,10 @@ public Cassandra(DatabaseConfig config) {
this.operationChecker = operationChecker;
}

// For the SpotBugs warning CT_CONSTRUCTOR_THROW
@Override
protected final void finalize() {}

@Override
@Nonnull
public Optional<Result> get(Get get) throws ExecutionException {
Expand All @@ -107,11 +117,6 @@ public Scanner scan(Scan scan) throws ExecutionException {
scan = copyAndSetTargetToIfNot(scan);
operationChecker.check(scan);

if (ScalarDbUtils.isRelational(scan)) {
throw new UnsupportedOperationException(
"Scanning all records with orderings or conditions is not supported in Cassandra");
}

ResultSet results = handlers.select().handle(scan);

return new ScannerImpl(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.scalar.db.storage.cassandra;

public class CassandraConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v3.x, there isn't CassandraConfig, but I added it to provide STORAGE_NAME, as the same as other storages.

// just hold the storage name for consistency with other storage
public static final String STORAGE_NAME = "cassandra";
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class CassandraProvider implements DistributedStorageProvider {
@Override
public String getName() {
return "cassandra";
return CassandraConfig.STORAGE_NAME;
}

@Override
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
Expand All @@ -47,6 +46,13 @@ public class Cosmos extends AbstractDistributedStorage {
@Inject
public Cosmos(DatabaseConfig databaseConfig) {
super(databaseConfig);

if (databaseConfig.isCrossPartitionScanFilteringEnabled()
|| databaseConfig.isCrossPartitionScanOrderingEnabled()) {
throw new IllegalArgumentException(
"Cross-partition scan with filtering or ordering is not supported in Cosmos DB");
}

CosmosConfig config = new CosmosConfig(databaseConfig);

client =
Expand All @@ -60,7 +66,7 @@ public Cosmos(DatabaseConfig databaseConfig) {
TableMetadataManager metadataManager =
new TableMetadataManager(
new CosmosAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs());
operationChecker = new CosmosOperationChecker(metadataManager);
operationChecker = new CosmosOperationChecker(databaseConfig, metadataManager);

selectStatementHandler = new SelectStatementHandler(client, metadataManager);
putStatementHandler = new PutStatementHandler(client, metadataManager);
Expand Down Expand Up @@ -88,6 +94,10 @@ public Cosmos(DatabaseConfig databaseConfig) {
this.operationChecker = operationChecker;
}

// For the SpotBugs warning CT_CONSTRUCTOR_THROW
@Override
protected final void finalize() {}

@Override
@Nonnull
public Optional<Result> get(Get get) throws ExecutionException {
Expand All @@ -108,11 +118,6 @@ public Scanner scan(Scan scan) throws ExecutionException {
scan = copyAndSetTargetToIfNot(scan);
operationChecker.check(scan);

if (ScalarDbUtils.isRelational(scan)) {
throw new UnsupportedOperationException(
"Scanning all records with orderings or conditions is not supported in Cosmos DB");
}

return selectStatementHandler.handle(scan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

@Immutable
public class CosmosConfig {

public static final String PREFIX = DatabaseConfig.PREFIX + "cosmos.";
public static final String STORAGE_NAME = "cosmos";
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
public static final String TABLE_METADATA_DATABASE = PREFIX + "table_metadata.database";

private final String endpoint;
Expand All @@ -19,8 +19,9 @@ public class CosmosConfig {

public CosmosConfig(DatabaseConfig databaseConfig) {
String storage = databaseConfig.getStorage();
if (!"cosmos".equals(storage)) {
throw new IllegalArgumentException(DatabaseConfig.STORAGE + " should be 'cosmos'");
if (!storage.equals(STORAGE_NAME)) {
throw new IllegalArgumentException(
DatabaseConfig.STORAGE + " should be '" + STORAGE_NAME + "'");
}

if (databaseConfig.getContactPoints().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.TableMetadataManager;
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.DataType;

public class CosmosOperationChecker extends OperationChecker {
public CosmosOperationChecker(TableMetadataManager metadataManager) {
super(metadataManager);
public CosmosOperationChecker(
DatabaseConfig databaseConfig, TableMetadataManager metadataManager) {
super(databaseConfig, metadataManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class CosmosProvider implements DistributedStorageProvider {
@Override
public String getName() {
return "cosmos";
return CosmosConfig.STORAGE_NAME;
}

@Override
Expand Down