Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tree/ntuple/inc/ROOT/RNTupleTypes.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private:
static constexpr std::uint64_t kMaskReservedBit = 1ull << 60;

/// To save memory, we use the most significant bits to store the locator type (file, DAOS, zero page,
/// unkown, kTestLocatorType) as well as the one "reserved bit" that we currently process, the DAOS cage bit.
/// unkown, kTestLocatorType) as well as one "reserved bit" that can be used in future locators.
/// Consequently, we can only store sizes up to 60 bits (1 EB), which in practice won't be an issue.
std::uint64_t fFlagsAndNBytes = 0;
/// Simple on-disk locators consisting of a 64-bit offset use variant type `uint64_t`;
Expand All @@ -272,7 +272,7 @@ public:
/// For non-disk locators, the value for the _Type_ field. This makes it possible to have different type values even
/// if the payload structure is identical.
ELocatorType GetType() const;
/// The only currently supported reserved bit is the DAOS cage bit.
/// We currently only support one of the 8 available reserved bits (none are used so far).
std::uint8_t GetReserved() const { return (fFlagsAndNBytes & kMaskReservedBit) > 0; }

void SetNBytesOnStorage(std::uint64_t nBytesOnStorage);
Expand Down
9 changes: 0 additions & 9 deletions tree/ntuple/inc/ROOT/RNTupleWriteOptionsDaos.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ namespace Experimental {
// clang-format on
class RNTupleWriteOptionsDaos : public ROOT::RNTupleWriteOptions {
std::string fObjectClass{"SX"};
/// The maximum cage size is set to the equivalent of 16 uncompressed pages - 16MiB by default.
/// A `fMaxCageSize` of 0 disables the caging mechanism.
uint32_t fMaxCageSize = 16 * RNTupleWriteOptions::fMaxUnzippedPageSize;

public:
~RNTupleWriteOptionsDaos() override = default;
Expand All @@ -51,12 +48,6 @@ public:
/// `OC_xxx` constant defined in `daos_obj_class.h` may be used here without
/// the OC_ prefix.
void SetObjectClass(const std::string &val) { fObjectClass = val; }

uint32_t GetMaxCageSize() const { return fMaxCageSize; }
/// Set the upper bound for page concatenation into cages, in bytes. It is assumed
/// that cage size will be no smaller than the approximate uncompressed page size.
/// To disable page concatenation, set this value to 0.
void SetMaxCageSize(uint32_t cageSz) { fMaxCageSize = cageSz; }
};

} // namespace Experimental
Expand Down
5 changes: 0 additions & 5 deletions tree/ntuple/inc/ROOT/RPageStorageDaos.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ using ntuple_index_t = std::uint32_t;
class RDaosPool;
class RDaosContainer;
class RPageAllocatorHeap;
enum EDaosLocatorFlags {
// Indicates that the referenced page is "caged", i.e. it is stored in a larger blob that contains multiple pages.
kCagedPage = 0x01,
};

// clang-format off
/**
Expand Down Expand Up @@ -119,7 +115,6 @@ private:

RDaosNTupleAnchor fNTupleAnchor;
ntuple_index_t fNTupleIndex{0};
uint32_t fCageSizeLimit{};

protected:
using RPagePersistentSink::InitImpl;
Expand Down
117 changes: 24 additions & 93 deletions tree/ntuple/src/RPageStorageDaos.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,6 @@ RDaosURI ParseDaosURI(std::string_view uri)
return {m[1], m[2]};
}

/// \brief Unpacks a 64-bit RNTuple page locator address for object stores into a pair of 32-bit values:
/// the attribute key under which the cage is stored and the offset within that cage to access the page.
std::pair<uint32_t, uint32_t> DecodeDaosPagePosition(const ROOT::RNTupleLocatorObject64 &address)
{
auto position = static_cast<uint32_t>(address.GetLocation() & 0xFFFFFFFF);
auto offset = static_cast<uint32_t>(address.GetLocation() >> 32);
return {position, offset};
}

/// \brief Packs an attribute key together with an offset within its contents into a single 64-bit address.
/// The offset is kept in the MSb half and defaults to zero, which is the case when caging is disabled.
ROOT::RNTupleLocatorObject64 EncodeDaosPagePosition(uint64_t position, uint64_t offset = 0)
{
uint64_t address = (position & 0xFFFFFFFF) | (offset << 32);
return ROOT::RNTupleLocatorObject64{address};
}

/// \brief Helper structure concentrating the functionality required to locate an ntuple within a DAOS container.
/// It includes a hashing function that converts the RNTuple's name into a 32-bit identifier; this value is used to
/// index the subspace for the ntuple among all objects in the container. A zero-value hash value is reserved for
Expand Down Expand Up @@ -293,10 +276,6 @@ void ROOT::Experimental::Internal::RPageSinkDaos::InitImpl(unsigned char *serial
if (oclass.IsUnknown())
throw ROOT::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));

size_t cageSz = opts ? opts->GetMaxCageSize() : RNTupleWriteOptionsDaos().GetMaxCageSize();
size_t pageSz = opts ? opts->GetMaxUnzippedPageSize() : RNTupleWriteOptionsDaos().GetMaxUnzippedPageSize();
fCageSizeLimit = std::max(cageSz, pageSz);

auto args = ParseDaosURI(fURI);
auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);

Expand Down Expand Up @@ -330,20 +309,20 @@ ROOT::RNTupleLocator
ROOT::Experimental::Internal::RPageSinkDaos::CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId,
const RPageStorage::RSealedPage &sealedPage)
{
auto offsetData = fPageId.fetch_add(1);
auto pageId = fPageId.fetch_add(1);
ROOT::DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();

{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId, offsetData);
RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId, pageId);
fDaosContainer->WriteSingleAkey(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
daosKey.fAkey);
}

RNTupleLocator result;
result.SetType(RNTupleLocator::kTypeDAOS);
result.SetNBytesOnStorage(sealedPage.GetDataSize());
result.SetPosition(EncodeDaosPagePosition(offsetData));
result.SetPosition(ROOT::RNTupleLocatorObject64{pageId});
fCounters->fNPageCommitted.Inc();
fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
fNBytesCurrentCluster += sealedPage.GetBufferSize();
Expand All @@ -359,48 +338,31 @@ ROOT::Experimental::Internal::RPageSinkDaos::CommitSealedPageVImpl(std::span<RPa
auto nPages = mask.size();
locators.reserve(nPages);

const uint32_t maxCageSz = fCageSizeLimit;
const bool useCaging = fCageSizeLimit > 0;
const std::uint8_t locatorFlags = useCaging ? EDaosLocatorFlags::kCagedPage : 0;

ROOT::DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
int64_t payloadSz = 0;
std::size_t positionOffset;
uint32_t positionIndex;

/// Aggregate batch of requests by object ID and distribution key, determined by the ntuple-DAOS mapping
for (auto &range : ranges) {
positionOffset = 0;
/// Under caging, the atomic page counter is fetch-incremented for every column range to get the position of its
/// first cage and indicate the next one, also ensuring subsequent pages of different columns do not end up caged
/// together. This increment is not necessary in the absence of caging, as each page is trivially caged.
positionIndex = useCaging ? fPageId.fetch_add(1) : fPageId.load();

for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
const RPageStorage::RSealedPage &s = *sealedPageIt;

if (positionOffset + s.GetBufferSize() > maxCageSz) {
positionOffset = 0;
positionIndex = fPageId.fetch_add(1);
}
const auto pageId = fPageId.fetch_add(1);

d_iov_t pageIov;
d_iov_set(&pageIov, const_cast<void *>(s.GetBuffer()), s.GetBufferSize());

RDaosKey daosKey =
GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, range.fPhysicalColumnId, positionIndex);
GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, range.fPhysicalColumnId, pageId);
auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
auto [it, ret] = writeRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
it->second.Insert(daosKey.fAkey, pageIov);

RNTupleLocator locator;
locator.SetType(RNTupleLocator::kTypeDAOS);
locator.SetNBytesOnStorage(s.GetDataSize());
locator.SetPosition(EncodeDaosPagePosition(positionIndex, positionOffset));
locator.SetReserved(locatorFlags);
locator.SetPosition(ROOT::RNTupleLocatorObject64{pageId});
locators.push_back(locator);

positionOffset += s.GetBufferSize();
payloadSz += s.GetBufferSize();
}
}
Expand Down Expand Up @@ -574,24 +536,11 @@ void ROOT::Experimental::Internal::RPageSourceDaos::LoadSealedPage(ROOT::Descrip
return;
}

if (pageInfo.GetLocator().GetReserved() & EDaosLocatorFlags::kCagedPage) {
// Suboptimal but hard to do differently: we load the full cage up to and including the requested page.
// In practice, individual LoadSealedPage calls are rare and usually full clusters are buffered.
// The support for extracting individual pages from a cage makes testing easier, however.
const auto [position, offset] =
DecodeDaosPagePosition(pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>());
RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId, position);
const auto bufSize = offset + sealedPage.GetBufferSize();
auto cageHeadBuffer = MakeUninitArray<unsigned char>(bufSize);
fDaosContainer->ReadSingleAkey(cageHeadBuffer.get(), bufSize, daosKey.fOid, daosKey.fDkey, daosKey.fAkey);
memcpy(const_cast<void *>(sealedPage.GetBuffer()), cageHeadBuffer.get() + offset, sealedPage.GetBufferSize());
} else {
RDaosKey daosKey =
GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId,
pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>().GetLocation());
fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
daosKey.fOid, daosKey.fDkey, daosKey.fAkey);
}
RDaosKey daosKey =
GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId,
pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>().GetLocation());
fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(), daosKey.fOid,
daosKey.fDkey, daosKey.fAkey);

sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
}
Expand Down Expand Up @@ -625,10 +574,6 @@ ROOT::Internal::RPageRef ROOT::Experimental::Internal::RPageSourceDaos::LoadPage
std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off

if (fOptions.GetClusterCache() == ROOT::RNTupleReadOptions::EClusterCache::kOff) {
if (pageInfo.GetLocator().GetReserved() & EDaosLocatorFlags::kCagedPage) {
throw ROOT::RException(R__FAIL("accessing caged pages is only supported in conjunction with cluster cache"));
}

directReadBuffer = MakeUninitArray<unsigned char>(sealedPage.GetBufferSize());
RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(
fNTupleIndex, clusterId, columnId, pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>().GetLocation());
Expand Down Expand Up @@ -680,8 +625,7 @@ ROOT::Experimental::Internal::RPageSourceDaos::LoadClusters(std::span<RCluster::
ROOT::DescriptorId_t fClusterId = 0;
ROOT::DescriptorId_t fColumnId = 0;
ROOT::NTupleSize_t fPageNo = 0;
std::uint64_t fPosition = 0;
std::uint64_t fCageOffset = 0;
std::uint64_t fPageId = 0;
std::uint64_t fDataSize = 0; // page payload
std::uint64_t fBufferSize = 0; // page payload + checksum (if available)
};
Expand All @@ -693,9 +637,7 @@ ROOT::Experimental::Internal::RPageSourceDaos::LoadClusters(std::span<RCluster::
auto fnPrepareSingleCluster = [&](const RCluster::RKey &clusterKey,
RDaosContainer::MultiObjectRWOperation_t &readRequests) {
auto clusterId = clusterKey.fClusterId;
// Group page locators by their position in the object store; with caging enabled, this facilitates the
// processing of cages' requests together into a single IOV to be loaded.
std::unordered_map<std::uint32_t, std::vector<RDaosSealedPageLocator>> onDiskPages;
std::vector<RDaosSealedPageLocator> onDiskPages;

unsigned clusterBufSz = 0, nPages = 0;
auto pageZeroMap = std::make_unique<ROOT::Internal::ROnDiskPageMap>();
Expand All @@ -704,13 +646,11 @@ ROOT::Experimental::Internal::RPageSourceDaos::LoadClusters(std::span<RCluster::
[&](ROOT::DescriptorId_t physicalColumnId, ROOT::NTupleSize_t pageNo,
const ROOT::RClusterDescriptor::RPageInfo &pageInfo) {
const auto &pageLocator = pageInfo.GetLocator();
uint32_t position, offset;
std::tie(position, offset) = DecodeDaosPagePosition(pageLocator.GetPosition<RNTupleLocatorObject64>());
auto [itLoc, _] = onDiskPages.emplace(position, std::vector<RDaosSealedPageLocator>());
auto pageBufferSize = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
const auto pageId = pageLocator.GetPosition<RNTupleLocatorObject64>().GetLocation();
const auto pageBufferSize = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
onDiskPages.emplace_back(RDaosSealedPageLocator{clusterId, physicalColumnId, pageNo, pageId,
pageLocator.GetNBytesOnStorage(), pageBufferSize});

itLoc->second.push_back({clusterId, physicalColumnId, pageNo, position, offset,
pageLocator.GetNBytesOnStorage(), pageBufferSize});
++nPages;
clusterBufSz += pageBufferSize;
});
Expand All @@ -719,31 +659,22 @@ ROOT::Experimental::Internal::RPageSourceDaos::LoadClusters(std::span<RCluster::
auto pageMap =
std::make_unique<ROOT::Internal::ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(clusterBuffer));

auto cageBuffer = clusterBuffer;
// Fill the cluster page map and the read requests for the RDaosContainer::ReadV() call
for (auto &[cageIndex, pageVec] : onDiskPages) {
auto columnId = pageVec[0].fColumnId; // All pages in a cage belong to the same column
std::size_t cageSz = 0;

for (auto &s : pageVec) {
assert(columnId == s.fColumnId);
assert(cageIndex == s.fPosition);
// Register the on disk pages in a page map
ROOT::Internal::ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
pageMap->Register(key, ROOT::Internal::ROnDiskPage(cageBuffer + s.fCageOffset, s.fBufferSize));
cageSz += s.fBufferSize;
}
for (const auto &sealedLoc : onDiskPages) {
ROOT::Internal::ROnDiskPage::Key key(sealedLoc.fColumnId, sealedLoc.fPageNo);
pageMap->Register(key, ROOT::Internal::ROnDiskPage(clusterBuffer, sealedLoc.fBufferSize));

// Prepare new read request batched up by object ID and distribution key
d_iov_t iov;
d_iov_set(&iov, cageBuffer, cageSz);
d_iov_set(&iov, clusterBuffer, sealedLoc.fBufferSize);

RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, columnId, cageIndex);
RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, sealedLoc.fClusterId, sealedLoc.fColumnId,
sealedLoc.fPageId);
auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
auto [itReq, ret] = readRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
itReq->second.Insert(daosKey.fAkey, iov);

cageBuffer += cageSz;
clusterBuffer += sealedLoc.fBufferSize;
}
fCounters->fNPageRead.Add(nPages);
fCounters->fSzReadPayload.Add(clusterBufSz);
Expand Down
Loading
Loading