-
Notifications
You must be signed in to change notification settings - Fork 122
/
MergeMDFiles.cpp
393 lines (330 loc) · 14.3 KB
/
MergeMDFiles.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
// NScD Oak Ridge National Laboratory, European Spallation Source,
// Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
// SPDX - License - Identifier: GPL - 3.0 +
#include "MantidMDAlgorithms/MergeMDFiles.h"
#include "MantidAPI/FileProperty.h"
#include "MantidAPI/MultipleFileProperty.h"
#include "MantidDataObjects/BoxControllerNeXusIO.h"
#include "MantidDataObjects/MDBoxBase.h"
#include "MantidDataObjects/MDEventFactory.h"
#include "MantidKernel/CPUTimer.h"
#include "MantidKernel/Strings.h"
#include "MantidKernel/System.h"
#include "MantidKernel/VectorHelper.h"
#include <Poco/File.h>
#include <boost/scoped_ptr.hpp>
using namespace Mantid::Kernel;
using namespace Mantid::API;
using namespace Mantid::DataObjects;
namespace Mantid {
namespace MDAlgorithms {
// Register the algorithm into the AlgorithmFactory
DECLARE_ALGORITHM(MergeMDFiles)
//----------------------------------------------------------------------------------------------
/** Constructor
*/
MergeMDFiles::MergeMDFiles()
: m_nDims(0), m_MDEventType(), m_fileBasedTargetWS(false), m_Filenames(), m_EventLoader(), m_OutIWS(),
m_totalEvents(0), m_totalLoaded(0), m_fileMutex(), m_statsMutex() {}
//----------------------------------------------------------------------------------------------
/** Destructor
*/
MergeMDFiles::~MergeMDFiles() { clearEventLoaders(); }
//----------------------------------------------------------------------------------------------
//----------------------------------------------------------------------------------------------
/** Initialize the algorithm's properties.
*/
void MergeMDFiles::init() {
std::vector<std::string> exts(1, ".nxs");
declareProperty(std::make_unique<MultipleFileProperty>("Filenames", exts),
"Select several MDEventWorkspace NXS files to merge "
"together. Files must have common box structure.");
declareProperty(std::make_unique<FileProperty>("OutputFilename", "", FileProperty::OptionalSave, exts),
"Choose a file to which to save the output workspace. \n"
"Optional: if specified, the workspace created will be file-backed. \n"
"If not, it will be created in memory.");
declareProperty("Parallel", false,
"Run the loading tasks in parallel.\n"
"This can be faster but might use more memory.");
declareProperty(std::make_unique<WorkspaceProperty<IMDEventWorkspace>>("OutputWorkspace", "", Direction::Output),
"An output MDEventWorkspace.");
}
//----------------------------------------------------------------------------------------------
/** Loads all of the box data required (no events) for later use.
* Calculates total number events in each box
* Also opens the files and leaves them open */
void MergeMDFiles::loadBoxData() {
this->progress(0.05, "Loading File Info");
// Get plain box structure and box tree
std::vector<API::IMDNode *> &Boxes = m_BoxStruct.getBoxes();
std::vector<uint64_t> &targetEventIndexes = m_BoxStruct.getEventIndex();
// clear the averages for target event indexes;
targetEventIndexes.assign(targetEventIndexes.size(), 0);
// Total number of events in ALL files.
m_totalEvents = 0;
m_fileComponentsStructure.resize(m_Filenames.size());
m_EventLoader.assign(m_Filenames.size(), nullptr);
try {
for (size_t i = 0; i < m_Filenames.size(); i++) {
// load box structure and the experimental info from each target
// workspace.
m_fileComponentsStructure[i].loadBoxStructure(m_Filenames[i], m_nDims, m_MDEventType, true, true);
// export just loaded experiment info to the target workspace
m_fileComponentsStructure[i].exportExperiment(m_OutIWS);
// Check for consistency
if (i > 0) {
if (m_fileComponentsStructure[i].getEventIndex().size() != targetEventIndexes.size())
throw std::runtime_error("Inconsistent number of boxes found in file " + m_Filenames[i] +
". Cannot merge these files. Did you generate them all with "
"exactly the same box structure?");
}
// calculate total number of events per target cell, which will be
size_t nBoxes = Boxes.size();
for (size_t j = 0; j < nBoxes; j++) {
size_t ID = Boxes[j]->getID();
targetEventIndexes[2 * ID + 1] += m_fileComponentsStructure[i].getEventIndex()[2 * ID + 1];
m_totalEvents += m_fileComponentsStructure[i].getEventIndex()[2 * ID + 1];
}
// Open the event data, track the total number of events
auto bc = std::shared_ptr<API::BoxController>(new API::BoxController(static_cast<size_t>(m_nDims)));
bc->fromXMLString(m_fileComponentsStructure[i].getBCXMLdescr());
m_EventLoader[i] = new BoxControllerNeXusIO(bc.get());
m_EventLoader[i]->setDataType(sizeof(coord_t), m_MDEventType);
m_EventLoader[i]->openFile(m_Filenames[i], "r");
}
} catch (...) {
// Close all open files in case of error
clearEventLoaders();
throw;
}
const std::vector<int> &boxType = m_BoxStruct.getBoxType();
// calculate event positions in the target file.
uint64_t eventsStart = 0;
for (auto mdBox : Boxes) {
mdBox->clear();
size_t ID = mdBox->getID();
// avoid grid boxes;
if (boxType[ID] == 2)
continue;
uint64_t nEvents = targetEventIndexes[2 * ID + 1];
targetEventIndexes[ID * 2] = eventsStart;
if (m_fileBasedTargetWS)
mdBox->setFileBacked(eventsStart, nEvents, false);
eventsStart += nEvents;
}
g_log.notice() << m_totalEvents << " events in " << m_Filenames.size() << " files.\n";
}
/** Task that loads all of the events from corresponded boxes of all files
* that is being merged into a particular box in the output workspace.
*/
uint64_t MergeMDFiles::loadEventsFromSubBoxes(API::IMDNode *TargetBox) {
/// get rid of the events and averages which are in the memory erroneously
/// (from cloning)
TargetBox->clear();
uint64_t nBoxEvents(0);
std::vector<size_t> numFileEvents(m_EventLoader.size());
for (size_t iw = 0; iw < this->m_EventLoader.size(); iw++) {
size_t ID = TargetBox->getID();
numFileEvents[iw] = static_cast<size_t>(m_fileComponentsStructure[iw].getEventIndex()[2 * ID + 1]);
nBoxEvents += numFileEvents[iw];
}
// At this point memory required is known, so it is reserved all in one go
TargetBox->reserveMemoryForLoad(nBoxEvents);
for (size_t iw = 0; iw < this->m_EventLoader.size(); iw++) {
size_t ID = TargetBox->getID();
uint64_t fileLocation = m_fileComponentsStructure[iw].getEventIndex()[2 * ID + 0];
if (numFileEvents[iw] == 0)
continue;
TargetBox->loadAndAddFrom(m_EventLoader[iw], fileLocation, numFileEvents[iw]);
}
return nBoxEvents;
}
//----------------------------------------------------------------------------------------------
/** Perform the merging, but clone the initial workspace and use the same
*splitting
* as its structure is equivalent to the partial box structures.
*
* @param ws :: first MDEventWorkspace in the list to merge to.
* @param outputFile :: the name of the output file where file-based workspace
*should be saved
*/
void MergeMDFiles::doExecByCloning(const Mantid::API::IMDEventWorkspace_sptr &ws, const std::string &outputFile) {
m_OutIWS = ws;
m_MDEventType = ws->getEventTypeName();
// Run the tasks in parallel? TODO: enable
// bool Parallel = this->getProperty("Parallel");
// Fix the box controller settings in the output workspace so that it splits
// normally
BoxController_sptr bc = ws->getBoxController();
// set up internal variables characterizing the workspace.
m_nDims = static_cast<int>(bc->getNDims());
// Fix the max depth to something bigger.
bc->setMaxDepth(20);
bc->setSplitThreshold(5000);
auto saver = std::shared_ptr<API::IBoxControllerIO>(new DataObjects::BoxControllerNeXusIO(bc.get()));
saver->setDataType(sizeof(coord_t), m_MDEventType);
if (m_fileBasedTargetWS) {
bc->setFileBacked(saver, outputFile);
// Complete the file-back-end creation.
g_log.notice() << "Setting cache to 400 MB write.\n";
bc->getFileIO()->setWriteBufferSize(400000000 / m_OutIWS->sizeofEvent());
}
/* else
{
saver->openFile(outputFile,"w");
}*/
// Init box structure used for memory/file space calculations
m_BoxStruct.initFlatStructure(ws, outputFile);
// First, load all the box data and experiment info and calculate file
// positions of the target workspace
this->loadBoxData();
size_t numBoxes = m_BoxStruct.getNBoxes();
// Progress report based on events processed.
m_progress = std::make_unique<Progress>(this, 0.1, 0.9, size_t(numBoxes));
m_progress->setNotifyStep(0.1);
// For tracking progress
// uint64_t m_totalEventsInTasks = 0;
// Prepare thread pool
CPUTimer overallTime;
auto ts = new ThreadSchedulerFIFO();
ThreadPool tp(ts);
Kernel::DiskBuffer *DiskBuf(nullptr);
if (m_fileBasedTargetWS) {
DiskBuf = bc->getFileIO();
}
this->m_totalLoaded = 0;
std::vector<API::IMDNode *> &boxes = m_BoxStruct.getBoxes();
for (size_t ib = 0; ib < numBoxes; ib++) {
auto box = boxes[ib];
if (!box->isBox())
continue;
// load all contributed events into current box;
this->loadEventsFromSubBoxes(boxes[ib]);
if (DiskBuf) {
if (box->getDataInMemorySize() > 0) { // data position has been already pre-calculated
box->getISaveable()->save();
box->clearDataFromMemory();
// Kernel::ISaveable *Saver = box->getISaveable();
// DiskBuf->toWrite(Saver);
}
}
// else
//{ size_t ID = box->getID();
// uint64_t filePosition = targetEventIndexes[2*ID];
// box->saveAt(saver.get(), filePosition);
//}
//
// if (!Parallel)
//{
// // Run the task serially only
// task->run();
// delete task;
//}
// else
//{
// // Enqueue to run in parallel (at the joinAll() call below).
// ts->push(task);
//}
m_progress->reportIncrement(ib, "Loading and merging box data");
}
if (DiskBuf) {
DiskBuf->flushCache();
bc->getFileIO()->flushData();
}
//// Run any final tasks
// tp.joinAll();
g_log.information() << overallTime << " to do all the adding.\n";
// Close any open file handle
clearEventLoaders();
// Finish things up
this->finalizeOutput(outputFile);
}
//----------------------------------------------------------------------------------------------
/** Now re-save the MDEventWorkspace to update the file back end */
void MergeMDFiles::finalizeOutput(const std::string &outputFile) {
CPUTimer overallTime;
this->progress(0.90, "Refreshing Cache");
m_OutIWS->refreshCache();
g_log.information() << overallTime << " to run refreshCache().\n";
if (!outputFile.empty()) {
g_log.notice() << "Starting SaveMD to update the file back-end.\n";
// create or open WS group and put there additional information about WS and
// its dimensions
bool old_data_there;
// clang-format off
boost::scoped_ptr< ::NeXus::File> file(MDBoxFlatTree::createOrOpenMDWSgroup(
outputFile, m_nDims, m_MDEventType, false, old_data_there));
// clang-format on
this->progress(0.94, "Saving ws history and dimensions");
MDBoxFlatTree::saveWSGenericInfo(file.get(), m_OutIWS);
// Save each ExperimentInfo to a spot in the file
this->progress(0.98, "Saving experiment infos");
MDBoxFlatTree::saveExperimentInfos(file.get(), m_OutIWS);
file->closeGroup();
file->close();
// -------------- Save Box Structure -------------------------------------
// OK, we've filled these big arrays of data representing flat box
// structure. Save them.
progress(0.91, "Writing Box Data");
m_progress->resetNumSteps(8, 0.92, 1.00);
// Save box structure;
m_BoxStruct.saveBoxStructure(outputFile);
g_log.information() << overallTime << " to run SaveMD structure\n";
}
}
//----------------------------------------------------------------------------------------------
/** Execute the algorithm.
*/
void MergeMDFiles::exec() {
// clear disk buffer which can remain from previous runs
// the existence/ usage of the buffer indicates if the algorithm works with
// file based or memory based target workspaces;
// pDiskBuffer = NULL;
MultipleFileProperty *multiFileProp = dynamic_cast<MultipleFileProperty *>(getPointerToProperty("Filenames"));
if (!multiFileProp) {
throw std::logic_error("Filenames property must have MultipleFileProperty type.");
}
m_Filenames = VectorHelper::flattenVector(multiFileProp->operator()());
if (m_Filenames.empty())
throw std::invalid_argument("Must specify at least one filename.");
std::string firstFile = m_Filenames[0];
std::string outputFile = getProperty("OutputFilename");
m_fileBasedTargetWS = false;
if (!outputFile.empty()) {
m_fileBasedTargetWS = true;
if (Poco::File(outputFile).exists())
throw std::invalid_argument(" File " + outputFile +
" already exists. Can not use existing file "
"as the target to MergeMD files.\n" +
" Use it as one of source files if you want to add MD data to it");
}
// Start by loading the first file but just the box structure, no events, and
// not file-backed
// m_BoxStruct.loadBoxStructure(firstFile,
IAlgorithm_sptr loader = createChildAlgorithm("LoadMD", 0.0, 0.05, false);
loader->setPropertyValue("Filename", firstFile);
loader->setProperty("MetadataOnly", false);
loader->setProperty("BoxStructureOnly", true);
loader->setProperty("FileBackEnd", false);
loader->executeAsChildAlg();
IMDWorkspace_sptr result = (loader->getProperty("OutputWorkspace"));
auto firstWS = std::dynamic_pointer_cast<API::IMDEventWorkspace>(result);
if (!firstWS)
throw std::runtime_error("Can not load MDEventWorkspace from initial file " + firstFile);
// do the job
this->doExecByCloning(firstWS, outputFile);
m_OutIWS->setFileNeedsUpdating(false);
setProperty("OutputWorkspace", m_OutIWS);
}
/**Delete all event loaders */
void MergeMDFiles::clearEventLoaders() {
for (auto &loader : m_EventLoader) {
delete loader;
loader = nullptr;
}
}
} // namespace MDAlgorithms
} // namespace Mantid