Skip to content

Commit

Permalink
[CORE] Shrink compressed buffers in arrow and track LargeMemoryPool (f…
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 13, 2023
1 parent d14c7e7 commit 0fd70ff
Show file tree
Hide file tree
Showing 15 changed files with 359 additions and 282 deletions.
4 changes: 2 additions & 2 deletions cpp/core/memory/HbwAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ bool HbwMemoryAllocator::allocateZeroFilled(int64_t nmemb, int64_t size, void**
return true;
}

bool HbwMemoryAllocator::allocateAligned(uint16_t alignment, int64_t size, void** out) {
bool HbwMemoryAllocator::allocateAligned(uint64_t alignment, int64_t size, void** out) {
if (hbw_posix_memalign(out, alignment, size) != 0) {
return false;
}
Expand All @@ -60,7 +60,7 @@ bool HbwMemoryAllocator::reallocate(void* p, int64_t size, int64_t newSize, void
return true;
}

bool HbwMemoryAllocator::reallocateAligned(void* p, uint16_t alignment, int64_t size, int64_t newSize, void** out) {
bool HbwMemoryAllocator::reallocateAligned(void* p, uint64_t alignment, int64_t size, int64_t newSize, void** out) {
if (newSize <= 0) {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/memory/HbwAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class HbwMemoryAllocator final : public MemoryAllocator {

bool allocateZeroFilled(int64_t nmemb, int64_t size, void** out) override;

bool allocateAligned(uint16_t alignment, int64_t size, void** out) override;
bool allocateAligned(uint64_t alignment, int64_t size, void** out) override;

bool reallocate(void* p, int64_t size, int64_t newSize, void** out) override;

bool reallocateAligned(void* p, uint16_t alignment, int64_t size, int64_t newSize, void** out) override;
bool reallocateAligned(void* p, uint64_t alignment, int64_t size, int64_t newSize, void** out) override;

bool free(void* p, int64_t size) override;

Expand Down
8 changes: 4 additions & 4 deletions cpp/core/memory/MemoryAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ bool ListenableMemoryAllocator::allocateZeroFilled(int64_t nmemb, int64_t size,
return succeed;
}

bool ListenableMemoryAllocator::allocateAligned(uint16_t alignment, int64_t size, void** out) {
bool ListenableMemoryAllocator::allocateAligned(uint64_t alignment, int64_t size, void** out) {
listener_->allocationChanged(size);
bool succeed = delegated_->allocateAligned(alignment, size, out);
if (!succeed) {
Expand All @@ -72,7 +72,7 @@ bool ListenableMemoryAllocator::reallocate(void* p, int64_t size, int64_t newSiz

bool ListenableMemoryAllocator::reallocateAligned(
void* p,
uint16_t alignment,
uint64_t alignment,
int64_t size,
int64_t newSize,
void** out) {
Expand Down Expand Up @@ -126,7 +126,7 @@ bool StdMemoryAllocator::allocateZeroFilled(int64_t nmemb, int64_t size, void**
return true;
}

bool StdMemoryAllocator::allocateAligned(uint16_t alignment, int64_t size, void** out) {
bool StdMemoryAllocator::allocateAligned(uint64_t alignment, int64_t size, void** out) {
*out = aligned_alloc(alignment, size);
bytes_ += size;
return true;
Expand All @@ -138,7 +138,7 @@ bool StdMemoryAllocator::reallocate(void* p, int64_t size, int64_t newSize, void
return true;
}

bool StdMemoryAllocator::reallocateAligned(void* p, uint16_t alignment, int64_t size, int64_t newSize, void** out) {
bool StdMemoryAllocator::reallocateAligned(void* p, uint64_t alignment, int64_t size, int64_t newSize, void** out) {
if (newSize <= 0) {
return false;
}
Expand Down
12 changes: 6 additions & 6 deletions cpp/core/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class MemoryAllocator {

virtual bool allocate(int64_t size, void** out) = 0;
virtual bool allocateZeroFilled(int64_t nmemb, int64_t size, void** out) = 0;
virtual bool allocateAligned(uint16_t alignment, int64_t size, void** out) = 0;
virtual bool allocateAligned(uint64_t alignment, int64_t size, void** out) = 0;

virtual bool reallocate(void* p, int64_t size, int64_t newSize, void** out) = 0;
virtual bool reallocateAligned(void* p, uint16_t alignment, int64_t size, int64_t newSize, void** out) = 0;
virtual bool reallocateAligned(void* p, uint64_t alignment, int64_t size, int64_t newSize, void** out) = 0;

virtual bool free(void* p, int64_t size) = 0;

Expand Down Expand Up @@ -67,11 +67,11 @@ class ListenableMemoryAllocator final : public MemoryAllocator {

bool allocateZeroFilled(int64_t nmemb, int64_t size, void** out) override;

bool allocateAligned(uint16_t alignment, int64_t size, void** out) override;
bool allocateAligned(uint64_t alignment, int64_t size, void** out) override;

bool reallocate(void* p, int64_t size, int64_t newSize, void** out) override;

bool reallocateAligned(void* p, uint16_t alignment, int64_t size, int64_t newSize, void** out) override;
bool reallocateAligned(void* p, uint64_t alignment, int64_t size, int64_t newSize, void** out) override;

bool free(void* p, int64_t size) override;

Expand All @@ -93,11 +93,11 @@ class StdMemoryAllocator final : public MemoryAllocator {

bool allocateZeroFilled(int64_t nmemb, int64_t size, void** out) override;

bool allocateAligned(uint16_t alignment, int64_t size, void** out) override;
bool allocateAligned(uint64_t alignment, int64_t size, void** out) override;

bool reallocate(void* p, int64_t size, int64_t newSize, void** out) override;

bool reallocateAligned(void* p, uint16_t alignment, int64_t size, int64_t newSize, void** out) override;
bool reallocateAligned(void* p, uint64_t alignment, int64_t size, int64_t newSize, void** out) override;

bool free(void* p, int64_t size) override;

Expand Down
13 changes: 7 additions & 6 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,12 @@ arrow::Status PreferCachePartitionWriter::evictPartition(int32_t partitionId /*
return arrow::Status::OutOfMemory("Cannot evict partition ", partitionId, " because writer is stopped.");
}

if (shuffleWriter_->totalCachedPayloadSize() <= 0) {
return arrow::Status::OutOfMemory("No partition to evict.");
}

int64_t evictTime = 0;
TIME_NANO_START(evictTime)

ARROW_ASSIGN_OR_RAISE(auto spilledFile, createTempShuffleFile(nextSpilledFileDir()));
SpillInfo spillInfo = {spilledFile};
SpillInfo spillInfo{spilledFile};

// Spill all cached batches into one file, record their start and length.
ARROW_ASSIGN_OR_RAISE(auto spilledFileOs, arrow::io::FileOutputStream::Open(spilledFile, true));
for (auto pid = 0; pid < shuffleWriter_->numPartitions(); ++pid) {
Expand All @@ -283,11 +281,14 @@ arrow::Status PreferCachePartitionWriter::evictPartition(int32_t partitionId /*
}
}
RETURN_NOT_OK(spilledFileOs->Close());
spills_.push_back(std::move(spillInfo));

TIME_NANO_END(evictTime)
shuffleWriter_->setTotalEvictTime(shuffleWriter_->totalEvictTime() + evictTime);

if (!spillInfo.partitionSpillInfos.empty()) {
spills_.push_back(std::move(spillInfo));
}

return arrow::Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ set(VELOX_SRCS
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
operators/writer/VeloxParquetDatasource.cc
memory/LargeMemoryPool.cc
memory/VeloxMemoryPool.cc
memory/VeloxColumnarBatch.cc
utils/VeloxArrowUtils.cc
Expand Down
5 changes: 1 addition & 4 deletions cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,7 @@ class BenchmarkShuffleSplitIterateScanBenchmark : public BenchmarkShuffleSplit {
std::cout << schema_->ToString() << std::endl;

auto* pool = options.memory_pool.get();
auto ipcMemoryPool = std::make_shared<MMapMemoryPool>();
auto* shuffleWriterPtr = shuffleWriter.get();
ipcMemoryPool->SetSpillFunc(
[shuffleWriterPtr](int64_t size, int64_t* actual) { return shuffleWriterPtr->evictFixedSize(size, actual); });
auto ipcMemoryPool = std::make_shared<LargeMemoryPool>(pool);
options.ipc_write_options.memory_pool = ipcMemoryPool.get();
GLUTEN_ASSIGN_OR_THROW(
shuffleWriter,
Expand Down
149 changes: 149 additions & 0 deletions cpp/velox/memory/LargeMemoryPool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "LargeMemoryPool.h"
#include <arrow/util/logging.h>
#include <sys/mman.h>
#include "utils/macros.h"

#include <numeric>

namespace gluten {

arrow::Status LargeMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) {
if (size == 0) {
return delegated_->Allocate(0, alignment, out);
}
// make sure the size is cache line size aligned
size = ROUND_TO_LINE(size, alignment);
if (buffers_.empty() || size > buffers_.back().size - buffers_.back().allocated) {
// Allocate new page. Align to kHugePageSize.
uint64_t allocSize = size > kLargeBufferSize ? ROUND_TO_LINE(size, kHugePageSize) : kLargeBufferSize;

uint8_t* allocAddr;
RETURN_NOT_OK(doAlloc(allocSize, kHugePageSize, &allocAddr));
if (!allocAddr) {
return arrow::Status::Invalid("doAlloc failed.");
}
madvise(allocAddr, size, MADV_WILLNEED);
buffers_.push_back({allocAddr, allocAddr, allocSize, 0, 0});
}
auto& last = buffers_.back();
*out = last.startAddr + last.allocated;
last.lastAllocAddr = *out;
last.allocated += size;
return arrow::Status::OK();
}

void LargeMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) {
if (size == 0) {
return;
}
// make sure the size is cache line size aligned
size = ROUND_TO_LINE(size, alignment);

auto its = std::find_if(buffers_.begin(), buffers_.end(), [buffer](BufferAllocated& buf) {
return buffer >= buf.startAddr && buffer < buf.startAddr + buf.size;
});
ARROW_CHECK_NE(its, buffers_.end());
its->freed += size;
if (its->freed && its->freed == its->allocated) {
doFree(its->startAddr, its->size);
buffers_.erase(its);
}
}

arrow::Status LargeMemoryPool::Reallocate(int64_t oldSize, int64_t newSize, int64_t alignment, uint8_t** ptr) {
if (oldSize == 0) {
return arrow::Status::Invalid("Cannot call reallocated on oldSize == 0");
}
if (newSize == 0) {
return arrow::Status::Invalid("Cannot call reallocated on newSize == 0");
}
auto* oldPtr = *ptr;
auto& lastBuffer = buffers_.back();
if (!(oldPtr >= lastBuffer.lastAllocAddr && oldPtr < lastBuffer.startAddr + lastBuffer.size)) {
return arrow::Status::Invalid("Reallocate can only be called for the last buffer");
}

// shrink-to-fit
if (newSize <= oldSize) {
lastBuffer.allocated -= (oldSize - newSize);
return arrow::Status::OK();
}

if (newSize - oldSize > lastBuffer.size - lastBuffer.allocated) {
RETURN_NOT_OK(Allocate(newSize, alignment, ptr));
memcpy(*ptr, oldPtr, std::min(oldSize, newSize));
Free(oldPtr, oldSize, alignment);
} else {
lastBuffer.allocated += (newSize - oldSize);
}
return arrow::Status::OK();
}

int64_t LargeMemoryPool::bytes_allocated() const {
return std::accumulate(
buffers_.begin(), buffers_.end(), 0LL, [](uint64_t size, const BufferAllocated& buf) { return size + buf.size; });
}

int64_t LargeMemoryPool::max_memory() const {
return delegated_->max_memory();
}

std::string LargeMemoryPool::backend_name() const {
return "LargeMemoryPool";
}

int64_t LargeMemoryPool::total_bytes_allocated() const {
return delegated_->total_bytes_allocated();
}

int64_t LargeMemoryPool::num_allocations() const {
return delegated_->num_allocations();
}

arrow::Status LargeMemoryPool::doAlloc(int64_t size, int64_t alignment, uint8_t** out) {
return delegated_->Allocate(size, alignment, out);
}

void LargeMemoryPool::doFree(uint8_t* buffer, int64_t size) {
delegated_->Free(buffer, size);
}

LargeMemoryPool::~LargeMemoryPool() {
ARROW_CHECK(buffers_.size() == 0);
}

MMapMemoryPool::~MMapMemoryPool() {
ARROW_CHECK(buffers_.size() == 0);
}

arrow::Status MMapMemoryPool::doAlloc(int64_t size, int64_t alignment, uint8_t** out) {
*out = static_cast<uint8_t*>(mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
if (*out == MAP_FAILED) {
return arrow::Status::OutOfMemory(" mmap error ", size);
} else {
madvise(*out, size, MADV_WILLNEED);
return arrow::Status::OK();
}
}

void MMapMemoryPool::doFree(uint8_t* buffer, int64_t size) {
munmap((void*)(buffer), size);
}
} // namespace gluten
80 changes: 80 additions & 0 deletions cpp/velox/memory/LargeMemoryPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <arrow/memory_pool.h>

namespace gluten {

class LargeMemoryPool : public arrow::MemoryPool {
public:
constexpr static uint64_t kHugePageSize = 1 << 21;
constexpr static uint64_t kLargeBufferSize = 4 << 21;

explicit LargeMemoryPool(MemoryPool* pool) : delegated_(pool) {}

~LargeMemoryPool();

arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override;

void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;

arrow::Status Reallocate(int64_t oldSize, int64_t newSize, int64_t alignment, uint8_t** ptr) override;

int64_t bytes_allocated() const override;

int64_t max_memory() const override;

std::string backend_name() const override;

int64_t total_bytes_allocated() const override;

int64_t num_allocations() const override;

protected:
virtual arrow::Status doAlloc(int64_t size, int64_t alignment, uint8_t** out);

virtual void doFree(uint8_t* buffer, int64_t size);

struct BufferAllocated {
uint8_t* startAddr;
uint8_t* lastAllocAddr;
uint64_t size;
uint64_t allocated;
uint64_t freed;
};

std::vector<BufferAllocated> buffers_;

MemoryPool* delegated_;
};

// MMapMemoryPool can't be tracked by Spark. Currently only used for test purpose.
class MMapMemoryPool : public LargeMemoryPool {
public:
explicit MMapMemoryPool() : LargeMemoryPool(arrow::default_memory_pool()) {}

~MMapMemoryPool() override;

protected:
arrow::Status doAlloc(int64_t size, int64_t alignment, uint8_t** out) override;

void doFree(uint8_t* buffer, int64_t size) override;
};

} // namespace gluten

0 comments on commit 0fd70ff

Please sign in to comment.