Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Mar 20, 2024
1 parent b6c4fbc commit 429d060
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 27 deletions.
5 changes: 5 additions & 0 deletions cpp/src/arrow/io/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ class MyMemoryPool : public MemoryPool {
return Status::OK();
}

Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override {
return Reallocate(old_size, new_size, alignment, ptr);
}

int64_t bytes_allocated() const override { return -1; }

int64_t total_bytes_allocated() const override { return -1; }
Expand Down
91 changes: 91 additions & 0 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,27 @@ class DebugAllocator {
return Status::OK();
}

static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) {
CheckAllocatedArea(ptr, old_size, "in-place expanding");
if (old_size == 0 || new_size == 0) {
// Cannot expand
return false;
}
auto maybe_raw_new_size = RawSize(new_size);
if (!maybe_raw_new_size.ok()) {
return false;
}
int64_t raw_new_size = *maybe_raw_new_size;
DCHECK(raw_new_size > new_size)
<< "bug in raw size computation: " << raw_new_size << " for size " << new_size;
bool success =
WrappedAllocator::ResizeInPlace(old_size + kOverhead, raw_new_size, ptr);
if (success) {
InitAllocatedArea(ptr, new_size);
}
return success;
}

static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) {
CheckAllocatedArea(ptr, size, "deallocation");
if (ptr != memory_pool::internal::kZeroSizeArea) {
Expand Down Expand Up @@ -363,6 +384,11 @@ class SystemAllocator {
return Status::OK();
}

static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) {
// No standard C API for this
return false;
}

static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) {
if (ptr == memory_pool::internal::kZeroSizeArea) {
DCHECK_EQ(size, 0);
Expand Down Expand Up @@ -425,6 +451,14 @@ class MimallocAllocator {
return Status::OK();
}

static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) {
if (old_size == 0 || new_size == 0) {
// Cannot resize
return false;
}
return mi_expand(ptr, static_cast<size_t>(new_size)) != nullptr;
}

static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) {
if (ptr == memory_pool::internal::kZeroSizeArea) {
DCHECK_EQ(size, 0);
Expand Down Expand Up @@ -498,6 +532,43 @@ class BaseMemoryPoolImpl : public MemoryPool {
return Status::OK();
}

Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override {
if (new_size == old_size) {
return Status::OK();
}
if (new_size < 0) {
return Status::Invalid("negative realloc size");
}
if (static_cast<uint64_t>(new_size) >= std::numeric_limits<size_t>::max()) {
return Status::OutOfMemory("realloc overflows size_t");
}
// First try resizing in place
if (!Allocator::ResizeInPlace(old_size, new_size, *ptr)) {
// TODO comment
if (std::max(old_size, new_size) >= 32 * 1024) {
// Deallocate then allocate (faster than copying data?)
Allocator::DeallocateAligned(*ptr, old_size, alignment);
RETURN_NOT_OK(Allocator::AllocateAligned(new_size, alignment, ptr));
} else {
RETURN_NOT_OK(Allocator::ReallocateAligned(old_size, new_size, alignment, ptr));
}
}
#ifndef NDEBUG
// Poison data
if (new_size > 0) {
DCHECK_NE(*ptr, nullptr);
if (new_size > old_size) {
(*ptr)[old_size] = kReallocPoison;
}
(*ptr)[new_size - 1] = kReallocPoison;
}
#endif

stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
#ifndef NDEBUG
// Poison data
Expand Down Expand Up @@ -721,6 +792,14 @@ Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size,
return s;
}

Status LoggingMemoryPool::ReallocateNoCopy(int64_t old_size, int64_t new_size,
int64_t alignment, uint8_t** ptr) {
Status s = pool_->ReallocateNoCopy(old_size, new_size, ptr);
std::cout << "ReallocateNoCopy: old_size = " << old_size << ", new_size = " << new_size
<< ", alignment = " << alignment << std::endl;
return s;
}

void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) {
pool_->Free(buffer, size, alignment);
std::cout << "Free: size = " << size << ", alignment = " << alignment << std::endl;
Expand Down Expand Up @@ -772,6 +851,13 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl {
return Status::OK();
}

Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
RETURN_NOT_OK(pool_->ReallocateNoCopy(old_size, new_size, alignment, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t alignment) {
pool_->Free(buffer, size, alignment);
stats_.UpdateAllocatedBytes(-size, /*is_free=*/true);
Expand Down Expand Up @@ -807,6 +893,11 @@ Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t a
return impl_->Reallocate(old_size, new_size, alignment, ptr);
}

Status ProxyMemoryPool::ReallocateNoCopy(int64_t old_size, int64_t new_size,
int64_t alignment, uint8_t** ptr) {
return impl_->ReallocateNoCopy(old_size, new_size, alignment, ptr);
}

void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) {
return impl_->Free(buffer, size, alignment);
}
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class ARROW_EXPORT MemoryPool {
return Reallocate(old_size, new_size, kDefaultBufferAlignment, ptr);
}

virtual Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) = 0;
Status ReallocateNoCopy(int64_t old_size, int64_t new_size, uint8_t** ptr) {
return ReallocateNoCopy(old_size, new_size, kDefaultBufferAlignment, ptr);
}

/// Free an allocated region.
///
/// @param buffer Pointer to the start of the allocated memory region
Expand Down Expand Up @@ -162,6 +168,8 @@ class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override;
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override;
Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override;
void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;

int64_t bytes_allocated() const override;
Expand Down Expand Up @@ -194,6 +202,8 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override;
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override;
Status ReallocateNoCopy(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override;
void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;

int64_t bytes_allocated() const override;
Expand Down
93 changes: 93 additions & 0 deletions cpp/src/arrow/memory_pool_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/config.h" // for ARROW_JEMALLOC, ARROW_MIMALLOC
#include "arrow/memory_pool.h"
#include "arrow/result.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -114,6 +115,76 @@ static void AllocateTouchDeallocate(
state.SetBytesProcessed(state.iterations() * nbytes);
}

template <typename Alloc, bool Copy>
static void BenchmarkReallocateGrowing(benchmark::State& state) {
// 256 kiB: typical max size for a scratch space (L2-sized)
const int64_t max_size = 256 << 10;
// 4 kiB: typical increment when resizing a scratch space
const int64_t increment = 4096;
MemoryPool* pool = *Alloc::GetAllocator();
int64_t nb_reallocs = 0;

for (auto _ : state) {
uint8_t* data;
int64_t size = 0;
ARROW_CHECK_OK(pool->Allocate(size, &data));
for (; size < max_size; size += increment) {
if constexpr (Copy) {
ARROW_CHECK_OK(pool->Reallocate(size - increment, size, &data));
} else {
ARROW_CHECK_OK(pool->ReallocateNoCopy(size - increment, size, &data));
}
++nb_reallocs;
}
pool->Free(data, size - increment);
}
state.SetItemsProcessed(nb_reallocs);
}

template <typename Alloc>
static void ReallocateGrowing(benchmark::State& state) {
BenchmarkReallocateGrowing<Alloc, /*Copy=*/true>(state);
}

template <typename Alloc>
static void ReallocateGrowingNoCopy(benchmark::State& state) {
BenchmarkReallocateGrowing<Alloc, /*Copy=*/false>(state);
}

template <typename Alloc, bool Copy>
static void BenchmarkReallocateShrinking(benchmark::State& state) {
const int64_t max_size = 256 << 10; // 256 kiB
const int64_t increment = 4096;
MemoryPool* pool = *Alloc::GetAllocator();
int64_t nb_reallocs = 0;

for (auto _ : state) {
uint8_t* data;
int64_t size = max_size;
ARROW_CHECK_OK(pool->Allocate(size, &data));
for (; size >= 0; size -= increment) {
if constexpr (Copy) {
ARROW_CHECK_OK(pool->Reallocate(size + increment, size, &data));
} else {
ARROW_CHECK_OK(pool->ReallocateNoCopy(size + increment, size, &data));
}
++nb_reallocs;
}
pool->Free(data, size + increment);
}
state.SetItemsProcessed(nb_reallocs);
}

template <typename Alloc>
static void ReallocateShrinking(benchmark::State& state) {
BenchmarkReallocateShrinking<Alloc, /*Copy=*/true>(state);
}

template <typename Alloc>
static void ReallocateShrinkingNoCopy(benchmark::State& state) {
BenchmarkReallocateShrinking<Alloc, /*Copy=*/false>(state);
}

#define BENCHMARK_ALLOCATE_ARGS \
->RangeMultiplier(16)->Range(4096, 16 * 1024 * 1024)->ArgName("size")->UseRealTime()

Expand All @@ -135,4 +206,26 @@ BENCHMARK_ALLOCATE(AllocateDeallocate, Mimalloc);
BENCHMARK_ALLOCATE(AllocateTouchDeallocate, Mimalloc);
#endif

BENCHMARK_TEMPLATE(ReallocateGrowing, SystemAlloc);
BENCHMARK_TEMPLATE(ReallocateGrowingNoCopy, SystemAlloc);
#ifdef ARROW_JEMALLOC
BENCHMARK_TEMPLATE(ReallocateGrowing, Jemalloc);
BENCHMARK_TEMPLATE(ReallocateGrowingNoCopy, Jemalloc);
#endif
#ifdef ARROW_MIMALLOC
BENCHMARK_TEMPLATE(ReallocateGrowing, Mimalloc);
BENCHMARK_TEMPLATE(ReallocateGrowingNoCopy, Mimalloc);
#endif

BENCHMARK_TEMPLATE(ReallocateShrinking, SystemAlloc);
BENCHMARK_TEMPLATE(ReallocateShrinkingNoCopy, SystemAlloc);
#ifdef ARROW_JEMALLOC
BENCHMARK_TEMPLATE(ReallocateShrinking, Jemalloc);
BENCHMARK_TEMPLATE(ReallocateShrinkingNoCopy, Jemalloc);
#endif
#ifdef ARROW_MIMALLOC
BENCHMARK_TEMPLATE(ReallocateShrinking, Mimalloc);
BENCHMARK_TEMPLATE(ReallocateShrinkingNoCopy, Mimalloc);
#endif

} // namespace arrow
1 change: 1 addition & 0 deletions cpp/src/arrow/memory_pool_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class JemallocAllocator {
static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out);
static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr);
static bool ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr);
static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment);
static void ReleaseUnused();
};
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/memory_pool_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ Status JemallocAllocator::ReallocateAligned(int64_t old_size, int64_t new_size,
return Status::OK();
}

bool JemallocAllocator::ResizeInPlace(int64_t old_size, int64_t new_size, uint8_t* ptr) {
if (old_size == 0 || new_size == 0) {
// Cannot resize
return false;
}
// No need to pass any alignment since this doesn't move the base pointer
int64_t got_size = static_cast<int64_t>(xallocx(ptr, static_cast<size_t>(new_size),
/*extra=*/0, /*flags=*/0));
return got_size == new_size;
}

void JemallocAllocator::DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) {
if (ptr == kZeroSizeArea) {
DCHECK_EQ(size, 0);
Expand Down
35 changes: 18 additions & 17 deletions cpp/src/arrow/memory_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,33 @@ class TestMemoryPool : public ::arrow::TestMemoryPoolBase {
MemoryPool* memory_pool() override { return Factory::memory_pool(); }
};

TYPED_TEST_SUITE_P(TestMemoryPool);
using MemoryPoolFactories =
::testing::Types<DefaultMemoryPoolFactory, SystemMemoryPoolFactory
#ifdef ARROW_JEMALLOC
,
JemallocMemoryPoolFactory
#endif
#ifdef ARROW_MIMALLOC
,
MimallocMemoryPoolFactory
#endif
>;

TYPED_TEST_SUITE(TestMemoryPool, MemoryPoolFactories);

TYPED_TEST_P(TestMemoryPool, MemoryTracking) { this->TestMemoryTracking(); }
TYPED_TEST(TestMemoryPool, MemoryTracking) { this->TestMemoryTracking(); }

TYPED_TEST_P(TestMemoryPool, OOM) {
TYPED_TEST(TestMemoryPool, OOM) {
#ifndef ADDRESS_SANITIZER
this->TestOOM();
#endif
}

TYPED_TEST_P(TestMemoryPool, Reallocate) { this->TestReallocate(); }

TYPED_TEST_P(TestMemoryPool, Alignment) { this->TestAlignment(); }
TYPED_TEST(TestMemoryPool, Reallocate) { this->TestReallocate(); }

REGISTER_TYPED_TEST_SUITE_P(TestMemoryPool, MemoryTracking, OOM, Reallocate, Alignment);
TYPED_TEST(TestMemoryPool, ReallocateNoCopy) { this->TestReallocateNoCopy(); }

INSTANTIATE_TYPED_TEST_SUITE_P(Default, TestMemoryPool, DefaultMemoryPoolFactory);
INSTANTIATE_TYPED_TEST_SUITE_P(System, TestMemoryPool, SystemMemoryPoolFactory);

#ifdef ARROW_JEMALLOC
INSTANTIATE_TYPED_TEST_SUITE_P(Jemalloc, TestMemoryPool, JemallocMemoryPoolFactory);
#endif

#ifdef ARROW_MIMALLOC
INSTANTIATE_TYPED_TEST_SUITE_P(Mimalloc, TestMemoryPool, MimallocMemoryPoolFactory);
#endif
TYPED_TEST(TestMemoryPool, Alignment) { this->TestAlignment(); }

TEST(DefaultMemoryPool, Identity) {
// The default memory pool is pointer-identical to one of the backend-specific pools.
Expand Down
Loading

0 comments on commit 429d060

Please sign in to comment.