forked from npshub/mantid
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reducer.py
596 lines (493 loc) · 23 KB
/
reducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
# pylint: disable=protected-access, unused-variable, W0121
# W0121 = Use raise ErrorClass(args) instead of raise ErrorClass, args.
"""
Base reduction class. Hold a list of data and a list of reduction steps to apply to them.
Pseudo code example:
r = Reducer()
r.set_instrument( SANSInstrument() )
r.append_step( ReductionStep() )
r.reduce()
The ReductionStep object is initialized before being passed to the Reducer.
The Reducer will call ReductionStep.execute() when Reducer.reduce() is called.
The ReductionStep.execute() method takes two arguments, a reference to the
Reducer itself and the name of the workspace to apply the step to (usually a data set).
The Reducer object reference is passed so that the reduction step can access
instrument settings.
"""
import os
import sys
import time
import types
import uuid
from reduction.instrument import Instrument
import mantid.simpleapi as mantid
import warnings
import inspect
from reduction.find_data import find_data
## Version number
__version__ = '1.0'
def validate_loader(func): # noqa
def validated_f(reducer, algorithm, *args, **kwargs):
if issubclass(algorithm.__class__, ReductionStep) or algorithm is None:
# If we have a ReductionStep object, just use it.
# "None" is allowed as an algorithm (usually tells the reducer to skip a step)
return func(reducer, algorithm)
if isinstance(algorithm, types.FunctionType):
# If we get a function, assume its name is an algorithm name
algorithm = algorithm.func_name
if isinstance(algorithm, types.StringType):
# If we have a string, assume it's an algorithm name
class _AlgorithmStep(ReductionStep):
def __init__(self):
self.algorithm = None
self._data_file = None
def get_algorithm(self):
return self.algorithm
def setProperty(self, key, value):
kwargs[key] = value
def execute(self, reducer, inputworkspace=None, outputworkspace=None):
"""
Create a new instance of the requested algorithm object,
set the algorithm properties replacing the input and output
workspaces.
The execution will work for any combination of mandatory/optional
properties.
@param reducer: Reducer object managing the reduction
@param inputworkspace: input workspace name [optional]
@param outputworkspace: output workspace name [optional]
"""
# If we don't have a data file, look up the workspace handle
if self._data_file is None:
if inputworkspace in reducer._data_files:
data_file = reducer._data_files[inputworkspace]
if data_file is None:
return
else:
raise RuntimeError("SANSReductionSteps.LoadRun doesn't recognize workspace handle %s" % inputworkspace)
else:
data_file = self._data_file
alg = mantid.api.AlgorithmManager.create(algorithm)
if not isinstance(alg, mantid.api.IAlgorithm):
raise RuntimeError("Reducer expects an Algorithm object from FrameworkManager, found '%s'" % str(
type(alg)))
propertyOrder = alg.orderedProperties()
# add the args to the kw list so everything can be set in a single way
for (key, arg) in zip(propertyOrder[:len(args)], args):
kwargs[key] = arg
# Override input and output workspaces
if "Workspace" in kwargs:
kwargs["Workspace"] = inputworkspace
if "OutputWorkspace" in kwargs:
kwargs["OutputWorkspace"] = inputworkspace
if "Filename" in kwargs:
kwargs["Filename"] = data_file
if "AlternateName" in kwargs and \
kwargs["AlternateName"] in propertyOrder:
kwargs[kwargs["AlternateName"]] = data_file
self.algorithm = alg
mantid.set_properties(alg, *(), **kwargs)
alg.execute()
if "OutputMessage" in propertyOrder:
return alg.getPropertyValue("OutputMessage")
return "%s applied" % alg.name()
return func(reducer, _AlgorithmStep())
elif isinstance(algorithm, mantid.api.IAlgorithm) \
or type(algorithm).__name__ == "IAlgorithm":
class _AlgorithmStep(ReductionStep):
def __init__(self):
self.algorithm = algorithm
self._data_file = None
def get_algorithm(self):
return self.algorithm
def setProperty(self, key, value):
kwargs[key] = value
def execute(self, reducer, inputworkspace=None, outputworkspace=None):
"""
Create a new instance of the requested algorithm object,
set the algorithm properties replacing the input and output
workspaces.
The execution will work for any combination of mandatory/optional
properties.
@param reducer: Reducer object managing the reduction
@param inputworkspace: input workspace name [optional]
@param outputworkspace: output workspace name [optional]
"""
# If we don't have a data file, look up the workspace handle
if self._data_file is None:
if inputworkspace in reducer._data_files:
data_file = reducer._data_files[inputworkspace]
if data_file is None:
return
else:
raise RuntimeError("SANSReductionSteps.LoadRun doesn't recognize workspace handle %s" % inputworkspace)
else:
data_file = self._data_file
propertyOrder = algorithm.orderedProperties()
# Override input and output workspaces
if "Workspace" in propertyOrder:
algorithm.setPropertyValue("Workspace", inputworkspace)
if "OutputWorkspace" in propertyOrder:
algorithm.setPropertyValue("OutputWorkspace", inputworkspace)
if "Filename" in propertyOrder:
algorithm.setPropertyValue("Filename", data_file)
if "AlternateName" in kwargs and \
kwargs["AlternateName"] in propertyOrder:
algorithm.setPropertyValue(kwargs["AlternateName"], data_file)
algorithm.execute()
return "%s applied" % algorithm.name()
return func(reducer, _AlgorithmStep())
else:
raise RuntimeError("%s expects a ReductionStep object, found %s" % (func.__name__, algorithm.__class__))
return validated_f
def validate_step(func): # noqa
"""
Decorator for Reducer methods that need a ReductionStep
object as its first argument.
Example:
@validate_step
def some_func(self, reduction_step):
[...]
Arguments to a Mantid algorithm function should be passed as arguments.
Example:
#Load("my_file.txt", "my_wksp") will become:
reducer.some_func(Load, "my_file.txt", "my_wksp")
InputWorkspace and OutputWorkspace arguments can be left as None
if they are to be overwritten by the Reducer.
"""
def validated_f(reducer, algorithm, *args, **kwargs):
"""
Wrapper function around the function func.
The function ensures that the algorithm parameter
is a sub-class of ReductionStep
@param algorithm: algorithm name, ReductionStep object, or Mantid algorithm function
"""
if issubclass(algorithm.__class__, ReductionStep) or algorithm is None:
# If we have a ReductionStep object, just use it.
# "None" is allowed as an algorithm (usually tells the reducer to skip a step)
return func(reducer, algorithm)
if isinstance(algorithm, types.FunctionType):
# If we get a function, assume its name is an algorithm name
algorithm = algorithm.func_name
if isinstance(algorithm, types.StringType):
# If we have a string, assume it's an algorithm name
class _AlgorithmStep(ReductionStep):
def __init__(self):
self.algorithm = None
def get_algorithm(self):
return self.algorithm
def setProperty(self, key, value):
kwargs[key] = value
def execute(self, reducer, inputworkspace=None, outputworkspace=None):
"""
Create a new instance of the requested algorithm object,
set the algorithm properties replacing the input and output
workspaces.
The execution will work for any combination of mandatory/optional
properties.
@param reducer: Reducer object managing the reduction
@param inputworkspace: input workspace name [optional]
@param outputworkspace: output workspace name [optional]
"""
if outputworkspace is None:
outputworkspace = inputworkspace
alg = mantid.AlgorithmManager.create(algorithm)
if not isinstance(alg, mantid.api.IAlgorithm):
raise RuntimeError("Reducer expects an Algorithm object from FrameworkManager, found '%s'" % str(
type(alg)))
propertyOrder = alg.orderedProperties()
# add the args to the kw list so everything can be set in a single way
for (key, arg) in zip(propertyOrder[:len(args)], args):
kwargs[key] = arg
# Override input and output workspaces
if "Workspace" in kwargs:
kwargs["Workspace"] = inputworkspace
if "InputWorkspace" in kwargs:
kwargs["InputWorkspace"] = inputworkspace
if "OutputWorkspace" in kwargs:
kwargs["OutputWorkspace"] = outputworkspace
self.algorithm = alg
mantid.set_properties(alg, *(), **kwargs)
alg.execute()
if "OutputMessage" in propertyOrder:
return alg.getPropertyValue("OutputMessage")
return "%s applied" % alg.name()
return func(reducer, _AlgorithmStep())
elif isinstance(algorithm, mantid.api.IAlgorithm) \
or type(algorithm).__name__ == "IAlgorithm":
class _AlgorithmStep(ReductionStep):
def __init__(self):
self.algorithm = algorithm
def get_algorithm(self):
return self.algorithm
def setProperty(self, key, value):
kwargs[key] = value
def execute(self, reducer, inputworkspace=None, outputworkspace=None):
"""
Create a new instance of the requested algorithm object,
set the algorithm properties replacing the input and output
workspaces.
The execution will work for any combination of mandatory/optional
properties.
@param reducer: Reducer object managing the reduction
@param inputworkspace: input workspace name [optional]
@param outputworkspace: output workspace name [optional]
"""
if outputworkspace is None:
outputworkspace = inputworkspace
propertyOrder = algorithm.orderedProperties()
# Override input and output workspaces
if "Workspace" in propertyOrder:
algorithm.setPropertyValue("Workspace", inputworkspace)
if "InputWorkspace" in propertyOrder:
algorithm.setPropertyValue("InputWorkspace", inputworkspace)
if "OutputWorkspace" in propertyOrder:
algorithm.setPropertyValue("OutputWorkspace", outputworkspace)
algorithm.execute()
if "OutputMessage" in propertyOrder:
return algorithm.getPropertyValue("OutputMessage")
return "%s applied" % algorithm.name()
return func(reducer, _AlgorithmStep())
else:
raise RuntimeError("%s expects a ReductionStep object, found %s" % (func.__name__, algorithm.__class__))
return validated_f
class Reducer(object):
"""
Base reducer class. Instrument-specific reduction processes should be
implemented in a child of this class.
"""
## Instrument configuration object
instrument = None
## Path for data files
_data_path = '.'
## Path for output files
_output_path = None
## List of data files to process
_data_files = {}
## List of workspaces that were modified
_dirty = []
## List of reduction steps
_reduction_steps = []
## Log
log_text = ''
## Output workspaces
output_workspaces = []
def __init__(self):
# Generate UUID and trim to 5 chars
self.UID = str(uuid.uuid1())[:5]
self.property_manager = "__reduction_parameters_" + self.UID
self._data_files = {}
self._reduction_steps = []
def get_reduction_table_name(self):
return self.property_manager
def set_reduction_table_name(self, name):
self.property_manager = str(name)
def set_instrument(self, configuration):
if issubclass(configuration.__class__, Instrument):
self.instrument = configuration
else:
raise RuntimeError("Reducer.set_instrument expects an %s object, found %s" % (
Instrument, configuration.__class__))
def dirty(self, workspace):
"""
Flag a workspace as dirty when the data has been modified
"""
if workspace not in self._dirty:
self._dirty.append(workspace)
def clean_up(self):
"""
Removes all workspace flagged as dirty, use when a reduction aborts with errors
"""
for bad_data in self._dirty:
if bad_data in mantid.mtd:
mantid.DeleteWorkspace(Workspace=bad_data)
else:
mantid.logger.notice('reducer: Could not access tainted workspace ' + bad_data)
def clean(self, workspace):
"""
Remove the dirty flag on a workspace
"""
if workspace in self._dirty:
self._dirty.remove(workspace)
def is_clean(self, workspace):
"""
Returns True if the workspace is clean
"""
if workspace in self._dirty:
return False
return True
def set_data_path(self, path):
"""
Set the path for data files
@param path: data file path
"""
path = os.path.normcase(path)
if os.path.isdir(path):
self._data_path = path
mantid.config.appendDataSearchDir(path)
else:
raise RuntimeError("Reducer.set_data_path: provided path is not a directory (%s)" % path)
def set_output_path(self, path):
"""
Set the path for output files
@param path: output file path
"""
path = os.path.normcase(path)
if os.path.isdir(path):
self._output_path = path
else:
raise RuntimeError("Reducer.set_output_path: provided path is not a directory (%s)" % path)
def _full_file_path(self, filename):
"""
Prepends the data folder path and returns a full path to the given file.
Raises an exception if the file doesn't exist.
@param filename: name of the file to create the full path for
"""
lineno = inspect.currentframe().f_code.co_firstlineno
warnings.warn_explicit("Reducer._full_file_path is deprecated: use find_data instead", DeprecationWarning,
__file__, lineno)
instrument_name = ''
if self.instrument is not None:
instrument_name = self.instrument.name()
return find_data(filename, instrument=instrument_name)
@validate_step
def append_step(self, reduction_step):
"""
Append a reduction step
@param reduction_step: ReductionStep object
"""
if reduction_step is None:
return None
self._reduction_steps.append(reduction_step)
return reduction_step
def clear_data_files(self):
"""
Empty the list of files to reduce while keeping all the
other options the same.
"""
self._data_files = {}
def append_data_file(self, data_file, workspace=None):
"""
Append a file to be processed.
@param data_file: name of the file to be processed
@param workspace: optional name of the workspace for this data,
default will be the name of the file
TODO: this needs to be an ordered list
"""
if data_file is None:
if workspace in mantid.mtd:
self._data_files[workspace] = None
return
else:
raise RuntimeError("Trying to append a data set without a file name or an existing workspace.")
if isinstance(data_file, list):
if workspace is None:
# Use the first file to determine the workspace name
workspace = extract_workspace_name(data_file[0])
else:
if workspace is None:
workspace = extract_workspace_name(data_file)
self._data_files[workspace] = data_file
def pre_process(self):
"""
Reduction steps that are meant to be executed only once per set
of data files. After this is executed, all files will go through
the list of reduction steps.
"""
pass
def post_process(self):
"""
Reduction steps to be executed after all data files have been
processed.
"""
pass
def reduce(self):
"""
Go through the list of reduction steps
"""
t_0 = time.time()
instrument_name = ''
self.output_workspaces = []
# Check that an instrument was specified
if self.instrument is not None:
instrument_name = self.instrument.name()
# Log text
self.log_text = "%s reduction - %s\n" % (instrument_name, time.ctime())
# Go through the list of steps that are common to all data files
self.pre_process()
# Go through the list of files to be reduced
for file_ws in self._data_files:
for item in self._reduction_steps:
try:
result = item.execute(self, file_ws)
if result is not None and len(str(result)) > 0:
self.log_text += "%s\n" % str(result)
except:
self.log_text += "\n%s\n" % sys.exc_info()[1]
raise
# any clean up, possibly removing workspaces
self.post_process()
# Determine which directory to use
output_dir = self._data_path
if self._output_path is not None:
if os.path.isdir(self._output_path):
output_dir = self._output_path
else:
output_dir = os.path.expanduser('~')
self.log_text += "Reduction completed in %g sec\n" % (time.time() - t_0)
log_path = os.path.join(output_dir, "%s_reduction.log" % instrument_name)
self.log_text += "Log saved to %s" % log_path
# Write the log to file
file_dat = open(log_path, 'a')
file_dat.write("\n-------------------------------------------\n")
file_dat.write(self.log_text)
file_dat.close()
return self.log_text
class ReductionStep(object):
"""
Base class for reduction steps
"""
@classmethod
def delete_workspaces(cls, workspace):
"""
Delete all workspace created by this reduction step related
to the given workspace
@param workspace: workspace to delete
"""
return
@classmethod
def _create_unique_name(cls, filepath, descriptor):
"""
Generate a unique name for an internal workspace
"""
random_str = str(uuid.uuid1())[:5]
return "__" + descriptor + "_" + extract_workspace_name(filepath) + "_" + random_str
def execute(self, reducer, inputworkspace=None, outputworkspace=None):
"""
Implemented the reduction step.
@param reducer: Reducer object for which the step is executed
@param inputworkspace: Name of the workspace to apply this step to
@param outputworkspace: Name of the workspace to have as an output. If this is None it will be set to inputworkspace
"""
raise NotImplementedError
def extract_workspace_name(filepath, suffix=''):
"""
Returns a default workspace name for a given data file path.
@param filepath: path of the file to generate a workspace name for
@param suffix: string to append to name
"""
filepath_tmp = filepath
if isinstance(filepath, list):
filepath_tmp = filepath[0]
(head, tail) = os.path.split(filepath_tmp)
basename, extension = os.path.splitext(tail)
if isinstance(filepath, list):
basename += "_combined"
# TODO: check whether the workspace name is already in use
# and modify it if it is.
return basename + suffix