From 429d06017882e89a1aa143cf5f91d25ac2b141ef Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 20 Mar 2024 18:50:46 +0100 Subject: [PATCH] WIP --- cpp/src/arrow/io/file_test.cc | 5 ++ cpp/src/arrow/memory_pool.cc | 91 +++++++++++++++++++++++++ cpp/src/arrow/memory_pool.h | 10 +++ cpp/src/arrow/memory_pool_benchmark.cc | 93 ++++++++++++++++++++++++++ cpp/src/arrow/memory_pool_internal.h | 1 + cpp/src/arrow/memory_pool_jemalloc.cc | 11 +++ cpp/src/arrow/memory_pool_test.cc | 35 +++++----- cpp/src/arrow/memory_pool_test.h | 26 +++++++ cpp/src/arrow/stl_allocator.h | 29 +++++--- 9 files changed, 274 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index e7e7ba949c9fd..c099500061cf8 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -461,6 +461,11 @@ class MyMemoryPool : public MemoryPool { return Status::OK(); } + Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) override { + return Reallocate(old_size, new_size, alignment, ptr); + } + int64_t bytes_allocated() const override { return -1; } int64_t total_bytes_allocated() const override { return -1; } diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index d58c203d2ae27..2f416602237d2 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -261,6 +261,27 @@ class DebugAllocator { return Status::OK(); } + static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) { + CheckAllocatedArea(ptr, old_size, "in-place expanding"); + if (old_size == 0 || new_size == 0) { + // Cannot expand + return false; + } + auto maybe_raw_new_size = RawSize(new_size); + if (!maybe_raw_new_size.ok()) { + return false; + } + int64_t raw_new_size = *maybe_raw_new_size; + DCHECK(raw_new_size > new_size) + << "bug in raw size computation: " << raw_new_size << " for size " << new_size; + bool success = + WrappedAllocator::ResizeInPlace(old_size + kOverhead, raw_new_size, ptr); + if (success) { + InitAllocatedArea(ptr, new_size); + } + return success; + } + static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) { CheckAllocatedArea(ptr, size, "deallocation"); if (ptr != memory_pool::internal::kZeroSizeArea) { @@ -363,6 +384,11 @@ class SystemAllocator { return Status::OK(); } + static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) { + // No standard C API for this + return false; + } + static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) { if (ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(size, 0); @@ -425,6 +451,14 @@ class MimallocAllocator { return Status::OK(); } + static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) { + if (old_size == 0 || new_size == 0) { + // Cannot resize + return false; + } + return mi_expand(ptr, static_cast(new_size)) != nullptr; + } + static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) { if (ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(size, 0); @@ -498,6 +532,43 @@ class BaseMemoryPoolImpl : public MemoryPool { return Status::OK(); } + Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) override { + if (new_size == old_size) { + return Status::OK(); + } + if (new_size < 0) { + return Status::Invalid("negative realloc size"); + } + if (static_cast(new_size) >= std::numeric_limits::max()) { + return Status::OutOfMemory("realloc overflows size_t"); + } + // First try resizing in place + if (!Allocator::ResizeInPlace(old_size, new_size, *ptr)) { + // TODO comment + if (std::max(old_size, new_size) >= 32 * 1024) { + // Deallocate then allocate (faster than copying data?) + Allocator::DeallocateAligned(*ptr, old_size, alignment); + RETURN_NOT_OK(Allocator::AllocateAligned(new_size, alignment, ptr)); + } else { + RETURN_NOT_OK(Allocator::ReallocateAligned(old_size, new_size, alignment, ptr)); + } + } +#ifndef NDEBUG + // Poison data + if (new_size > 0) { + DCHECK_NE(*ptr, nullptr); + if (new_size > old_size) { + (*ptr)[old_size] = kReallocPoison; + } + (*ptr)[new_size - 1] = kReallocPoison; + } +#endif + + stats_.UpdateAllocatedBytes(new_size - old_size); + return Status::OK(); + } + void Free(uint8_t* buffer, int64_t size, int64_t alignment) override { #ifndef NDEBUG // Poison data @@ -721,6 +792,14 @@ Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size, return s; } +Status LoggingMemoryPool::ReallocateNoCopy(int64_t old_size, int64_t new_size, + int64_t alignment, uint8_t** ptr) { + Status s = pool_->ReallocateNoCopy(old_size, new_size, ptr); + std::cout << "ReallocateNoCopy: old_size = " << old_size << ", new_size = " << new_size + << ", alignment = " << alignment << std::endl; + return s; +} + void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) { pool_->Free(buffer, size, alignment); std::cout << "Free: size = " << size << ", alignment = " << alignment << std::endl; @@ -772,6 +851,13 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl { return Status::OK(); } + Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) { + RETURN_NOT_OK(pool_->ReallocateNoCopy(old_size, new_size, alignment, ptr)); + stats_.UpdateAllocatedBytes(new_size - old_size); + return Status::OK(); + } + void Free(uint8_t* buffer, int64_t size, int64_t alignment) { pool_->Free(buffer, size, alignment); stats_.UpdateAllocatedBytes(-size, /*is_free=*/true); @@ -807,6 +893,11 @@ Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t a return impl_->Reallocate(old_size, new_size, alignment, ptr); } +Status ProxyMemoryPool::ReallocateNoCopy(int64_t old_size, int64_t new_size, + int64_t alignment, uint8_t** ptr) { + return impl_->ReallocateNoCopy(old_size, new_size, alignment, ptr); +} + void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) { return impl_->Free(buffer, size, alignment); } diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 712a828041c76..847a828f197a6 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -108,6 +108,12 @@ class ARROW_EXPORT MemoryPool { return Reallocate(old_size, new_size, kDefaultBufferAlignment, ptr); } + virtual Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) = 0; + Status ReallocateNoCopy(int64_t old_size, int64_t new_size, uint8_t** ptr) { + return ReallocateNoCopy(old_size, new_size, kDefaultBufferAlignment, ptr); + } + /// Free an allocated region. /// /// @param buffer Pointer to the start of the allocated memory region @@ -162,6 +168,8 @@ class ARROW_EXPORT LoggingMemoryPool : public MemoryPool { Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override; Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override; + Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) override; void Free(uint8_t* buffer, int64_t size, int64_t alignment) override; int64_t bytes_allocated() const override; @@ -194,6 +202,8 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool { Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override; Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override; + Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment, + uint8_t** ptr) override; void Free(uint8_t* buffer, int64_t size, int64_t alignment) override; int64_t bytes_allocated() const override; diff --git a/cpp/src/arrow/memory_pool_benchmark.cc b/cpp/src/arrow/memory_pool_benchmark.cc index fe7a3dd2f8ee0..c260bb7af5ffd 100644 --- a/cpp/src/arrow/memory_pool_benchmark.cc +++ b/cpp/src/arrow/memory_pool_benchmark.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/config.h" // for ARROW_JEMALLOC, ARROW_MIMALLOC #include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/util/logging.h" @@ -114,6 +115,76 @@ static void AllocateTouchDeallocate( state.SetBytesProcessed(state.iterations() * nbytes); } +template +static void BenchmarkReallocateGrowing(benchmark::State& state) { + // 256 kiB: typical max size for a scratch space (L2-sized) + const int64_t max_size = 256 << 10; + // 4 kiB: typical increment when resizing a scratch space + const int64_t increment = 4096; + MemoryPool* pool = *Alloc::GetAllocator(); + int64_t nb_reallocs = 0; + + for (auto _ : state) { + uint8_t* data; + int64_t size = 0; + ARROW_CHECK_OK(pool->Allocate(size, &data)); + for (; size < max_size; size += increment) { + if constexpr (Copy) { + ARROW_CHECK_OK(pool->Reallocate(size - increment, size, &data)); + } else { + ARROW_CHECK_OK(pool->ReallocateNoCopy(size - increment, size, &data)); + } + ++nb_reallocs; + } + pool->Free(data, size - increment); + } + state.SetItemsProcessed(nb_reallocs); +} + +template +static void ReallocateGrowing(benchmark::State& state) { + BenchmarkReallocateGrowing(state); +} + +template +static void ReallocateGrowingNoCopy(benchmark::State& state) { + BenchmarkReallocateGrowing(state); +} + +template +static void BenchmarkReallocateShrinking(benchmark::State& state) { + const int64_t max_size = 256 << 10; // 256 kiB + const int64_t increment = 4096; + MemoryPool* pool = *Alloc::GetAllocator(); + int64_t nb_reallocs = 0; + + for (auto _ : state) { + uint8_t* data; + int64_t size = max_size; + ARROW_CHECK_OK(pool->Allocate(size, &data)); + for (; size >= 0; size -= increment) { + if constexpr (Copy) { + ARROW_CHECK_OK(pool->Reallocate(size + increment, size, &data)); + } else { + ARROW_CHECK_OK(pool->ReallocateNoCopy(size + increment, size, &data)); + } + ++nb_reallocs; + } + pool->Free(data, size + increment); + } + state.SetItemsProcessed(nb_reallocs); +} + +template +static void ReallocateShrinking(benchmark::State& state) { + BenchmarkReallocateShrinking(state); +} + +template +static void ReallocateShrinkingNoCopy(benchmark::State& state) { + BenchmarkReallocateShrinking(state); +} + #define BENCHMARK_ALLOCATE_ARGS \ ->RangeMultiplier(16)->Range(4096, 16 * 1024 * 1024)->ArgName("size")->UseRealTime() @@ -135,4 +206,26 @@ BENCHMARK_ALLOCATE(AllocateDeallocate, Mimalloc); BENCHMARK_ALLOCATE(AllocateTouchDeallocate, Mimalloc); #endif +BENCHMARK_TEMPLATE(ReallocateGrowing, SystemAlloc); +BENCHMARK_TEMPLATE(ReallocateGrowingNoCopy, SystemAlloc); +#ifdef ARROW_JEMALLOC +BENCHMARK_TEMPLATE(ReallocateGrowing, Jemalloc); +BENCHMARK_TEMPLATE(ReallocateGrowingNoCopy, Jemalloc); +#endif +#ifdef ARROW_MIMALLOC +BENCHMARK_TEMPLATE(ReallocateGrowing, Mimalloc); +BENCHMARK_TEMPLATE(ReallocateGrowingNoCopy, Mimalloc); +#endif + +BENCHMARK_TEMPLATE(ReallocateShrinking, SystemAlloc); +BENCHMARK_TEMPLATE(ReallocateShrinkingNoCopy, SystemAlloc); +#ifdef ARROW_JEMALLOC +BENCHMARK_TEMPLATE(ReallocateShrinking, Jemalloc); +BENCHMARK_TEMPLATE(ReallocateShrinkingNoCopy, Jemalloc); +#endif +#ifdef ARROW_MIMALLOC +BENCHMARK_TEMPLATE(ReallocateShrinking, Mimalloc); +BENCHMARK_TEMPLATE(ReallocateShrinkingNoCopy, Mimalloc); +#endif + } // namespace arrow diff --git a/cpp/src/arrow/memory_pool_internal.h b/cpp/src/arrow/memory_pool_internal.h index 01500b3c1eae1..e7e522ce937c5 100644 --- a/cpp/src/arrow/memory_pool_internal.h +++ b/cpp/src/arrow/memory_pool_internal.h @@ -42,6 +42,7 @@ class JemallocAllocator { static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out); static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr); + static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr); static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment); static void ReleaseUnused(); }; diff --git a/cpp/src/arrow/memory_pool_jemalloc.cc b/cpp/src/arrow/memory_pool_jemalloc.cc index 24bc0f27f0912..a211879b1bfc9 100644 --- a/cpp/src/arrow/memory_pool_jemalloc.cc +++ b/cpp/src/arrow/memory_pool_jemalloc.cc @@ -118,6 +118,17 @@ Status JemallocAllocator::ReallocateAligned(int64_t old_size, int64_t new_size, return Status::OK(); } +bool JemallocAllocator::ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) { + if (old_size == 0 || new_size == 0) { + // Cannot resize + return false; + } + // No need to pass any alignment since this doesn't move the base pointer + int64_t got_size = static_cast(xallocx(ptr, static_cast(new_size), + /*extra=*/0, /*flags=*/0)); + return got_size == new_size; +} + void JemallocAllocator::DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) { if (ptr == kZeroSizeArea) { DCHECK_EQ(size, 0); diff --git a/cpp/src/arrow/memory_pool_test.cc b/cpp/src/arrow/memory_pool_test.cc index 81d9d69ba346d..3f596c8defc70 100644 --- a/cpp/src/arrow/memory_pool_test.cc +++ b/cpp/src/arrow/memory_pool_test.cc @@ -63,32 +63,33 @@ class TestMemoryPool : public ::arrow::TestMemoryPoolBase { MemoryPool* memory_pool() override { return Factory::memory_pool(); } }; -TYPED_TEST_SUITE_P(TestMemoryPool); +using MemoryPoolFactories = + ::testing::Types; + +TYPED_TEST_SUITE(TestMemoryPool, MemoryPoolFactories); -TYPED_TEST_P(TestMemoryPool, MemoryTracking) { this->TestMemoryTracking(); } +TYPED_TEST(TestMemoryPool, MemoryTracking) { this->TestMemoryTracking(); } -TYPED_TEST_P(TestMemoryPool, OOM) { +TYPED_TEST(TestMemoryPool, OOM) { #ifndef ADDRESS_SANITIZER this->TestOOM(); #endif } -TYPED_TEST_P(TestMemoryPool, Reallocate) { this->TestReallocate(); } - -TYPED_TEST_P(TestMemoryPool, Alignment) { this->TestAlignment(); } +TYPED_TEST(TestMemoryPool, Reallocate) { this->TestReallocate(); } -REGISTER_TYPED_TEST_SUITE_P(TestMemoryPool, MemoryTracking, OOM, Reallocate, Alignment); +TYPED_TEST(TestMemoryPool, ReallocateNoCopy) { this->TestReallocateNoCopy(); } -INSTANTIATE_TYPED_TEST_SUITE_P(Default, TestMemoryPool, DefaultMemoryPoolFactory); -INSTANTIATE_TYPED_TEST_SUITE_P(System, TestMemoryPool, SystemMemoryPoolFactory); - -#ifdef ARROW_JEMALLOC -INSTANTIATE_TYPED_TEST_SUITE_P(Jemalloc, TestMemoryPool, JemallocMemoryPoolFactory); -#endif - -#ifdef ARROW_MIMALLOC -INSTANTIATE_TYPED_TEST_SUITE_P(Mimalloc, TestMemoryPool, MimallocMemoryPoolFactory); -#endif +TYPED_TEST(TestMemoryPool, Alignment) { this->TestAlignment(); } TEST(DefaultMemoryPool, Identity) { // The default memory pool is pointer-identical to one of the backend-specific pools. diff --git a/cpp/src/arrow/memory_pool_test.h b/cpp/src/arrow/memory_pool_test.h index e4a07099f830f..4118aff0e9628 100644 --- a/cpp/src/arrow/memory_pool_test.h +++ b/cpp/src/arrow/memory_pool_test.h @@ -89,6 +89,32 @@ class TestMemoryPoolBase : public ::testing::Test { ASSERT_EQ(0, pool->bytes_allocated()); } + void TestReallocateNoCopy() { + auto pool = memory_pool(); + + uint8_t* data; + ASSERT_OK(pool->Allocate(10, &data)); + ASSERT_EQ(10, pool->bytes_allocated()); + data[0] = 35; + data[9] = 12; + + // Small expansion, likely to be in place + ASSERT_OK(pool->ReallocateNoCopy(10, 11, &data)); + ASSERT_EQ(11, pool->bytes_allocated()); + // Larger expansion, unlikely to be in place + ASSERT_OK(pool->ReallocateNoCopy(11, 200, &data)); + ASSERT_EQ(200, pool->bytes_allocated()); + // Small shrink, likely to be in place + ASSERT_OK(pool->ReallocateNoCopy(200, 196, &data)); + ASSERT_EQ(196, pool->bytes_allocated()); + // Larger shrink, unlikely to be in place + ASSERT_OK(pool->ReallocateNoCopy(196, 13, &data)); + ASSERT_EQ(13, pool->bytes_allocated()); + // Free + pool->Free(data, 13); + ASSERT_EQ(0, pool->bytes_allocated()); + } + void TestAlignment() { auto pool = memory_pool(); { diff --git a/cpp/src/arrow/stl_allocator.h b/cpp/src/arrow/stl_allocator.h index a1f4ae9feb82b..3143ec042d01b 100644 --- a/cpp/src/arrow/stl_allocator.h +++ b/cpp/src/arrow/stl_allocator.h @@ -116,16 +116,12 @@ class STLMemoryPool : public MemoryPool { Status Reallocate(int64_t old_size, int64_t new_size, int64_t /*alignment*/, uint8_t** ptr) override { - uint8_t* old_ptr = *ptr; - try { - *ptr = alloc_.allocate(new_size); - } catch (std::bad_alloc& e) { - return Status::OutOfMemory(e.what()); - } - memcpy(*ptr, old_ptr, std::min(old_size, new_size)); - alloc_.deallocate(old_ptr, old_size); - stats_.UpdateAllocatedBytes(new_size - old_size); - return Status::OK(); + return DoReallocate(old_size, new_size, ptr); + } + + Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t /*alignment*/, + uint8_t** ptr) override { + return DoReallocate(old_size, new_size, ptr); } void Free(uint8_t* buffer, int64_t size, int64_t /*alignment*/) override { @@ -146,6 +142,19 @@ class STLMemoryPool : public MemoryPool { std::string backend_name() const override { return "stl"; } private: + Status DoReallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { + uint8_t* old_ptr = *ptr; + try { + *ptr = alloc_.allocate(new_size); + } catch (std::bad_alloc& e) { + return Status::OutOfMemory(e.what()); + } + memcpy(*ptr, old_ptr, std::min(old_size, new_size)); + alloc_.deallocate(old_ptr, old_size); + stats_.UpdateAllocatedBytes(new_size - old_size); + return Status::OK(); + } + Allocator alloc_; arrow::internal::MemoryPoolStats stats_; };