diff --git a/cpp/src/arrow/util/byte_stream_split.h b/cpp/src/arrow/util/byte_stream_split.h index d428df0659b28..fd3d6ceaa7d1a 100644 --- a/cpp/src/arrow/util/byte_stream_split.h +++ b/cpp/src/arrow/util/byte_stream_split.h @@ -17,11 +17,13 @@ #pragma once +#include "arrow/util/endian.h" #include "arrow/util/simd.h" #include "arrow/util/ubsan.h" -#include #include +#include +#include #ifdef ARROW_HAVE_SSE4_2 // Enable the SIMD for ByteStreamSplit Encoder/Decoder @@ -32,6 +34,10 @@ namespace arrow { namespace util { namespace internal { +// +// SIMD implementations +// + #if defined(ARROW_HAVE_SSE4_2) template void ByteStreamSplitDecodeSse2(const uint8_t* data, int64_t num_values, int64_t stride, @@ -568,45 +574,141 @@ template void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const size_t num_values, uint8_t* output_buffer_raw) { #if defined(ARROW_HAVE_AVX512) - return ByteStreamSplitEncodeAvx512(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeAvx512(raw_values, static_cast(num_values), + output_buffer_raw); #elif defined(ARROW_HAVE_AVX2) - return ByteStreamSplitEncodeAvx2(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeAvx2(raw_values, static_cast(num_values), + output_buffer_raw); #elif defined(ARROW_HAVE_SSE4_2) - return ByteStreamSplitEncodeSse2(raw_values, num_values, output_buffer_raw); + return ByteStreamSplitEncodeSse2(raw_values, static_cast(num_values), + output_buffer_raw); #else #error "ByteStreamSplitEncodeSimd not implemented" #endif } #endif +// +// Scalar implementations +// + +// CAUTION when editing this: on x86 platforms, the test suite only exercises +// the SIMD versions by default. Either check on non-x86, or manually disable +// SIMD in the ByteStreamSplitEncode/ByteStreamSplitDecode functions below. + +inline void DoSplitStreams(const uint8_t* src, int width, int64_t nvalues, + uint8_t** dest_streams) { + // Value empirically chosen to provide the best performance on the author's machine + constexpr int kBlockSize = 32; + + while (nvalues >= kBlockSize) { + for (int stream = 0; stream < width; ++stream) { + uint8_t* dest = dest_streams[stream]; + for (int i = 0; i < kBlockSize; i += 8) { + uint64_t a = src[stream + i * width]; + uint64_t b = src[stream + (i + 1) * width]; + uint64_t c = src[stream + (i + 2) * width]; + uint64_t d = src[stream + (i + 3) * width]; + uint64_t e = src[stream + (i + 4) * width]; + uint64_t f = src[stream + (i + 5) * width]; + uint64_t g = src[stream + (i + 6) * width]; + uint64_t h = src[stream + (i + 7) * width]; +#if ARROW_LITTLE_ENDIAN + uint64_t r = a | (b << 8) | (c << 16) | (d << 24) | (e << 32) | (f << 40) | + (g << 48) | (h << 56); +#else + uint64_t r = (a << 56) | (b << 48) | (c << 40) | (d << 32) | (e << 24) | + (f << 16) | (g << 8) | h; +#endif + *reinterpret_cast(&dest[i]) = r; + } + dest_streams[stream] += kBlockSize; + } + src += width * kBlockSize; + nvalues -= kBlockSize; + } + + // Epilog + for (int stream = 0; stream < width; ++stream) { + uint8_t* dest = dest_streams[stream]; + for (int64_t i = 0; i < nvalues; ++i) { + dest[i] = src[stream + i * width]; + } + } +} + +inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalues, + uint8_t* dest) { + // Value empirically chosen to provide the best performance on the author's machine + constexpr int kBlockSize = 128; + + while (nvalues >= kBlockSize) { + for (int stream = 0; stream < width; ++stream) { + // Take kBlockSize bytes from the given stream and spread them + // to their logical places in destination. + const uint8_t* src = src_streams[stream]; + for (int i = 0; i < kBlockSize; i += 8) { + uint64_t v = *reinterpret_cast(&src[i]); +#if ARROW_LITTLE_ENDIAN + dest[stream + i * width] = static_cast(v); + dest[stream + (i + 1) * width] = static_cast(v >> 8); + dest[stream + (i + 2) * width] = static_cast(v >> 16); + dest[stream + (i + 3) * width] = static_cast(v >> 24); + dest[stream + (i + 4) * width] = static_cast(v >> 32); + dest[stream + (i + 5) * width] = static_cast(v >> 40); + dest[stream + (i + 6) * width] = static_cast(v >> 48); + dest[stream + (i + 7) * width] = static_cast(v >> 56); +#else + dest[stream + i * width] = static_cast(v >> 56); + dest[stream + (i + 1) * width] = static_cast(v >> 48); + dest[stream + (i + 2) * width] = static_cast(v >> 40); + dest[stream + (i + 3) * width] = static_cast(v >> 32); + dest[stream + (i + 4) * width] = static_cast(v >> 24); + dest[stream + (i + 5) * width] = static_cast(v >> 16); + dest[stream + (i + 6) * width] = static_cast(v >> 8); + dest[stream + (i + 7) * width] = static_cast(v); +#endif + } + src_streams[stream] += kBlockSize; + } + dest += width * kBlockSize; + nvalues -= kBlockSize; + } + + // Epilog + for (int stream = 0; stream < width; ++stream) { + const uint8_t* src = src_streams[stream]; + for (int64_t i = 0; i < nvalues; ++i) { + dest[stream + i * width] = src[i]; + } + } +} + template -void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values, +void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { - constexpr size_t kNumStreams = sizeof(T); - for (size_t i = 0U; i < num_values; ++i) { - for (size_t j = 0U; j < kNumStreams; ++j) { - const uint8_t byte_in_value = raw_values[i * kNumStreams + j]; - output_buffer_raw[j * num_values + i] = byte_in_value; - } + constexpr int kNumStreams = static_cast(sizeof(T)); + std::array dest_streams; + for (int stream = 0; stream < kNumStreams; ++stream) { + dest_streams[stream] = &output_buffer_raw[stream * num_values]; } + DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data()); } template void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride, T* out) { - constexpr size_t kNumStreams = sizeof(T); - auto output_buffer_raw = reinterpret_cast(out); - - for (int64_t i = 0; i < num_values; ++i) { - for (size_t b = 0; b < kNumStreams; ++b) { - const size_t byte_index = b * stride + i; - output_buffer_raw[i * kNumStreams + b] = data[byte_index]; - } + constexpr int kNumStreams = static_cast(sizeof(T)); + std::array src_streams; + for (int stream = 0; stream < kNumStreams; ++stream) { + src_streams[stream] = &data[stream * stride]; } + DoMergeStreams(src_streams.data(), kNumStreams, num_values, + reinterpret_cast(out)); } template -void inline ByteStreamSplitEncode(const uint8_t* raw_values, const size_t num_values, +void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values, uint8_t* output_buffer_raw) { #if defined(ARROW_HAVE_SIMD_SPLIT) return ByteStreamSplitEncodeSimd(raw_values, num_values, output_buffer_raw); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 5221f2588c0d3..bd3b4fe1bcb25 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -850,8 +850,8 @@ std::shared_ptr ByteStreamSplitEncoder::FlushValues() { AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); uint8_t* output_buffer_raw = output_buffer->mutable_data(); const uint8_t* raw_values = sink_.data(); - ::arrow::util::internal::ByteStreamSplitEncode( - raw_values, static_cast(num_values_in_buffer_), output_buffer_raw); + ::arrow::util::internal::ByteStreamSplitEncode(raw_values, num_values_in_buffer_, + output_buffer_raw); sink_.Reset(); num_values_in_buffer_ = 0; return std::move(output_buffer);