Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt to apache-arrow 15 #1729

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions modules/basic/ds/arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ class ArrowArrayBuilderVisitor {
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::StringViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::LargeStringType*) {
builder_ = std::make_shared<LargeStringArrayBuilder>(client_, array_);
return Status::OK();
Expand All @@ -149,6 +157,14 @@ class ArrowArrayBuilderVisitor {
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::BinaryViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::LargeBinaryType*) {
builder_ = std::make_shared<LargeBinaryArrayBuilder>(client_, array_);
return Status::OK();
Expand All @@ -163,10 +179,28 @@ class ArrowArrayBuilderVisitor {
builder_ = std::make_shared<ListArrayBuilder>(client_, array_);
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::ListViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::LargeListType*) {
builder_ = std::make_shared<LargeListArrayBuilder>(client_, array_);
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::LargeListViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::FixedSizeListType*) {
builder_ = std::make_shared<FixedSizeListArrayBuilder>(client_, array_);
return Status::OK();
Expand Down
252 changes: 252 additions & 0 deletions modules/basic/ds/arrow_shim/concatenate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
#include "arrow/util/bit_block_counter.h"
#endif
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
Expand All @@ -59,10 +62,16 @@
#if defined(ARROW_VERSION) && ARROW_VERSION >= 9000000
#include "arrow/util/int_util_overflow.h"
#endif
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
#include "arrow/util/list_util.h"
#endif
#include "arrow/util/logging.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
#include "arrow/util/ree_util.h"
#endif
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
#include "arrow/visit_data_inline.h"
#endif
#if defined(ARROW_VERSION) && ARROW_VERSION >= 7000000
#include "arrow/visit_type_inline.h"
#else
Expand Down Expand Up @@ -225,6 +234,163 @@ Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
// for "arrow/util/slice_util_internal.h"
static inline Status CheckSliceParams(int64_t object_length,
int64_t slice_offset,
int64_t slice_length,
const char* object_name) {
if (ARROW_PREDICT_FALSE(slice_offset < 0)) {
return Status::IndexError("Negative ", object_name, " slice offset");
}
if (ARROW_PREDICT_FALSE(slice_length < 0)) {
return Status::IndexError("Negative ", object_name, " slice length");
}
int64_t offset_plus_length;
if (ARROW_PREDICT_FALSE(internal::AddWithOverflow(slice_offset, slice_length,
&offset_plus_length))) {
return Status::IndexError(object_name, " slice would overflow");
}
if (ARROW_PREDICT_FALSE(offset_plus_length > object_length)) {
return Status::IndexError(object_name, " slice would exceed ", object_name,
" length");
}
return Status::OK();
}

int64_t SumBufferSizesInBytes(const BufferVector& buffers) {
int64_t size = 0;
for (const auto& buffer : buffers) {
size += buffer->size();
}
return size;
}

template <typename offset_type>
Status PutListViewOffsets(const ArrayData& input, offset_type* sizes,
const Buffer& src, offset_type displacement,
offset_type* dst);

// Concatenate buffers holding list-view offsets into a single buffer of offsets
//
// value_ranges contains the relevant ranges of values in the child array
// actually referenced to by the views. Most commonly, these ranges will start
// from 0, but when that is not the case, we need to adjust the displacement of
// offsets. The concatenated child array does not contain values from the
// beginning if they are not referenced to by any view.
//
// The child arrays and the sizes buffer are used to ensure we can trust the
// offsets in offset_buffers to be within the valid range.
//
// This function also mutates sizes so that null list-view entries have size 0.
//
// \param[in] in The child arrays
// \param[in,out] sizes The concatenated sizes buffer
template <typename offset_type>
Status ConcatenateListViewOffsets(const ArrayDataVector& in, offset_type* sizes,
const BufferVector& offset_buffers,
const std::vector<Range>& value_ranges,
MemoryPool* pool,
std::shared_ptr<Buffer>* out) {
DCHECK_EQ(offset_buffers.size(), value_ranges.size());

// Allocate resulting offsets buffer and initialize it with zeros
const int64_t out_size_in_bytes = SumBufferSizesInBytes(offset_buffers);
ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(out_size_in_bytes, pool));
memset((*out)->mutable_data(), 0, static_cast<size_t>((*out)->size()));

auto* out_offsets = (*out)->mutable_data_as<offset_type>();

int64_t num_child_values = 0;
int64_t elements_length = 0;
for (size_t i = 0; i < offset_buffers.size(); ++i) {
const auto displacement =
static_cast<offset_type>(num_child_values - value_ranges[i].offset);
RETURN_NOT_OK(PutListViewOffsets(*in[i], /*sizes=*/sizes + elements_length,
/*src=*/*offset_buffers[i], displacement,
/*dst=*/out_offsets + elements_length));
elements_length += offset_buffers[i]->size() / sizeof(offset_type);
num_child_values += value_ranges[i].length;
if (num_child_values > std::numeric_limits<offset_type>::max()) {
return Status::Invalid("offset overflow while concatenating arrays");
}
}
DCHECK_EQ(elements_length,
static_cast<int64_t>(out_size_in_bytes / sizeof(offset_type)));

return Status::OK();
}

template <typename offset_type>
Status PutListViewOffsets(const ArrayData& input, offset_type* sizes,
const Buffer& src, offset_type displacement,
offset_type* dst) {
if (src.size() == 0) {
return Status::OK();
}
const auto& validity_buffer = input.buffers[0];
if (validity_buffer) {
// Ensure that it is safe to access all the bits in the validity bitmap of
// input.
RETURN_NOT_OK(CheckSliceParams(/*size=*/8 * validity_buffer->size(),
input.offset, input.length, "buffer"));
}

const auto offsets = src.data_as<offset_type>();
DCHECK_EQ(static_cast<int64_t>(src.size() / sizeof(offset_type)),
input.length);

auto visit_not_null = [&](int64_t position) {
if (sizes[position] > 0) {
// NOTE: Concatenate can be called during IPC reads to append delta
// dictionaries. Avoid UB on non-validated input by doing the addition in
// the unsigned domain. (the result can later be validated using
// Array::ValidateFull)
const auto displaced_offset =
SafeSignedAdd(offsets[position], displacement);
// displaced_offset>=0 is guaranteed by RangeOfValuesUsed returning the
// smallest offset of valid and non-empty list-views.
DCHECK_GE(displaced_offset, 0);
dst[position] = displaced_offset;
} else {
// Do nothing to leave the dst[position] as 0.
}
};

const auto* validity =
validity_buffer ? validity_buffer->data_as<uint8_t>() : nullptr;
internal::OptionalBitBlockCounter bit_counter(validity, input.offset,
input.length);
int64_t position = 0;
while (position < input.length) {
internal::BitBlockCount block = bit_counter.NextBlock();
if (block.AllSet()) {
for (int64_t i = 0; i < block.length; ++i, ++position) {
visit_not_null(position);
}
} else if (block.NoneSet()) {
// NOTE: we don't have to do anything for the null entries regarding the
// offsets as the buffer is initialized to 0 when it is allocated.

// Zero-out the sizes of the null entries to ensure these sizes are not
// greater than the new values length of the concatenated array.
memset(sizes + position, 0, block.length * sizeof(offset_type));
position += block.length;
} else {
for (int64_t i = 0; i < block.length; ++i, ++position) {
if (bit_util::GetBit(validity, input.offset + position)) {
visit_not_null(position);
} else {
// Zero-out the size at position.
sizes[position] = 0;
}
}
}
}
return Status::OK();
}
#endif

class ConcatenateImpl {
public:
ConcatenateImpl(ArrayDataVector&& in, MemoryPool* pool)
Expand Down Expand Up @@ -280,6 +446,52 @@ class ConcatenateImpl {
.Value(&out_->buffers[2]);
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const BinaryViewType&) {
out_->buffers.resize(2);

for (const auto& in_data : in_) {
for (const auto& buf : util::span(in_data->buffers).subspan(2)) {
out_->buffers.push_back(buf);
}
}

ARROW_ASSIGN_OR_RAISE(auto view_buffers, Buffers(1, BinaryViewType::kSize));
ARROW_ASSIGN_OR_RAISE(auto view_buffer,
ConcatenateBuffers(view_buffers, pool_));

auto* views = view_buffer->mutable_data_as<BinaryViewType::c_type>();
size_t preceding_buffer_count = 0;

int64_t i = in_[0]->length;
for (size_t in_index = 1; in_index < in_.size(); ++in_index) {
preceding_buffer_count += in_[in_index - 1]->buffers.size() - 2;

for (int64_t end_i = i + in_[in_index]->length; i < end_i; ++i) {
if (views[i].is_inline())
continue;
views[i].ref.buffer_index =
SafeSignedAdd(views[i].ref.buffer_index,
static_cast<int32_t>(preceding_buffer_count));
}
}

if (out_->buffers[0] != nullptr) {
i = in_[0]->length;
VisitNullBitmapInline(
out_->buffers[0]->data(), i, out_->length - i, out_->null_count,
[&] { ++i; },
[&] {
views[i++] =
{}; // overwrite views under null bits with an empty view
});
}

out_->buffers[1] = std::move(view_buffer);
return Status::OK();
}
#endif

Status Visit(const LargeBinaryType&) {
std::vector<Range> value_ranges;
ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, sizeof(int64_t)));
Expand Down Expand Up @@ -317,6 +529,46 @@ class ConcatenateImpl {
.Concatenate(&out_->child_data[0]);
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
template <typename T>
enable_if_list_view<T, Status> Visit(const T& type) {
using offset_type = typename T::offset_type;
out_->buffers.resize(3);
out_->child_data.resize(1);

// Calculate the ranges of values that each list-view array uses
std::vector<Range> value_ranges;
value_ranges.reserve(in_.size());
for (const auto& input : in_) {
ArraySpan input_span(*input);
Range range;
ARROW_ASSIGN_OR_RAISE(std::tie(range.offset, range.length),
list_util::internal::RangeOfValuesUsed(input_span));
value_ranges.push_back(range);
}

// Concatenate the values
ARROW_ASSIGN_OR_RAISE(ArrayDataVector value_data,
ChildData(0, value_ranges));
RETURN_NOT_OK(ConcatenateImpl(std::move(value_data), pool_)
.Concatenate(&out_->child_data[0]));
out_->child_data[0]->type = type.value_type();

// Concatenate the sizes first
ARROW_ASSIGN_OR_RAISE(auto size_buffers, Buffers(2, sizeof(offset_type)));
RETURN_NOT_OK(
ConcatenateBuffers(size_buffers, pool_).Value(&out_->buffers[2]));

// Concatenate the offsets
ARROW_ASSIGN_OR_RAISE(auto offset_buffers, Buffers(1, sizeof(offset_type)));
RETURN_NOT_OK(ConcatenateListViewOffsets<offset_type>(
in_, /*sizes=*/out_->buffers[2]->mutable_data_as<offset_type>(),
offset_buffers, value_ranges, pool_, &out_->buffers[1]));

return Status::OK();
}
#endif

Status Visit(const StructType& s) {
for (int i = 0; i < s.num_fields(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto child_data, ChildData(i));
Expand Down