From cd11bdd6b22f08ae2a5c8343104f3b5cea94daf0 Mon Sep 17 00:00:00 2001 From: Nick Woolmer <29717167+nwoolmer@users.noreply.github.com> Date: Mon, 22 Jan 2024 10:48:31 +0000 Subject: [PATCH] perf(sql): support uuid and long256 in parallel GROUP BY (#4140) --- .../GroupByLong128HashSetBenchmark.java | 110 ++++++++ .../GroupByLong256HashSetBenchmark.java | 112 ++++++++ .../questdb/GroupByLongHashSetBenchmark.java | 43 +-- .../java/io/questdb/cairo/RecordChain.java | 5 + .../java/io/questdb/cairo/RecordSinkSPI.java | 2 + .../java/io/questdb/cairo/map/MapKey.java | 1 + .../java/io/questdb/cairo/map/OrderedMap.java | 21 ++ .../io/questdb/cairo/map/Unordered16Map.java | 5 + .../io/questdb/cairo/map/Unordered8Map.java | 5 + .../engine/functions/QuaternaryFunction.java | 113 ++++++++ .../CountDistinctLong256GroupByFunction.java | 119 ++++++--- .../CountDistinctLongGroupByFunction.java | 2 +- .../CountDistinctUuidGroupByFunction.java | 110 ++++++-- .../LongsToLong256FunctionFactory.java | 139 ++++++++++ .../engine/groupby/GroupByLong128HashSet.java | 238 +++++++++++++++++ .../engine/groupby/GroupByLong256HashSet.java | 250 ++++++++++++++++++ core/src/main/java/io/questdb/std/Hash.java | 8 + .../java/io/questdb/std/Long256HashSet.java | 193 -------------- .../main/java/io/questdb/std/Long256Impl.java | 13 + core/src/main/java/module-info.java | 1 + .../io.questdb.griffin.FunctionFactory | 1 + .../test/cairo/RecordSinkFactoryTest.java | 5 + ...inctLong256GroupByFunctionFactoryTest.java | 236 +++++++++++++++++ ...istinctUuidGroupByFunctionFactoryTest.java | 235 ++++++++++++++++ .../groupby/GroupByLong128HashSetTest.java | 146 ++++++++++ .../groupby/GroupByLong256HashSetTest.java | 182 +++++++++++++ .../questdb/test/std/Long256HashSetTest.java | 82 ------ 27 files changed, 2021 insertions(+), 356 deletions(-) create mode 100644 benchmarks/src/main/java/org/questdb/GroupByLong128HashSetBenchmark.java create mode 100644 benchmarks/src/main/java/org/questdb/GroupByLong256HashSetBenchmark.java create mode 100644 core/src/main/java/io/questdb/griffin/engine/functions/QuaternaryFunction.java create mode 100644 core/src/main/java/io/questdb/griffin/engine/functions/long256/LongsToLong256FunctionFactory.java create mode 100644 core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong128HashSet.java create mode 100644 core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong256HashSet.java delete mode 100644 core/src/main/java/io/questdb/std/Long256HashSet.java create mode 100644 core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunctionFactoryTest.java create mode 100644 core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunctionFactoryTest.java create mode 100644 core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong128HashSetTest.java create mode 100644 core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong256HashSetTest.java delete mode 100644 core/src/test/java/io/questdb/test/std/Long256HashSetTest.java diff --git a/benchmarks/src/main/java/org/questdb/GroupByLong128HashSetBenchmark.java b/benchmarks/src/main/java/org/questdb/GroupByLong128HashSetBenchmark.java new file mode 100644 index 000000000000..95dca3ade15d --- /dev/null +++ b/benchmarks/src/main/java/org/questdb/GroupByLong128HashSetBenchmark.java @@ -0,0 +1,110 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.questdb; + +import io.questdb.cairo.ColumnType; +import io.questdb.cairo.DefaultCairoConfiguration; +import io.questdb.cairo.SingleColumnType; +import io.questdb.cairo.map.MapKey; +import io.questdb.cairo.map.OrderedMap; +import io.questdb.griffin.engine.groupby.GroupByAllocator; +import io.questdb.griffin.engine.groupby.GroupByLong128HashSet; +import io.questdb.std.Numbers; +import io.questdb.std.Rnd; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.concurrent.TimeUnit; + +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class GroupByLong128HashSetBenchmark { + private static final double loadFactor = 0.7; + private static final int orderedMapPageSize = 1024 * 1024; + private static final long groupByAllocatorDefaultChunkSize = 128 * 1024; + private static final long groupByAllocatorMaxChunkSize = Numbers.SIZE_1GB * 4; + + private static final Rnd rnd = new Rnd(); + + @Param({"2500", "25000", "250000", "2500000"}) + public int size; + + private static final GroupByAllocator allocator = new GroupByAllocator(new DefaultCairoConfiguration(null) { + @Override + public long getGroupByAllocatorDefaultChunkSize() { + return groupByAllocatorDefaultChunkSize; + } + + @Override + public long getGroupByAllocatorMaxChunkSize() { + return groupByAllocatorMaxChunkSize; + } + }); + private static final OrderedMap orderedMap = new OrderedMap(orderedMapPageSize, new SingleColumnType(ColumnType.UUID), null, 64, loadFactor, Integer.MAX_VALUE); + private static final GroupByLong128HashSet groupByLong128HashSet = new GroupByLong128HashSet(64, loadFactor, 0); + private static long ptr = 0; + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(GroupByLong128HashSetBenchmark.class.getSimpleName()) + .warmupIterations(3) + .measurementIterations(3) + .forks(1) + .build(); + + new Runner(opt).run(); + } + + @Setup(Level.Iteration) + public void reset() { + orderedMap.clear(); + allocator.close(); + groupByLong128HashSet.setAllocator(allocator); + ptr = 0; + rnd.reset(); + } + + @Benchmark + public void testOrderedMap() { + MapKey key = orderedMap.withKey(); + key.putLong128(rnd.nextLong(size), rnd.nextLong(size)); + key.createValue(); + } + + @Benchmark + public void testGroupByLong128HashSet() { + long lo = rnd.nextLong(size); + long hi = rnd.nextLong(size); + int index = groupByLong128HashSet.of(ptr).keyIndex(lo, hi); + if (index >= 0) { + groupByLong128HashSet.addAt(index, lo, hi); + ptr = groupByLong128HashSet.ptr(); + } + } +} diff --git a/benchmarks/src/main/java/org/questdb/GroupByLong256HashSetBenchmark.java b/benchmarks/src/main/java/org/questdb/GroupByLong256HashSetBenchmark.java new file mode 100644 index 000000000000..43d560eb4c97 --- /dev/null +++ b/benchmarks/src/main/java/org/questdb/GroupByLong256HashSetBenchmark.java @@ -0,0 +1,112 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.questdb; + +import io.questdb.cairo.ColumnType; +import io.questdb.cairo.DefaultCairoConfiguration; +import io.questdb.cairo.SingleColumnType; +import io.questdb.cairo.map.MapKey; +import io.questdb.cairo.map.OrderedMap; +import io.questdb.griffin.engine.groupby.GroupByAllocator; +import io.questdb.griffin.engine.groupby.GroupByLong256HashSet; +import io.questdb.std.Numbers; +import io.questdb.std.Rnd; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.concurrent.TimeUnit; + +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class GroupByLong256HashSetBenchmark { + private static final double loadFactor = 0.7; + private static final int orderedMapPageSize = 1024 * 1024; + private static final long groupByAllocatorDefaultChunkSize = 128 * 1024; + private static final long groupByAllocatorMaxChunkSize = Numbers.SIZE_1GB * 8; + + private static final Rnd rnd = new Rnd(); + + @Param({"1250", "12500", "125000", "1250000"}) + public int size; + + private static final GroupByAllocator allocator = new GroupByAllocator(new DefaultCairoConfiguration(null) { + @Override + public long getGroupByAllocatorDefaultChunkSize() { + return groupByAllocatorDefaultChunkSize; + } + + @Override + public long getGroupByAllocatorMaxChunkSize() { + return groupByAllocatorMaxChunkSize; + } + }); + private static final OrderedMap orderedMap = new OrderedMap(orderedMapPageSize, new SingleColumnType(ColumnType.LONG256), null, 64, loadFactor, Integer.MAX_VALUE); + private static final GroupByLong256HashSet groupByLong256HashSet = new GroupByLong256HashSet(64, loadFactor, 0); + private static long ptr = 0; + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(GroupByLong256HashSetBenchmark.class.getSimpleName()) + .warmupIterations(3) + .measurementIterations(3) + .forks(1) + .build(); + + new Runner(opt).run(); + } + + @Setup(Level.Iteration) + public void reset() { + orderedMap.clear(); + allocator.close(); + groupByLong256HashSet.setAllocator(allocator); + ptr = 0; + rnd.reset(); + } + + @Benchmark + public void testOrderedMap() { + MapKey key = orderedMap.withKey(); + key.putLong256(rnd.nextLong(size), rnd.nextLong(size), rnd.nextLong(size), rnd.nextLong(size)); + key.createValue(); + } + + @Benchmark + public void testGroupByLong256HashSet() { + long l0 = rnd.nextLong(size); + long l1 = rnd.nextLong(size); + long l2 = rnd.nextLong(size); + long l3 = rnd.nextLong(size); + int index = groupByLong256HashSet.of(ptr).keyIndex(l0, l1, l2, l3); + if (index >= 0) { + groupByLong256HashSet.addAt(index, l0, l1, l2, l3); + ptr = groupByLong256HashSet.ptr(); + } + } +} diff --git a/benchmarks/src/main/java/org/questdb/GroupByLongHashSetBenchmark.java b/benchmarks/src/main/java/org/questdb/GroupByLongHashSetBenchmark.java index c8b6778cbc76..fb7f5747740c 100644 --- a/benchmarks/src/main/java/org/questdb/GroupByLongHashSetBenchmark.java +++ b/benchmarks/src/main/java/org/questdb/GroupByLongHashSetBenchmark.java @@ -42,20 +42,27 @@ @State(Scope.Thread) @BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MICROSECONDS) +@OutputTimeUnit(TimeUnit.NANOSECONDS) public class GroupByLongHashSetBenchmark { - private static final long N = 1_000_000; + private static final double loadFactor = 0.7; + private static final int orderedMapPageSize = 1024 * 1024; + private static final long groupByAllocatorDefaultChunkSize = 128 * 1024; + private static final Rnd rnd = new Rnd(); + + @Param({"5000", "50000", "500000", "5000000"}) + public int size; + private static final GroupByAllocator allocator = new GroupByAllocator(new DefaultCairoConfiguration(null) { @Override public long getGroupByAllocatorDefaultChunkSize() { - return 128 * 1024; + return groupByAllocatorDefaultChunkSize; } }); - private static final OrderedMap fmap = new OrderedMap(1024 * 1024, new SingleColumnType(ColumnType.LONG), null, 16, 0.7f, Integer.MAX_VALUE); - private static final GroupByLongHashSet gbset = new GroupByLongHashSet(16, 0.7, 0); + + private static final OrderedMap orderedMap = new OrderedMap(orderedMapPageSize, new SingleColumnType(ColumnType.LONG), null, 64, loadFactor, Integer.MAX_VALUE); + private static final GroupByLongHashSet groupByLongHashSet = new GroupByLongHashSet(64, loadFactor, 0); private static long ptr = 0; - private final Rnd rnd = new Rnd(); public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() @@ -68,35 +75,29 @@ public static void main(String[] args) throws RunnerException { new Runner(opt).run(); } - @Benchmark - public long baseline() { - return rnd.nextLong(N); - } - @Setup(Level.Iteration) public void reset() { - fmap.close(); - fmap.reopen(); + orderedMap.clear(); allocator.close(); - gbset.setAllocator(allocator); + groupByLongHashSet.setAllocator(allocator); ptr = 0; rnd.reset(); } @Benchmark - public void testFastMap() { - MapKey key = fmap.withKey(); - key.putLong(rnd.nextLong(N)); + public void testOrderedMap() { + MapKey key = orderedMap.withKey(); + key.putLong(rnd.nextLong(size)); key.createValue(); } @Benchmark public void testGroupByLongHashSet() { - long value = rnd.nextLong(N); - int index = gbset.of(ptr).keyIndex(value); + long value = rnd.nextLong(size); + int index = groupByLongHashSet.of(ptr).keyIndex(value); if (index >= 0) { - gbset.addAt(index, value); - ptr = gbset.ptr(); + groupByLongHashSet.addAt(index, value); + ptr = groupByLongHashSet.ptr(); } } } diff --git a/core/src/main/java/io/questdb/cairo/RecordChain.java b/core/src/main/java/io/questdb/cairo/RecordChain.java index 34fa13de21ab..049d57c85950 100644 --- a/core/src/main/java/io/questdb/cairo/RecordChain.java +++ b/core/src/main/java/io/questdb/cairo/RecordChain.java @@ -230,6 +230,11 @@ public void putLong256(Long256 value) { mem.putLong256(value); } + @Override + public void putLong256(long l0, long l1, long l2, long l3) { + mem.putLong256(l0, l1, l2, l3); + } + @Override public void putRecord(Record value) { // noop diff --git a/core/src/main/java/io/questdb/cairo/RecordSinkSPI.java b/core/src/main/java/io/questdb/cairo/RecordSinkSPI.java index 41037c77309b..c3727350c7b4 100644 --- a/core/src/main/java/io/questdb/cairo/RecordSinkSPI.java +++ b/core/src/main/java/io/questdb/cairo/RecordSinkSPI.java @@ -51,6 +51,8 @@ public interface RecordSinkSPI { void putLong256(Long256 value); + void putLong256(long l0, long l1, long l2, long l3); + void putRecord(Record value); void putShort(short value); diff --git a/core/src/main/java/io/questdb/cairo/map/MapKey.java b/core/src/main/java/io/questdb/cairo/map/MapKey.java index 1491fb32d5ab..547f3e2f93f0 100644 --- a/core/src/main/java/io/questdb/cairo/map/MapKey.java +++ b/core/src/main/java/io/questdb/cairo/map/MapKey.java @@ -62,4 +62,5 @@ default boolean notFound() { } void put(Record record, RecordSink sink); + } diff --git a/core/src/main/java/io/questdb/cairo/map/OrderedMap.java b/core/src/main/java/io/questdb/cairo/map/OrderedMap.java index fa53cfc423c0..eec8c5b767cd 100644 --- a/core/src/main/java/io/questdb/cairo/map/OrderedMap.java +++ b/core/src/main/java/io/questdb/cairo/map/OrderedMap.java @@ -668,6 +668,15 @@ public void putLong256(Long256 value) { appendAddress += 32L; } + @Override + public void putLong256(long l0, long l1, long l2, long l3) { + Unsafe.getUnsafe().putLong(appendAddress, l0); + Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES, l1); + Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 2, l2); + Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 3, l3); + appendAddress += 32L; + } + @Override public void putShort(short value) { Unsafe.getUnsafe().putShort(appendAddress, value); @@ -795,6 +804,8 @@ protected void checkCapacity(long requiredKeySize) { abstract void copyFromRawKey(long srcPtr, long srcSize); + public abstract void putLong256(long l0, long l1, long l2, long l3); + protected abstract boolean eq(long offset); } @@ -916,6 +927,16 @@ public void putLong256(Long256 value) { appendAddress += 32L; } + @Override + public void putLong256(long l0, long l1, long l2, long l3) { + checkCapacity(32L); + Unsafe.getUnsafe().putLong(appendAddress, l0); + Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES, l1); + Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 2, l2); + Unsafe.getUnsafe().putLong(appendAddress + Long.BYTES * 3, l3); + appendAddress += 32L; + } + @Override public void putShort(short value) { checkCapacity(2L); diff --git a/core/src/main/java/io/questdb/cairo/map/Unordered16Map.java b/core/src/main/java/io/questdb/cairo/map/Unordered16Map.java index fbfda4891c6f..c6208d8cf91a 100644 --- a/core/src/main/java/io/questdb/cairo/map/Unordered16Map.java +++ b/core/src/main/java/io/questdb/cairo/map/Unordered16Map.java @@ -583,6 +583,11 @@ public void putLong256(Long256 value) { throw new UnsupportedOperationException(); } + @Override + public void putLong256(long l0, long l1, long l2, long l3) { + throw new UnsupportedOperationException(); + } + @Override public void putRecord(Record value) { // no-op diff --git a/core/src/main/java/io/questdb/cairo/map/Unordered8Map.java b/core/src/main/java/io/questdb/cairo/map/Unordered8Map.java index a31d05f457a4..f3717cf20ac1 100644 --- a/core/src/main/java/io/questdb/cairo/map/Unordered8Map.java +++ b/core/src/main/java/io/questdb/cairo/map/Unordered8Map.java @@ -567,6 +567,11 @@ public void putLong256(Long256 value) { throw new UnsupportedOperationException(); } + @Override + public void putLong256(long l0, long l1, long l2, long l3) { + throw new UnsupportedOperationException(); + } + @Override public void putRecord(Record value) { // no-op diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/QuaternaryFunction.java b/core/src/main/java/io/questdb/griffin/engine/functions/QuaternaryFunction.java new file mode 100644 index 000000000000..647d45b3e64a --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/functions/QuaternaryFunction.java @@ -0,0 +1,113 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions; + +import io.questdb.cairo.sql.Function; +import io.questdb.cairo.sql.SymbolTableSource; +import io.questdb.griffin.PlanSink; +import io.questdb.griffin.SqlException; +import io.questdb.griffin.SqlExecutionContext; + +public interface QuaternaryFunction extends Function { + + @Override + default void close() { + getFunc0().close(); + getFunc1().close(); + getFunc2().close(); + getFunc3().close(); + } + + Function getFunc0(); + + Function getFunc1(); + + Function getFunc2(); + + Function getFunc3(); + + @Override + default void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) throws SqlException { + getFunc0().init(symbolTableSource, executionContext); + getFunc1().init(symbolTableSource, executionContext); + getFunc2().init(symbolTableSource, executionContext); + getFunc3().init(symbolTableSource, executionContext); + } + + @Override + default void initCursor() { + getFunc0().initCursor(); + getFunc1().initCursor(); + getFunc2().initCursor(); + getFunc3().initCursor(); + } + + @Override + default boolean isConstant() { + return + getFunc0().isConstant() && + getFunc1().isConstant() && + getFunc2().isConstant() && + getFunc3().isConstant(); + } + + @Override + default boolean isParallelismSupported() { + return getFunc0().isParallelismSupported() && getFunc1().isParallelismSupported() && getFunc2().isParallelismSupported() && getFunc3().isParallelismSupported(); + } + + @Override + default boolean isReadThreadSafe() { + return getFunc0().isReadThreadSafe() && getFunc1().isReadThreadSafe() && getFunc2().isReadThreadSafe() && getFunc3().isReadThreadSafe(); + } + + @Override + default boolean isRuntimeConstant() { + boolean arc = getFunc0().isRuntimeConstant(); + boolean brc = getFunc1().isRuntimeConstant(); + boolean crc = getFunc2().isRuntimeConstant(); + boolean drc = getFunc3().isRuntimeConstant(); + + boolean ac = getFunc0().isConstant(); + boolean bc = getFunc1().isConstant(); + boolean cc = getFunc2().isConstant(); + boolean dc = getFunc3().isConstant(); + + return (ac || arc) && (bc || brc) && (cc || crc) && (dc || drc) && (arc || brc || crc || drc); + } + + @Override + default void toPlan(PlanSink sink) { + sink.val(getName()).val('(').val(getFunc0()).val(',').val(getFunc1()).val(',').val(getFunc2()).val(',').val(getFunc3()).val(')'); + } + + @Override + default void toTop() { + getFunc0().toTop(); + getFunc1().toTop(); + getFunc2().toTop(); + getFunc3().toTop(); + } +} diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunction.java b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunction.java index 516a4f3cda25..4de20033b0df 100644 --- a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunction.java +++ b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunction.java @@ -32,59 +32,77 @@ import io.questdb.griffin.engine.functions.GroupByFunction; import io.questdb.griffin.engine.functions.LongFunction; import io.questdb.griffin.engine.functions.UnaryFunction; +import io.questdb.griffin.engine.groupby.GroupByAllocator; +import io.questdb.griffin.engine.groupby.GroupByLong256HashSet; import io.questdb.std.*; public class CountDistinctLong256GroupByFunction extends LongFunction implements UnaryFunction, GroupByFunction { private final Function arg; - private final int setInitialCapacity; - private final double setLoadFactor; - private final ObjList sets = new ObjList<>(); - private int setIndex; + private final GroupByLong256HashSet setA; + private final GroupByLong256HashSet setB; private int valueIndex; public CountDistinctLong256GroupByFunction(Function arg, int setInitialCapacity, double setLoadFactor) { this.arg = arg; - this.setInitialCapacity = setInitialCapacity; - this.setLoadFactor = setLoadFactor; + // We use zero as the default value to speed up zeroing on rehash. + setA = new GroupByLong256HashSet(setInitialCapacity, setLoadFactor, 0); + setB = new GroupByLong256HashSet(setInitialCapacity, setLoadFactor, 0); } @Override public void clear() { - sets.clear(); - setIndex = 0; + setA.resetPtr(); + setB.resetPtr(); } @Override public void computeFirst(MapValue mapValue, Record record) { - final Long256HashSet set; - if (sets.size() <= setIndex) { - sets.extendAndSet(setIndex, set = new Long256HashSet(setInitialCapacity, setLoadFactor)); - } else { - set = sets.getQuick(setIndex); - set.clear(); - } + final Long256 l256 = arg.getLong256A(record); - Long256 val = arg.getLong256A(record); - if (isNotNull(val)) { - set.add(val.getLong0(), val.getLong1(), val.getLong2(), val.getLong3()); + if (isNotNull(l256)) { mapValue.putLong(valueIndex, 1L); + long l0 = l256.getLong0(); + long l1 = l256.getLong1(); + long l2 = l256.getLong2(); + long l3 = l256.getLong3(); + // Remap zero since it's used as the no entry key. + if (l0 == 0 && l1 == 0 && l2 == 0 && l3 == 0) { + l0 = Numbers.LONG_NaN; + l1 = Numbers.LONG_NaN; + l2 = Numbers.LONG_NaN; + l3 = Numbers.LONG_NaN; + } + setA.of(0).add(l0, l1, l2, l3); + mapValue.putLong(valueIndex + 1, setA.ptr()); } else { - mapValue.putLong(valueIndex, 0L); + mapValue.putLong(valueIndex, 0); + mapValue.putLong(valueIndex + 1, 0);; } - mapValue.putInt(valueIndex + 1, setIndex++); } @Override public void computeNext(MapValue mapValue, Record record) { - final Long256HashSet set = sets.getQuick(mapValue.getInt(valueIndex + 1)); - final Long256 val = arg.getLong256A(record); - if (isNotNull(val)) { - final int index = set.keyIndex(val.getLong0(), val.getLong1(), val.getLong2(), val.getLong3()); - if (index < 0) { - return; + final Long256 l256 = arg.getLong256A(record); + + if (isNotNull(l256)) { + long l0 = l256.getLong0(); + long l1 = l256.getLong1(); + long l2 = l256.getLong2(); + long l3 = l256.getLong3(); + long ptr = mapValue.getLong(valueIndex + 1); + // Remap zero since it's used as the no entry key. + if (l0 == 0 && l1 == 0 && l2 == 0 && l3 == 0) { + l0 = Numbers.LONG_NaN; + l1 = Numbers.LONG_NaN; + l2 = Numbers.LONG_NaN; + l3 = Numbers.LONG_NaN; + } + final int index = setA.of(ptr).keyIndex(l0, l1, l2, l3); + if (index >= 0) { + setA.addAt(index, l0, l1, l2, l3); + mapValue.addLong(valueIndex, 1); + mapValue.putLong(valueIndex + 1, setA.ptr()); } - set.addAt(index, val.getLong0(), val.getLong1(), val.getLong2(), val.getLong3()); - mapValue.addLong(valueIndex, 1); } } @@ -115,7 +133,7 @@ public boolean isConstant() { @Override public boolean isParallelismSupported() { - return false; + return UnaryFunction.super.isParallelismSupported(); } @Override @@ -123,26 +141,66 @@ public boolean isReadThreadSafe() { return false; } + @Override + public void merge(MapValue destValue, MapValue srcValue) { + long srcCount = srcValue.getLong(valueIndex); + if (srcCount == 0 || srcCount == Numbers.LONG_NaN) { + return; + } + long srcPtr = srcValue.getLong(valueIndex + 1); + + long destCount = destValue.getLong(valueIndex); + if (destCount == 0 || destCount == Numbers.LONG_NaN) { + destValue.putLong(valueIndex, srcCount); + destValue.putLong(valueIndex + 1, srcPtr); + return; + } + long destPtr = destValue.getLong(valueIndex + 1); + + setA.of(destPtr); + setB.of(srcPtr); + + if (setA.size() > (setB.size() >> 1)) { + setA.merge(setB); + destValue.putLong(valueIndex, setA.size()); + destValue.putLong(valueIndex + 1, setA.ptr()); + } else { + // Set A is significantly smaller than set B, so we merge it into set B. + setB.merge(setA); + destValue.putLong(valueIndex, setB.size()); + destValue.putLong(valueIndex + 1, setB.ptr()); + } + } + @Override public void pushValueTypes(ArrayColumnTypes columnTypes) { this.valueIndex = columnTypes.getColumnCount(); columnTypes.add(ColumnType.LONG); - columnTypes.add(ColumnType.INT); + columnTypes.add(ColumnType.LONG); + } + + @Override + public void setAllocator(GroupByAllocator allocator) { + setA.setAllocator(allocator); + setB.setAllocator(allocator); } @Override public void setEmpty(MapValue mapValue) { mapValue.putLong(valueIndex, 0L); + mapValue.putLong(valueIndex + 1, 0); } @Override public void setLong(MapValue mapValue, long value) { mapValue.putLong(valueIndex, value); + mapValue.putLong(valueIndex + 1, 0); } @Override public void setNull(MapValue mapValue) { mapValue.putLong(valueIndex, Numbers.LONG_NaN); + mapValue.putLong(valueIndex + 1, 0); } @Override @@ -153,7 +211,6 @@ public void setValueIndex(int valueIndex) { @Override public void toTop() { UnaryFunction.super.toTop(); - setIndex = 0; } private static boolean isNotNull(Long256 value) { diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunction.java b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunction.java index 0912421d76f6..2b2a11cf293e 100644 --- a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunction.java +++ b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctLongGroupByFunction.java @@ -113,7 +113,7 @@ public boolean isConstant() { @Override public boolean isParallelismSupported() { - return true; + return UnaryFunction.super.isParallelismSupported(); } @Override diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunction.java b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunction.java index 4fd6ff141dea..7ce07238c7fd 100644 --- a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunction.java +++ b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunction.java @@ -32,6 +32,9 @@ import io.questdb.griffin.engine.functions.GroupByFunction; import io.questdb.griffin.engine.functions.LongFunction; import io.questdb.griffin.engine.functions.UnaryFunction; +import io.questdb.griffin.engine.groupby.GroupByAllocator; +import io.questdb.griffin.engine.groupby.GroupByLong128HashSet; +import io.questdb.griffin.engine.groupby.GroupByLongHashSet; import io.questdb.std.LongLongHashSet; import io.questdb.std.Numbers; import io.questdb.std.ObjList; @@ -39,57 +42,59 @@ public final class CountDistinctUuidGroupByFunction extends LongFunction implements UnaryFunction, GroupByFunction { private final Function arg; - private final int setInitialCapacity; - private final double setLoadFactor; - private final ObjList sets = new ObjList<>(); - private int setIndex; + private final GroupByLong128HashSet setA; + private final GroupByLong128HashSet setB; private int valueIndex; public CountDistinctUuidGroupByFunction(Function arg, int setInitialCapacity, double setLoadFactor) { this.arg = arg; - this.setInitialCapacity = setInitialCapacity; - this.setLoadFactor = setLoadFactor; + // We use zero as the default value to speed up zeroing on rehash. + setA = new GroupByLong128HashSet(setInitialCapacity, setLoadFactor, 0); + setB = new GroupByLong128HashSet(setInitialCapacity, setLoadFactor, 0); } @Override public void clear() { - sets.clear(); - setIndex = 0; + setA.resetPtr(); + setB.resetPtr(); } @Override public void computeFirst(MapValue mapValue, Record record) { - LongLongHashSet set; - if (sets.size() <= setIndex) { - sets.extendAndSet(setIndex, set = new LongLongHashSet(setInitialCapacity, setLoadFactor, Numbers.LONG_NaN, LongLongHashSet.UUID_STRATEGY)); - } else { - set = sets.getQuick(setIndex); - set.clear(); - } - long lo = arg.getLong128Lo(record); long hi = arg.getLong128Hi(record); if (!Uuid.isNull(lo, hi)) { - set.add(lo, hi); mapValue.putLong(valueIndex, 1L); + // Remap zero since it's used as the no entry key. + if (lo == 0 && hi == 0) { + lo = Numbers.LONG_NaN; + hi = Numbers.LONG_NaN; + } + setA.of(0).add(lo, hi); + mapValue.putLong(valueIndex + 1, setA.ptr()); } else { - mapValue.putLong(valueIndex, 0L); + mapValue.putLong(valueIndex, 0); + mapValue.putLong(valueIndex + 1, 0);; } - mapValue.putInt(valueIndex + 1, setIndex++); } @Override public void computeNext(MapValue mapValue, Record record) { - LongLongHashSet set = sets.getQuick(mapValue.getInt(valueIndex + 1)); long lo = arg.getLong128Lo(record); long hi = arg.getLong128Hi(record); if (!Uuid.isNull(lo, hi)) { - final int index = set.keySlot(lo, hi); - if (index < 0) { - return; + long ptr = mapValue.getLong(valueIndex + 1); + // Remap zero since it's used as the no entry key. + if (lo == 0 && hi == 0) { + lo = Numbers.LONG_NaN; + hi = Numbers.LONG_NaN; + } + final int index = setA.of(ptr).keyIndex(lo, hi); + if (index >= 0) { + setA.addAt(index, lo, hi); + mapValue.addLong(valueIndex, 1); + mapValue.putLong(valueIndex + 1, setA.ptr()); } - set.addAt(index, lo, hi); - mapValue.addLong(valueIndex, 1); } } @@ -120,7 +125,7 @@ public boolean isConstant() { @Override public boolean isParallelismSupported() { - return false; + return UnaryFunction.super.isParallelismSupported(); } @Override @@ -128,16 +133,66 @@ public boolean isReadThreadSafe() { return false; } + @Override + public void merge(MapValue destValue, MapValue srcValue) { + long srcCount = srcValue.getLong(valueIndex); + if (srcCount == 0 || srcCount == Numbers.LONG_NaN) { + return; + } + long srcPtr = srcValue.getLong(valueIndex + 1); + + long destCount = destValue.getLong(valueIndex); + if (destCount == 0 || destCount == Numbers.LONG_NaN) { + destValue.putLong(valueIndex, srcCount); + destValue.putLong(valueIndex + 1, srcPtr); + return; + } + long destPtr = destValue.getLong(valueIndex + 1); + + setA.of(destPtr); + setB.of(srcPtr); + + if (setA.size() > (setB.size() >> 1)) { + setA.merge(setB); + destValue.putLong(valueIndex, setA.size()); + destValue.putLong(valueIndex + 1, setA.ptr()); + } else { + // Set A is significantly smaller than set B, so we merge it into set B. + setB.merge(setA); + destValue.putLong(valueIndex, setB.size()); + destValue.putLong(valueIndex + 1, setB.ptr()); + } + } + @Override public void pushValueTypes(ArrayColumnTypes columnTypes) { this.valueIndex = columnTypes.getColumnCount(); columnTypes.add(ColumnType.LONG); - columnTypes.add(ColumnType.INT); + columnTypes.add(ColumnType.LONG); + } + + @Override + public void setAllocator(GroupByAllocator allocator) { + setA.setAllocator(allocator); + setB.setAllocator(allocator); + } + + @Override + public void setEmpty(MapValue mapValue) { + mapValue.putLong(valueIndex, 0L); + mapValue.putLong(valueIndex + 1, 0); + } + + @Override + public void setLong(MapValue mapValue, long value) { + mapValue.putLong(valueIndex, value); + mapValue.putLong(valueIndex + 1, 0); } @Override public void setNull(MapValue mapValue) { mapValue.putLong(valueIndex, Numbers.LONG_NaN); + mapValue.putLong(valueIndex + 1, 0); } @Override @@ -148,6 +203,5 @@ public void setValueIndex(int valueIndex) { @Override public void toTop() { UnaryFunction.super.toTop(); - setIndex = 0; } } diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/long256/LongsToLong256FunctionFactory.java b/core/src/main/java/io/questdb/griffin/engine/functions/long256/LongsToLong256FunctionFactory.java new file mode 100644 index 000000000000..4b5beae549de --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/functions/long256/LongsToLong256FunctionFactory.java @@ -0,0 +1,139 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions.long256; + +import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.sql.Function; +import io.questdb.cairo.sql.Record; +import io.questdb.griffin.FunctionFactory; +import io.questdb.griffin.SqlExecutionContext; +import io.questdb.griffin.engine.functions.*; +import io.questdb.griffin.engine.functions.constants.Long256Constant; +import io.questdb.std.*; +import io.questdb.std.str.CharSink; + +public final class LongsToLong256FunctionFactory implements FunctionFactory { + + @Override + public String getSignature() { + return "to_long256(LLLL)"; + } + + @Override + public Function newInstance(int position, ObjList args, IntList argPositions, CairoConfiguration configuration, SqlExecutionContext sqlExecutionContext) { + final Function l0 = args.getQuick(0); + final Function l1 = args.getQuick(1); + final Function l2 = args.getQuick(2); + final Function l3 = args.getQuick(3); + + if (l0.isConstant() && l1.isConstant() && l2.isConstant() && l3.isConstant()) { + return new Long256Constant(l0.getLong(null), l1.getLong(null), l2.getLong(null), l3.getLong(null)); + } + return new LongsToLong256Function(l0, l1, l2, l3); + } + + private static class LongsToLong256Function extends Long256Function implements QuaternaryFunction { + + private final Function l0; + private final Function l1; + private final Function l2; + private final Function l3; + + private final Long256Impl long256a = new Long256Impl(); + private final Long256Impl long256b = new Long256Impl(); + + public LongsToLong256Function(Function l0, Function l1, Function l2, Function l3) { + this.l0 = l0; + this.l1 = l1; + this.l2 = l2; + this.l3 = l3; + } + + @Override + public void close() { + Misc.free(l0); + Misc.free(l1); + Misc.free(l2); + Misc.free(l3); + } + + @Override + public void getLong256(Record rec, CharSink sink) { + Numbers.appendLong256(l0.getLong(rec), l1.getLong(rec), l2.getLong(rec), l3.getLong(rec), sink); + } + + @Override + public Function getFunc0() { + return l0; + } + + @Override + public Function getFunc1() { + return l1; + } + + @Override + public Function getFunc2() { + return l2; + } + + @Override + public Function getFunc3() { + return l3; + } + + @Override + public Long256 getLong256A(Record rec) { + long l0 = this.l0.getLong(rec); + long l1 = this.l1.getLong(rec); + long l2 = this.l2.getLong(rec); + long l3 = this.l3.getLong(rec); + if (Long256Impl.isNull(l0, l1, l2, l3)) { + return Long256Impl.NULL_LONG256; + } + long256a.setAll(l0, l1, l2, l3); + return long256a; + } + + @Override + public Long256 getLong256B(Record rec) { + long l0 = this.l0.getLong(rec); + long l1 = this.l1.getLong(rec); + long l2 = this.l2.getLong(rec); + long l3 = this.l3.getLong(rec); + if (Long256Impl.isNull(l0, l1, l2, l3)) { + return Long256Impl.NULL_LONG256; + } + long256b.setAll(l0, l1, l2, l3); + return long256b; + } + + @Override + public String getName() { + return "to_long256"; + } + + } +} diff --git a/core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong128HashSet.java b/core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong128HashSet.java new file mode 100644 index 000000000000..46ad17299b7e --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong128HashSet.java @@ -0,0 +1,238 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.groupby; + +import io.questdb.cairo.CairoException; +import io.questdb.std.Hash; +import io.questdb.std.Numbers; +import io.questdb.std.Unsafe; +import io.questdb.std.Vect; + +/** + * Specialized flyweight hash set used in {@link io.questdb.griffin.engine.functions.GroupByFunction}s. + *

+ * Uses provided {@link GroupByAllocator} to allocate the underlying buffer. Grows the buffer when needed. + *

+ * Buffer layout is the following: + *

+ * | capacity (in long128s) | size (in long128s) | size limit (in long128s) | padding | long128 array |
+ * +------------------------+--------------------+--------------------------+---------+---------------+
+ * |        4 bytes         |       4 bytes      |         4 bytes          | 4 bytes |       -       |
+ * +------------------------+--------------------+--------------------------+---------+---------------+
+ * 
+ */ +public class GroupByLong128HashSet { + private static final long HEADER_SIZE = 4 * Integer.BYTES; + private static final int MIN_INITIAL_CAPACITY = 16; + private static final long SIZE_LIMIT_OFFSET = 2 * Integer.BYTES; + private static final long SIZE_OFFSET = Integer.BYTES; + private final int initialCapacity; + private final double loadFactor; + private final long noKeyValue; + private GroupByAllocator allocator; + private int mask; + private long ptr; + + public GroupByLong128HashSet(int initialCapacity, double loadFactor, long noKeyValue) { + if (loadFactor <= 0d || loadFactor >= 1d) { + throw new IllegalArgumentException("0 < loadFactor < 1"); + } + this.initialCapacity = Numbers.ceilPow2((int) (Math.max(initialCapacity, MIN_INITIAL_CAPACITY) / loadFactor)); + this.loadFactor = loadFactor; + this.noKeyValue = noKeyValue; + } + + /** + * Adds key to hash set preserving key uniqueness. + * + * @param lo low bits of key to be added. + * @param hi high bits of key to be added. + * @return false if key is already in the set and true otherwise. + */ + public boolean add(long lo, long hi) { + int index = keyIndex(lo, hi); + if (index < 0) { + return false; + } + addAt(index, lo, hi); + return true; + } + + public void addAt(int index, long lo, long hi) { + setKeyAt(index, lo, hi); + int size = size(); + int sizeLimit = sizeLimit(); + Unsafe.getUnsafe().putInt(ptr + SIZE_OFFSET, ++size); + if (size >= sizeLimit) { + rehash(capacity() << 1, sizeLimit << 1); + } + } + + public int capacity() { + return ptr != 0 ? Unsafe.getUnsafe().getInt(ptr) : 0; + } + + public long keyAddrAt(int index) { + return ptr + HEADER_SIZE + 16L * index; + } + + public int keyIndex(long lo, long hi) { + int hashCode = Hash.hashLong128(lo, hi); + int index = hashCode & mask; + long keyAddr = keyAddrAt(index); + long loKey = Unsafe.getUnsafe().getLong(keyAddr); + long hiKey = Unsafe.getUnsafe().getLong(keyAddr + 8L); + if (loKey == noKeyValue && hiKey == noKeyValue) { + return index; + } + if (loKey == lo && hiKey == hi) { + return -index - 1; + } + return probe(lo, hi, index); + } + + public void merge(GroupByLong128HashSet srcSet) { + final int size = size(); + // Math.max is here for overflow protection. + final int newSize = Math.max(size + srcSet.size(), size); + final int sizeLimit = sizeLimit(); + if (sizeLimit < newSize) { + int newSizeLimit = sizeLimit; + int newCapacity = capacity(); + while (newSizeLimit < newSize) { + newSizeLimit *= 2; + newCapacity *= 2; + } + rehash(newCapacity, newSizeLimit); + } + + for (long p = srcSet.ptr + HEADER_SIZE, lim = srcSet.ptr + HEADER_SIZE + 16L * srcSet.capacity(); p < lim; p += 16L) { + long lo = Unsafe.getUnsafe().getLong(p); + long hi = Unsafe.getUnsafe().getLong(p + 8L); + if (lo != noKeyValue || hi != noKeyValue) { + final int index = keyIndex(lo, hi); + if (index >= 0) { + addAt(index, lo, hi); + } + } + } + } + + public GroupByLong128HashSet of(long ptr) { + if (ptr == 0) { + this.ptr = allocator.malloc(HEADER_SIZE + 16L * initialCapacity); + zero(this.ptr, initialCapacity); + Unsafe.getUnsafe().putInt(this.ptr, initialCapacity); + Unsafe.getUnsafe().putInt(this.ptr + SIZE_OFFSET, 0); + Unsafe.getUnsafe().putInt(this.ptr + SIZE_LIMIT_OFFSET, (int) (initialCapacity * loadFactor)); + mask = initialCapacity - 1; + } else { + this.ptr = ptr; + mask = capacity() - 1; + } + return this; + } + + public long ptr() { + return ptr; + } + + public void resetPtr() { + ptr = 0; + } + + public void setAllocator(GroupByAllocator allocator) { + this.allocator = allocator; + } + + public int size() { + return ptr != 0 ? Unsafe.getUnsafe().getInt(ptr + SIZE_OFFSET) : 0; + } + + public int sizeLimit() { + return ptr != 0 ? Unsafe.getUnsafe().getInt(ptr + SIZE_LIMIT_OFFSET) : 0; + } + + private int probe(long lo, long hi, int index) { + do { + index = (index + 1) & mask; + long p = keyAddrAt(index); + long loKey = Unsafe.getUnsafe().getLong(p); + long hiKey = Unsafe.getUnsafe().getLong(p + 8L); + if (loKey == noKeyValue && hiKey == noKeyValue) { + return index; + } + if (loKey == lo && hiKey == hi) { + return -index - 1; + } + } while (true); + } + + private void rehash(int newCapacity, int newSizeLimit) { + if (newCapacity < 0) { + throw CairoException.nonCritical().put("set capacity overflow"); + } + + final int oldSize = size(); + final int oldCapacity = capacity(); + + long oldPtr = ptr; + ptr = allocator.malloc(16L * newCapacity + HEADER_SIZE); + zero(ptr, newCapacity); + Unsafe.getUnsafe().putInt(ptr, newCapacity); + Unsafe.getUnsafe().putInt(ptr + SIZE_OFFSET, oldSize); + Unsafe.getUnsafe().putInt(ptr + SIZE_LIMIT_OFFSET, newSizeLimit); + mask = newCapacity - 1; + + for (long p = oldPtr + HEADER_SIZE, lim = oldPtr + HEADER_SIZE + 16L * oldCapacity; p < lim; p += 16L) { + long lo = Unsafe.getUnsafe().getLong(p); + long hi = Unsafe.getUnsafe().getLong(p + 8L); + if (lo != noKeyValue || hi != noKeyValue) { + int index = keyIndex(lo, hi); + setKeyAt(index, lo, hi); + } + } + + allocator.free(oldPtr, HEADER_SIZE + 16L * oldCapacity); + } + + private void setKeyAt(int index, long lo, long hi) { + long p = keyAddrAt(index); + Unsafe.getUnsafe().putLong(p, lo); + Unsafe.getUnsafe().putLong(p + 8L, hi); + } + + private void zero(long ptr, int cap) { + if (noKeyValue == 0) { + // Vectorized fast path for zero default value. + Vect.memset(ptr + HEADER_SIZE, 16L * cap, 0); + } else { + for (long p = ptr + HEADER_SIZE, lim = ptr + HEADER_SIZE + 16L * cap; p < lim; p += 16L) { + Unsafe.getUnsafe().putLong(p, noKeyValue); + Unsafe.getUnsafe().putLong(p + 8L, noKeyValue); + } + } + } +} diff --git a/core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong256HashSet.java b/core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong256HashSet.java new file mode 100644 index 000000000000..6c83fdb9c095 --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/groupby/GroupByLong256HashSet.java @@ -0,0 +1,250 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.groupby; + +import io.questdb.cairo.CairoException; +import io.questdb.std.Hash; +import io.questdb.std.Numbers; +import io.questdb.std.Unsafe; +import io.questdb.std.Vect; + +/** + * Specialized flyweight hash set used in {@link io.questdb.griffin.engine.functions.GroupByFunction}s. + *

+ * Uses provided {@link GroupByAllocator} to allocate the underlying buffer. Grows the buffer when needed. + *

+ * Buffer layout is the following: + *

+ * | capacity (in long256s) | size (in long256s) | size limit (in long256s) | padding | long256 array |
+ * +------------------------+--------------------+--------------------------+---------+---------------+
+ * |        4 bytes         |       4 bytes      |         4 bytes          | 4 bytes |       -       |
+ * +------------------------+--------------------+--------------------------+---------+---------------+
+ * 
+ */ +public class GroupByLong256HashSet { + private static final long HEADER_SIZE = 4 * Integer.BYTES; + private static final int MIN_INITIAL_CAPACITY = 16; + private static final long SIZE_LIMIT_OFFSET = 2 * Integer.BYTES; + private static final long SIZE_OFFSET = Integer.BYTES; + private final int initialCapacity; + private final double loadFactor; + private final long noKeyValue; + private GroupByAllocator allocator; + private int mask; + private long ptr; + + public GroupByLong256HashSet(int initialCapacity, double loadFactor, long noKeyValue) { + if (loadFactor <= 0d || loadFactor >= 1d) { + throw new IllegalArgumentException("0 < loadFactor < 1"); + } + this.initialCapacity = Numbers.ceilPow2((int) (Math.max(initialCapacity, MIN_INITIAL_CAPACITY) / loadFactor)); + this.loadFactor = loadFactor; + this.noKeyValue = noKeyValue; + } + + /** + * Adds key to hash set preserving key uniqueness. + * + * @param k0 lowest 64 bits of key to be added. + * @param k1 second lowest 64 bits of key to be added. + * @param k2 second highest 64 bits of key to be added. + * @param k3 highest 64 bits of key to be added. + * @return false if key is already in the set and true otherwise. + */ + public boolean add(long k0, long k1, long k2, long k3) { + int index = keyIndex(k0, k1, k2, k3); + if (index < 0) { + return false; + } + addAt(index, k0, k1, k2, k3); + return true; + } + + public void addAt(int index, long k0, long k1, long k2, long k3) { + setKeyAt(index, k0, k1, k2, k3); + int size = size(); + int sizeLimit = sizeLimit(); + Unsafe.getUnsafe().putInt(ptr + SIZE_OFFSET, ++size); + if (size >= sizeLimit) { + rehash(capacity() << 1, sizeLimit << 1); + } + } + + public int capacity() { + return ptr != 0 ? Unsafe.getUnsafe().getInt(ptr) : 0; + } + + public long keyAddrAt(int index) { + return ptr + HEADER_SIZE + 32L * index; + } + + public int keyIndex(long k0, long k1, long k2, long k3) { + int hashCode = Hash.hashLong256(k0, k1, k2, k3); + int index = hashCode & mask; + long p = keyAddrAt(index); + long k0Key = Unsafe.getUnsafe().getLong(p); + long k1Key = Unsafe.getUnsafe().getLong(p + 8L); + long k2Key = Unsafe.getUnsafe().getLong(p + 16L); + long k3Key = Unsafe.getUnsafe().getLong(p + 24L); + if (k0Key == noKeyValue && k1Key == noKeyValue && k2Key == noKeyValue && k3Key == noKeyValue) { + return index; + } + if (k0Key == k0 && k1Key == k1 && k2Key == k2 && k3Key == k3) { + return -index - 1; + } + return probe(k0, k1, k2, k3, index); + } + + public void merge(GroupByLong256HashSet srcSet) { + final int size = size(); + // Math.max is here for overflow protection. + final int newSize = Math.max(size + srcSet.size(), size); + final int sizeLimit = sizeLimit(); + if (sizeLimit < newSize) { + int newSizeLimit = sizeLimit; + int newCapacity = capacity(); + while (newSizeLimit < newSize) { + newSizeLimit *= 2; + newCapacity *= 2; + } + rehash(newCapacity, newSizeLimit); + } + + for (long p = srcSet.ptr + HEADER_SIZE, lim = srcSet.ptr + HEADER_SIZE + 32L * srcSet.capacity(); p < lim; p += 32L) { + long k0 = Unsafe.getUnsafe().getLong(p); + long k1 = Unsafe.getUnsafe().getLong(p + 8L); + long k2 = Unsafe.getUnsafe().getLong(p + 16L); + long k3 = Unsafe.getUnsafe().getLong(p + 24L); + if (k0 != noKeyValue || k1 != noKeyValue || k2 != noKeyValue || k3 != noKeyValue) { + final int index = keyIndex(k0, k1, k2, k3); + if (index >= 0) { + addAt(index, k0, k1, k2, k3); + } + } + } + } + public GroupByLong256HashSet of(long ptr) { + if (ptr == 0) { + this.ptr = allocator.malloc(HEADER_SIZE + 32L * initialCapacity); + zero(this.ptr, initialCapacity); + Unsafe.getUnsafe().putInt(this.ptr, initialCapacity); + Unsafe.getUnsafe().putInt(this.ptr + SIZE_OFFSET, 0); + Unsafe.getUnsafe().putInt(this.ptr + SIZE_LIMIT_OFFSET, (int) (initialCapacity * loadFactor)); + mask = initialCapacity - 1; + } else { + this.ptr = ptr; + mask = capacity() - 1; + } + return this; + } + + public long ptr() { + return ptr; + } + public void resetPtr() { + ptr = 0; + } + + public void setAllocator(GroupByAllocator allocator) { + this.allocator = allocator; + } + + public int size() { + return ptr != 0 ? Unsafe.getUnsafe().getInt(ptr + SIZE_OFFSET) : 0; + } + + public int sizeLimit() { + return ptr != 0 ? Unsafe.getUnsafe().getInt(ptr + SIZE_LIMIT_OFFSET) : 0; + } + + private int probe(long k0, long k1, long k2, long k3, int index) { + do { + index = (index + 1) & mask; + long p = keyAddrAt(index); + long k0Key = Unsafe.getUnsafe().getLong(p); + long k1Key = Unsafe.getUnsafe().getLong(p + 8L); + long k2Key = Unsafe.getUnsafe().getLong(p + 16L); + long k3Key = Unsafe.getUnsafe().getLong(p + 24L); + if (k0Key == noKeyValue && k1Key == noKeyValue && k2Key == noKeyValue && k3Key == noKeyValue) { + return index; + } + if (k0Key == k0 && k1Key == k1 && k2Key == k2 && k3Key == k3) { + return -index - 1; + } + } while (true); + } + + private void rehash(int newCapacity, int newSizeLimit) { + if (newCapacity < 0) { + throw CairoException.nonCritical().put("set capacity overflow"); + } + + final int oldSize = size(); + final int oldCapacity = capacity(); + + long oldPtr = ptr; + ptr = allocator.malloc(32L * newCapacity + HEADER_SIZE); + zero(ptr, newCapacity); + Unsafe.getUnsafe().putInt(ptr, newCapacity); + Unsafe.getUnsafe().putInt(ptr + SIZE_OFFSET, oldSize); + Unsafe.getUnsafe().putInt(ptr + SIZE_LIMIT_OFFSET, newSizeLimit); + mask = newCapacity - 1; + + for (long p = oldPtr + HEADER_SIZE, lim = oldPtr + HEADER_SIZE + 32L * oldCapacity; p < lim; p += 32L) { + long k0 = Unsafe.getUnsafe().getLong(p); + long k1 = Unsafe.getUnsafe().getLong(p + 8L); + long k2 = Unsafe.getUnsafe().getLong(p + 16L); + long k3 = Unsafe.getUnsafe().getLong(p + 24L); + if (k0 != noKeyValue || k1 != noKeyValue || k2 != noKeyValue || k3 != noKeyValue) { + int index = keyIndex(k0, k1, k2, k3); + setKeyAt(index, k0, k1, k2, k3); + } + } + + allocator.free(oldPtr, HEADER_SIZE + 32L * oldCapacity); + } + + private void setKeyAt(int index, long k0, long k1, long k2, long k3) { + long p = keyAddrAt(index); + Unsafe.getUnsafe().putLong(p, k0); + Unsafe.getUnsafe().putLong(p + 8L, k1); + Unsafe.getUnsafe().putLong(p + 16L, k2); + Unsafe.getUnsafe().putLong(p + 24L, k3); + } + + private void zero(long ptr, int cap) { + if (noKeyValue == 0) { + // Vectorized fast path for zero default value. + Vect.memset(ptr + HEADER_SIZE, 32L * cap, 0); + } else { + for (long p = ptr + HEADER_SIZE, lim = ptr + HEADER_SIZE + 32L * cap; p < lim; p += 32L) { + Unsafe.getUnsafe().putLong(p, noKeyValue); + Unsafe.getUnsafe().putLong(p + 8L, noKeyValue); + Unsafe.getUnsafe().putLong(p + 16L, noKeyValue); + Unsafe.getUnsafe().putLong(p + 24L, noKeyValue); + } + } + } +} diff --git a/core/src/main/java/io/questdb/std/Hash.java b/core/src/main/java/io/questdb/std/Hash.java index 1783d6be6aa4..573a6e6c5c92 100644 --- a/core/src/main/java/io/questdb/std/Hash.java +++ b/core/src/main/java/io/questdb/std/Hash.java @@ -57,6 +57,14 @@ public static int hashLong128(long key1, long key2) { return (int) (h ^ h >>> 32); } + public static int hashLong256(long key1, long key2, long key3, long key4) { + long h = key1 * M2 + key2; + h = (h * M2) + key3; + h = (h * M2) + key4; + h *= M2; + return (int) (h ^ h >>> 32); + } + /** * Same as {@link #hashMem32(long, long)}, but with on-heap char sequence * instead of direct unsafe access. diff --git a/core/src/main/java/io/questdb/std/Long256HashSet.java b/core/src/main/java/io/questdb/std/Long256HashSet.java deleted file mode 100644 index 4a0927581b86..000000000000 --- a/core/src/main/java/io/questdb/std/Long256HashSet.java +++ /dev/null @@ -1,193 +0,0 @@ -/******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - * - * Copyright (c) 2014-2019 Appsicle - * Copyright (c) 2019-2023 QuestDB - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - ******************************************************************************/ - -package io.questdb.std; - -import java.util.Arrays; - - -public class Long256HashSet implements Mutable { - - protected static final long noEntryKey = -1; - private static final int MIN_INITIAL_CAPACITY = 16; - protected final double loadFactor; - protected int capacity; - protected int free; - protected int mask; - private long[] keys; - - public Long256HashSet() { - this(MIN_INITIAL_CAPACITY); - } - - @SuppressWarnings("CopyConstructorMissesField") - public Long256HashSet(Long256HashSet that) { - this(that.capacity, that.loadFactor); - for (int i = 0, n = that.keys.length; i < n; i++) { - this.keys[i] = that.keys[i]; - } - } - - public Long256HashSet(int initialCapacity) { - this(initialCapacity, 0.4); - } - - public Long256HashSet(int initialCapacity, double loadFactor) { - if (loadFactor <= 0d || loadFactor >= 1d) { - throw new IllegalArgumentException("0 < loadFactor < 1"); - } - this.capacity = Math.max(initialCapacity, MIN_INITIAL_CAPACITY); - int len = Numbers.ceilPow2((int) (this.capacity / loadFactor)); - this.loadFactor = loadFactor; - this.keys = alloc(len); - this.mask = len - 1; - clear(); - } - - /** - * Adds key to hash set preserving key uniqueness. 256 bit long encoded in 4 64-bit values - * - * @param k0 0-63 bit - * @param k1 64-127 bit - * @param k2 128-191 bit - * @param k3 192-256 bit - * @return false if key is already in the set and true otherwise. - */ - public boolean add(long k0, long k1, long k2, long k3) { - int index = keyIndex(k0, k1, k2, k3); - if (index < 0) { - return false; - } - - addAt(index, k0, k1, k2, k3); - return true; - } - - public void addAt(int index, long k0, long k1, long k2, long k3) { - setAt(index, k0, k1, k2, k3); - if (--free < 1) { - rehash(); - } - } - - @Override - public final void clear() { - free = capacity; - Arrays.fill(keys, noEntryKey); - } - - public long k0At(int index) { - return keys[(-index - 1) * 4]; - } - - public long k1At(int index) { - return keys[(-index - 1) * 4 + 1]; - } - - public long k2At(int index) { - return keys[(-index - 1) * 4 + 2]; - } - - public long k3At(int index) { - return keys[(-index - 1) * 4 + 3]; - } - - public int keyIndex(long k0, long k1, long k2, long k3) { - int index = hashCode0(k0, k1, k2, k3) & mask; - - if (isSlotFree(index)) { - return index; - } - - if (isSlotMatches(index, k0, k1, k2, k3)) { - return -index - 1; - } - - return probe(k0, k1, k2, k3, index); - } - - public int size() { - return capacity - free; - } - - private static int hashCode0(long k0, long k1, long k2, long k3) { - int h; - h = (int) (k0); - h = (int) (31 * h + k1); - h = (int) (31 * h + k2); - h = (int) (31 * h + k3); - return Hash.spread(h); - } - - private long[] alloc(int capacity) { - return new long[capacity * 4]; - } - - private boolean isSlotFree(int index) { - return keys[index * 4] == noEntryKey && keys[index * 4 + 1] == noEntryKey && keys[index * 4 + 2] == noEntryKey && keys[index * 4 + 3] == noEntryKey; - } - - private boolean isSlotMatches(int index, long k0, long k1, long k2, long k3) { - return keys[index * 4] == k0 && keys[index * 4 + 1] == k1 && keys[index * 4 + 2] == k2 && keys[index * 4 + 3] == k3; - } - - private int probe(long k0, long k1, long k2, long k3, int index) { - do { - index = (index + 1) & mask; - if (isSlotFree(index)) { - return index; - } - if (isSlotMatches(index, k0, k1, k2, k3)) { - return -index - 1; - } - } while (true); - } - - private void rehash() { - long[] old = this.keys; - int oldSize = size(); - int newCapacity = capacity * 2; - free = capacity = newCapacity; - int len = Numbers.ceilPow2((int) (newCapacity / loadFactor)); - this.keys = alloc(len); - this.mask = len - 1; - Arrays.fill(keys, noEntryKey); - free -= oldSize; - - for (int i = 0, n = old.length / 4; i < n; i++) { - if (old[i * 4] == noEntryKey && old[i * 4 + 1] == noEntryKey && old[i * 4 + 2] == noEntryKey && old[i * 4 + 3] == noEntryKey) { - continue; - } - int index = keyIndex(old[i * 4], old[i * 4 + 1], old[i * 4 + 2], old[i * 4 + 3]); - setAt(index, old[i * 4], old[i * 4 + 1], old[i * 4 + 2], old[i * 4 + 3]); - } - } - - private void setAt(int index, long k0, long k1, long k2, long k3) { - keys[index * 4] = k0; - keys[index * 4 + 1] = k1; - keys[index * 4 + 2] = k2; - keys[index * 4 + 3] = k3; - } -} diff --git a/core/src/main/java/io/questdb/std/Long256Impl.java b/core/src/main/java/io/questdb/std/Long256Impl.java index 3c6537507440..319528d72329 100644 --- a/core/src/main/java/io/questdb/std/Long256Impl.java +++ b/core/src/main/java/io/questdb/std/Long256Impl.java @@ -52,6 +52,10 @@ public static boolean isNull(Long256 value) { return Long256Impl.NULL_LONG256.equals(value); } + public static boolean isNull(long l0, long l1, long l2, long l3) { + return l0 == Numbers.LONG_NaN && l1 == Numbers.LONG_NaN && l2 == Numbers.LONG_NaN && l3 == Numbers.LONG_NaN; + } + public static void putNull(long appendPointer) { Unsafe.getUnsafe().putLong(appendPointer, NULL_LONG256.getLong0()); Unsafe.getUnsafe().putLong(appendPointer + Long.BYTES, NULL_LONG256.getLong1()); @@ -82,6 +86,15 @@ public void fromRnd(Rnd rnd) { ); } + public void fromRnd(Rnd rnd, long N) { + setAll( + rnd.nextLong(N), + rnd.nextLong(N), + rnd.nextLong(N), + rnd.nextLong(N) + ); + } + @Override public long getLong0() { return l0; diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java index b3ddd06f8db0..b94ac12abce9 100644 --- a/core/src/main/java/module-info.java +++ b/core/src/main/java/module-info.java @@ -283,6 +283,7 @@ io.questdb.griffin.engine.functions.rnd.RndUuidFunctionFactory, io.questdb.griffin.engine.functions.date.TimestampSequenceFunctionFactory, io.questdb.griffin.engine.functions.long128.LongsToLong128FunctionFactory, + io.questdb.griffin.engine.functions.long256.LongsToLong256FunctionFactory, io.questdb.griffin.engine.functions.uuid.LongsToUuidFunctionFactory, io.questdb.griffin.engine.functions.date.TimestampShuffleFunctionFactory, io.questdb.griffin.engine.functions.date.TimestampFloorFunctionFactory, diff --git a/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory b/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory index 733240cbf17c..65acb3f94b53 100644 --- a/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory +++ b/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory @@ -209,6 +209,7 @@ io.questdb.griffin.engine.functions.rnd.RndLongFunctionFactory io.questdb.griffin.engine.functions.rnd.RndUuidFunctionFactory io.questdb.griffin.engine.functions.date.TimestampSequenceFunctionFactory io.questdb.griffin.engine.functions.long128.LongsToLong128FunctionFactory +io.questdb.griffin.engine.functions.long256.LongsToLong256FunctionFactory io.questdb.griffin.engine.functions.uuid.LongsToUuidFunctionFactory io.questdb.griffin.engine.functions.rnd.RndByteCCFunctionFactory io.questdb.griffin.engine.functions.rnd.RndBinCCCFunctionFactory diff --git a/core/src/test/java/io/questdb/test/cairo/RecordSinkFactoryTest.java b/core/src/test/java/io/questdb/test/cairo/RecordSinkFactoryTest.java index dde0bd7070f8..f6c12df4ba25 100644 --- a/core/src/test/java/io/questdb/test/cairo/RecordSinkFactoryTest.java +++ b/core/src/test/java/io/questdb/test/cairo/RecordSinkFactoryTest.java @@ -720,6 +720,11 @@ public void putLong256(Long256 value) { recordedTypes.add(ColumnType.LONG256); } + @Override + public void putLong256(long l0, long l1, long l2, long l3) { + recordedTypes.add(ColumnType.LONG256); + } + @Override public void putRecord(Record value) { throw new UnsupportedOperationException(); diff --git a/core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunctionFactoryTest.java b/core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunctionFactoryTest.java new file mode 100644 index 000000000000..9c927f9db560 --- /dev/null +++ b/core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctLong256GroupByFunctionFactoryTest.java @@ -0,0 +1,236 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.test.griffin.engine.functions.groupby; + +import io.questdb.test.AbstractCairoTest; +import org.junit.Test; + +public class CountDistinctLong256GroupByFunctionFactoryTest extends AbstractCairoTest { + + @Test + public void testConstant() throws Exception { + assertQuery( + "a\tcount_distinct\n" + + "a\t1\n" + + "b\t1\n" + + "c\t1\n", + "select a, count_distinct(to_long256(42L, 42L, 42L, 42L)) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c') a from long_sequence(20)))", + null, + true, + true + ); + } + + @Test + public void testConstantDefaultHashSetNoEntryValue() throws Exception { + assertQuery( + "count_distinct\n" + + "1\n", + "select count_distinct(to_long256(l, l, l, l)) from x", + "create table x as (select -1::long as l from long_sequence(10))", + null, + false, + true + ); + } + + @Test + public void testExpression() throws Exception { + final String expected = "a\tcount_distinct\n" + + "a\t4\n" + + "b\t4\n" + + "c\t4\n"; + assertQuery( + expected, + "select a, count_distinct(to_long256(s*42, s*42, s*42, s*42)) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c') a, rnd_long(1,8,0) s from long_sequence(20)))", + null, + true, + true + ); + // multiplication shouldn't affect the number of distinct values, + // so the result should stay the same + assertSql(expected, "select a, count_distinct(s) from x order by a"); + } + + @Test + public void testGroupKeyed() throws Exception { + assertQuery( + "a\tcount_distinct\n" + + "a\t2\n" + + "b\t1\n" + + "c\t1\n" + + "d\t4\n" + + "e\t4\n" + + "f\t3\n", + "select a, count_distinct(s) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c','d','e','f') a, to_long256(rnd_long(0, 16, 0), 0, 0, 0) s, timestamp_sequence(0, 100000) ts from long_sequence(20)) timestamp(ts))", + null, + true, + true + ); + } + + @Test + public void testGroupNotKeyed() throws Exception { + assertQuery( + "count_distinct\n" + + "6\n", + "select count_distinct(s) from x", + "create table x as (select * from (select to_long256(rnd_long(1, 6, 0), 0, 0, 0) s, timestamp_sequence(0, 1000) ts from long_sequence(1000)) timestamp(ts))", + null, + false, + true + ); + } + + @Test + public void testGroupNotKeyedWithNulls() throws Exception { + String expected = "count_distinct\n" + + "6\n"; + assertQuery( + expected, + "select count_distinct(s) from x", + "create table x as (select * from (select to_long256(rnd_long(1, 6, 0), 0, 0 ,0) s, timestamp_sequence(10, 100000) ts from long_sequence(1000)) timestamp(ts)) timestamp(ts) PARTITION BY YEAR", + null, + false, + true + ); + + insert("insert into x values(cast(null as long256), '2021-05-21')"); + insert("insert into x values(cast(null as long256), '1970-01-01')"); + assertSql(expected, "select count_distinct(s) from x"); + } + + @Test + public void testNullConstant() throws Exception { + assertQuery( + "a\tcount_distinct\n" + + "a\t0\n" + + "b\t0\n" + + "c\t0\n", + "select a, count_distinct(to_long256(null, null, null, null)) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c') a from long_sequence(20)))", + null, + true, + true + ); + } + + @Test + public void testSampleFillLinear() throws Exception { + assertQuery( + "ts\tcount_distinct\n" + + "1970-01-01T00:00:00.000000Z\t9\n" + + "1970-01-01T00:00:01.000000Z\t7\n" + + "1970-01-01T00:00:02.000000Z\t7\n" + + "1970-01-01T00:00:03.000000Z\t8\n" + + "1970-01-01T00:00:04.000000Z\t8\n" + + "1970-01-01T00:00:05.000000Z\t8\n" + + "1970-01-01T00:00:06.000000Z\t7\n" + + "1970-01-01T00:00:07.000000Z\t8\n" + + "1970-01-01T00:00:08.000000Z\t7\n" + + "1970-01-01T00:00:09.000000Z\t9\n", + "select ts, count_distinct(s) from x sample by 1s fill(linear)", + "create table x as (select * from (select to_long256(rnd_long(0, 16, 0), 0, 0, 0) s, timestamp_sequence(0, 100000) ts from long_sequence(100)) timestamp(ts))", + "ts", + true, + true + ); + } +// + @Test + public void testSampleFillNone() throws Exception { + assertMemoryLeak(() -> assertSql( + "ts\tcount_distinct\n" + + "1970-01-01T00:00:00.050000Z\t8\n" + + "1970-01-01T00:00:02.050000Z\t8\n", "with x as (select * from (select to_long256(rnd_long(1, 8, 0), 0, 0, 0) s, timestamp_sequence(50000, 100000L/4) ts from long_sequence(100)) timestamp(ts))\n" + + "select ts, count_distinct(s) from x sample by 2s" + )); + } + + @Test + public void testSampleFillValue() throws Exception { + assertQuery( + "ts\tcount_distinct\n" + + "1970-01-01T00:00:00.000000Z\t5\n" + + "1970-01-01T00:00:01.000000Z\t8\n" + + "1970-01-01T00:00:02.000000Z\t6\n" + + "1970-01-01T00:00:03.000000Z\t7\n" + + "1970-01-01T00:00:04.000000Z\t6\n" + + "1970-01-01T00:00:05.000000Z\t5\n" + + "1970-01-01T00:00:06.000000Z\t6\n" + + "1970-01-01T00:00:07.000000Z\t6\n" + + "1970-01-01T00:00:08.000000Z\t6\n" + + "1970-01-01T00:00:09.000000Z\t7\n", + "select ts, count_distinct(s) from x sample by 1s fill(99)", + "create table x as (select * from (select to_long256(rnd_long(0, 8, 0), 0, 0, 0) s, timestamp_sequence(0, 100000) ts from long_sequence(100)) timestamp(ts))", + "ts", + false + ); + } + + @Test + public void testSampleKeyed() throws Exception { + assertQuery( + "a\tcount_distinct\tts\n" + + "a\t4\t1970-01-01T00:00:00.000000Z\n" + + "f\t9\t1970-01-01T00:00:00.000000Z\n" + + "c\t8\t1970-01-01T00:00:00.000000Z\n" + + "e\t4\t1970-01-01T00:00:00.000000Z\n" + + "d\t6\t1970-01-01T00:00:00.000000Z\n" + + "b\t6\t1970-01-01T00:00:00.000000Z\n" + + "b\t5\t1970-01-01T00:00:05.000000Z\n" + + "c\t4\t1970-01-01T00:00:05.000000Z\n" + + "f\t7\t1970-01-01T00:00:05.000000Z\n" + + "e\t6\t1970-01-01T00:00:05.000000Z\n" + + "d\t8\t1970-01-01T00:00:05.000000Z\n" + + "a\t5\t1970-01-01T00:00:05.000000Z\n", + "select a, count_distinct(s), ts from x sample by 5s", + "create table x as (select * from (select rnd_symbol('a','b','c','d','e','f') a, to_long256(rnd_long(0, 12, 0), 0, 0, 0) s, timestamp_sequence(0, 100000) ts from long_sequence(100)) timestamp(ts))", + "ts", + false + ); + } + + @Test + public void testMappingZeroToNulls() throws Exception { + // this is to ensure that long256s with nulls and zeros don't map to the same values + assertQuery( + "a\ts\tts\n", + "select * from x", + "create table x ( a SYMBOL, s long256, ts TIMESTAMP ) timestamp(ts)", + "ts", + true + ); + + insert("insert into x values ('a', to_long256(5, 0, 5, 5), '2021-05-21'), ('a', to_long256(5, 0, 5, 5), '2021-05-21'), ('a', to_long256(5, null, 5, 5), '2021-05-21'), ('a', to_long256(0, 5, 5, 5), '2021-05-21'), ('a', to_long256(null, 5, 5, 5), '2021-05-21')" + + ", ('a', to_long256(5, 5, 0, 5), '2021-05-21'), ('a', to_long256(5, 5, null, 5), '2021-05-21'), ('a', to_long256(5, 5, 5, 0), '2021-05-21'), ('a', to_long256(5, 5, 5, null), '2021-05-21')" + + ", ('a', to_long256(0, 0, 0, 0), '2021-05-21'), ('a', to_long256(null, null, null, null), '2021-05-21')"); + assertSql("a\ts\n" + + "a\t9\n", "select a, count_distinct(s) as s from x order by a"); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunctionFactoryTest.java b/core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunctionFactoryTest.java new file mode 100644 index 000000000000..d0494696ad75 --- /dev/null +++ b/core/src/test/java/io/questdb/test/griffin/engine/functions/groupby/CountDistinctUuidGroupByFunctionFactoryTest.java @@ -0,0 +1,235 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.test.griffin.engine.functions.groupby; + +import io.questdb.test.AbstractCairoTest; +import org.junit.Test; + +public class CountDistinctUuidGroupByFunctionFactoryTest extends AbstractCairoTest { + + @Test + public void testConstant() throws Exception { + assertQuery( + "a\tcount_distinct\n" + + "a\t1\n" + + "b\t1\n" + + "c\t1\n", + "select a, count_distinct(to_uuid(42L, 42L)) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c') a from long_sequence(20)))", + null, + true, + true + ); + } + + @Test + public void testConstantDefaultHashSetNoEntryValue() throws Exception { + assertQuery( + "count_distinct\n" + + "1\n", + "select count_distinct(to_uuid(l, l)) from x", + "create table x as (select -1::long as l from long_sequence(10))", + null, + false, + true + ); + } + + @Test + public void testExpression() throws Exception { + final String expected = "a\tcount_distinct\n" + + "a\t4\n" + + "b\t4\n" + + "c\t4\n"; + assertQuery( + expected, + "select a, count_distinct(to_uuid(s * 42, s * 42)) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c') a, rnd_long(1, 8, 0) s from long_sequence(20)))", + null, + true, + true + ); + // multiplication shouldn't affect the number of distinct values, + // so the result should stay the same + assertSql(expected, "select a, count_distinct(s) from x order by a"); + } + + @Test + public void testGroupKeyed() throws Exception { + assertQuery( + "a\tcount_distinct\n" + + "a\t2\n" + + "b\t1\n" + + "c\t1\n" + + "d\t4\n" + + "e\t4\n" + + "f\t3\n", + "select a, count_distinct(s) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c','d','e','f') a, to_uuid(rnd_long(0, 16, 0), 0) s, timestamp_sequence(0, 100000) ts from long_sequence(20)) timestamp(ts))", + null, + true, + true + ); + } + + @Test + public void testGroupNotKeyed() throws Exception { + assertQuery( + "count_distinct\n" + + "6\n", + "select count_distinct(s) from x", + "create table x as (select * from (select to_uuid(rnd_long(1, 6, 0), 0) s, timestamp_sequence(0, 1000) ts from long_sequence(1000)) timestamp(ts))", + null, + false, + true + ); + } + + @Test + public void testGroupNotKeyedWithNulls() throws Exception { + String expected = "count_distinct\n" + + "6\n"; + assertQuery( + expected, + "select count_distinct(s) from x", + "create table x as (select * from (select to_uuid(rnd_long(1, 6, 0), 0) s, timestamp_sequence(10, 100000) ts from long_sequence(1000)) timestamp(ts)) timestamp(ts) PARTITION BY YEAR", + null, + false, + true + ); + + insert("insert into x values(cast(null as UUID), '2021-05-21')"); + insert("insert into x values(cast(null as UUID), '1970-01-01')"); + assertSql(expected, "select count_distinct(s) from x"); + } + + @Test + public void testNullConstant() throws Exception { + assertQuery( + "a\tcount_distinct\n" + + "a\t0\n" + + "b\t0\n" + + "c\t0\n", + "select a, count_distinct(to_uuid(null, null)) from x order by a", + "create table x as (select * from (select rnd_symbol('a','b','c') a from long_sequence(20)))", + null, + true, + true + ); + } + + @Test + public void testSampleFillLinear() throws Exception { + assertQuery( + "ts\tcount_distinct\n" + + "1970-01-01T00:00:00.000000Z\t9\n" + + "1970-01-01T00:00:01.000000Z\t7\n" + + "1970-01-01T00:00:02.000000Z\t7\n" + + "1970-01-01T00:00:03.000000Z\t8\n" + + "1970-01-01T00:00:04.000000Z\t8\n" + + "1970-01-01T00:00:05.000000Z\t8\n" + + "1970-01-01T00:00:06.000000Z\t7\n" + + "1970-01-01T00:00:07.000000Z\t8\n" + + "1970-01-01T00:00:08.000000Z\t7\n" + + "1970-01-01T00:00:09.000000Z\t9\n", + "select ts, count_distinct(s) from x sample by 1s fill(linear)", + "create table x as (select * from (select to_uuid(rnd_long(0, 16, 0), 0) s, timestamp_sequence(0, 100000) ts from long_sequence(100)) timestamp(ts))", + "ts", + true, + true + ); + } +// + @Test + public void testSampleFillNone() throws Exception { + assertMemoryLeak(() -> assertSql( + "ts\tcount_distinct\n" + + "1970-01-01T00:00:00.050000Z\t8\n" + + "1970-01-01T00:00:02.050000Z\t8\n", "with x as (select * from (select to_uuid(rnd_long(1, 8, 0), 0) s, timestamp_sequence(50000, 100000L/4) ts from long_sequence(100)) timestamp(ts))\n" + + "select ts, count_distinct(s) from x sample by 2s" + )); + } + + @Test + public void testSampleFillValue() throws Exception { + assertQuery( + "ts\tcount_distinct\n" + + "1970-01-01T00:00:00.000000Z\t5\n" + + "1970-01-01T00:00:01.000000Z\t8\n" + + "1970-01-01T00:00:02.000000Z\t6\n" + + "1970-01-01T00:00:03.000000Z\t7\n" + + "1970-01-01T00:00:04.000000Z\t6\n" + + "1970-01-01T00:00:05.000000Z\t5\n" + + "1970-01-01T00:00:06.000000Z\t6\n" + + "1970-01-01T00:00:07.000000Z\t6\n" + + "1970-01-01T00:00:08.000000Z\t6\n" + + "1970-01-01T00:00:09.000000Z\t7\n", + "select ts, count_distinct(s) from x sample by 1s fill(99)", + "create table x as (select * from (select to_uuid(rnd_long(0, 8, 0), 0) s, timestamp_sequence(0, 100000) ts from long_sequence(100)) timestamp(ts))", + "ts", + false + ); + } + + @Test + public void testSampleKeyed() throws Exception { + assertQuery( + "a\tcount_distinct\tts\n" + + "a\t4\t1970-01-01T00:00:00.000000Z\n" + + "f\t9\t1970-01-01T00:00:00.000000Z\n" + + "c\t8\t1970-01-01T00:00:00.000000Z\n" + + "e\t4\t1970-01-01T00:00:00.000000Z\n" + + "d\t6\t1970-01-01T00:00:00.000000Z\n" + + "b\t6\t1970-01-01T00:00:00.000000Z\n" + + "b\t5\t1970-01-01T00:00:05.000000Z\n" + + "c\t4\t1970-01-01T00:00:05.000000Z\n" + + "f\t7\t1970-01-01T00:00:05.000000Z\n" + + "e\t6\t1970-01-01T00:00:05.000000Z\n" + + "d\t8\t1970-01-01T00:00:05.000000Z\n" + + "a\t5\t1970-01-01T00:00:05.000000Z\n", + "select a, count_distinct(s), ts from x sample by 5s", + "create table x as (select * from (select rnd_symbol('a','b','c','d','e','f') a, to_uuid(rnd_long(0, 12, 0), 0) s, timestamp_sequence(0, 100000) ts from long_sequence(100)) timestamp(ts))", + "ts", + false + ); + } + + @Test + public void testMappingZeroToNulls() throws Exception { + // this is to ensure that uuids wth nulls and zeros don't map to the same values + assertQuery( + "a\ts\tts\n", + "select * from x", + "create table x ( a SYMBOL, s UUID, ts TIMESTAMP ) timestamp(ts)", + "ts", + true + ); + + insert("insert into x values ('a', to_uuid(5, 0), '2021-05-21'), ('a', to_uuid(5, 0), '2021-05-21'), ('a', to_uuid(5, null), '2021-05-21'), ('a', to_uuid(10, 0), '2021-05-21'), ('a', to_uuid(10, null), '2021-05-21')" + + ", ('a', to_uuid(0, 5), '2021-05-21'), ('a', to_uuid(0, 5), '2021-05-21'), ('a', to_uuid(null, 5), '2021-05-21'), ('a', to_uuid(0, 10), '2021-05-21'), ('a', to_uuid(null, 10), '2021-05-21'), ('a', to_uuid(0, 0), '2021-05-21'), ('a', to_uuid(null, null), '2021-05-21')" ); + assertSql("a\ts\n" + + "a\t9\n", "select a, count_distinct(s) as s from x order by a"); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong128HashSetTest.java b/core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong128HashSetTest.java new file mode 100644 index 000000000000..2de52bce13f3 --- /dev/null +++ b/core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong128HashSetTest.java @@ -0,0 +1,146 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.test.griffin.engine.groupby; + +import io.questdb.cairo.*; +import io.questdb.griffin.engine.groupby.GroupByAllocator; +import io.questdb.griffin.engine.groupby.GroupByLong128HashSet; +import io.questdb.std.Numbers; +import io.questdb.std.Rnd; +import io.questdb.std.Uuid; +import io.questdb.test.AbstractCairoTest; +import io.questdb.test.tools.TestUtils; +import java.util.HashSet; +import org.junit.Assert; +import org.junit.Test; + +public class GroupByLong128HashSetTest extends AbstractCairoTest { + + @Test + public void testMerge() throws Exception { + final CairoConfiguration config = new DefaultCairoConfiguration(root) { + @Override + public long getGroupByAllocatorDefaultChunkSize() { + return 64; + } + }; + TestUtils.assertMemoryLeak(() -> { + try (GroupByAllocator allocator = new GroupByAllocator(config)) { + GroupByLong128HashSet setA = new GroupByLong128HashSet(16, 0.5, -1); + setA.setAllocator(allocator); + setA.of(0); + GroupByLong128HashSet setB = new GroupByLong128HashSet(16, 0.9, -1); + setB.setAllocator(allocator); + setB.of(0); + + final int N = 1000; + + for (int i = 0; i < N; i++) { + setA.add(i, i); + } + Assert.assertEquals(N, setA.size()); + Assert.assertTrue(setA.capacity() >= N); + + for (int i = N; i < 2 * N; i++) { + setB.add(i, i); + } + Assert.assertEquals(N, setB.size()); + Assert.assertTrue(setB.capacity() >= N); + + setA.merge(setB); + Assert.assertEquals(2 * N, setA.size()); + for (int i = 0; i < 2 * N; i++) { + Assert.assertTrue(setA.keyIndex(i, i) < 0); + } + } + }); + } + + @Test + public void testFuzzWithLongNullAsNoKeyValue() throws Exception { + testFuzz(Numbers.LONG_NaN); + } + + @Test + public void testFuzzWithZeroAsNoKeyValue() throws Exception { + testFuzz(0); + } + + private void testFuzz(long noKeyValue) throws Exception { + final CairoConfiguration config = new DefaultCairoConfiguration(root) { + @Override + public long getGroupByAllocatorDefaultChunkSize() { + return 64; + } + }; + TestUtils.assertMemoryLeak(() -> { + Rnd rnd = new Rnd(); + + HashSet oracle = new HashSet<>(); + try (GroupByAllocator allocator = new GroupByAllocator(config)) { + GroupByLong128HashSet set = new GroupByLong128HashSet(64, 0.7, noKeyValue); + set.setAllocator(allocator); + set.of(0); + + final int N = 1000; + + for (int i = 0; i < N; i++) { + long l0 = rnd.nextPositiveLong() + 1; + long l1 = rnd.nextPositiveLong() + 1; + set.add(l0, l1); + oracle.add(new Uuid(l0, l1)); + } + + Assert.assertEquals(N, set.size()); + Assert.assertTrue(set.capacity() >= N); + + // check size vs oracle + Assert.assertEquals(set.size(), oracle.size()); + + // check contents + for (Uuid u : oracle) { + Assert.assertTrue(set.keyIndex(u.getLo(), u.getHi()) < 0); + } + + rnd.reset(); + + for (int i = 0; i < N; i++) { + Assert.assertTrue(set.keyIndex(rnd.nextPositiveLong() + 1, rnd.nextPositiveLong() + 1) < 0); + } + + set.of(0); + rnd.reset(); + + for (int i = 0; i < N; i++) { + long lo = rnd.nextPositiveLong() + 1; + long hi = rnd.nextPositiveLong() + 1; + int index = set.keyIndex(lo, hi); + Assert.assertTrue(index >= 0); + set.addAt(index, lo, hi); + } + } + }); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong256HashSetTest.java b/core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong256HashSetTest.java new file mode 100644 index 000000000000..becb128940a7 --- /dev/null +++ b/core/src/test/java/io/questdb/test/griffin/engine/groupby/GroupByLong256HashSetTest.java @@ -0,0 +1,182 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.test.griffin.engine.groupby; + +import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.DefaultCairoConfiguration; +import io.questdb.griffin.engine.groupby.GroupByAllocator; +import io.questdb.griffin.engine.groupby.GroupByLong256HashSet; +import io.questdb.std.Numbers; +import io.questdb.std.Rnd; +import io.questdb.test.AbstractCairoTest; +import io.questdb.test.tools.TestUtils; +import java.util.HashSet; +import java.util.Objects; +import org.junit.Assert; +import org.junit.Test; + +public class GroupByLong256HashSetTest extends AbstractCairoTest { + + @Test + public void testMerge() throws Exception { + final CairoConfiguration config = new DefaultCairoConfiguration(root) { + @Override + public long getGroupByAllocatorDefaultChunkSize() { + return 64; + } + }; + TestUtils.assertMemoryLeak(() -> { + try (GroupByAllocator allocator = new GroupByAllocator(config)) { + GroupByLong256HashSet setA = new GroupByLong256HashSet(16, 0.5, -1); + setA.setAllocator(allocator); + setA.of(0); + GroupByLong256HashSet setB = new GroupByLong256HashSet(16, 0.9, -1); + setB.setAllocator(allocator); + setB.of(0); + + final int N = 1000; + + for (int i = 0; i < N; i++) { + setA.add(i, i, i, i); + } + Assert.assertEquals(N, setA.size()); + Assert.assertTrue(setA.capacity() >= N); + + for (int i = N; i < 2 * N; i++) { + setB.add(i, i, i, i); + } + Assert.assertEquals(N, setB.size()); + Assert.assertTrue(setB.capacity() >= N); + + setA.merge(setB); + Assert.assertEquals(2 * N, setA.size()); + for (int i = 0; i < 2 * N; i++) { + Assert.assertTrue(setA.keyIndex(i, i, i, i) < 0); + } + } + }); + } + + @Test + public void testFuzzWithLongNullAsNoKeyValue() throws Exception { + testFuzz(Numbers.LONG_NaN); + } + + @Test + public void testFuzzWithZeroAsNoKeyValue() throws Exception { + testFuzz(0); + } + + private void testFuzz(long noKeyValue) throws Exception { + final CairoConfiguration config = new DefaultCairoConfiguration(root) { + @Override + public long getGroupByAllocatorDefaultChunkSize() { + return 64; + } + }; + TestUtils.assertMemoryLeak(() -> { + Rnd rnd = new Rnd(); + + HashSet oracle = new HashSet<>(); + try (GroupByAllocator allocator = new GroupByAllocator(config)) { + GroupByLong256HashSet set = new GroupByLong256HashSet(64, 0.7, noKeyValue); + set.setAllocator(allocator); + set.of(0); + + final int N = 1000; + + for (int i = 0; i < N; i++) { + long l0 = rnd.nextPositiveLong() + 1; + long l1 = rnd.nextPositiveLong() + 1; + long l2 = rnd.nextPositiveLong() + 1; + long l3 = rnd.nextPositiveLong() + 1; + set.add(l0, l1, l2, l3); + oracle.add(new Long256Tuple(l0, l1, l2, l3)); + } + + Assert.assertEquals(N, set.size()); + Assert.assertTrue(set.capacity() >= N); + + // check size vs oracle + Assert.assertEquals(set.size(), oracle.size()); + + // check contents + for (Long256Tuple lc : oracle) { + Assert.assertTrue(set.keyIndex(lc.l0, lc.l1, lc.l2, lc.l3) < 0); + } + + rnd.reset(); + + for (int i = 0; i < N; i++) { + Assert.assertTrue(set.keyIndex(rnd.nextPositiveLong() + 1, rnd.nextPositiveLong() + 1, rnd.nextPositiveLong() + 1, rnd.nextPositiveLong() + 1) < 0); + } + + set.of(0); + rnd.reset(); + + for (int i = 0; i < N; i++) { + long l0 = rnd.nextPositiveLong() + 1; + long l1 = rnd.nextPositiveLong() + 1; + long l2 = rnd.nextPositiveLong() + 1; + long l3 = rnd.nextPositiveLong() + 1; + int index = set.keyIndex(l0, l1, l2, l3); + Assert.assertTrue(index >= 0); + set.addAt(index, l0, l1, l2, l3); + } + } + }); + } + + private static class Long256Tuple { + final long l0; + final long l1; + final long l2; + final long l3; + + private Long256Tuple(long l0, long l1, long l2, long l3) { + this.l0 = l0; + this.l1 = l1; + this.l2 = l2; + this.l3 = l3; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Long256Tuple long256Tuple = (Long256Tuple) o; + return l0 == long256Tuple.l0 && l1 == long256Tuple.l1 && l2 == long256Tuple.l2 && l3 == long256Tuple.l3; + } + + @Override + public int hashCode() { + return Objects.hash(l0, l1, l2, l3); + } + } +} diff --git a/core/src/test/java/io/questdb/test/std/Long256HashSetTest.java b/core/src/test/java/io/questdb/test/std/Long256HashSetTest.java deleted file mode 100644 index 276a0db389a6..000000000000 --- a/core/src/test/java/io/questdb/test/std/Long256HashSetTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - * - * Copyright (c) 2014-2019 Appsicle - * Copyright (c) 2019-2023 QuestDB - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - ******************************************************************************/ - -package io.questdb.test.std; - -import io.questdb.std.Long256HashSet; -import io.questdb.std.Rnd; -import org.junit.Assert; -import org.junit.Test; - -public class Long256HashSetTest { - - @Test - public void testFill() { - Rnd rnd = new Rnd(); - - Long256HashSet set = new Long256HashSet(); - - for (int i = 0; i < 1000; i++) { - Assert.assertTrue(set.add(rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong())); - } - - // check that all of that is still in the set - rnd.reset(); - - for (int i = 0; i < 1000; i++) { - long k0 = rnd.nextLong(); - long k1 = rnd.nextLong(); - long k2 = rnd.nextLong(); - long k3 = rnd.nextLong(); - int index = set.keyIndex(k0, k1, k2, k3); - Assert.assertTrue(index < 0); - Assert.assertEquals(k0, set.k0At(index)); - Assert.assertEquals(k1, set.k1At(index)); - Assert.assertEquals(k2, set.k2At(index)); - Assert.assertEquals(k3, set.k3At(index)); - } - - // add same values should fail - rnd.reset(); - for (int i = 0; i < 1000; i++) { - Assert.assertFalse(set.add(rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong())); - } - - Long256HashSet set2 = new Long256HashSet(set); - - rnd.reset(); - for (int i = 0; i < 1000; i++) { - long k0 = rnd.nextLong(); - long k1 = rnd.nextLong(); - long k2 = rnd.nextLong(); - long k3 = rnd.nextLong(); - int index = set2.keyIndex(k0, k1, k2, k3); - Assert.assertTrue(index < 0); - Assert.assertEquals(k0, set.k0At(index)); - Assert.assertEquals(k1, set.k1At(index)); - Assert.assertEquals(k2, set.k2At(index)); - Assert.assertEquals(k3, set.k3At(index)); - } - } -}