Skip to content

Commit

Permalink
apacheGH-37055: [C++] Optimize hash kernels for Dictionary ChunkedArr…
Browse files Browse the repository at this point in the history
…ays (apache#38394)

### Rationale for this change

When merging dictionaries across chunks, the hash kernels unnecessarily unify the existing dictionary, dragging down the performance.

### What changes are included in this PR?

Reuse the dictionary unifier across chunks.

### Are these changes tested?

Yes, with a new benchmark for dictionary chunked arrays.

### Are there any user-facing changes?

No. 

* Closes: apache#37055

Lead-authored-by: Jin Shang <shangjin1997@gmail.com>
Co-authored-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
Signed-off-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
  • Loading branch information
js8544 and felipecrv committed Dec 23, 2023
1 parent d519544 commit ec41209
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 17 deletions.
55 changes: 38 additions & 17 deletions cpp/src/arrow/compute/kernels/vector_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@
#include "arrow/array/concatenate.h"
#include "arrow/array/dict_internal.h"
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/kernels/common_internal.h"
#include "arrow/result.h"
#include "arrow/util/hashing.h"
#include "arrow/util/int_util.h"
#include "arrow/util/unreachable.h"

namespace arrow {

using internal::DictionaryTraits;
using internal::HashTraits;
using internal::TransposeInts;

namespace compute {
namespace internal {
Expand Down Expand Up @@ -448,31 +451,33 @@ class DictionaryHashKernel : public HashKernel {

Status Append(const ArraySpan& arr) override {
auto arr_dict = arr.dictionary().ToArray();
if (!dictionary_) {
dictionary_ = arr_dict;
} else if (!dictionary_->Equals(*arr_dict)) {
if (!first_dictionary_) {
first_dictionary_ = arr_dict;
} else if (!first_dictionary_->Equals(*arr_dict)) {
// NOTE: This approach computes a new dictionary unification per chunk.
// This is in effect O(n*k) where n is the total chunked array length and
// k is the number of chunks (therefore O(n**2) if chunks have a fixed size).
//
// A better approach may be to run the kernel over each individual chunk,
// and then hash-aggregate all results (for example sum-group-by for
// the "value_counts" kernel).
auto out_dict_type = dictionary_->type();
if (dictionary_unifier_ == nullptr) {
ARROW_ASSIGN_OR_RAISE(dictionary_unifier_,
DictionaryUnifier::Make(first_dictionary_->type()));
RETURN_NOT_OK(dictionary_unifier_->Unify(*first_dictionary_));
}
auto out_dict_type = first_dictionary_->type();
std::shared_ptr<Buffer> transpose_map;
std::shared_ptr<Array> out_dict;
ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(out_dict_type));

ARROW_CHECK_OK(unifier->Unify(*dictionary_));
ARROW_CHECK_OK(unifier->Unify(*arr_dict, &transpose_map));
ARROW_CHECK_OK(unifier->GetResult(&out_dict_type, &out_dict));
RETURN_NOT_OK(dictionary_unifier_->Unify(*arr_dict, &transpose_map));

dictionary_ = out_dict;
auto transpose = reinterpret_cast<const int32_t*>(transpose_map->data());
auto in_dict_array = arr.ToArray();
auto in_array = arr.ToArray();
const auto& in_dict_array =
arrow::internal::checked_cast<const DictionaryArray&>(*in_array);
ARROW_ASSIGN_OR_RAISE(
auto tmp, arrow::internal::checked_cast<const DictionaryArray&>(*in_dict_array)
.Transpose(arr.type->GetSharedPtr(), out_dict, transpose));
auto tmp, in_dict_array.Transpose(arr.type->GetSharedPtr(),
in_dict_array.dictionary(), transpose));
return indices_kernel_->Append(*tmp->data());
}

Expand All @@ -495,12 +500,27 @@ class DictionaryHashKernel : public HashKernel {
return dictionary_value_type_;
}

std::shared_ptr<Array> dictionary() const { return dictionary_; }
/// This can't be called more than once because DictionaryUnifier::GetResult()
/// can't be called more than once and produce the same output.
Result<std::shared_ptr<Array>> dictionary() const {
if (!first_dictionary_) { // Append was never called
return nullptr;
}
if (!dictionary_unifier_) { // Append was called only once
return first_dictionary_;
}

auto out_dict_type = first_dictionary_->type();
std::shared_ptr<Array> out_dict;
RETURN_NOT_OK(dictionary_unifier_->GetResult(&out_dict_type, &out_dict));
return out_dict;
}

private:
std::unique_ptr<HashKernel> indices_kernel_;
std::shared_ptr<Array> dictionary_;
std::shared_ptr<Array> first_dictionary_;
std::shared_ptr<DataType> dictionary_value_type_;
std::unique_ptr<DictionaryUnifier> dictionary_unifier_;
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -630,8 +650,9 @@ Status ValueCountsFinalize(KernelContext* ctx, std::vector<Datum>* out) {
// hence have no dictionary.
Result<std::shared_ptr<ArrayData>> EnsureHashDictionary(KernelContext* ctx,
DictionaryHashKernel* hash) {
if (hash->dictionary()) {
return hash->dictionary()->data();
ARROW_ASSIGN_OR_RAISE(auto dict, hash->dictionary());
if (dict) {
return dict->data();
}
ARROW_ASSIGN_OR_RAISE(auto null, MakeArrayOfNull(hash->dictionary_value_type(),
/*length=*/0, ctx->memory_pool()));
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/testing/util.h"
#include "arrow/util/logging.h"

#include "arrow/compute/api.h"

Expand Down Expand Up @@ -226,6 +227,33 @@ static void UniqueString100bytes(benchmark::State& state) {
BenchUnique(state, HashParams<StringType>{general_bench_cases[state.range(0)], 100});
}

template <typename ParamType>
void BenchValueCountsDictionaryChunks(benchmark::State& state, const ParamType& params) {
std::shared_ptr<Array> arr;
params.GenerateTestData(&arr);
// chunk arr to 100 slices
std::vector<std::shared_ptr<Array>> chunks;
const int64_t chunk_size = arr->length() / 100;
for (int64_t i = 0; i < 100; ++i) {
auto slice = arr->Slice(i * chunk_size, chunk_size);
auto datum = DictionaryEncode(slice).ValueOrDie();
ARROW_CHECK(datum.is_array());
chunks.push_back(datum.make_array());
}
auto chunked_array = std::make_shared<ChunkedArray>(chunks);

while (state.KeepRunning()) {
ABORT_NOT_OK(ValueCounts(chunked_array).status());
}
params.SetMetadata(state);
}

static void ValueCountsDictionaryChunks(benchmark::State& state) {
// Dictionary of byte strings with 10 bytes each
BenchValueCountsDictionaryChunks(
state, HashParams<StringType>{general_bench_cases[state.range(0)], 10});
}

void HashSetArgs(benchmark::internal::Benchmark* bench) {
for (int i = 0; i < static_cast<int>(general_bench_cases.size()); ++i) {
bench->Arg(i);
Expand All @@ -239,6 +267,14 @@ BENCHMARK(UniqueInt64)->Apply(HashSetArgs);
BENCHMARK(UniqueString10bytes)->Apply(HashSetArgs);
BENCHMARK(UniqueString100bytes)->Apply(HashSetArgs);

void DictionaryChunksHashSetArgs(benchmark::internal::Benchmark* bench) {
for (int i = 0; i < static_cast<int>(general_bench_cases.size()); ++i) {
bench->Arg(i);
}
}

BENCHMARK(ValueCountsDictionaryChunks)->Apply(DictionaryChunksHashSetArgs);

void UInt8SetArgs(benchmark::internal::Benchmark* bench) {
for (int i = 0; i < static_cast<int>(uint8_bench_cases.size()); ++i) {
bench->Arg(i);
Expand Down

0 comments on commit ec41209

Please sign in to comment.