Skip to content

Commit

Permalink
Add comparable bytes to row decorator (#1750)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricBorczuk committed Apr 4, 2022
1 parent 4b58d95 commit d3867fc
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ protected AbstractRowDecorator(TableName table, List<String> partitionKeyColumnN

protected abstract ComparableKey<?> decoratePrimaryKey(Object... rawKeyValues);

@Override
public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row) {

protected Object[] primaryKeyValues(Row row) {
Object[] pkValues = new Object[partitionKeyColumnNames.size()];

int idx = 0;
for (String columnName : partitionKeyColumnNames) {
ByteBuffer value = row.getBytesUnsafe(columnName);
Expand All @@ -51,6 +48,13 @@ public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row)
pkValues[idx++] = value;
}

return pkValues;
}

@Override
public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row) {
Object[] pkValues = primaryKeyValues(row);

//noinspection unchecked
return (ComparableKey<T>) decoratePrimaryKey(pkValues);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.stargate.db.datastore.ResultSet;
import io.stargate.db.datastore.Row;
import java.nio.ByteBuffer;

/**
* A table-specific interface for extracting key column values from a {@link ResultSet} {@link Row}
Expand All @@ -29,4 +30,10 @@ public interface RowDecorator {
* same order that queries iterate / paginate over the Cassandra data ring.
*/
<T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row);

/**
* Generates the comparable byte value of the {@link Row}, using the "comparable bytes" API of the
* underlying datastore. If no such API exists, an empty ByteBuffer is returned.
*/
ByteBuffer getComparableBytes(Row row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.stargate.db.AbstractRowDecorator;
import io.stargate.db.ComparableKey;
import io.stargate.db.datastore.Row;
import io.stargate.db.schema.TableName;
import java.nio.ByteBuffer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -49,4 +50,11 @@ protected ComparableKey<?> decoratePrimaryKey(Object... pkValues) {
DecoratedKey decoratedKey = metadata.partitioner.decorateKey(serializedKey);
return new ComparableKey<>(PartitionPosition.class, decoratedKey);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
// TODO replace this with the relevant row's byte-comparable value when
// https://github.com/apache/cassandra/pull/1294 is ready
return ByteBuffer.wrap(new byte[] {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import io.stargate.db.AbstractRowDecorator;
import io.stargate.db.ComparableKey;
import io.stargate.db.datastore.Row;
import io.stargate.db.schema.TableName;
import java.nio.ByteBuffer;
import java.util.stream.Collectors;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
Expand Down Expand Up @@ -47,4 +49,11 @@ protected ComparableKey<?> decoratePrimaryKey(Object... pkValues) {
DecoratedKey decoratedKey = metadata.partitioner.decorateKey(key.serializeAsPartitionKey());
return new ComparableKey<>(PartitionPosition.class, decoratedKey);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
// TODO replace this with the relevant row's byte-comparable value when
// https://github.com/apache/cassandra/pull/1294 is ready
return ByteBuffer.wrap(new byte[] {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
*/
package io.stargate.db.dse.impl;

import com.datastax.bdp.db.tries.util.ByteSourceUtil;
import io.stargate.db.AbstractRowDecorator;
import io.stargate.db.ComparableKey;
import io.stargate.db.datastore.Row;
import io.stargate.db.schema.TableName;
import java.nio.ByteBuffer;
import java.util.stream.Collectors;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteComparable;
import org.apache.cassandra.utils.ByteSource;

public class RowDecoratorImpl extends AbstractRowDecorator {

Expand All @@ -37,10 +42,23 @@ public RowDecoratorImpl(TableName tableName, TableMetadata tableMetadata) {
this.metadata = tableMetadata;
}

@Override
protected ComparableKey<?> decoratePrimaryKey(Object... rawKeyValues) {
private DecoratedKey decoratedKeyFromRawKeyValues(Object... rawKeyValues) {
Clustering key = metadata.partitionKeyAsClusteringComparator().make(rawKeyValues);
DecoratedKey decoratedKey = metadata.partitioner.decorateKey(key.serializeAsPartitionKey());
return decoratedKey;
}

@Override
protected ComparableKey<?> decoratePrimaryKey(Object... rawKeyValues) {
DecoratedKey decoratedKey = decoratedKeyFromRawKeyValues(rawKeyValues);
return new ComparableKey<>(PartitionPosition.class, decoratedKey);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
DecoratedKey decoratedKey = decoratedKeyFromRawKeyValues(primaryKeyValues(row));
ByteSource src = decoratedKey.asComparableBytes(ByteComparable.Version.DSE68);
int initialBufSize = 256;
return ByteBuffer.wrap(ByteSourceUtil.readBytes(src, initialBufSize));
}
}
49 changes: 49 additions & 0 deletions persistence-test/src/main/java/io/stargate/it/PersistenceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.jcip.annotations.NotThreadSafe;
import org.apache.cassandra.stargate.utils.FastByteOperations.PureJavaOperations;
import org.assertj.core.api.Assertions;
import org.javatuples.Pair;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1964,6 +1965,54 @@ public void testRowDecorator() throws ExecutionException, InterruptedException {
assertGt(last, first, dec1, dec2);
}

@Test
public void testRowDecoratorComparableBytes() throws ExecutionException, InterruptedException {
setupCustomPagingData();

// Obtain partition keys in "ring" order
AbstractBound<?> selectAll =
dataStore
.queryBuilder()
.select()
.column("pk")
.column("val")
.from(keyspace, table)
.build()
.bind();

ResultSet rs1 = dataStore.execute(selectAll).get();
Iterator<Row> it1 = rs1.iterator();
RowDecorator dec1 = rs1.makeRowDecorator();

ResultSet rs2 = dataStore.execute(selectAll).get();
RowDecorator dec2 = rs2.makeRowDecorator();
Iterator<Row> it2 = rs2.iterator();

if (backend.isDse()) {
Row first = null;
Row last = null;
Row p1 = null;
PureJavaOperations ops = new PureJavaOperations();
while (it1.hasNext()) {
assertThat(it2.hasNext()).isTrue();
Row r1 = it1.next();
Row r2 = it2.next();
first = first == null ? r1 : first;
last = r1;
assertThat(ops.compare(dec1.getComparableBytes(r1), dec2.getComparableBytes(r2)))
.isEqualTo(0);
if (p1 == null) {
p1 = r1;
}
assertThat(ops.compare(dec1.getComparableBytes(r1), dec2.getComparableBytes(p1)))
.isGreaterThanOrEqualTo(0);
}
assertThat(it2.hasNext()).isFalse();
assertThat(ops.compare(dec1.getComparableBytes(last), dec2.getComparableBytes(first)))
.isGreaterThan(0);
}
}

private boolean isCassandra4() {
return !backend.isDse()
&& Version.parse(backend.clusterVersion()).nextStable().compareTo(Version.V4_0_0) >= 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.stargate.db.ComparableKey;
import io.stargate.db.RowDecorator;
import io.stargate.db.schema.Column;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -34,13 +36,22 @@ public AlphabeticalOrderPartitionKeyDecorator(List<Column> allColumns) {
allColumns.stream().filter(Column::isPartitionKey).collect(Collectors.toList());
}

private String decoratedKeyString(Row row) {
return partitionKeyColumns.stream()
.map(column -> row.getString(column.name()))
.collect(Collectors.joining("|"));
}

@Override
public <T extends Comparable<T>> ComparableKey<T> decoratePartitionKey(Row row) {
String decorated =
partitionKeyColumns.stream()
.map(column -> row.getString(column.name()))
.collect(Collectors.joining("|"));
String decorated = decoratedKeyString(row);
//noinspection unchecked
return (ComparableKey<T>) new ComparableKey<>(String.class, decorated);
}

@Override
public ByteBuffer getComparableBytes(Row row) {
String decorated = decoratedKeyString(row);
return ByteBuffer.wrap(decorated.getBytes(StandardCharsets.UTF_8));
}
}

0 comments on commit d3867fc

Please sign in to comment.