Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comparable bytes to row decorator #1750

Merged
merged 8 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ protected AbstractRowDecorator(TableName table, List<String> partitionKeyColumnN

protected abstract ComparableKey<?> decoratePrimaryKey(Object... rawKeyValues);

@Override
public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row) {

protected Object[] primaryKeyValues(Row row) {
Object[] pkValues = new Object[partitionKeyColumnNames.size()];

int idx = 0;
for (String columnName : partitionKeyColumnNames) {
ByteBuffer value = row.getBytesUnsafe(columnName);
Expand All @@ -51,6 +48,13 @@ public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row)
pkValues[idx++] = value;
}

return pkValues;
}

@Override
public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row) {
Object[] pkValues = primaryKeyValues(row);

//noinspection unchecked
return (ComparableKey<T>) decoratePrimaryKey(pkValues);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.stargate.db.datastore.ResultSet;
import io.stargate.db.datastore.Row;
import java.nio.ByteBuffer;

/**
* A table-specific interface for extracting key column values from a {@link ResultSet} {@link Row}
Expand All @@ -29,4 +30,10 @@ public interface RowDecorator {
* same order that queries iterate / paginate over the Cassandra data ring.
*/
<T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row);

/**
* Generates the comparable byte value of the {@link Row}, using the "comparable bytes" API of the
* underlying datastore. If no such API exists, an empty ByteBuffer is returned.
*/
ByteBuffer getComparableBytes(Row row);
EricBorczuk marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.stargate.db.AbstractRowDecorator;
import io.stargate.db.ComparableKey;
import io.stargate.db.datastore.Row;
import io.stargate.db.schema.TableName;
import java.nio.ByteBuffer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -49,4 +50,11 @@ protected ComparableKey<?> decoratePrimaryKey(Object... pkValues) {
DecoratedKey decoratedKey = metadata.partitioner.decorateKey(serializedKey);
return new ComparableKey<>(PartitionPosition.class, decoratedKey);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
// TODO replace this with the relevant row's byte-comparable value when
// https://github.com/apache/cassandra/pull/1294 is ready
return ByteBuffer.wrap(new byte[] {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import io.stargate.db.AbstractRowDecorator;
import io.stargate.db.ComparableKey;
import io.stargate.db.datastore.Row;
import io.stargate.db.schema.TableName;
import java.nio.ByteBuffer;
import java.util.stream.Collectors;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
Expand Down Expand Up @@ -47,4 +49,11 @@ protected ComparableKey<?> decoratePrimaryKey(Object... pkValues) {
DecoratedKey decoratedKey = metadata.partitioner.decorateKey(key.serializeAsPartitionKey());
return new ComparableKey<>(PartitionPosition.class, decoratedKey);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
// TODO replace this with the relevant row's byte-comparable value when
// https://github.com/apache/cassandra/pull/1294 is ready
return ByteBuffer.wrap(new byte[] {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
*/
package io.stargate.db.dse.impl;

import com.datastax.bdp.db.tries.util.ByteSourceUtil;
import io.stargate.db.AbstractRowDecorator;
import io.stargate.db.ComparableKey;
import io.stargate.db.datastore.Row;
import io.stargate.db.schema.TableName;
import java.nio.ByteBuffer;
import java.util.stream.Collectors;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteComparable;
import org.apache.cassandra.utils.ByteSource;

public class RowDecoratorImpl extends AbstractRowDecorator {

Expand All @@ -37,10 +42,23 @@ public RowDecoratorImpl(TableName tableName, TableMetadata tableMetadata) {
this.metadata = tableMetadata;
}

@Override
protected ComparableKey<?> decoratePrimaryKey(Object... rawKeyValues) {
private DecoratedKey decoratedKeyFromRawKeyValues(Object... rawKeyValues) {
Clustering key = metadata.partitionKeyAsClusteringComparator().make(rawKeyValues);
DecoratedKey decoratedKey = metadata.partitioner.decorateKey(key.serializeAsPartitionKey());
return decoratedKey;
}

@Override
protected ComparableKey<?> decoratePrimaryKey(Object... rawKeyValues) {
DecoratedKey decoratedKey = decoratedKeyFromRawKeyValues(rawKeyValues);
return new ComparableKey<>(PartitionPosition.class, decoratedKey);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
DecoratedKey decoratedKey = decoratedKeyFromRawKeyValues(primaryKeyValues(row));
ByteSource src = decoratedKey.asComparableBytes(ByteComparable.Version.DSE68);
int initialBufSize = 256;
return ByteBuffer.wrap(ByteSourceUtil.readBytes(src, initialBufSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.jcip.annotations.NotThreadSafe;
import org.apache.cassandra.stargate.utils.FastByteOperations.PureJavaOperations;
import org.assertj.core.api.Assertions;
import org.javatuples.Pair;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1964,6 +1965,54 @@ public void testRowDecorator() throws ExecutionException, InterruptedException {
assertGt(last, first, dec1, dec2);
}

@Test
public void testRowDecoratorComparableBytes() throws ExecutionException, InterruptedException {
setupCustomPagingData();

// Obtain partition keys in "ring" order
AbstractBound<?> selectAll =
dataStore
.queryBuilder()
.select()
.column("pk")
.column("val")
.from(keyspace, table)
.build()
.bind();

ResultSet rs1 = dataStore.execute(selectAll).get();
Iterator<Row> it1 = rs1.iterator();
RowDecorator dec1 = rs1.makeRowDecorator();

ResultSet rs2 = dataStore.execute(selectAll).get();
RowDecorator dec2 = rs2.makeRowDecorator();
Iterator<Row> it2 = rs2.iterator();

if (backend.isDse()) {
Row first = null;
Row last = null;
Row p1 = null;
PureJavaOperations ops = new PureJavaOperations();
while (it1.hasNext()) {
assertThat(it2.hasNext()).isTrue();
Row r1 = it1.next();
Row r2 = it2.next();
first = first == null ? r1 : first;
last = r1;
assertThat(ops.compare(dec1.getComparableBytes(r1), dec2.getComparableBytes(r2)))
.isEqualTo(0);
if (p1 == null) {
p1 = r1;
}
assertThat(ops.compare(dec1.getComparableBytes(r1), dec2.getComparableBytes(p1)))
.isGreaterThanOrEqualTo(0);
}
assertThat(it2.hasNext()).isFalse();
assertThat(ops.compare(dec1.getComparableBytes(last), dec2.getComparableBytes(first)))
.isGreaterThan(0);
}
}

private boolean isCassandra4() {
return !backend.isDse()
&& Version.parse(backend.clusterVersion()).nextStable().compareTo(Version.V4_0_0) >= 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.stargate.db.ComparableKey;
import io.stargate.db.RowDecorator;
import io.stargate.db.schema.Column;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -34,13 +36,22 @@ public AlphabeticalOrderPartitionKeyDecorator(List<Column> allColumns) {
allColumns.stream().filter(Column::isPartitionKey).collect(Collectors.toList());
}

private String decoratedKeyString(Row row) {
return partitionKeyColumns.stream()
.map(column -> row.getString(column.name()))
.collect(Collectors.joining("|"));
}

@Override
public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row) {
String decorated =
partitionKeyColumns.stream()
.map(column -> row.getString(column.name()))
.collect(Collectors.joining("|"));
String decorated = decoratedKeyString(row);
//noinspection unchecked
return (ComparableKey<T>) new ComparableKey<>(String.class, decorated);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
String decorated = decoratedKeyString(row);
return ByteBuffer.wrap(decorated.getBytes(StandardCharsets.UTF_8));
}
}