Skip to content

Commit

Permalink
refs #7228 different multithreading options exploration
Browse files Browse the repository at this point in the history
  • Loading branch information
abuts committed Sep 12, 2013
1 parent 95328a4 commit 7a00629
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 26 deletions.
12 changes: 9 additions & 3 deletions Code/Mantid/Framework/API/inc/MantidAPI/BoxController.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ namespace API
// TODO: Smarter ways to determine all of these values
m_maxDepth = 5;
m_addingEvents_eventsPerTask = 1000;
m_SignifEventsNumber = 10000000;
m_addingEvents_numTasksPerBlock = Kernel::ThreadPool::getNumPhysicalCores() * 5;
m_splitInto.resize(this->nd, 1);
resetNumBoxes();
}

virtual ~BoxController();

/** get number of events used as significatn in box splitting (the box splitting begins after that)*/
size_t getSignifEventsNumber()const
{return m_SignifEventsNumber;}
// create new box controller from the existing one
virtual BoxController *clone()const;
/// Serialize
Expand Down Expand Up @@ -260,8 +263,8 @@ namespace API
// (because when the workspace gets very large you should split less often)
// But no more often than every 10 million events.
size_t comparisonPoint = nEventsInOutput/16;
if (comparisonPoint < 10000000)
comparisonPoint = 10000000;
if (comparisonPoint < m_SignifEventsNumber)
comparisonPoint = m_SignifEventsNumber;
if (eventsAdded > (comparisonPoint))return true;

// Return true if the average # of events per box is big enough to split.
Expand Down Expand Up @@ -413,6 +416,9 @@ namespace API
/// Splitting threshold
size_t m_SplitThreshold;

/// this number of events takes noticeple time to process. Identified experimentally and used in box splitting limings
size_t m_SignifEventsNumber;

/** Maximum splitting depth: don't go further than this many levels of recursion.
* This avoids infinite recursion and should be set to a value that gives a smallest
* box size that is a little smaller than the finest desired binning upon viewing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "MantidMDEvents/ConvToMDBase.h"
// coordinate transformation
#include "MantidMDEvents/MDTransfInterface.h"
#include "MantidKernel/Multithreaded.h"
#include "MantidKernel/Task.h"
#include "Poco/ScopedLock.h"

namespace Mantid
{
Expand Down Expand Up @@ -48,6 +51,9 @@ namespace MDEvents
*/

//-----------------------------------------------
// predefined declaration of the class, which do chunk of conversion in case of multithreaded execution (linux do not understand friend without it)
class ChunkOfWork;

class ConvToMDHistoWS: public ConvToMDBase
{

Expand All @@ -61,12 +67,52 @@ class ConvToMDHistoWS: public ConvToMDBase
// the size of temporary buffer, each thread stores data in before adding these data to target MD workspace;
size_t m_bufferSize;
// internal function used to identify m_spectraChunk and m_bufferSize
void estimateThreadWork(size_t nThreads,size_t specSize);
void estimateThreadWork(size_t nThreads,size_t specSize,size_t nPointsToProcess);
// the function does a chunk of work. Expected to run on a thread.
size_t conversionChunk(size_t job_ID);

friend class ChunkOfWork;

// counter for number of workspace points used for multithreaded calculations of number of actual points;
size_t m_numAddedPoints;
Kernel::Mutex m_npLock;

size_t addNPoits(size_t numPoints)
{
Poco::ScopedLock<Kernel::Mutex> lock(m_npLock);
m_numAddedPoints+=numPoints;
return 0;
}

};


/** Helper class for multithreaded adding -- poor replacement for bind, but we are thinking of adding more options here in a future*/
class ChunkOfWork : public Kernel::Task
{
ConvToMDHistoWS *classHolder;
// function pointer to the conversion chunk above
typedef size_t (ConvToMDHistoWS::*fpRunMethod)(size_t) ;
fpRunMethod runMethod;

fpRunMethod countPoints;

// the Id for the conversion job
size_t job_ID;
public:
/**Constructor */
ChunkOfWork(ConvToMDHistoWS *Converter,fpRunMethod conversionChunk ,fpRunMethod addPoints,size_t theJI)
:classHolder(Converter),runMethod(conversionChunk),countPoints(addPoints),job_ID(theJI)
{ };
/** Overloaded POCO run method used to run the job*/
void run()
{
size_t nAddedPoints = (classHolder->*runMethod)(job_ID);
(classHolder->*countPoints)(nAddedPoints);
}

};


} // endNamespace MDAlgorithms
} // endNamespace Mantid

Expand Down
57 changes: 36 additions & 21 deletions Code/Mantid/Framework/MDEvents/src/ConvToMDHistoWS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Mantid
return (val!=buf);
}


/** method sets up all internal variables necessary to convert from Matrix2D workspace to MDEvent workspace
@param WSD -- the class describing the target MD workspace, sorurce matrtix workspace and the transformations, necessary to perform on these workspaces
@param inWSWrapper -- the class wrapping the target MD workspace
Expand Down Expand Up @@ -157,18 +158,36 @@ namespace Mantid
Kernel::ThreadPool tp(ts,nThreads, new API::Progress(*pProgress));
//<<<-- Thread control stuff

if (runMultithreaded )
nThreads = static_cast<int>(tp.getNumPhysicalCores());
else
nThreads =1;

// estimate the size of data conversion a single thread should perform
//TO DO: this piece of code should be carefully rethinked
//size_t nThreads = tp.getNumPhysicalCores();
size_t nThr = 1;
this->estimateThreadWork(nThr,specSize);
size_t eventsChunkNum = bc->getSignifEventsNumber();
this->estimateThreadWork(nThreads,specSize,eventsChunkNum);

//External loop over the spectra:
for (size_t i = 0; i < nValidSpectra; i+=m_spectraChunk)
for (size_t i = 0; i < nValidSpectra; i+=m_spectraChunk*nThreads)
{
size_t nThreadEv = this->conversionChunk(i);
nAddedEvents+=nThreadEv;
nEventsInWS +=nThreadEv;
if (runMultithreaded)
{
m_numAddedPoints = 0;
for(size_t j=0;j<nThreads;j++)
ts->push(new ChunkOfWork(this,&ConvToMDHistoWS::conversionChunk,&ConvToMDHistoWS::addNPoits,i+j));

//tp.start();
nAddedEvents+=eventsChunkNum;
nEventsInWS +=eventsChunkNum;

}
else
{
size_t nThreadEv = this->conversionChunk(i);
nAddedEvents+=nThreadEv;
nEventsInWS +=nThreadEv;
}

if (bc->shouldSplitBoxes(nEventsInWS,nAddedEvents,lastNumBoxes))
{
Expand Down Expand Up @@ -221,28 +240,24 @@ namespace Mantid
* @param nThreads -- number of threads used to process data
* @param specSize -- the size of single spectra in matrix workspace;
*/
void ConvToMDHistoWS::estimateThreadWork(size_t nThreads,size_t specSize)
void ConvToMDHistoWS::estimateThreadWork(size_t nThreads,size_t specSize,size_t nPointsToProcess)
{
if (nThreads==0)nThreads=1;

// buffer size is at least a spectra size or more
m_bufferSize = ((specSize>DATA_BUFFER_SIZE)?specSize:DATA_BUFFER_SIZE);
if(m_bufferSize%specSize!=0)
{
m_bufferSize = ((m_bufferSize/specSize)+1)*specSize;
}
size_t nSpectras = this->m_InWS2D->getNPoints()/(specSize);

//
size_t nSpectras = nPointsToProcess/specSize+1;

m_spectraChunk = nSpectras/nThreads;
// estimate number of points, produced by single thread;
size_t nPoints = m_spectraChunk*nThreads;
// experimental parameter, which defines the number of points, which can be added to ws efficiently;
if(nPoints > 10000000)
{
nPoints = 10000000;
m_spectraChunk = nPoints/nThreads+1;
}
// the usfullness of this criteria is questionable;
//if(m_spectraChunk*specSize>10*m_bufferSize)m_spectraChunk = 10*m_bufferSize;
//if(nSpectras/m_spectraChunk<nThreads)m_spectraChunk=nSpectras/nThreads;
if(m_spectraChunk<1)m_spectraChunk =1;

if(m_spectraChunk<1)m_spectraChunk=1;
//if(m_spectraChunk<1)m_spectraChunk=1;
// TMP
//m_spectraChunk = 10;

Expand Down

0 comments on commit 7a00629

Please sign in to comment.