/
SANSStitch.py
444 lines (357 loc) · 19 KB
/
SANSStitch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# pylint: disable=no-init,invalid-name,too-many-arguments,too-few-public-methods
from mantid.simpleapi import *
from mantid.api import DataProcessorAlgorithm, MatrixWorkspaceProperty, PropertyMode, AnalysisDataService
from mantid.kernel import Direction, Property, StringListValidator, UnitFactory, \
EnabledWhenProperty, PropertyCriterion
import numpy as np
class Mode(object):
class ShiftOnly(object):
pass
class ScaleOnly(object):
pass
class BothFit(object):
pass
class NoneFit(object):
pass
class SANSStitch(DataProcessorAlgorithm):
def _make_mode_map(self):
return {'ShiftOnly': Mode.ShiftOnly, 'ScaleOnly': Mode.ScaleOnly,
'Both': Mode.BothFit, 'None': Mode.NoneFit}
def category(self):
return 'SANS'
def summary(self):
return 'Stitch the high angle and low angle banks of a workspace together'
def PyInit(self):
self.declareProperty(
MatrixWorkspaceProperty('HABCountsSample', '', optional=PropertyMode.Mandatory, direction=Direction.Input),
doc='High angle bank sample workspace in Q')
self.declareProperty(
MatrixWorkspaceProperty('HABNormSample', '', optional=PropertyMode.Mandatory, direction=Direction.Input),
doc='High angle bank normalization workspace in Q')
self.declareProperty(
MatrixWorkspaceProperty('LABCountsSample', '', optional=PropertyMode.Mandatory, direction=Direction.Input),
doc='Low angle bank sample workspace in Q')
self.declareProperty(
MatrixWorkspaceProperty('LABNormSample', '', optional=PropertyMode.Mandatory, direction=Direction.Input),
doc='Low angle bank normalization workspace in Q')
self.declareProperty('ProcessCan', defaultValue=False, direction=Direction.Input, doc='Process the can')
self.declareProperty(
MatrixWorkspaceProperty('HABCountsCan', '', optional=PropertyMode.Optional, direction=Direction.Input),
doc='High angle bank sample workspace in Q')
self.declareProperty(
MatrixWorkspaceProperty('HABNormCan', '', optional=PropertyMode.Optional, direction=Direction.Input),
doc='High angle bank normalization workspace in Q')
self.declareProperty(
MatrixWorkspaceProperty('LABCountsCan', '', optional=PropertyMode.Optional, direction=Direction.Input),
doc='Low angle bank sample workspace in Q')
self.declareProperty(
MatrixWorkspaceProperty('LABNormCan', '', optional=PropertyMode.Optional, direction=Direction.Input),
doc='Low angle bank normalization workspace in Q')
allowedModes = StringListValidator(self._make_mode_map().keys())
self.declareProperty('Mode', 'None', validator=allowedModes, direction=Direction.Input,
doc='What to fit. Free parameter(s).')
self.declareProperty('ScaleFactor', defaultValue=Property.EMPTY_DBL, direction=Direction.Input,
doc='Optional scaling factor')
self.declareProperty('ShiftFactor', defaultValue=Property.EMPTY_DBL, direction=Direction.Input,
doc='Optional shift factor')
self.declareProperty(MatrixWorkspaceProperty('OutputWorkspace', '', direction=Direction.Output),
doc='Stitched high and low Q 1-D data')
self.declareProperty('OutScaleFactor', defaultValue=Property.EMPTY_DBL, direction=Direction.Output,
doc='Applied scale factor')
self.declareProperty('OutShiftFactor', defaultValue=Property.EMPTY_DBL, direction=Direction.Output,
doc='Applied shift factor')
self.setPropertyGroup("Mode", 'Fitting')
self.setPropertyGroup("ScaleFactor", 'Fitting')
self.setPropertyGroup("ShiftFactor", 'Fitting')
self.setPropertyGroup("HABCountsSample", 'Sample')
self.setPropertyGroup("HABNormSample", 'Sample')
self.setPropertyGroup("LABCountsSample", 'Sample')
self.setPropertyGroup("LABNormSample", 'Sample')
self.setPropertyGroup("OutputWorkspace", 'Output')
self.setPropertyGroup("OutScaleFactor", 'Output')
self.setPropertyGroup("OutShiftFactor", 'Output')
can_settings = EnabledWhenProperty('ProcessCan', PropertyCriterion.IsNotDefault)
self.setPropertyGroup("HABCountsCan", 'Can')
self.setPropertyGroup("HABNormCan", 'Can')
self.setPropertyGroup("LABCountsCan", 'Can')
self.setPropertyGroup("LABNormCan", 'Can')
self.setPropertySettings("HABCountsCan", can_settings)
self.setPropertySettings("HABNormCan", can_settings)
self.setPropertySettings("LABCountsCan", can_settings)
self.setPropertySettings("LABNormCan", can_settings)
def _divide(self, left, right):
divide = self.createChildAlgorithm('Divide')
divide.setProperty('LHSWorkspace', left)
divide.setProperty('RHSWorkspace', right)
divide.execute()
return divide.getProperty('OutputWorkspace').value
def _multiply(self, left, right):
multiply = self.createChildAlgorithm('Multiply')
multiply.setProperty('LHSWorkspace', left)
multiply.setProperty('RHSWorkspace', right)
multiply.execute()
return multiply.getProperty('OutputWorkspace').value
def _add(self, left, right):
plus = self.createChildAlgorithm('Plus')
plus.setProperty('LHSWorkspace', left)
plus.setProperty('RHSWorkspace', right)
plus.execute()
return plus.getProperty('OutputWorkspace').value
def _subract(self, left, right):
minus = self.createChildAlgorithm('Minus')
minus.setProperty('LHSWorkspace', left)
minus.setProperty('RHSWorkspace', right)
minus.execute()
return minus.getProperty('OutputWorkspace').value
def _crop_to_x_range(self, ws, x_min, x_max):
crop = self.createChildAlgorithm("CropWorkspace")
crop.setProperty("InputWorkspace", ws)
crop.setProperty("XMin", x_min)
crop.setProperty("XMax", x_max)
crop.execute()
return crop.getProperty("OutputWorkspace").value
def _scale(self, ws, factor, operation='Multiply'):
scale = self.createChildAlgorithm('Scale')
scale.setProperty('InputWorkspace', ws)
scale.setProperty('Operation', operation)
scale.setProperty('Factor', factor)
scale.execute()
scaled = scale.getProperty('OutputWorkspace').value
return scaled
def _calculate_merged_q(self, cF, nF, cR, nR, shift_factor, scale_factor):
# We want: (Cf+shift*Nf+Cr)/(Nf/scale + Nr)
shifted_norm_front = self._scale(nF, shift_factor)
scaled_norm_front = self._scale(nF, 1.0 / scale_factor)
numerator = self._add(self._add(cF, shifted_norm_front), cR)
denominator = self._add(scaled_norm_front, nR)
merged_q = self._divide(numerator, denominator)
return merged_q
def _calculate_merged_q_can(self, cF, nF, cR, nR, scale_factor):
# We want: (Cf_can+Cr_can)/(Nf_can/scale + Nr_can)
scaled_norm_front = self._scale(nF, 1.0 / scale_factor)
numerator = self._add(cF, cR)
denominator = self._add(scaled_norm_front, nR)
merged_q = self._divide(numerator, denominator)
return merged_q
def _crop_out_special_values(self, ws):
if ws.getNumberHistograms() != 1:
# Strip zeros is only possible on 1D workspaces
return
y_vals = ws.readY(0)
length = len(y_vals)
# Find the first non-zero value
start = 0
for i in range(0, length):
if not np.isnan(y_vals[i]) and not np.isinf(y_vals[i]):
start = i
break
# Now find the last non-zero value
stop = 0
length -= 1
for j in range(length, 0, -1):
if not np.isnan(y_vals[j]) and not np.isinf(y_vals[j]):
stop = j
break
# Find the appropriate X values and call CropWorkspace
x_vals = ws.readX(0)
start_x = x_vals[start]
# Make sure we're inside the bin that we want to crop
end_x = x_vals[stop + 1]
return self._crop_to_x_range(ws=ws,x_min=start_x, x_max=end_x)
def _run_fit(self, q_high_angle, q_low_angle, scale_factor, shift_factor):
fit_alg = self.createChildAlgorithm("SANSFitShiftScale")
fit_alg.setProperty("HABWorkspace", q_high_angle)
fit_alg.setProperty("LABWorkspace", q_low_angle)
fit_alg.setProperty("Mode", self.getProperty('Mode').value)
fit_alg.setProperty("ScaleFactor", scale_factor)
fit_alg.setProperty("ShiftFactor", shift_factor)
fit_alg.execute()
scale_factor_fit = fit_alg.getProperty("OutScaleFactor").value
shift_factor_fit = fit_alg.getProperty("OutShiftFactor").value
return scale_factor_fit, shift_factor_fit
def PyExec(self):
enum_map = self._make_mode_map()
mode = enum_map[self.getProperty('Mode').value]
cF = self.getProperty('HABCountsSample').value
cR = self.getProperty('LABCountsSample').value
nF = self.getProperty('HABNormSample').value
nR = self.getProperty('LABNormSample').value
q_high_angle = self._divide(cF, nF)
q_low_angle = self._divide(cR, nR)
if self.getProperty('ProcessCan').value:
cF_can = self.getProperty('HABCountsCan').value
cR_can = self.getProperty('LABCountsCan').value
nF_can = self.getProperty('HABNormCan').value
nR_can = self.getProperty('LABNormCan').value
q_high_angle_can = self._divide(cF_can, nF_can)
q_low_angle_can = self._divide(cR_can, nR_can)
# Now we can do the can subraction.
q_high_angle = self._subract(q_high_angle, q_high_angle_can)
q_low_angle = self._subract(q_low_angle, q_low_angle_can)
q_high_angle = self._crop_out_special_values(q_high_angle)
q_low_angle = self._crop_out_special_values(q_low_angle)
shift_factor = self.getProperty('ShiftFactor').value
scale_factor = self.getProperty('ScaleFactor').value
if not mode == Mode.NoneFit:
scale_factor, shift_factor = self._run_fit(q_high_angle, q_low_angle,
scale_factor, shift_factor)
min_q = min(min(q_high_angle.dataX(0)), min(q_low_angle.dataX(0)))
max_q = max(max(q_high_angle.dataX(0)), max(q_low_angle.dataX(0)))
# Crop our input workspaces
cF = self._crop_to_x_range(cF, min_q, max_q)
cR = self._crop_to_x_range(cR, min_q, max_q)
nF = self._crop_to_x_range(nF, min_q, max_q)
nR = self._crop_to_x_range(nR, min_q, max_q)
# We want: (Cf+shift*Nf+Cr)/(Nf/scale + Nr)
merged_q = self._calculate_merged_q(cF=cF, nF=nF, cR=cR, nR=nR, scale_factor=scale_factor,
shift_factor=shift_factor)
if self.getProperty('ProcessCan').value:
cF_can = self._crop_to_x_range(cF_can, min_q, max_q)
cR_can = self._crop_to_x_range(cR_can, min_q, max_q)
nF_can = self._crop_to_x_range(nF_can, min_q, max_q)
nR_can = self._crop_to_x_range(nR_can, min_q, max_q)
# Calculate merged q for the can
merged_q_can = self._calculate_merged_q_can(cF=cF_can, nF=nF_can, cR=cR_can, nR=nR_can,
scale_factor=scale_factor)
# Subtract it from the sample
merged_q = self._subract(merged_q, merged_q_can)
q_error_correction = QErrorCorrectionForMergedWorkspaces()
q_error_correction.correct_q_resolution_for_merged(count_ws_front=cF,
count_ws_rear=cR,
output_ws=merged_q,
scale=scale_factor)
self.setProperty('OutputWorkspace', merged_q)
self.setProperty('OutScaleFactor', scale_factor)
self.setProperty('OutShiftFactor', shift_factor)
def _validateIs1DFromPropertyName(self, property_name):
ws = self.getProperty(property_name).value
if not ws:
return True # Mandatory validators to take care of this. Early exit.
return ws.getNumberHistograms() == 1
def _validateIsInQ(self, property_name):
ws = self.getProperty(property_name).value
if not ws:
return True # Mandatory validators to take care of this. Early exit.
targetUnit = UnitFactory.create('MomentumTransfer')
return targetUnit.caption() == ws.getAxis(0).getUnit().caption()
def validateInputs(self):
errors = dict()
# Mode compatibility checks
scale_factor_property = self.getProperty('ScaleFactor')
shift_factor_property = self.getProperty('ShiftFactor')
mode_property = self.getProperty('Mode')
enum_map = self._make_mode_map()
mode = enum_map[mode_property.value]
if mode == Mode.NoneFit:
if scale_factor_property.isDefault:
errors[scale_factor_property.name] = 'ScaleFactor required'
if shift_factor_property.isDefault:
errors[shift_factor_property.name] = 'ShiftFactor required'
elif mode == Mode.ScaleOnly:
if shift_factor_property.isDefault:
errors[shift_factor_property.name] = 'ShiftFactor required'
elif mode == Mode.ShiftOnly:
if scale_factor_property.isDefault:
errors[scale_factor_property.name] = 'ScaleFactor required'
workspace_property_names = ['HABCountsSample', 'LABCountsSample', 'HABNormSample', 'LABNormSample']
# 1d data check
self._validate_1D(workspace_property_names, errors, mode)
# Units check
self._validate_units(workspace_property_names, errors)
process_can = self.getProperty('ProcessCan')
if bool(process_can.value):
workspace_property_names = ['HABCountsCan', 'HABNormCan', 'LABCountsCan', 'LABNormCan']
# Check existance
self._validate_provided(workspace_property_names, errors)
# Check Q units
self._validate_units(workspace_property_names, errors)
# Check 1D
self._validate_1D(workspace_property_names, errors, mode)
return errors
def _validate_units(self, workspace_property_names, errors):
for property_name in workspace_property_names:
if not self._validateIsInQ(property_name):
errors[property_name] = 'Workspace must have units of momentum transfer'
def _validate_1D(self, workspace_property_names, errors, mode):
if mode != Mode.NoneFit:
for property_name in workspace_property_names:
if not self._validateIs1DFromPropertyName(property_name):
errors[property_name] = 'Wrong number of spectra. Must be 1D input'
def _validate_provided(self, workspace_property_names, errors):
for property_name in workspace_property_names:
if not self.getProperty(property_name).value:
errors[property_name] = 'This workspace is required in order to process the can'
class QErrorCorrectionForMergedWorkspaces(object):
def __init__(self):
super(QErrorCorrectionForMergedWorkspaces, self).__init__()
def _divide_q_resolution_by_counts(self, q_res, counts):
# We are dividing DX by Y. Note that len(DX) = len(Y) + 1
# Unfortunately, we need some knowlege about the Q1D algorithm here.
# The two last entries of DX are duplicate in Q1D and this is how we
# treat it here.
q_res_buffer = np.divide(q_res[0:-1], counts)
q_res_buffer = np.append(q_res_buffer, q_res_buffer[-1])
return q_res_buffer
def _multiply_q_resolution_by_counts(self, q_res, counts):
# We are dividing DX by Y. Note that len(DX) = len(Y) + 1
# Unfortunately, we need some knowlege about the Q1D algorithm here.
# The two last entries of DX are duplicate in Q1D and this is how we
# treat it here.
q_res_buffer = np.multiply(q_res[0:-1], counts)
q_res_buffer = np.append(q_res_buffer, q_res_buffer[-1])
return q_res_buffer
def correct_q_resolution_for_merged(self, count_ws_front, count_ws_rear,
output_ws, scale):
"""
We need to transfer the DX error values from the original workspaces to the merged worksapce.
We have:
C(Q) = Sum_all_lambda_for_particular_Q(Counts(lambda))
weightedQRes(Q) = Sum_all_lambda_for_particular_Q(Counts(lambda)* qRes(lambda))
ResQ(Q) = weightedQRes(Q)/C(Q)
Richard suggested:
ResQMerged(Q) = (weightedQRes_FRONT(Q)*scale + weightedQRes_REAR(Q))/
(C_FRONT(Q)*scale + C_REAR(Q))
Note that we drop the shift here.
The Q Resolution functionality only exists currently
for 1D, ie when only one spectrum is present.
@param count_ws_front: the front counts
@param count_ws_rear: the rear counts
@param output_ws: the output workspace
"""
self._comment(output_ws, 'Internal Step: q-resolution transferred from input workspaces')
if count_ws_rear.getNumberHistograms() != 1:
return
if count_ws_front.getNumberHistograms() != 1:
return
# We require both count workspaces to contain the DX value
if not count_ws_rear.hasDx(0) or not count_ws_front.hasDx(0):
return
q_resolution_front = count_ws_front.readDx(0)
q_resolution_rear = count_ws_rear.readDx(0)
counts_front = count_ws_front.readY(0)
counts_rear = count_ws_rear.readY(0)
# We need to make sure that the workspaces match in length
if ((len(q_resolution_front) != len(q_resolution_rear)) or
(len(counts_front) != len(counts_rear))):
return
# Get everything for the FRONT detector
q_res_front_norm_free = self._multiply_q_resolution_by_counts(q_resolution_front,counts_front)
q_res_front_norm_free = q_res_front_norm_free * scale
counts_front = counts_front * scale
# Get everything for the REAR detector
q_res_rear_norm_free = self._multiply_q_resolution_by_counts(q_resolution_rear,counts_rear)
# Now add and divide
new_q_res = np.add(q_res_front_norm_free, q_res_rear_norm_free)
new_counts = np.add(counts_front, counts_rear)
q_resolution = self._divide_q_resolution_by_counts(new_q_res, new_counts)
# Set the dx error
output_ws.setDx(0, q_resolution)
def _comment(self, ws, message):
comment = AlgorithmManager.createUnmanaged('Comment')
comment.initialize()
comment.setChild(True)
comment.setProperty('Workspace', ws)
comment.setProperty('Text', message)
comment.execute()
# Register algorithm with Mantid
AlgorithmFactory.subscribe(SANSStitch)