From 880b7eae07020f8d2d2419674510522a5c79e977 Mon Sep 17 00:00:00 2001 From: silverweed Date: Wed, 15 Oct 2025 11:06:52 +0200 Subject: [PATCH] [ntuple] Merger: avoid resealing if not needed Currently we either "fast merge" the page (i.e. we copy it as-is to the destination) or we reseal it if fast merging is not possible. There is a middle ground where we can avoid resealing if only the page compression differs but not the on-disk encoding. We were not currently leveraging this and doing the extra work of resealing the pages in all cases where just recompression could be done. With this PR we introduce this middle ground case ("L2 merging", aka "recompress merging") which skips the resealing (thus the re-encoding) of the pages whenever possible. --- tree/ntuple/src/RNTupleMerger.cxx | 109 +++++++++++++++++++++++++----- 1 file changed, 93 insertions(+), 16 deletions(-) diff --git a/tree/ntuple/src/RNTupleMerger.cxx b/tree/ntuple/src/RNTupleMerger.cxx index 9b5462fbf2446..6a88f2302fdf8 100644 --- a/tree/ntuple/src/RNTupleMerger.cxx +++ b/tree/ntuple/src/RNTupleMerger.cxx @@ -267,6 +267,38 @@ struct RChangeCompressionFunc { RPageStorage::RSealedPage &fSealedPage; ROOT::Internal::RPageAllocator &fPageAlloc; std::uint8_t *fBuffer; + std::size_t fBufSize; + const ROOT::RNTupleWriteOptions &fWriteOpts; + + void operator()() const + { + assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier()); + + const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements()); + // TODO: this buffer could be kept and reused across pages + auto unzipBuf = MakeUninitArray(bytesPacked); + ROOT::Internal::RNTupleDecompressor::Unzip(fSealedPage.GetBuffer(), fSealedPage.GetDataSize(), bytesPacked, + unzipBuf.get()); + + const auto checksumSize = fWriteOpts.GetEnablePageChecksums() * sizeof(std::uint64_t); + assert(fBufSize >= bytesPacked + checksumSize); + auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip(unzipBuf.get(), bytesPacked, + fMergeOptions.fCompressionSettings.value(), fBuffer); + + fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()}; + fSealedPage.ChecksumIfEnabled(); + } +}; + +struct RResealFunc { + const RColumnElementBase &fSrcColElement; + const RColumnElementBase &fDstColElement; + const RNTupleMergeOptions &fMergeOptions; + + RPageStorage::RSealedPage &fSealedPage; + ROOT::Internal::RPageAllocator &fPageAlloc; + std::uint8_t *fBuffer; + std::size_t fBufSize; void operator()() const { @@ -277,11 +309,25 @@ struct RChangeCompressionFunc { sealConf.fBuffer = fBuffer; sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings; sealConf.fWriteChecksum = fSealedPage.GetHasChecksum(); + assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t)); auto refSealedPage = RPageSink::SealPage(sealConf); fSealedPage = refSealedPage; } }; +struct RTaskVisitor { + std::optional &fGroup; + + template + void operator()(T &f) + { + if (fGroup) + fGroup->Run(f); + else + f(); + } +}; + struct RCommonField { const ROOT::RFieldDescriptor *fSrc; const ROOT::RFieldDescriptor *fDst; @@ -778,15 +824,23 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, // Each column range potentially has a distinct compression settings const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value(); - // If either the compression or the encoding of the source doesn't match that of the destination, we need - // to reseal the page. Otherwise, if both match, we can fast merge. + + // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case: + // L1: compression and encoding of src and dest both match: we can simply copy the page + // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid + // resealing it. + // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing + // it. + const bool compressionIsDifferent = + colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value(); const bool needsResealing = - colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value() || srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType; + const bool needsRecompressing = compressionIsDifferent || needsResealing; - if (needsResealing && mergeData.fMergeOpts.fExtraVerbose) { + if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) { R__LOG_INFO(NTupleMergeLog()) - << "Resealing column " << column.fColumnName << ": { compression: " << colRangeCompressionSettings << " => " + << (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName + << ": { compression: " << colRangeCompressionSettings << " => " << mergeData.fMergeOpts.fCompressionSettings.value() << ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType) << " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType); @@ -795,7 +849,7 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, size_t pageBufferBaseIdx = sealedPageData.fBuffers.size(); // If the column range already has the right compression we don't need to allocate any new buffer, so we don't // bother reserving memory for them. - if (needsResealing) + if (needsRecompressing) sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size()); // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry @@ -831,18 +885,41 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, sealedPage.VerifyChecksumIfEnabled().ThrowOnError(); R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize())); - if (needsResealing) { + if (needsRecompressing) { + std::optional> task; const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements(); auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx]; - buffer = MakeUninitArray(uncompressedSize + checksumSize); - RChangeCompressionFunc compressTask{ - *srcColElement, *dstColElement, mergeData.fMergeOpts, sealedPage, *fPageAlloc, buffer.get(), - }; - - if (fTaskGroup) - fTaskGroup->Run(compressTask); - else - compressTask(); + const auto bufSize = uncompressedSize + checksumSize; + // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward. + // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit + // the actual data size after recompressing. + buffer = MakeUninitArray(bufSize); + + if (needsResealing) { + task.emplace(RResealFunc{ + *srcColElement, + *dstColElement, + mergeData.fMergeOpts, + sealedPage, + *fPageAlloc, + buffer.get(), + bufSize, + }); + } else { + task.emplace(RChangeCompressionFunc{ + *srcColElement, + *dstColElement, + mergeData.fMergeOpts, + sealedPage, + *fPageAlloc, + buffer.get(), + bufSize, + mergeData.fDestination.GetWriteOptions() + }); + } + + assert(task.has_value()); + std::visit(RTaskVisitor{fTaskGroup}, *task); } ++pageIdx;