Skip to content

Commit

Permalink
[C++][Parquet] Faster scalar BYTE_STREAM_SPLIT
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Oct 31, 2023
1 parent 2628d49 commit c2f44f5
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 22 deletions.
142 changes: 122 additions & 20 deletions cpp/src/arrow/util/byte_stream_split.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

#pragma once

#include "arrow/util/endian.h"
#include "arrow/util/simd.h"
#include "arrow/util/ubsan.h"

#include <stdint.h>
#include <algorithm>
#include <cassert>
#include <cstdint>

#ifdef ARROW_HAVE_SSE4_2
// Enable the SIMD for ByteStreamSplit Encoder/Decoder
Expand All @@ -32,6 +34,10 @@ namespace arrow {
namespace util {
namespace internal {

//
// SIMD implementations
//

#if defined(ARROW_HAVE_SSE4_2)
template <typename T>
void ByteStreamSplitDecodeSse2(const uint8_t* data, int64_t num_values, int64_t stride,
Expand Down Expand Up @@ -568,45 +574,141 @@ template <typename T>
void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const size_t num_values,
uint8_t* output_buffer_raw) {
#if defined(ARROW_HAVE_AVX512)
return ByteStreamSplitEncodeAvx512<T>(raw_values, num_values, output_buffer_raw);
return ByteStreamSplitEncodeAvx512<T>(raw_values, static_cast<size_t>(num_values),
output_buffer_raw);
#elif defined(ARROW_HAVE_AVX2)
return ByteStreamSplitEncodeAvx2<T>(raw_values, num_values, output_buffer_raw);
return ByteStreamSplitEncodeAvx2<T>(raw_values, static_cast<size_t>(num_values),
output_buffer_raw);
#elif defined(ARROW_HAVE_SSE4_2)
return ByteStreamSplitEncodeSse2<T>(raw_values, num_values, output_buffer_raw);
return ByteStreamSplitEncodeSse2<T>(raw_values, static_cast<size_t>(num_values),
output_buffer_raw);
#else
#error "ByteStreamSplitEncodeSimd not implemented"
#endif
}
#endif

//
// Scalar implementations
//

// CAUTION when editing this: on x86 platforms, the test suite only exercises
// the SIMD versions by default. Either check on non-x86, or manually disable
// SIMD in the ByteStreamSplitEncode/ByteStreamSplitDecode functions below.

inline void DoSplitStreams(const uint8_t* src, int width, int64_t nvalues,
uint8_t** dest_streams) {
// Value empirically chosen to provide the best performance on the author's machine
constexpr int kBlockSize = 32;

while (nvalues >= kBlockSize) {
for (int stream = 0; stream < width; ++stream) {
uint8_t* dest = dest_streams[stream];
for (int i = 0; i < kBlockSize; i += 8) {
uint64_t a = src[stream + i * width];
uint64_t b = src[stream + (i + 1) * width];
uint64_t c = src[stream + (i + 2) * width];
uint64_t d = src[stream + (i + 3) * width];
uint64_t e = src[stream + (i + 4) * width];
uint64_t f = src[stream + (i + 5) * width];
uint64_t g = src[stream + (i + 6) * width];
uint64_t h = src[stream + (i + 7) * width];
#if ARROW_LITTLE_ENDIAN
uint64_t r = a | (b << 8) | (c << 16) | (d << 24) | (e << 32) | (f << 40) |
(g << 48) | (h << 56);
#else
uint64_t r = (a << 56) | (b << 48) | (c << 40) | (d << 32) | (e << 24) |
(f << 16) | (g << 8) | h;
#endif
*reinterpret_cast<uint64_t*>(&dest[i]) = r;
}
dest_streams[stream] += kBlockSize;
}
src += width * kBlockSize;
nvalues -= kBlockSize;
}

// Epilog
for (int stream = 0; stream < width; ++stream) {
uint8_t* dest = dest_streams[stream];
for (int64_t i = 0; i < nvalues; ++i) {
dest[i] = src[stream + i * width];
}
}
}

inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalues,
uint8_t* dest) {
// Value empirically chosen to provide the best performance on the author's machine
constexpr int kBlockSize = 128;

while (nvalues >= kBlockSize) {
for (int stream = 0; stream < width; ++stream) {
// Take kBlockSize bytes from the given stream and spread them
// to their logical places in destination.
const uint8_t* src = src_streams[stream];
for (int i = 0; i < kBlockSize; i += 8) {
uint64_t v = *reinterpret_cast<const uint64_t*>(&src[i]);
#if ARROW_LITTLE_ENDIAN
dest[stream + i * width] = static_cast<uint8_t>(v);
dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 8);
dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 16);
dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 24);
dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 32);
dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 40);
dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 48);
dest[stream + (i + 7) * width] = static_cast<uint8_t>(v >> 56);
#else
dest[stream + i * width] = static_cast<uint8_t>(v >> 56);
dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 48);
dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 40);
dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 32);
dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 24);
dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 16);
dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 8);
dest[stream + (i + 7) * width] = static_cast<uint8_t>(v);
#endif
}
src_streams[stream] += kBlockSize;
}
dest += width * kBlockSize;
nvalues -= kBlockSize;
}

// Epilog
for (int stream = 0; stream < width; ++stream) {
const uint8_t* src = src_streams[stream];
for (int64_t i = 0; i < nvalues; ++i) {
dest[stream + i * width] = src[i];
}
}
}

template <typename T>
void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values,
void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
constexpr size_t kNumStreams = sizeof(T);
for (size_t i = 0U; i < num_values; ++i) {
for (size_t j = 0U; j < kNumStreams; ++j) {
const uint8_t byte_in_value = raw_values[i * kNumStreams + j];
output_buffer_raw[j * num_values + i] = byte_in_value;
}
constexpr int kNumStreams = static_cast<int>(sizeof(T));
std::array<uint8_t*, kNumStreams> dest_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
dest_streams[stream] = &output_buffer_raw[stream * num_values];
}
DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data());
}

template <typename T>
void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
T* out) {
constexpr size_t kNumStreams = sizeof(T);
auto output_buffer_raw = reinterpret_cast<uint8_t*>(out);

for (int64_t i = 0; i < num_values; ++i) {
for (size_t b = 0; b < kNumStreams; ++b) {
const size_t byte_index = b * stride + i;
output_buffer_raw[i * kNumStreams + b] = data[byte_index];
}
constexpr int kNumStreams = static_cast<int>(sizeof(T));
std::array<const uint8_t*, kNumStreams> src_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
src_streams[stream] = &data[stream * stride];
}
DoMergeStreams(src_streams.data(), kNumStreams, num_values,
reinterpret_cast<uint8_t*>(out));
}

template <typename T>
void inline ByteStreamSplitEncode(const uint8_t* raw_values, const size_t num_values,
void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
#if defined(ARROW_HAVE_SIMD_SPLIT)
return ByteStreamSplitEncodeSimd<T>(raw_values, num_values, output_buffer_raw);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,8 @@ std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const uint8_t* raw_values = sink_.data();
::arrow::util::internal::ByteStreamSplitEncode<T>(
raw_values, static_cast<size_t>(num_values_in_buffer_), output_buffer_raw);
::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values_in_buffer_,
output_buffer_raw);
sink_.Reset();
num_values_in_buffer_ = 0;
return std::move(output_buffer);
Expand Down

0 comments on commit c2f44f5

Please sign in to comment.