From d8413427d71d05a905a098fc0a1064d4c041ade9 Mon Sep 17 00:00:00 2001 From: Jonas Hahnfeld Date: Fri, 14 Nov 2025 10:03:22 +0100 Subject: [PATCH 1/2] [ntuple] Delete move of RPageSinkBuf Created tasks reference *this, so moving is not safe. It's also not needed because RPageSinkBuf is always inside a std::unique_ptr. --- tree/ntuple/inc/ROOT/RPageSinkBuf.hxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx b/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx index 19a5d47b3528c..190117f58bd67 100644 --- a/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx +++ b/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx @@ -123,8 +123,8 @@ public: explicit RPageSinkBuf(std::unique_ptr inner); RPageSinkBuf(const RPageSinkBuf&) = delete; RPageSinkBuf& operator=(const RPageSinkBuf&) = delete; - RPageSinkBuf(RPageSinkBuf&&) = default; - RPageSinkBuf& operator=(RPageSinkBuf&&) = default; + RPageSinkBuf(RPageSinkBuf &&) = delete; + RPageSinkBuf &operator=(RPageSinkBuf &&) = delete; ~RPageSinkBuf() override; ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final; From e896cd5555fe80972d1214e533678a280d0b50e9 Mon Sep 17 00:00:00 2001 From: Jonas Hahnfeld Date: Fri, 14 Nov 2025 10:04:34 +0100 Subject: [PATCH 2/2] [ntuple] Reduce memory usage of RPageSinkBuf When IMT is turned on and RPageSinkBuf has an RTaskScheduler, we would previously buffer all pages and create tasks to seal / compress them. While this exposes the maximum work, it's a waste of memory if other threads are not fast enough to process the tasks. Heuristically assume that there is enough work if we already buffer more uncompressed bytes than the approximate zipped cluster size. In a small test, writing random data with ROOT::EnableImplicitMT(1) and therefore no extra worker thread, the application used 500 MB before this change for the default cluster size of 128 MiB. After this change, memory usage is reduced to around 430 MB (compared to a memory usage of 360 MB without IMT). The compression factor is around ~2.1x in this case, which roughly checks out: Instead of buffering the full uncompressed cluster (which is around compression factor * zipped cluster size = 270 MiB), we now buffer uncompressed pages up to the approximate zipped cluster size (128 MiB) and then start compressing pages immediately. The result of course also needs to be buffered, but is much smaller after compression: ((1 - 1 / compression factor) * zipped cluster size = 67 MiB). Accordingly, the gain will be higher for larger compression factors. --- tree/ntuple/inc/ROOT/RPageSinkBuf.hxx | 4 ++++ tree/ntuple/src/RPageSinkBuf.cxx | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx b/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx index 190117f58bd67..b9465ff252b36 100644 --- a/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx +++ b/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx @@ -19,6 +19,8 @@ #include #include +#include +#include #include #include #include @@ -109,6 +111,8 @@ private: /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink. /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter. std::unique_ptr fInnerModel; + /// The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage. + std::atomic fBufferedUncompressed = 0; /// Vector of buffered column pages. Indexed by column id. std::vector fBufferedColumns; /// Columns committed as suppressed are stored and passed to the inner sink at cluster commit diff --git a/tree/ntuple/src/RPageSinkBuf.cxx b/tree/ntuple/src/RPageSinkBuf.cxx index 9253dac701acb..13caad87dd2e6 100644 --- a/tree/ntuple/src/RPageSinkBuf.cxx +++ b/tree/ntuple/src/RPageSinkBuf.cxx @@ -175,7 +175,13 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const } }; - if (!fTaskScheduler) { + // If we already buffer more uncompressed bytes than the approximate zipped cluster size, we assume there is enough + // work for other threads to pick up. This limits the buffer usage when sealing / compression tasks are not processed + // fast enough, and heuristically reduces the memory usage, especially for big compression factors. + std::size_t bufferedUncompressed = fBufferedUncompressed.load(); + bool enoughWork = bufferedUncompressed > GetWriteOptions().GetApproxZippedClusterSize(); + + if (!fTaskScheduler || enoughWork) { allocateBuf(); // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased. RSealPageConfig config; @@ -194,16 +200,25 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const return; } + // We will buffer the uncompressed page. Unless work is consumed fast enough, the next page might be compressed + // directly. + fBufferedUncompressed += page.GetNBytes(); + // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements()); // make sure the page is aware of how many elements it will have zipItem.fPage.GrowUnchecked(page.GetNElements()); + assert(zipItem.fPage.GetNBytes() == page.GetNBytes()); memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes()); fCounters->fParallelZip.SetValue(1); // Thread safety: Each thread works on a distinct zipItem which owns its // compression buffer. fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] { + // The task will consume the uncompressed page. Decrease the atomic counter early so that more work has arrived + // when we are done. + fBufferedUncompressed -= zipItem.fPage.GetNBytes(); + allocateBuf(); RSealPageConfig config; config.fPage = &zipItem.fPage; @@ -241,6 +256,7 @@ void ROOT::Internal::RPageSinkBuf::CommitSealedPageV( void ROOT::Internal::RPageSinkBuf::FlushClusterImpl(std::function FlushClusterFn) { WaitForAllTasks(); + assert(fBufferedUncompressed == 0 && "all buffered pages should have been processed"); std::vector toCommit; toCommit.reserve(fBufferedColumns.size());