Skip to content

Commit

Permalink
perf(sql): support uuid and long256 in parallel GROUP BY (#4140)
Browse files Browse the repository at this point in the history
  • Loading branch information
nwoolmer committed Jan 22, 2024
1 parent 6119117 commit cd11bdd
Show file tree
Hide file tree
Showing 27 changed files with 2,021 additions and 356 deletions.
@@ -0,0 +1,110 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2023 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.questdb;

import io.questdb.cairo.ColumnType;
import io.questdb.cairo.DefaultCairoConfiguration;
import io.questdb.cairo.SingleColumnType;
import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.OrderedMap;
import io.questdb.griffin.engine.groupby.GroupByAllocator;
import io.questdb.griffin.engine.groupby.GroupByLong128HashSet;
import io.questdb.std.Numbers;
import io.questdb.std.Rnd;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.TimeUnit;

@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class GroupByLong128HashSetBenchmark {
private static final double loadFactor = 0.7;
private static final int orderedMapPageSize = 1024 * 1024;
private static final long groupByAllocatorDefaultChunkSize = 128 * 1024;
private static final long groupByAllocatorMaxChunkSize = Numbers.SIZE_1GB * 4;

private static final Rnd rnd = new Rnd();

@Param({"2500", "25000", "250000", "2500000"})
public int size;

private static final GroupByAllocator allocator = new GroupByAllocator(new DefaultCairoConfiguration(null) {
@Override
public long getGroupByAllocatorDefaultChunkSize() {
return groupByAllocatorDefaultChunkSize;
}

@Override
public long getGroupByAllocatorMaxChunkSize() {
return groupByAllocatorMaxChunkSize;
}
});
private static final OrderedMap orderedMap = new OrderedMap(orderedMapPageSize, new SingleColumnType(ColumnType.UUID), null, 64, loadFactor, Integer.MAX_VALUE);
private static final GroupByLong128HashSet groupByLong128HashSet = new GroupByLong128HashSet(64, loadFactor, 0);
private static long ptr = 0;

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(GroupByLong128HashSetBenchmark.class.getSimpleName())
.warmupIterations(3)
.measurementIterations(3)
.forks(1)
.build();

new Runner(opt).run();
}

@Setup(Level.Iteration)
public void reset() {
orderedMap.clear();
allocator.close();
groupByLong128HashSet.setAllocator(allocator);
ptr = 0;
rnd.reset();
}

@Benchmark
public void testOrderedMap() {
MapKey key = orderedMap.withKey();
key.putLong128(rnd.nextLong(size), rnd.nextLong(size));
key.createValue();
}

@Benchmark
public void testGroupByLong128HashSet() {
long lo = rnd.nextLong(size);
long hi = rnd.nextLong(size);
int index = groupByLong128HashSet.of(ptr).keyIndex(lo, hi);
if (index >= 0) {
groupByLong128HashSet.addAt(index, lo, hi);
ptr = groupByLong128HashSet.ptr();
}
}
}
@@ -0,0 +1,112 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2023 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.questdb;

import io.questdb.cairo.ColumnType;
import io.questdb.cairo.DefaultCairoConfiguration;
import io.questdb.cairo.SingleColumnType;
import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.OrderedMap;
import io.questdb.griffin.engine.groupby.GroupByAllocator;
import io.questdb.griffin.engine.groupby.GroupByLong256HashSet;
import io.questdb.std.Numbers;
import io.questdb.std.Rnd;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.TimeUnit;

@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class GroupByLong256HashSetBenchmark {
private static final double loadFactor = 0.7;
private static final int orderedMapPageSize = 1024 * 1024;
private static final long groupByAllocatorDefaultChunkSize = 128 * 1024;
private static final long groupByAllocatorMaxChunkSize = Numbers.SIZE_1GB * 8;

private static final Rnd rnd = new Rnd();

@Param({"1250", "12500", "125000", "1250000"})
public int size;

private static final GroupByAllocator allocator = new GroupByAllocator(new DefaultCairoConfiguration(null) {
@Override
public long getGroupByAllocatorDefaultChunkSize() {
return groupByAllocatorDefaultChunkSize;
}

@Override
public long getGroupByAllocatorMaxChunkSize() {
return groupByAllocatorMaxChunkSize;
}
});
private static final OrderedMap orderedMap = new OrderedMap(orderedMapPageSize, new SingleColumnType(ColumnType.LONG256), null, 64, loadFactor, Integer.MAX_VALUE);
private static final GroupByLong256HashSet groupByLong256HashSet = new GroupByLong256HashSet(64, loadFactor, 0);
private static long ptr = 0;

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(GroupByLong256HashSetBenchmark.class.getSimpleName())
.warmupIterations(3)
.measurementIterations(3)
.forks(1)
.build();

new Runner(opt).run();
}

@Setup(Level.Iteration)
public void reset() {
orderedMap.clear();
allocator.close();
groupByLong256HashSet.setAllocator(allocator);
ptr = 0;
rnd.reset();
}

@Benchmark
public void testOrderedMap() {
MapKey key = orderedMap.withKey();
key.putLong256(rnd.nextLong(size), rnd.nextLong(size), rnd.nextLong(size), rnd.nextLong(size));
key.createValue();
}

@Benchmark
public void testGroupByLong256HashSet() {
long l0 = rnd.nextLong(size);
long l1 = rnd.nextLong(size);
long l2 = rnd.nextLong(size);
long l3 = rnd.nextLong(size);
int index = groupByLong256HashSet.of(ptr).keyIndex(l0, l1, l2, l3);
if (index >= 0) {
groupByLong256HashSet.addAt(index, l0, l1, l2, l3);
ptr = groupByLong256HashSet.ptr();
}
}
}
Expand Up @@ -42,20 +42,27 @@

@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class GroupByLongHashSetBenchmark {

private static final long N = 1_000_000;
private static final double loadFactor = 0.7;
private static final int orderedMapPageSize = 1024 * 1024;
private static final long groupByAllocatorDefaultChunkSize = 128 * 1024;
private static final Rnd rnd = new Rnd();

@Param({"5000", "50000", "500000", "5000000"})
public int size;

private static final GroupByAllocator allocator = new GroupByAllocator(new DefaultCairoConfiguration(null) {
@Override
public long getGroupByAllocatorDefaultChunkSize() {
return 128 * 1024;
return groupByAllocatorDefaultChunkSize;
}
});
private static final OrderedMap fmap = new OrderedMap(1024 * 1024, new SingleColumnType(ColumnType.LONG), null, 16, 0.7f, Integer.MAX_VALUE);
private static final GroupByLongHashSet gbset = new GroupByLongHashSet(16, 0.7, 0);

private static final OrderedMap orderedMap = new OrderedMap(orderedMapPageSize, new SingleColumnType(ColumnType.LONG), null, 64, loadFactor, Integer.MAX_VALUE);
private static final GroupByLongHashSet groupByLongHashSet = new GroupByLongHashSet(64, loadFactor, 0);
private static long ptr = 0;
private final Rnd rnd = new Rnd();

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
Expand All @@ -68,35 +75,29 @@ public static void main(String[] args) throws RunnerException {
new Runner(opt).run();
}

@Benchmark
public long baseline() {
return rnd.nextLong(N);
}

@Setup(Level.Iteration)
public void reset() {
fmap.close();
fmap.reopen();
orderedMap.clear();
allocator.close();
gbset.setAllocator(allocator);
groupByLongHashSet.setAllocator(allocator);
ptr = 0;
rnd.reset();
}

@Benchmark
public void testFastMap() {
MapKey key = fmap.withKey();
key.putLong(rnd.nextLong(N));
public void testOrderedMap() {
MapKey key = orderedMap.withKey();
key.putLong(rnd.nextLong(size));
key.createValue();
}

@Benchmark
public void testGroupByLongHashSet() {
long value = rnd.nextLong(N);
int index = gbset.of(ptr).keyIndex(value);
long value = rnd.nextLong(size);
int index = groupByLongHashSet.of(ptr).keyIndex(value);
if (index >= 0) {
gbset.addAt(index, value);
ptr = gbset.ptr();
groupByLongHashSet.addAt(index, value);
ptr = groupByLongHashSet.ptr();
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/io/questdb/cairo/RecordChain.java
Expand Up @@ -230,6 +230,11 @@ public void putLong256(Long256 value) {
mem.putLong256(value);
}

@Override
public void putLong256(long l0, long l1, long l2, long l3) {
mem.putLong256(l0, l1, l2, l3);
}

@Override
public void putRecord(Record value) {
// noop
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/io/questdb/cairo/RecordSinkSPI.java
Expand Up @@ -51,6 +51,8 @@ public interface RecordSinkSPI {

void putLong256(Long256 value);

void putLong256(long l0, long l1, long l2, long l3);

void putRecord(Record value);

void putShort(short value);
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/io/questdb/cairo/map/MapKey.java
Expand Up @@ -62,4 +62,5 @@ default boolean notFound() {
}

void put(Record record, RecordSink sink);

}
21 changes: 21 additions & 0 deletions core/src/main/java/io/questdb/cairo/map/OrderedMap.java
Expand Up @@ -668,6 +668,15 @@ public void putLong256(Long256 value) {
appendAddress += 32L;
}

@Override
public void putLong256(long l0, long l1, long l2, long l3) {
Unsafe.getUnsafe().putLong(appendAddress, l0);
Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES, l1);
Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 2, l2);
Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 3, l3);
appendAddress += 32L;
}

@Override
public void putShort(short value) {
Unsafe.getUnsafe().putShort(appendAddress, value);
Expand Down Expand Up @@ -795,6 +804,8 @@ protected void checkCapacity(long requiredKeySize) {

abstract void copyFromRawKey(long srcPtr, long srcSize);

public abstract void putLong256(long l0, long l1, long l2, long l3);

protected abstract boolean eq(long offset);
}

Expand Down Expand Up @@ -916,6 +927,16 @@ public void putLong256(Long256 value) {
appendAddress += 32L;
}

@Override
public void putLong256(long l0, long l1, long l2, long l3) {
checkCapacity(32L);
Unsafe.getUnsafe().putLong(appendAddress, l0);
Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES, l1);
Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 2, l2);
Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 3, l3);
appendAddress += 32L;
}

@Override
public void putShort(short value) {
checkCapacity(2L);
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/questdb/cairo/map/Unordered16Map.java
Expand Up @@ -583,6 +583,11 @@ public void putLong256(Long256 value) {
throw new UnsupportedOperationException();
}

@Override
public void putLong256(long l0, long l1, long l2, long l3) {
throw new UnsupportedOperationException();
}

@Override
public void putRecord(Record value) {
// no-op
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/questdb/cairo/map/Unordered8Map.java
Expand Up @@ -567,6 +567,11 @@ public void putLong256(Long256 value) {
throw new UnsupportedOperationException();
}

@Override
public void putLong256(long l0, long l1, long l2, long l3) {
throw new UnsupportedOperationException();
}

@Override
public void putRecord(Record value) {
// no-op
Expand Down

0 comments on commit cd11bdd

Please sign in to comment.