Skip to content

Commit

Permalink
Refs #4980 Add and append for histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
VickieLynch committed Mar 22, 2012
1 parent cf9ecbb commit 1b66fe2
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 8 deletions.
78 changes: 71 additions & 7 deletions Code/Mantid/Framework/MPIAlgorithms/src/GatherWorkspaces.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "MantidKernel/ArrayProperty.h"
#include "MantidKernel/ArrayBoundedValidator.h"
#include "MantidDataObjects/EventWorkspace.h"
#include "MantidKernel/ListValidator.h"

namespace mpi = boost::mpi;

Expand All @@ -34,6 +35,16 @@ void GatherWorkspaces::init()
declareProperty(new WorkspaceProperty<>("OutputWorkspace","",Direction::Output,PropertyMode::Optional));
declareProperty("PreserveEvents", false, "Keep the output workspace as an EventWorkspace, if the input has events (default).\n"
"If false, then the workspace gets converted to a Workspace2D histogram.");
std::vector<std::string> propOptions;
propOptions.push_back("Add");
//propOptions.push_back("Replace");
propOptions.push_back("Append");
declareProperty("AccumulationMethod", "Append", boost::make_shared<StringListValidator>(propOptions),
"Method to use for accumulating each chunk of live data.\n"
" - Add: the processed chunk will be summed to the previous outpu (default).\n"
//" - Replace: the processed chunk will replace the previous output.\n"
" - Append: the spectra of the chunk will be appended to the output workspace, increasing its size.");

}

void GatherWorkspaces::exec()
Expand Down Expand Up @@ -74,8 +85,15 @@ void GatherWorkspaces::exec()
throw Exception::MisMatch<int>(hist, 0, "The input workspaces must be all histogram or all point data");
}

// How do we accumulate the data?
std::string accum = this->getPropertyValue("AccumulationMethod");
// Get the total number of spectra in the combined inputs
totalSpec = inputWorkspace->getNumberHistograms();
size_t sumSpec = totalSpec;
if (accum == "Append")
{
reduce(included, totalSpec, sumSpec, std::plus<std::size_t>(), 0);
}

eventW = boost::dynamic_pointer_cast<const EventWorkspace>( inputWorkspace);
if (eventW != NULL)
Expand All @@ -92,28 +110,74 @@ void GatherWorkspaces::exec()
MatrixWorkspace_sptr outputWorkspace;
if ( included.rank() == 0 )
{
g_log.debug() << "Total number of spectra is " << totalSpec << "\n";
g_log.debug() << "Total number of spectra is " << sumSpec << "\n";
// Create the workspace for the output
outputWorkspace = WorkspaceFactory::Instance().create(inputWorkspace,totalSpec,numBins+hist,numBins);
outputWorkspace = WorkspaceFactory::Instance().create(inputWorkspace,sumSpec,numBins+hist,numBins);
setProperty("OutputWorkspace",outputWorkspace);
}

for (size_t wi = 0; wi < totalSpec; wi++)
{
if ( included.rank() == 0 )
{
outputWorkspace->dataX(wi) = inputWorkspace->readX(wi);
reduce(included, inputWorkspace->readY(wi), outputWorkspace->dataY(wi), vplus(), 0);
reduce(included, inputWorkspace->readE(wi), outputWorkspace->dataE(wi), eplus(), 0);
const ISpectrum * inSpec = inputWorkspace->getSpectrum(wi);
if (accum == "Add")
{
outputWorkspace->dataX(wi) = inputWorkspace->readX(wi);
reduce(included, inputWorkspace->readY(wi), outputWorkspace->dataY(wi), vplus(), 0);
reduce(included, inputWorkspace->readE(wi), outputWorkspace->dataE(wi), eplus(), 0);
}
else if (accum == "Append")
{
// Copy over data from own input workspace
outputWorkspace->dataX(wi) = inputWorkspace->readX(wi);
outputWorkspace->dataY(wi) = inputWorkspace->readY(wi);
outputWorkspace->dataE(wi) = inputWorkspace->readE(wi);

const int numReqs(3*(included.size()-1));
mpi::request reqs[numReqs];
int j(0);

// Receive data from all the other processes
// This works because the process ranks are ordered the same in 'included' as
// they are in 'world', but in general this is not guaranteed. TODO: robustify
for ( int i = 1; i < included.size(); ++i )
{
size_t index = wi + i * totalSpec;
reqs[j++] = included.irecv(i,0,outputWorkspace->dataX(index));
reqs[j++] = included.irecv(i,1,outputWorkspace->dataY(index));
reqs[j++] = included.irecv(i,2,outputWorkspace->dataE(index));
ISpectrum * outSpec = outputWorkspace->getSpectrum(index);
outSpec->clearDetectorIDs();
outSpec->addDetectorIDs( inSpec->getDetectorIDs() );
}

// Make sure everything's been received before exiting the algorithm
mpi::wait_all(reqs,reqs+numReqs);
}
ISpectrum * outSpec = outputWorkspace->getSpectrum(wi);
outSpec->clearDetectorIDs();
outSpec->addDetectorIDs( inSpec->getDetectorIDs() );
}
else
{
reduce(included, inputWorkspace->readY(wi), vplus(), 0);
reduce(included, inputWorkspace->readE(wi), eplus(), 0);
if (accum == "Add")
{
reduce(included, inputWorkspace->readY(wi), vplus(), 0);
reduce(included, inputWorkspace->readE(wi), eplus(), 0);
}
else if (accum == "Append")
{
mpi::request reqs[3];

// Send the spectrum to the root process
reqs[0] = included.isend(0,0,inputWorkspace->readX(0));
reqs[1] = included.isend(0,1,inputWorkspace->readY(0));
reqs[2] = included.isend(0,2,inputWorkspace->readE(0));

// Make sure the sends have completed before exiting the algorithm
mpi::wait_all(reqs,reqs+3);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def _focusChunks(self, runnumber, extension, filterWall, calib, filterLogs=None,
temp = self._focus(temp, calib, self._info, filterLogs, preserveEvents, normByCurrent)
comm = mpi.world
if HAVE_MPI:
alg = GatherWorkspaces(InputWorkspace=temp, PreserveEvents=preserveEvents, OutputWorkspace=wksp)
alg = GatherWorkspaces(InputWorkspace=temp, PreserveEvents=preserveEvents, AccumulationMethod="Add", OutputWorkspace=wksp)
wksp = alg['OutputWorkspace']
else:
if firstChunk:
Expand Down

0 comments on commit 1b66fe2

Please sign in to comment.