Skip to content

Commit

Permalink
Refs #5118: MergeMDFiles loads several boxes at once
Browse files Browse the repository at this point in the history
speed up in my test was 330 sec -> 265 sec for 263 million events
  • Loading branch information
Janik Zikovsky committed Apr 18, 2012
1 parent 6034ccd commit a542254
Showing 1 changed file with 140 additions and 92 deletions.
232 changes: 140 additions & 92 deletions Code/Mantid/Framework/MDAlgorithms/src/MergeMDFiles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ namespace MDAlgorithms
/// MergeMDFiles Algorithm - used to pass parameters etc. around
MergeMDFiles * m_alg;
/// Which block to load?
size_t m_blockNum;
size_t m_blockNumStart;
size_t m_blockNumEnd;
/// Output workspace
typename MDEventWorkspace<MDE, nd>::sptr outWS;
/// List of boxes where index = box ID, value = the box pointer.
Expand All @@ -419,120 +420,158 @@ namespace MDAlgorithms
/** Constructor
*
* @param alg :: MergeMDFiles Algorithm - used to pass parameters etc. around
* @param blockNum :: Which block to load?
* @param blockNumStart :: Start block ID to load.
* @param blockNumStart :: End block ID to load (non-inclusive)
* @param outWS :: Output workspace
* @param boxesById :: list of boxes with IDs
* @param parallelSplit :: if true, split the boxes via parallel mechanism
*/
MergeMDLoadToBoxTask(MergeMDFiles * alg, size_t blockNum, typename MDEventWorkspace<MDE, nd>::sptr outWS,
MergeMDLoadToBoxTask(MergeMDFiles * alg, size_t blockNumStart, size_t blockNumEnd,
typename MDEventWorkspace<MDE, nd>::sptr outWS,
typename std::vector<MDBoxBase<MDE,nd> *> & boxesById, bool parallelSplit)
: m_alg(alg), m_blockNum(blockNum), outWS(outWS),
: m_alg(alg), m_blockNumStart(blockNumStart), m_blockNumEnd(blockNumEnd),
outWS(outWS),
m_boxesById(boxesById), m_parallelSplit(parallelSplit)
{
this->m_cost = double(this->m_alg->eventsPerBox[this->m_blockNum]);
this->m_cost = double(this->m_alg->eventsPerBox[this->m_blockNumStart]);
}

//---------------------------------------------------------------------------------------------
/** Main method that performs the work for the task. */
void run()
{
uint64_t numEvents = this->m_alg->eventsPerBox[this->m_blockNum];
if (numEvents == 0) return;

// Find the box in the output.
MDBoxBase<MDE,nd> * outBox = this->m_boxesById[this->m_blockNum];
if (!outBox)
throw std::runtime_error("Could not find box at ID " + Strings::toString(this->m_blockNum) );
BoxController_sptr bc = outBox->getBoxController();

// Should we pre-emptively split the box
MDBox<MDE,nd> * outMDBox = dynamic_cast<MDBox<MDE,nd> *>(outBox);
if (outMDBox && (numEvents > bc->getSplitThreshold()))
// Vector of each vector of events in each box
std::vector<std::vector<MDE> > eventsPerBox(m_blockNumEnd-m_blockNumStart);
// Each of the outputted boxes
std::vector<MDBoxBase<MDE,nd> *> outputBoxes(m_blockNumEnd-m_blockNumStart);
// Running total of processed events
size_t totalEvents = 0;
// Box controller of the workspace
BoxController_sptr bc = outWS->getBoxController();

// ----------------- Prepare the boxes for each box -----------------------------------------
for (size_t blockNum = this->m_blockNumStart; blockNum < this->m_blockNumEnd; blockNum++)
{
// Yes, let's split it
MDGridBox<MDE,nd> * parent = dynamic_cast<MDGridBox<MDE,nd> *>(outMDBox->getParent());
if (parent)
uint64_t numEvents = this->m_alg->eventsPerBox[blockNum];
if (numEvents == 0) continue;

// Find the box in the output.
MDBoxBase<MDE,nd> * outBox = this->m_boxesById[blockNum];
if (!outBox)
throw std::runtime_error("Could not find box at ID " + Strings::toString(blockNum) );

// Should we pre-emptively split the box
MDBox<MDE,nd> * outMDBox = dynamic_cast<MDBox<MDE,nd> *>(outBox);
if (outMDBox && (numEvents > bc->getSplitThreshold()))
{
size_t index = parent->getChildIndexFromID( outBox->getId() );
if (index < parent->getNumChildren())
// Yes, let's split it
MDGridBox<MDE,nd> * parent = dynamic_cast<MDGridBox<MDE,nd> *>(outMDBox->getParent());
if (parent)
{
parent->splitContents(index);
// Have to update our pointer - old one was deleted!
outBox = parent->getChild(index);
size_t index = parent->getChildIndexFromID( outBox->getId() );
if (index < parent->getNumChildren())
{
parent->splitContents(index);
// Have to update our pointer - old one was deleted!
outBox = parent->getChild(index);
}
}
}
}
// Is the output a grid box?
MDGridBox<MDE,nd> * outGridBox = dynamic_cast<MDGridBox<MDE,nd> *>(outBox);

// Save the output box for later
outputBoxes[blockNum-this->m_blockNumStart] = outBox;

// Vector of events accumulated from ALL files to merge.
std::vector<MDE> events;
// Vector of events accumulated from ALL files to merge.
std::vector<MDE> & events = eventsPerBox[blockNum-this->m_blockNumStart];

// Occasionally release free memory (has an effect on Linux only).
if (numEvents > 1000000)
MemoryManager::Instance().releaseFreeMemory();
// Reserve ALL the space you will need for this vector. Should speed up a lot.
events.reserve(numEvents);
} // (for each blockNum)

// Reserve ALL the space you will need for this vector. Should speed up a lot.
events.reserve(numEvents);

// Go through each file
// --------------------- Go through each file -------------------------------------------
this->m_alg->fileMutex.lock();
//bc->fileMutex.lock();
for (size_t iw=0; iw<this->m_alg->files.size(); iw++)
{
// The file and the indexes into that file
::NeXus::File * file = this->m_alg->files[iw];
std::vector<uint64_t> & box_event_index = this->m_alg->box_indexes[iw];

uint64_t indexStart = box_event_index[this->m_blockNum*2+0];
uint64_t numEvents = box_event_index[this->m_blockNum*2+1];
// This will APPEND the events to the one vector
MDE::loadVectorFromNexusSlab(events, file, indexStart, numEvents);
for (size_t blockNum = this->m_blockNumStart; blockNum < this->m_blockNumEnd; blockNum++)
{
// This is the events vector for this particular block we are adding to
std::vector<MDE> & events = eventsPerBox[blockNum-this->m_blockNumStart];

// The file and the indexes into that file
::NeXus::File * file = this->m_alg->files[iw];
std::vector<uint64_t> & box_event_index = this->m_alg->box_indexes[iw];

uint64_t indexStart = box_event_index[blockNum*2+0];
uint64_t numEvents = box_event_index[blockNum*2+1];
if (numEvents == 0) continue;
totalEvents += numEvents;

// Occasionally release free memory (has an effect on Linux only).
if (numEvents > 1000000)
MemoryManager::Instance().releaseFreeMemory();

// This will APPEND the events to the one vector
MDE::loadVectorFromNexusSlab(events, file, indexStart, numEvents);
} // For each block
} // For each file
//bc->fileMutex.unlock();
this->m_alg->fileMutex.unlock();

if (!events.empty())

// -------------- Now we actually do the adding for each block --------------------------------------
for (size_t blockNum = this->m_blockNumStart; blockNum < this->m_blockNumEnd; blockNum++)
{
// Add all the events from the same box
outBox->addEventsUnsafe( events );
events.clear();
std::vector<MDE>().swap(events); // really free the data
// This is the events vector for this particular block we are adding to
std::vector<MDE> & events = eventsPerBox[blockNum-this->m_blockNumStart];

if (outGridBox)
if (!events.empty())
{
// Occasionally release free memory (has an effect on Linux only).
MemoryManager::Instance().releaseFreeMemory();
// Box we are adding to.
MDBoxBase<MDE,nd> * outBox = outputBoxes[blockNum-this->m_blockNumStart];

// Is the output a grid box?
MDGridBox<MDE,nd> * outGridBox = dynamic_cast<MDGridBox<MDE,nd> *>(outBox);

// Now do a split on only this box.
// Add all the events from the same box
outBox->addEventsUnsafe( events );
events.clear();
std::vector<MDE>().swap(events); // really free the data

// On option, do the split in parallel
if (m_parallelSplit)
if (outGridBox)
{
ThreadSchedulerFIFO * ts = new ThreadSchedulerFIFO();
ThreadPool tp(ts);
outGridBox->splitAllIfNeeded(ts);
tp.joinAll();
// Occasionally release free memory (has an effect on Linux only).
MemoryManager::Instance().releaseFreeMemory();

// Now do a split on only this box.

// On option, do the split in parallel
if (m_parallelSplit)
{
ThreadSchedulerFIFO * ts = new ThreadSchedulerFIFO();
ThreadPool tp(ts);
outGridBox->splitAllIfNeeded(ts);
tp.joinAll();
}
else
outGridBox->splitAllIfNeeded(NULL);

}
else
outGridBox->splitAllIfNeeded(NULL);

// HDF5 is NOT thread safe (by default) even for accessing DIFFERENT files from different threads.
// Hence we need this mutex here :(
this->m_alg->fileMutex.lock();
// Flush out any items to write.
bc->getDiskBuffer().flushCache();
this->m_alg->fileMutex.unlock();
}
} // there was something loaded
} // there was something loaded
} // (for each block)

// HDF5 is NOT thread safe (by default) even for accessing DIFFERENT files from different threads.
// Hence we need this mutex here :(
this->m_alg->fileMutex.lock();
// Flush out any items to write.
bc->getDiskBuffer().flushCache();
this->m_alg->fileMutex.unlock();

// Track the total number of added events
this->m_alg->statsMutex.lock();
this->m_alg->totalLoaded += numEvents;
this->m_alg->getLogger().debug() << "Box " << this->m_blockNum << ". Total events " << this->m_alg->totalLoaded << ". This one added " << numEvents << ". "<< std::endl;
this->m_alg->totalLoaded += totalEvents;
this->m_alg->getLogger().debug() << "Boxes " << this->m_blockNumStart << " to " << this->m_blockNumEnd << ". Total events " << this->m_alg->totalLoaded << ". These added " << totalEvents << ". "<< std::endl;
// Report the progress
this->m_alg->prog->reportIncrement(int(numEvents), "Loading Box");
this->m_alg->prog->reportIncrement(int(totalEvents), "Loading Box");
this->m_alg->statsMutex.unlock();

} // (end run)
Expand Down Expand Up @@ -583,27 +622,36 @@ namespace MDAlgorithms
ThreadSchedulerFIFO * ts = new ThreadSchedulerFIFO();
ThreadPool tp(ts);

for (size_t ib=0; ib<numBoxes; ib++)
size_t last_ib = 0;
size_t ib = 0;
while (ib < numBoxes)
{
// Add a task for each box that actually has some events
if (this->eventsPerBox[ib] > 0)
// Increase ib until you collect enough events
size_t accumulatedEvents = 0;
while (accumulatedEvents < 10000000 && ib < numBoxes)
{
totalEventsInTasks += eventsPerBox[ib];
MergeMDLoadToBoxTask<MDE,nd> * task = new MergeMDLoadToBoxTask<MDE,nd>(this, ib, outWS, boxesById, !Parallel);
accumulatedEvents += eventsPerBox[ib];
ib++;
}

if (!Parallel)
{
// Run the task serially only
task->run();
delete task;
}
else
{
// Enqueue to run in parallel (at the joinAll() call below).
ts->push(task);
}
totalEventsInTasks += accumulatedEvents;
MergeMDLoadToBoxTask<MDE,nd> * task = new MergeMDLoadToBoxTask<MDE,nd>(this, last_ib, ib, outWS, boxesById, !Parallel);

if (!Parallel)
{
// Run the task serially only
task->run();
delete task;
}
} // for each box
else
{
// Enqueue to run in parallel (at the joinAll() call below).
ts->push(task);
}

// Prepare the next loop
last_ib = ib;
} // (while ib in range)

// Run any final tasks
tp.joinAll();
Expand Down

0 comments on commit a542254

Please sign in to comment.