diff --git a/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx b/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx index 19a5d47b3528c..b9465ff252b36 100644 --- a/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx +++ b/tree/ntuple/inc/ROOT/RPageSinkBuf.hxx @@ -19,6 +19,8 @@ #include #include +#include +#include #include #include #include @@ -109,6 +111,8 @@ private: /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink. /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter. std::unique_ptr fInnerModel; + /// The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage. + std::atomic fBufferedUncompressed = 0; /// Vector of buffered column pages. Indexed by column id. std::vector fBufferedColumns; /// Columns committed as suppressed are stored and passed to the inner sink at cluster commit @@ -123,8 +127,8 @@ public: explicit RPageSinkBuf(std::unique_ptr inner); RPageSinkBuf(const RPageSinkBuf&) = delete; RPageSinkBuf& operator=(const RPageSinkBuf&) = delete; - RPageSinkBuf(RPageSinkBuf&&) = default; - RPageSinkBuf& operator=(RPageSinkBuf&&) = default; + RPageSinkBuf(RPageSinkBuf &&) = delete; + RPageSinkBuf &operator=(RPageSinkBuf &&) = delete; ~RPageSinkBuf() override; ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final; diff --git a/tree/ntuple/src/RPageSinkBuf.cxx b/tree/ntuple/src/RPageSinkBuf.cxx index 9253dac701acb..13caad87dd2e6 100644 --- a/tree/ntuple/src/RPageSinkBuf.cxx +++ b/tree/ntuple/src/RPageSinkBuf.cxx @@ -175,7 +175,13 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const } }; - if (!fTaskScheduler) { + // If we already buffer more uncompressed bytes than the approximate zipped cluster size, we assume there is enough + // work for other threads to pick up. This limits the buffer usage when sealing / compression tasks are not processed + // fast enough, and heuristically reduces the memory usage, especially for big compression factors. + std::size_t bufferedUncompressed = fBufferedUncompressed.load(); + bool enoughWork = bufferedUncompressed > GetWriteOptions().GetApproxZippedClusterSize(); + + if (!fTaskScheduler || enoughWork) { allocateBuf(); // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased. RSealPageConfig config; @@ -194,16 +200,25 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const return; } + // We will buffer the uncompressed page. Unless work is consumed fast enough, the next page might be compressed + // directly. + fBufferedUncompressed += page.GetNBytes(); + // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements()); // make sure the page is aware of how many elements it will have zipItem.fPage.GrowUnchecked(page.GetNElements()); + assert(zipItem.fPage.GetNBytes() == page.GetNBytes()); memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes()); fCounters->fParallelZip.SetValue(1); // Thread safety: Each thread works on a distinct zipItem which owns its // compression buffer. fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] { + // The task will consume the uncompressed page. Decrease the atomic counter early so that more work has arrived + // when we are done. + fBufferedUncompressed -= zipItem.fPage.GetNBytes(); + allocateBuf(); RSealPageConfig config; config.fPage = &zipItem.fPage; @@ -241,6 +256,7 @@ void ROOT::Internal::RPageSinkBuf::CommitSealedPageV( void ROOT::Internal::RPageSinkBuf::FlushClusterImpl(std::function FlushClusterFn) { WaitForAllTasks(); + assert(fBufferedUncompressed == 0 && "all buffered pages should have been processed"); std::vector toCommit; toCommit.reserve(fBufferedColumns.size());