Skip to content
This repository has been archived by the owner on May 2, 2024. It is now read-only.

Commit

Permalink
Introduce cassandra.skip-parition-check
Browse files Browse the repository at this point in the history
NativeCassandraSession#getPartitions is very slow
if there are many partitions used in WHERE clause.
In our use, the partitions usually exist, so
introduce cassandra.skip-parition-check property
to reduce planning time.
  • Loading branch information
abicky committed Oct 23, 2018
1 parent 71dcdd7 commit d7ce5e8
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class CassandraClientConfig
private Duration noHostAvailableRetryTimeout = new Duration(1, MINUTES);
private int speculativeExecutionLimit = 1;
private Duration speculativeExecutionDelay = new Duration(500, MILLISECONDS);
private boolean skipPartitionCheck;

@NotNull
@Size(min = 1)
Expand Down Expand Up @@ -379,4 +380,16 @@ public CassandraClientConfig setSpeculativeExecutionDelay(Duration speculativeEx
this.speculativeExecutionDelay = speculativeExecutionDelay;
return this;
}

public boolean isSkipPartitionCheck()
{
return skipPartitionCheck;
}

@Config("cassandra.skip-partition-check")
public CassandraClientConfig setSkipPartitionCheck(boolean skipPartitionCheck)
{
this.skipPartitionCheck = skipPartitionCheck;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public static CassandraSession createCassandraSession(
contactPoints.forEach(clusterBuilder::addContactPoint);
return clusterBuilder.build();
}),
config.getNoHostAvailableRetryTimeout());
config.getNoHostAvailableRetryTimeout(),
config.isSkipPartitionCheck());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,22 @@ public static String getColumnValueForCql(Row row, int i, CassandraType cassandr
}
}

public static String getColumnValueForCql(Object object, CassandraType cassandraType)
{
switch (cassandraType) {
case ASCII:
case TEXT:
case VARCHAR:
return CassandraCqlUtils.quoteStringLiteral(((Slice) object).toStringUtf8());
case INT:
case BIGINT:
return object.toString();
default:
throw new IllegalStateException("Handling of type " + cassandraType
+ " is not implemented");
}
}

private static String objectToString(Object object, CassandraType elemType)
{
switch (elemType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.google.common.collect.Sets;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.Duration;

import java.nio.ByteBuffer;
Expand All @@ -76,6 +77,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.Locale.ENGLISH;
Expand All @@ -98,13 +100,15 @@ public class NativeCassandraSession
private final Cluster cluster;
private final Supplier<Session> session;
private final Duration noHostAvailableRetryTimeout;
private final boolean skipPartitionCheck;

public NativeCassandraSession(String connectorId, JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec, Cluster cluster, Duration noHostAvailableRetryTimeout)
public NativeCassandraSession(String connectorId, JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec, Cluster cluster, Duration noHostAvailableRetryTimeout, boolean skipPartitionCheck)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
this.cluster = requireNonNull(cluster, "cluster is null");
this.noHostAvailableRetryTimeout = requireNonNull(noHostAvailableRetryTimeout, "noHostAvailableRetryTimeout is null");
this.skipPartitionCheck = skipPartitionCheck;
this.session = memoize(cluster::connect);
}

Expand Down Expand Up @@ -353,6 +357,10 @@ private CassandraColumnHandle buildColumnHandle(AbstractTableMetadata tableMetad
@Override
public List<CassandraPartition> getPartitions(CassandraTable table, List<Set<Object>> filterPrefixes)
{
if (skipPartitionCheck) {
return buildPartitionsFromFilterPrefixes(table, filterPrefixes);
}

List<CassandraColumnHandle> partitionKeyColumns = table.getPartitionKeyColumns();

if (filterPrefixes.size() != partitionKeyColumns.size()) {
Expand Down Expand Up @@ -420,6 +428,88 @@ public List<CassandraPartition> getPartitions(CassandraTable table, List<Set<Obj
return partitions.build();
}

private List<CassandraPartition> buildPartitionsFromFilterPrefixes(CassandraTable table, List<Set<Object>> filterPrefixes)
{
List<CassandraColumnHandle> partitionKeyColumns = table.getPartitionKeyColumns();

if (filterPrefixes.size() != partitionKeyColumns.size()) {
return ImmutableList.of(CassandraPartition.UNPARTITIONED);
}

ByteBuffer buffer = ByteBuffer.allocate(1000);
HashMap<ColumnHandle, NullableValue> map = new HashMap<>();
Set<String> uniquePartitionIds = new HashSet<>();
StringBuilder stringBuilder = new StringBuilder();

boolean isComposite = partitionKeyColumns.size() > 1;

ImmutableList.Builder<CassandraPartition> partitions = ImmutableList.builder();
for (List<Object> values : Sets.cartesianProduct(filterPrefixes)) {
buffer.clear();
map.clear();
stringBuilder.setLength(0);
for (int i = 0; i < partitionKeyColumns.size(); i++) {
Object value = values.get(i);
CassandraColumnHandle columnHandle = partitionKeyColumns.get(i);
CassandraType cassandraType = columnHandle.getCassandraType();

switch (cassandraType) {
case TEXT:
Slice slice = (Slice) value;
if (isComposite) {
buffer.putShort((short) slice.length());
buffer.put(slice.getBytes());
buffer.put((byte) 0);
}
else {
buffer.put(slice.getBytes());
}
break;
case INT:
int intValue = toIntExact((long) value);
if (isComposite) {
buffer.putShort((short) Integer.BYTES);
buffer.putInt(intValue);
buffer.put((byte) 0);
}
else {
buffer.putInt(intValue);
}
break;
case BIGINT:
if (isComposite) {
buffer.putShort((short) Long.BYTES);
buffer.putLong((long) value);
buffer.put((byte) 0);
}
else {
buffer.putLong((long) value);
}
break;
default:
throw new IllegalStateException("Handling of type " + cassandraType + " is not implemented");
}

map.put(columnHandle, NullableValue.of(cassandraType.getNativeType(), value));
if (i > 0) {
stringBuilder.append(" AND ");
}
stringBuilder.append(CassandraCqlUtils.validColumnName(columnHandle.getName()));
stringBuilder.append(" = ");
stringBuilder.append(CassandraType.getColumnValueForCql(value, cassandraType));
}
buffer.flip();
byte[] key = new byte[buffer.limit()];
buffer.get(key);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.fromFixedValues(map);
String partitionId = stringBuilder.toString();
if (uniquePartitionIds.add(partitionId)) {
partitions.add(new CassandraPartition(key, partitionId, tupleDomain, false));
}
}
return partitions.build();
}

@Override
public ResultSet execute(String cql, Object... values)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class EmbeddedCassandra
private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES);

private static CassandraSession session;
private static ReopeningCluster cluster;
private static boolean initialized;

private EmbeddedCassandra() {}
Expand Down Expand Up @@ -89,7 +90,8 @@ public static synchronized void start()
"EmbeddedCassandra",
JsonCodec.listJsonCodec(ExtraColumnMetadata.class),
cluster,
new Duration(1, MINUTES));
new Duration(1, MINUTES),
false);

try {
checkConnectivity(session);
Expand All @@ -101,6 +103,7 @@ public static synchronized void start()
}

EmbeddedCassandra.session = session;
EmbeddedCassandra.cluster = cluster;
initialized = true;
}

Expand Down Expand Up @@ -141,6 +144,12 @@ public static synchronized int getPort()
return PORT;
}

public static synchronized ReopeningCluster getCluster()
{
checkIsInitialized();
return cluster;
}

private static void checkIsInitialized()
{
checkState(initialized, "EmbeddedCassandra must be started with #start() method before retrieving the cluster retrieval");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void testDefaults()
.setWhiteListAddresses("")
.setNoHostAvailableRetryTimeout(new Duration(1, MINUTES))
.setSpeculativeExecutionLimit(1)
.setSpeculativeExecutionDelay(new Duration(500, MILLISECONDS)));
.setSpeculativeExecutionDelay(new Duration(500, MILLISECONDS))
.setSkipPartitionCheck(false));
}

@Test
Expand Down Expand Up @@ -86,6 +87,7 @@ public void testExplicitPropertyMappings()
.put("cassandra.no-host-available-retry-timeout", "3m")
.put("cassandra.speculative-execution.limit", "10")
.put("cassandra.speculative-execution.delay", "101s")
.put("cassandra.skip-partition-check", "true")
.build();

CassandraClientConfig expected = new CassandraClientConfig()
Expand All @@ -112,7 +114,8 @@ public void testExplicitPropertyMappings()
.setWhiteListAddresses("host1")
.setNoHostAvailableRetryTimeout(new Duration(3, MINUTES))
.setSpeculativeExecutionLimit(10)
.setSpeculativeExecutionDelay(new Duration(101, SECONDS));
.setSpeculativeExecutionDelay(new Duration(101, SECONDS))
.setSkipPartitionCheck(true);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit d7ce5e8

Please sign in to comment.