forked from AmbaPant/mantid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
BroadcastWorkspace.cpp
114 lines (96 loc) · 4.3 KB
/
BroadcastWorkspace.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
// NScD Oak Ridge National Laboratory, European Spallation Source,
// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
// SPDX - License - Identifier: GPL - 3.0 +
//----------------------------------------------------------------------
// Includes
//----------------------------------------------------------------------
#include "MantidMPIAlgorithms/BroadcastWorkspace.h"
#include "MantidAPI/Axis.h"
#include "MantidAPI/WorkspaceFactory.h"
#include "MantidDataObjects/Workspace2D.h"
#include "MantidKernel/BoundedValidator.h"
#include "MantidKernel/UnitFactory.h"
#include <boost/mpi.hpp>
namespace mpi = boost::mpi;
namespace Mantid {
namespace MPIAlgorithms {
using namespace Kernel;
using namespace API;
// Register the algorithm into the AlgorithmFactory
DECLARE_ALGORITHM(BroadcastWorkspace)
void BroadcastWorkspace::init() {
// Input is optional - only the 'BroadcasterRank' process should provide an
// input workspace
declareProperty(
std::make_unique<WorkspaceProperty<>>("InputWorkspace", "", Direction::Input, PropertyMode::Optional));
declareProperty(std::make_unique<WorkspaceProperty<>>("OutputWorkspace", "", Direction::Output));
declareProperty("BroadcasterRank", 0, std::make_shared<BoundedValidator<int>>(0, mpi::communicator().size() - 1));
}
void BroadcastWorkspace::exec() {
// Every process in an MPI job must hit this next line or everything hangs!
mpi::communicator world; // The communicator containing all processes
// Get the rank of the process that's doing the broadcasting
const int root = getProperty("BroadcasterRank");
MatrixWorkspace_const_sptr inputWorkspace;
std::size_t numSpec, numBins;
bool hist;
std::string xUnit, yUnit, yUnitLabel;
bool distribution;
if (world.rank() == root) {
inputWorkspace = getProperty("InputWorkspace");
if (!inputWorkspace) {
g_log.fatal("InputWorkspace '" + getPropertyValue("InputWorkspace") + "' not found in root process");
// We need to stop all the other processes. Not very graceful, but there's
// not much point in trying to be cleverer
mpi::environment::abort(-1);
}
numSpec = inputWorkspace->getNumberHistograms();
numBins = inputWorkspace->blocksize();
hist = inputWorkspace->isHistogramData();
xUnit = inputWorkspace->getAxis(0)->unit()->unitID();
yUnit = inputWorkspace->YUnit();
yUnitLabel = inputWorkspace->YUnitLabel();
distribution = inputWorkspace->isDistribution();
}
// Broadcast the size of the workspace
broadcast(world, numSpec, root);
broadcast(world, numBins, root);
broadcast(world, hist, root);
// Create an output workspace in each process. Assume Workspace2D for now
MatrixWorkspace_sptr outputWorkspace =
WorkspaceFactory::Instance().create("Workspace2D", numSpec, numBins + hist, numBins);
setProperty("OutputWorkspace", outputWorkspace);
// Broadcast the units
broadcast(world, xUnit, root);
broadcast(world, yUnit, root);
broadcast(world, yUnit, root);
broadcast(world, distribution, root);
outputWorkspace->getAxis(0)->unit() = UnitFactory::Instance().create(xUnit);
outputWorkspace->setYUnit(yUnit);
outputWorkspace->setYUnitLabel(yUnitLabel);
outputWorkspace->setDistribution(distribution);
// TODO: broadcast any other pertinent details. Want to keep this to a minimum
// though.
for (std::size_t i = 0; i < numSpec; ++i) {
if (world.rank() == root) {
// For local output, just copy over
outputWorkspace->dataX(i) = inputWorkspace->readX(i);
outputWorkspace->dataY(i) = inputWorkspace->readY(i);
outputWorkspace->dataE(i) = inputWorkspace->readE(i);
// Send out the current spectrum
broadcast(world, const_cast<MantidVec &>(inputWorkspace->readX(i)), root);
broadcast(world, const_cast<MantidVec &>(inputWorkspace->readY(i)), root);
broadcast(world, const_cast<MantidVec &>(inputWorkspace->readE(i)), root);
} else {
// Receive the broadcast spectrum from the broadcasting process
broadcast(world, outputWorkspace->dataX(i), root);
broadcast(world, outputWorkspace->dataY(i), root);
broadcast(world, outputWorkspace->dataE(i), root);
}
}
}
} // namespace MPIAlgorithms
} // namespace Mantid