Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 68 additions & 23 deletions tree/tree/src/TBranch.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,10 @@ const char* TBranch::GetIconName() const
/// the user_buffer will back the memory of the newly-constructed basket.
///
/// Assumes that this branch is enabled.
///
/// Returns -1 if the entry does not exist
/// Returns -2 in case of error
/// Returns the index of the basket in case of success.
Int_t TBranch::GetBasketAndFirst(TBasket *&basket, Long64_t &first,
TBuffer *user_buffer)
{
Expand All @@ -1335,9 +1339,10 @@ Int_t TBranch::GetBasketAndFirst(TBasket *&basket, Long64_t &first,
// make sure basket buffers are in memory.
basket = fCurrentBasket;
first = fFirstBasketEntry;
return fReadBasket;
} else {
if ((entry < fFirstEntry) || (entry >= fEntryNumber)) {
return 0;
return -1;
}
first = fFirstBasketEntry;
Long64_t last = fNextBasketEntry - 1;
Expand All @@ -1347,7 +1352,7 @@ Int_t TBranch::GetBasketAndFirst(TBasket *&basket, Long64_t &first,
if (fReadBasket < 0) {
fNextBasketEntry = -1;
Error("GetBasketAndFirst", "In the branch %s, no basket contains the entry %lld\n", GetName(), entry);
return -1;
return -2;
}
if (fReadBasket == fWriteBasket) {
fNextBasketEntry = fEntryNumber;
Expand All @@ -1366,7 +1371,7 @@ Int_t TBranch::GetBasketAndFirst(TBasket *&basket, Long64_t &first,
fCurrentBasket = 0;
fFirstBasketEntry = -1;
fNextBasketEntry = -1;
return -1;
return -2;
}
if (fTree->GetClusterPrefetch()) {
TTree::TClusterIterator clusterIterator = fTree->GetClusterIterator(entry);
Expand All @@ -1380,18 +1385,20 @@ Int_t TBranch::GetBasketAndFirst(TBasket *&basket, Long64_t &first,
// cause a reset of the first / next basket entries back to -1.
fFirstBasketEntry = first;
fNextBasketEntry = updatedNext;
}
if (user_buffer) {
// Disassociate basket from memory buffer for bulk IO
// When the user provides a memory buffer (i.e., for bulk IO), we should
// make sure to drop all references to that buffer in the TTree afterward.
fCurrentBasket = nullptr;
fBaskets[fReadBasket] = nullptr;
if (user_buffer) {
// Disassociate basket from memory buffer for bulk IO
// When the user provides a memory buffer (i.e., for bulk IO), we should
// make sure to drop all references to that buffer in the TTree afterward.
fCurrentBasket = nullptr;
fBaskets[fReadBasket] = nullptr;
} else {
fCurrentBasket = basket;
}
} else {
fCurrentBasket = basket;
}
return fReadBasket;
}
return 1;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1440,7 +1447,7 @@ Int_t TBranch::GetBulkEntries(Long64_t entry, TBuffer &user_buf)
TBasket *basket = nullptr;
Long64_t first;
Int_t result = GetBasketAndFirst(basket, first, &user_buf);
if (R__unlikely(result <= 0)) return -1;
if (R__unlikely(result < 0)) return -1;
// Only support reading from full clusters.
if (R__unlikely(entry != first)) {
//printf("Failed to read from full cluster; first entry is %ld; requested entry is %ld.\n", first, entry);
Expand All @@ -1461,22 +1468,41 @@ Int_t TBranch::GetBulkEntries(Long64_t entry, TBuffer &user_buf)
return -1;
}

if (&user_buf != buf) {
// The basket was already in memory and might (and might not) be backed by persistent
// storage.
R__ASSERT(result == fReadBasket);
if (fBasketSeek[fReadBasket]) {
// It is backed, so we can be destructive
user_buf.SetBuffer(buf->Buffer(), buf->BufferSize());
buf->ResetBit(TBufferIO::kIsOwner);
fCurrentBasket = nullptr;
fBaskets[fReadBasket] = nullptr;
} else {
// This is the only copy, we can't return it as is to the user, just make a copy.
if (user_buf.BufferSize() < buf->BufferSize()) {
user_buf.AutoExpand(buf->BufferSize());
}
memcpy(user_buf.Buffer(), buf->Buffer(), buf->BufferSize());
}
}

Int_t bufbegin = basket->GetKeylen();
buf->SetBufferOffset(bufbegin);
user_buf.SetBufferOffset(bufbegin);

Int_t N = ((fNextBasketEntry < 0) ? fEntryNumber : fNextBasketEntry) - first;
//printf("Requesting %d events; fNextBasketEntry=%lld; first=%lld.\n", N, fNextBasketEntry, first);
if (R__unlikely(!leaf->ReadBasketFast(*buf, N))) {
if (R__unlikely(!leaf->ReadBasketFast(user_buf, N))) {
Error("GetBulkEntries", "Leaf failed to read.\n");
return -1;
}
user_buf.SetBufferOffset(bufbegin);

fCurrentBasket = nullptr;
fBaskets[fReadBasket] = nullptr;
R__ASSERT(fExtraBasket == nullptr && "fExtraBasket should have been set to nullptr by GetFreshBasket");
fExtraBasket = basket;
basket->DisownBuffer();
if (fCurrentBasket == nullptr) {
R__ASSERT(fExtraBasket == nullptr && "fExtraBasket should have been set to nullptr by GetFreshBasket");
fExtraBasket = basket;
basket->DisownBuffer();
}

return N;
}
Expand All @@ -1501,7 +1527,7 @@ Int_t TBranch::GetEntriesSerialized(Long64_t entry, TBuffer &user_buf, TBuffer *
TBasket *basket = nullptr;
Long64_t first;
Int_t result = GetBasketAndFirst(basket, first, &user_buf);
if (R__unlikely(result <= 0)) { return -1; }
if (R__unlikely(result < 0)) { return -1; }
// Only support reading from full clusters.
if (R__unlikely(entry != first)) {
Error("GetEntriesSerialized", "Failed to read from full cluster; first entry is %lld; requested entry is %lld.\n", first, entry);
Expand All @@ -1522,13 +1548,32 @@ Int_t TBranch::GetEntriesSerialized(Long64_t entry, TBuffer &user_buf, TBuffer *
return -1;
}

if (&user_buf != buf) {
// The basket was already in memory and might (and might not) be backed by persistent
// storage.
R__ASSERT(result == fReadBasket);
if (fBasketSeek[fReadBasket]) {
// It is backed, so we can be destructive
user_buf.SetBuffer(buf->Buffer(), buf->BufferSize());
buf->ResetBit(TBufferIO::kIsOwner);
fCurrentBasket = nullptr;
fBaskets[fReadBasket] = nullptr;
} else {
// This is the only copy, we can't return it as is to the user, just make a copy.
if (user_buf.BufferSize() < buf->BufferSize()) {
user_buf.AutoExpand(buf->BufferSize());
}
memcpy(user_buf.Buffer(), buf->Buffer(), buf->BufferSize());
}
}

Int_t bufbegin = basket->GetKeylen();
buf->SetBufferOffset(bufbegin);
user_buf.SetBufferOffset(bufbegin);

Int_t N = ((fNextBasketEntry < 0) ? fEntryNumber : fNextBasketEntry) - first;
//Info("GetEntriesSerialized", "Requesting %d events; fNextBasketEntry=%lld; first=%lld.\n", N, fNextBasketEntry, first);

if (R__unlikely(!leaf->ReadBasketSerialized(*buf, N))) {
if (R__unlikely(!leaf->ReadBasketSerialized(user_buf, N))) {
Error("GetEntriesSerialized", "Leaf failed to read.\n");
return -1;
}
Expand Down Expand Up @@ -1590,7 +1635,7 @@ Int_t TBranch::GetEntry(Long64_t entry, Int_t getall)
Long64_t first;

Int_t result = GetBasketAndFirst(basket, first, nullptr);
if (R__unlikely(result <= 0)) { return result; }
if (R__unlikely(result < 0)) { return result + 1; }

basket->PrepareBasket(entry);
TBuffer* buf = basket->GetBufferRef();
Expand Down
129 changes: 116 additions & 13 deletions tree/tree/test/BulkApi.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class BulkApiTest : public ::testing::Test {
public:
static constexpr Int_t fEventCount = 1e7;
static constexpr Long64_t fEventCount = (Long64_t)1e7;
const std::string fFileName = "BulkApiTest.root";

protected:
Expand All @@ -37,6 +37,14 @@ class BulkApiTest : public ::testing::Test {
hfile->Write();
tree->Print();
printf("Successful write of all events.\n");

// We also want a copy the TTree with a basket stored alongside the TTree
// rather than on its own.
for (Long64_t ev = 0; ev < 10; ev++) {
tree->Fill();
++f;
}
hfile->WriteTObject(tree, "TwithBasket");
hfile->Close();

delete hfile;
Expand All @@ -55,7 +63,7 @@ TEST_F(BulkApiTest, stdRead)
TTreeReaderValue<float> myF(myReader, "myFloat");
Long64_t idx = 0;
float idx_f = 1;
Int_t events = fEventCount;
auto events = fEventCount;
sw.Start();
while (myReader.Next()) {
if (R__unlikely(idx == events)) {
Expand All @@ -74,48 +82,108 @@ TEST_F(BulkApiTest, stdRead)
delete hfile;
}

TEST_F(BulkApiTest, simpleRead)
void SimpleReadFunc(const char *filename, const char *treename)
{
auto hfile = TFile::Open(fFileName.c_str());
printf("Starting read of file %s.\n", fFileName.c_str());
auto hfile = TFile::Open(filename);
printf("Starting read of file %s.\n", filename);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this and the following printfs become a R__INFO so that the verbosity of the test can be controlled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, I just "continued" the style of the test ... i.e. let's do that in another PR.

TStopwatch sw;

printf("Using inline bulk read APIs.\n");
TBufferFile branchbuf(TBuffer::kWrite, 32 * 1024);
TTree *tree = dynamic_cast<TTree *>(hfile->Get("T"));
auto tree = hfile->Get<TTree>(treename);
ASSERT_TRUE(tree);

TBranch *branchF = tree->GetBranch("myFloat");
ASSERT_TRUE(branchF);
branchF->GetListOfBaskets()->ls();

Int_t events = fEventCount;
float idx_f = 1;
Long64_t evt_idx = 0;
Long64_t events = tree->GetEntries();
while (events) {
auto count = branchF->GetBulkRead().GetEntriesSerialized(evt_idx, branchbuf);
ASSERT_GE(count, 0);
events = events > count ? (events - count) : 0;

float *entry = reinterpret_cast<float *>(branchbuf.GetCurrent());
auto entry = reinterpret_cast<float *>(branchbuf.GetCurrent());
for (Int_t idx = 0; idx < count; idx++) {
idx_f++;
Int_t tmp = *reinterpret_cast<Int_t *>(&entry[idx]);
char *tmp_ptr = reinterpret_cast<char *>(&tmp);
auto tmp = *reinterpret_cast<Int_t *>(&entry[idx]);
auto tmp_ptr = reinterpret_cast<char *>(&tmp);
frombuf(tmp_ptr, entry + idx);

if (R__unlikely((evt_idx < 16000000) && (entry[idx] != idx_f))) {
printf("Incorrect value on myFloat branch: %f (event %lld)\n", entry[idx], evt_idx + idx);
printf("in %s Incorrect value on myFloat branch: %f (event %lld)\n", treename, entry[idx], evt_idx + idx);
ASSERT_TRUE(false);
}
}
evt_idx += count;
}
sw.Stop();
printf("GetEntriesSerialized: Successful read of all events.\n");
printf("GetEntriesSerialized: Successful read of all events in %s.\n", treename);
printf("GetEntriesSerialized: Total elapsed time (seconds) for bulk APIs: %.2f\n", sw.RealTime());
delete hfile;
}

TEST_F(BulkApiTest, simpleRead)
{
SimpleReadFunc(fFileName.c_str(), "T");
}

TEST_F(BulkApiTest, simpleReadTrailingBasket)
{
SimpleReadFunc(fFileName.c_str(), "TwithBasket");
}

void SimpleBulkReadFunc(const char *filename, const char *treename)
{
auto hfile = TFile::Open(filename);
printf("Starting read of file %s.\n", filename);
TStopwatch sw;

printf("Using outlined bulk read APIs.\n");
TBufferFile branchbuf(TBuffer::kWrite, 32 * 1024);
auto tree = hfile->Get<TTree>(treename);
ASSERT_TRUE(tree);

TBranch *branchF = tree->GetBranch("myFloat");
ASSERT_TRUE(branchF);
branchF->GetListOfBaskets()->ls();

float idx_f = 1;
Long64_t evt_idx = 0;
Long64_t events = tree->GetEntries();
while (events) {
auto count = branchF->GetBulkRead().GetBulkEntries(evt_idx, branchbuf);
ASSERT_GE(count, 0);
events = events > count ? (events - count) : 0;

auto entry = reinterpret_cast<float *>(branchbuf.GetCurrent());
for (Int_t idx = 0; idx < count; idx++) {
idx_f++;
if (R__unlikely((evt_idx < 16000000) && (entry[idx] != idx_f))) {
printf("in %s Incorrect value on myFloat branch: %f (event %lld)\n", treename, entry[idx], evt_idx + idx);
ASSERT_TRUE(false);
}
}
evt_idx += count;
}
sw.Stop();
printf("GetBulkEntries: Successful read of all events in %s.\n", treename);
printf("GetBulkEntries: Total elapsed time (seconds) for bulk APIs: %.2f\n", sw.RealTime());
delete hfile;
}

TEST_F(BulkApiTest, simpleBulkRead)
{
SimpleBulkReadFunc(fFileName.c_str(), "T");
}

TEST_F(BulkApiTest, simpleBulkReadTrailingBasket)
{
SimpleBulkReadFunc(fFileName.c_str(), "TwithBasket");
}

TEST_F(BulkApiTest, fastRead)
{
auto hfile = TFile::Open(fFileName.c_str());
Expand All @@ -134,7 +202,7 @@ TEST_F(BulkApiTest, fastRead)
printf("TTreeReaderFast failed to initialize. Entry status: %d\n", myReader.GetEntryStatus());
ASSERT_TRUE(false);
}
Int_t events = fEventCount;
auto events = fEventCount;
Long64_t idx = 0;
float idx_f = 1;
for (auto reader_idx : myReader) {
Expand All @@ -152,3 +220,38 @@ TEST_F(BulkApiTest, fastRead)
printf("TTreeReaderFast: Total elapsed time (seconds) for bulk APIs: %.2f\n", sw.RealTime());
delete hfile;
}

TEST_F(BulkApiTest, BulkInMem)
{
TTree t("t","t");
int i = 3;
t.Branch("fAlpha",&i);
for(i = 0; i < 100000; ++i)
t.Fill();

TBufferFile buf(TBuffer::EMode::kWrite, 32*1024);
auto &r = t.GetBranch("fAlpha")->GetBulkRead();

{
auto s = r.GetBulkEntries(0, buf);
ASSERT_EQ(7982, s) << "Did not read the expected number of entries.";
s = r.GetBulkEntries(0, buf);
ASSERT_EQ(7982, s) << "Did not read the expected number of entries.";
}

int iteration = 0;
Long64_t nentries = t.GetEntries();
Long64_t event_idx = 0;
while (nentries) {
auto s = r.GetBulkEntries(event_idx, buf);
if (iteration < 12)
ASSERT_EQ(7982, s) << "Did not read the expected number of entries.";
else
ASSERT_EQ(4216, s) << "Did not read the expected number of entries.";
nentries -= s;
event_idx += s;
++iteration;
if (s < 0)
break;
}
}