forked from AmbaPant/mantid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
GenerateLogbook.py
417 lines (374 loc) · 20.2 KB
/
GenerateLogbook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2021 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
from mantid import config
from mantid.api import AlgorithmFactory, FileAction, FileProperty, \
ITableWorkspaceProperty, Progress, PythonAlgorithm
from mantid.kernel import Direction, IntArrayBoundedValidator, \
StringListValidator, StringMandatoryValidator
from mantid.simpleapi import *
import fnmatch
import h5py
import numpy as np
import os
import re
class GenerateLogbook(PythonAlgorithm):
_data_directory = None
_facility = None
_instrument = None
_numor_range = None
_metadata_headers = None
_metadata_entries = None
def category(self):
return 'Utility'
def summary(self):
return 'Generates logbook containing meta-data specific to the instrument and technique used to obtain the raw data'
def name(self):
return 'GenerateLogbook'
def validateInputs(self):
issues = dict()
instrument = self.getPropertyValue('Instrument')
ws_tmp = CreateSingleValuedWorkspace()
try:
LoadParameterFile(Workspace=ws_tmp, Filename=instrument + '_Parameters.xml')
except Exception as e:
self.log().error(str(e))
issues['Instrument'] = 'There is no parameter file for {} instrument.'.format(instrument)
DeleteWorkspace(Workspace=ws_tmp)
if not self.getProperty('NumorRange').isDefault:
numor_range = self.getProperty('NumorRange').value
if len(numor_range) < 2:
issues['NumorRange'] = 'Please provide both bottom and upper numor limits.'
if numor_range[0] > numor_range[-1]:
issues['NumorRange'] = 'The upper limit must be larger than the bottom one.'
if not self.getProperty('CustomEntries').isDefault:
custom_entries = self.getPropertyValue('CustomEntries')
custom_entries = custom_entries.split(',')
if not self.getProperty('CustomHeaders').isDefault:
custom_headers = self.getPropertyValue('CustomHeaders')
custom_headers = custom_headers.split(',')
if len(custom_entries) != len(custom_headers):
issues['CustomHeaders'] = 'Provide none or as many headers as custom entries.'
return issues
def PyInit(self):
self.declareProperty(FileProperty('Directory', '',
action=FileAction.Directory),
doc='Path to directory containing data files for logging.')
self.declareProperty(ITableWorkspaceProperty('OutputWorkspace', '',
direction=Direction.Output),
doc='The output table workspace.')
self.declareProperty("NumorRange", [0, 0],
direction=Direction.Input,
validator=IntArrayBoundedValidator(lower=0),
doc='Numor range or a list of numors to be analysed in the directory.')
facilities = StringListValidator(list(config.getFacilityNames()))
self.declareProperty(name='Facility', defaultValue='ILL',
validator=facilities,
direction=Direction.Input,
doc='Facility the data belongs to.')
self.declareProperty('Instrument', '',
validator=StringMandatoryValidator(),
direction=Direction.Input,
doc='Instrument the data has been collected with.')
self.declareProperty(FileProperty('OutputFile', '',
extensions=".csv",
action=FileAction.OptionalSave),
doc='Comma-separated output file.')
self.declareProperty('OptionalHeaders', '',
doc='Names of optional metadata to be included in the logbook. Entries need to be specified'
'in the instrument IPF.')
self.declareProperty('CustomEntries', '',
doc='Custom NeXus paths for additional metadata to be included in the logbook.')
self.declareProperty('CustomHeaders', '',
doc='Names of those additional custom entries.')
def _prepare_file_array(self):
"""Prepares a list containing the NeXus files in the specified directory."""
instrument_name_len = 0
if self._facility != 'ILL':
instrument_name_len = len(self._instrument)
file_list = []
for file in sorted(fnmatch.filter(os.listdir(self._data_directory), '*.nxs')):
try:
numor = int(os.path.splitext(file[instrument_name_len:])[0])
if self._numor_range is None or numor in self._numor_range:
file_list.append(os.path.splitext(file)[0])
except (ValueError, OverflowError):
self.log().debug("File {} cannot be cast into an integer numor".format(file))
continue
if file_list == list():
raise RuntimeError("There are no files in {} with specified numors.".format(self._data_directory))
return file_list
def _get_optional_entries(self, parameters):
try:
logbook_optional_parameters = parameters.getStringParameter('logbook_optional_parameters')[0]
except IndexError:
raise RuntimeError("Optional headers are requested but are not defined for {}.".format(self._instrument))
else:
logbook_optional_parameters = logbook_optional_parameters.split(',')
# create tmp dictionary with headers and paths read from IPF with whitespaces removed from the header
optional_entries = dict()
for entry in logbook_optional_parameters:
optional_entry = entry.split(':')
if len(optional_entry) < 3:
optional_entry.append('s')
optional_entries[(optional_entry[2], str(optional_entry[0]).strip())] = optional_entry[1]
requested_headers = self.getPropertyValue('OptionalHeaders')
if str(requested_headers).casefold() == 'all':
for type, header in optional_entries:
self._metadata_headers.append((type, header))
self._metadata_entries.append(optional_entries[(type, header)])
else:
for header in requested_headers.split(','):
for type in ['s', 'd', 'f']:
if (type, header) in optional_entries:
self._metadata_headers.append((type, header))
self._metadata_entries.append(optional_entries[(type, header)])
break
if (('s', header) not in optional_entries and ('d', header) not in optional_entries
and ('f', header) not in optional_entries):
raise RuntimeError("Header {} requested, but not defined for {}.".format(header, self._instrument))
def _get_custom_entries(self):
logbook_custom_entries = self.getPropertyValue('CustomEntries')
logbook_custom_entries = logbook_custom_entries.split(',')
for entry in logbook_custom_entries:
self._metadata_entries.append(entry.split(':')[0])
logbook_custom_headers = [""] * len(logbook_custom_entries)
operators = ["+", "-", "*", "//"]
columnType = 's'
if self.getProperty('CustomHeaders').isDefault:
# derive headers from custom entries:
for entry_no, entry in enumerate(logbook_custom_entries):
entry_content = entry.split(':')
if len(entry_content) > 1:
columnType = entry_content[1]
if any(op in entry_content[0] for op in operators):
list_entries, binary_operations = self._process_regex(entry_content[0])
header = ""
for split_entry_no, split_entry in enumerate(list_entries):
# always use two strings around the final '/' for more informative header
partial_header = split_entry[split_entry.rfind('/', 0,
split_entry.rfind('/') - 1) + 1:]
header += partial_header
header += binary_operations[split_entry_no] \
if split_entry_no < len(binary_operations) else ""
logbook_custom_headers[entry_no] = (columnType, header)
else:
# always use two strings around the final '/' for more informative header
logbook_custom_headers[entry_no] = \
(columnType, (entry_content[0])[entry_content[0].rfind('/', 0, entry_content[0].rfind('/') - 1) + 1:])
else:
logbook_custom_headers = self.getPropertyValue('CustomHeaders')
logbook_custom_headers = [(columnType, header) for header in logbook_custom_headers.split(',')]
return logbook_custom_headers
def _get_entries(self):
"""Gets default and optional metadata entries using the specified instrument IPF."""
self._metadata_entries = []
self._metadata_headers = [('d', 'run_number')]
tmp_instr = self._instrument + '_tmp'
# Load empty instrument to access parameters defining metadata entries to be searched
LoadEmptyInstrument(Filename=self._instrument + "_Definition.xml", OutputWorkspace=tmp_instr)
parameters = mtd[tmp_instr].getInstrument()
try:
logbook_default_parameters = (parameters.getStringParameter('logbook_default_parameters')[0]).split(',')
for parameter in logbook_default_parameters:
parameter = parameter.split(':')
if len(parameter) < 3:
parameter.append('s')
# type, header, strip removes whitespaces
self._metadata_headers.append((parameter[2], str(parameter[0]).strip()))
self._metadata_entries.append(parameter[1])
except IndexError:
raise RuntimeError("The default logbook entries and headers are not defined for {}".format(self._instrument))
default_entries = list(self._metadata_entries)
if not self.getProperty('OptionalHeaders').isDefault:
self._get_optional_entries(parameters)
if not self.getProperty('CustomEntries').isDefault:
logbook_custom_headers = self._get_custom_entries()
self._metadata_headers += logbook_custom_headers
DeleteWorkspace(Workspace=tmp_instr)
return default_entries
def _verify_contains_metadata(self, data_array):
"""Verifies that the raw data indeed contains the desired meta-data to be logged."""
default_entries = self._get_entries()
data_path = os.path.join(self._data_directory, data_array[0] + '.nxs')
# check only if default entries exist in the first file in the directory
with h5py.File(data_path, 'r') as f:
for entry in default_entries:
try:
f.get(entry)[0]
except TypeError:
self.log().warning("The requested entry: {}, is not present in the raw data. ".format(entry))
def _prepare_logbook_ws(self):
"""Prepares the TableWorkspace logbook for filling with entries, sets up the headers."""
logbook_ws = self.getPropertyValue('OutputWorkspace')
CreateEmptyTableWorkspace(OutputWorkspace=logbook_ws)
type_dict = {'s': 'str', 'd': 'int', 'f': 'float'}
for type, headline in self._metadata_headers:
mtd[logbook_ws].addColumn(type_dict[type], headline)
return logbook_ws
def _perform_binary_operations(self, values, binary_operations, operations):
"""Performs binary arithmetic operations based on the list of operations
to perform and list of values."""
while True:
operation = [(ind, ind+1, op) for ind, op in enumerate(binary_operations)
if op in operations]
if operation == list():
break
ind1, ind2, op = operation[0]
if op == "+":
new_val = values[ind1] + values[ind2]
elif op == "-":
new_val = values[ind1] - values[ind2]
elif op == "*":
new_val = values[ind1] * values[ind2]
elif op == "//":
if values[ind2] == 0:
self.log().warning("Divisor is equal to 0.")
new_val = 'N/A'
else:
new_val = values[ind1] / values[ind2]
else:
raise RuntimeError("Unknown operation: {}".format(operation))
values[ind1] = new_val
values.pop(ind2)
binary_operations.pop(ind1)
return values, binary_operations
@staticmethod
def _get_index(entry_name):
try:
index = int(entry_name[entry_name.rfind('/')+1:])
except ValueError:
index = 0
new_name = entry_name
else:
new_name = entry_name[:entry_name.rfind('/')]
return new_name, index
@staticmethod
def _process_regex(entry):
regex_all = r'(\*)|(//)|(\+)|(\-)'
p = re.compile(regex_all)
list_entries = []
binary_operations = []
prev_pos = 0
for obj in p.finditer(entry):
list_entries.append(entry[prev_pos:obj.span()[0]])
prev_pos = obj.span()[1]
binary_operations.append(obj.group())
list_entries.append(entry[prev_pos:]) # add the last remaining file
return list_entries, binary_operations
@staticmethod
def _perform_cast(data, type):
if type == 'f':
try:
data = float(data)
except ValueError:
data = np.nan
elif type == 'd':
try:
data = int(data)
except ValueError:
data = -99999
elif type == 's':
data = str(data)
return data
def _fill_logbook(self, logbook_ws, data_array, progress):
"""Fills out the logbook with the requested meta-data."""
n_entries = len(self._metadata_headers)
entry_not_found_msg = "The requested entry: {}, is not present in the raw data"
operators = ["+","-","*","//"]
cache_entries_ops = {}
for file_no, file_name in enumerate(data_array):
# reporting progress each 10% of the data
if file_no % (len(data_array)/10) == 0:
progress.report("Filling logbook table...")
file_path = os.path.join(self._data_directory, file_name + '.nxs')
with h5py.File(file_path, 'r') as f:
rowData = np.empty(n_entries, dtype=object)
rowData[0] = int(file_name)
for entry_no, entry in enumerate(self._metadata_entries, 1):
if any(op in entry for op in operators):
if entry in cache_entries_ops:
list_entries, binary_operations = cache_entries_ops[entry]
binary_operations = binary_operations.copy()
else:
list_entries, binary_operations = self._process_regex(entry)
cache_entries_ops[entry] = (list_entries, list(binary_operations))
# load all entries from the file
values = [0]*len(list_entries)
for split_entry_no, split_entry in enumerate(list_entries):
try:
split_entry, index = self._get_index(split_entry)
data = f.get(split_entry)[index]
except TypeError:
values[0] = "Not found"
binary_operations = []
self.log().warning(entry_not_found_msg.format(entry))
break
else:
if isinstance(data, np.bytes_):
if any(op in operators[1:] for op in binary_operations):
self.log().warning("Only 'sum' operation is supported for string entries")
values[0] = "N/A"
binary_operations = []
break
else:
data = data.decode('utf-8')
data = data.replace(',', ';') # needed for CSV output
values[split_entry_no] = data
values, binary_operations = self._perform_binary_operations(values, binary_operations,
operations=['*', '//'])
values, _ = self._perform_binary_operations(values, binary_operations,
operations=['+', '-'])
if isinstance(values, np.ndarray):
tmp_data = ""
for value in values[0]:
tmp_data += str(value) + ','
rowData[entry_no] = tmp_data[:-1]
else:
data = self._perform_cast(values[0], self._metadata_headers[entry_no][0])
rowData[entry_no] = data
else:
try:
entry, index = self._get_index(entry)
data = f.get(entry)[index]
except TypeError:
data = "Not found"
self.log().warning(entry_not_found_msg.format(entry))
if isinstance(data, np.ndarray):
tmp_data = ""
for array in data:
tmp_data += ",".join(array)
data = tmp_data
elif isinstance(data, np.bytes_):
data = data.decode('utf-8')
data = str(data.replace(',', ';')).strip() # needed for CSV output
data = self._perform_cast(data, self._metadata_headers[entry_no][0])
rowData[entry_no] = data
mtd[logbook_ws].addRow(rowData)
def _store_logbook_as_csv(self, logbook_ws):
"""Calls algorithm that will store the logbook TableWorkspace in the specified location."""
SaveAscii(InputWorkspace=logbook_ws, Filename=self.getPropertyValue('OutputFile'),
Separator='CSV')
def PyExec(self):
self._data_directory = self.getPropertyValue('Directory')
self._facility = self.getPropertyValue('Facility')
self._instrument = self.getPropertyValue('Instrument')
if not self.getProperty('NumorRange').isDefault:
self._numor_range = self.getProperty('NumorRange').value
progress = Progress(self, start=0.0, end=1.0, nreports=15)
progress.report("Preparing file list")
data_array = self._prepare_file_array()
progress.report("Verifying conformity")
self._verify_contains_metadata(data_array)
progress.report("Preparing logbook table")
logbook_ws = self._prepare_logbook_ws()
self._fill_logbook(logbook_ws, data_array, progress)
if not self.getProperty('OutputFile').isDefault:
progress.report("Saving logbook as CSV")
self._store_logbook_as_csv(logbook_ws)
progress.report("Done")
self.setProperty('OutputWorkspace', mtd[logbook_ws])
AlgorithmFactory.subscribe(GenerateLogbook)