Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(sql): parallel GROUP BY support for first/last functions #4267

Merged
merged 11 commits into from Mar 6, 2024
Expand Up @@ -44,7 +44,6 @@
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class CairoKeywordBenchmark {


private final long memSize = "abcde.detached".length() + 1;
private final StringSink sink = new StringSink();
private final StringSink sink2 = new StringSink();
Expand Down
Expand Up @@ -37,9 +37,35 @@ public interface GroupByFunction extends Function, Mutable {
default void clear() {
}

void computeFirst(MapValue mapValue, Record record);
/**
* Performs the first aggregation within a group.
* <p>
* Row id is provided for aggregation functions that consider row order, such as first/last.
* The value is guaranteed to be growing between subsequent calls. In case of parallel GROUP BY,
* this means that all row ids of a later page frame are guaranteed to be greater than row ids
* of all previous page frames. {@link Record#getRowId()} shouldn't be used for this purpose
* since not all records implement it, and it's not guaranteed to be growing.
*
* @param mapValue map value holding the group
* @param record record holding the aggregated row
* @param rowId row id; the value may be different from record.getRowId()
*/
void computeFirst(MapValue mapValue, Record record, long rowId);

void computeNext(MapValue mapValue, Record record);
/**
* Performs a subsequent aggregation within a group.
* <p>
* Row id is provided for aggregation functions that consider row order, such as first/last.
* The value is guaranteed to be growing between subsequent calls. In case of parallel GROUP BY,
* this means that all row ids of a later page frame are guaranteed to be greater than row ids
* of all previous page frames. {@link Record#getRowId()} shouldn't be used for this purpose
* since not all records implement it, and it's not guaranteed to be growing.
*
* @param mapValue map value holding the group
* @param record record holding the aggregated row
* @param rowId row id; the value may be different from record.getRowId()
*/
void computeNext(MapValue mapValue, Record record, long rowId);

// only makes sense for non-keyed group by
default boolean earlyExit(MapValue mapValue) {
Expand Down Expand Up @@ -97,10 +123,12 @@ default void merge(MapValue destValue, MapValue srcValue) {
default void setAllocator(GroupByAllocator allocator) {
}

// used when doing interpolation
default void setByte(MapValue mapValue, byte value) {
throw new UnsupportedOperationException();
}

// used when doing interpolation
default void setDouble(MapValue mapValue, double value) {
throw new UnsupportedOperationException();
}
Expand All @@ -109,20 +137,24 @@ default void setEmpty(MapValue value) {
setNull(value);
}

// used when doing interpolation
default void setFloat(MapValue mapValue, float value) {
throw new UnsupportedOperationException();
}

// used when doing interpolation
default void setInt(MapValue mapValue, int value) {
throw new UnsupportedOperationException();
}

// used when doing interpolation
default void setLong(MapValue mapValue, long value) {
throw new UnsupportedOperationException();
}

void setNull(MapValue mapValue);

// used when doing interpolation
default void setShort(MapValue mapValue, short value) {
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ protected AbstractCovarGroupByFunction(@NotNull Function arg0, @NotNull Function
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final double x = xFunction.getDouble(record);
final double y = yFunction.getDouble(record);
mapValue.putDouble(valueIndex, 0);
Expand All @@ -68,7 +68,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final double x = xFunction.getDouble(record);
final double y = yFunction.getDouble(record);
if (Numbers.isFinite(x) && Numbers.isFinite(y)) {
Expand Down
Expand Up @@ -51,7 +51,7 @@ protected AbstractStdDevGroupByFunction(@NotNull Function arg) {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final double d = arg.getDouble(record);
mapValue.putDouble(valueIndex, 0);
mapValue.putDouble(valueIndex + 1, 0);
Expand All @@ -62,7 +62,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final double d = arg.getDouble(record);
if (Numbers.isFinite(d)) {
aggregate(mapValue, d);
Expand Down
Expand Up @@ -64,7 +64,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final int val = arg.getIPv4(record);
if (val != Numbers.IPv4_NULL) {
final long hash = Hash.murmur3ToLong(val);
Expand All @@ -79,7 +79,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final int val = arg.getIPv4(record);
if (val != Numbers.IPv4_NULL) {
final long hash = Hash.murmur3ToLong(val);
Expand Down
Expand Up @@ -64,7 +64,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final int val = arg.getInt(record);
if (val != Numbers.INT_NaN) {
final long hash = Hash.murmur3ToLong(val);
Expand All @@ -79,7 +79,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final int val = arg.getInt(record);
if (val != Numbers.INT_NaN) {
final long hash = Hash.murmur3ToLong(val);
Expand Down
Expand Up @@ -64,7 +64,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final long val = arg.getLong(record);
if (val != Numbers.LONG_NaN) {
final long hash = Hash.murmur3ToLong(val);
Expand All @@ -79,7 +79,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final long val = arg.getLong(record);
if (val != Numbers.LONG_NaN) {
final long hash = Hash.murmur3ToLong(val);
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final DoubleHistogram histogram;
if (histograms.size() <= histogramIndex) {
// We pre-size the histogram for 1000x ratio to avoid resizes in some basic use cases
Expand All @@ -83,7 +83,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final DoubleHistogram histogram = histograms.getQuick(mapValue.getInt(valueIndex));
final double val = exprFunc.getDouble(record);
if (Numbers.isFinite(val)) {
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final PackedDoubleHistogram histogram;
if (histograms.size() <= histogramIndex) {
// We pre-size the histogram for 1000x ratio to avoid resizes in some basic use cases
Expand All @@ -83,7 +83,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final PackedDoubleHistogram histogram = histograms.getQuick(mapValue.getInt(valueIndex));
final double val = exprFunc.getDouble(record);
if (Numbers.isFinite(val)) {
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final Histogram histogram;
if (histograms.size() <= histogramIndex) {
// We pre-size the histogram for [1, 1000] range to avoid resizes in some basic use cases
Expand All @@ -83,7 +83,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final Histogram histogram = histograms.getQuick(mapValue.getInt(valueIndex));
final long val = exprFunc.getLong(record);
if (val != Numbers.LONG_NaN) {
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final PackedHistogram histogram;
if (histograms.size() <= histogramIndex) {
// We pre-size the histogram for [1, 1000] range to avoid resizes in some basic use cases
Expand All @@ -83,7 +83,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final PackedHistogram histogram = histograms.getQuick(mapValue.getInt(valueIndex));
final long val = exprFunc.getLong(record);
if (val != Numbers.LONG_NaN) {
Expand Down
Expand Up @@ -44,7 +44,7 @@ public AvgDoubleGroupByFunction(@NotNull Function arg) {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final double d = arg.getDouble(record);
if (Numbers.isFinite(d)) {
mapValue.putDouble(valueIndex, d);
Expand All @@ -56,7 +56,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final double d = arg.getDouble(record);
if (Numbers.isFinite(d)) {
mapValue.addDouble(valueIndex, d);
Expand Down
Expand Up @@ -53,7 +53,7 @@ protected CorrGroupByFunction(@NotNull Function arg0, @NotNull Function arg1) {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final double x = xFunction.getDouble(record);
final double y = yFunction.getDouble(record);
mapValue.putDouble(valueIndex, 0);
Expand All @@ -69,7 +69,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final double x = xFunction.getDouble(record);
final double y = yFunction.getDouble(record);
if (Numbers.isFinite(x) && Numbers.isFinite(y)) {
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final int val = arg.getIPv4(record);
if (val != Numbers.IPv4_NULL) {
mapValue.putLong(valueIndex, 1);
Expand All @@ -69,7 +69,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final int val = arg.getIPv4(record);
if (val != Numbers.IPv4_NULL) {
long ptr = mapValue.getLong(valueIndex + 1);
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
int val = arg.getInt(record);
if (val != Numbers.INT_NaN) {
mapValue.putLong(valueIndex, 1);
Expand All @@ -71,7 +71,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
int val = arg.getInt(record);
if (val != Numbers.INT_NaN) {
long ptr = mapValue.getLong(valueIndex + 1);
Expand Down
Expand Up @@ -34,7 +34,9 @@
import io.questdb.griffin.engine.functions.UnaryFunction;
import io.questdb.griffin.engine.groupby.GroupByAllocator;
import io.questdb.griffin.engine.groupby.GroupByLong256HashSet;
import io.questdb.std.*;
import io.questdb.std.Long256;
import io.questdb.std.Long256Impl;
import io.questdb.std.Numbers;

public class CountDistinctLong256GroupByFunction extends LongFunction implements UnaryFunction, GroupByFunction {
private final Function arg;
Expand All @@ -56,7 +58,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final Long256 l256 = arg.getLong256A(record);

if (isNotNull(l256)) {
Expand All @@ -76,12 +78,13 @@ public void computeFirst(MapValue mapValue, Record record) {
mapValue.putLong(valueIndex + 1, setA.ptr());
} else {
mapValue.putLong(valueIndex, 0);
mapValue.putLong(valueIndex + 1, 0);;
mapValue.putLong(valueIndex + 1, 0);
;
}
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final Long256 l256 = arg.getLong256A(record);

if (isNotNull(l256)) {
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
long val = arg.getLong(record);
if (val != Numbers.LONG_NaN) {
mapValue.putLong(valueIndex, 1);
Expand All @@ -71,7 +71,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
long val = arg.getLong(record);
if (val != Numbers.LONG_NaN) {
long ptr = mapValue.getLong(valueIndex + 1);
Expand Down
Expand Up @@ -58,7 +58,7 @@ public void clear() {
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
public void computeFirst(MapValue mapValue, Record record, long rowId) {
final CompactCharSequenceHashSet set;
if (sets.size() <= setIndex) {
sets.extendAndSet(setIndex, set = new CompactCharSequenceHashSet(setInitialCapacity, setLoadFactor));
Expand All @@ -78,7 +78,7 @@ public void computeFirst(MapValue mapValue, Record record) {
}

@Override
public void computeNext(MapValue mapValue, Record record) {
public void computeNext(MapValue mapValue, Record record, long rowId) {
final CompactCharSequenceHashSet set = sets.getQuick(mapValue.getInt(valueIndex + 1));
final CharSequence val = arg.getStr(record);
if (val != null) {
Expand Down