forked from AmbaPant/mantid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
MatchPeaks.py
294 lines (237 loc) · 13.2 KB
/
MatchPeaks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
# pylint: disable=too-many-branches
from mantid.api import PythonAlgorithm, MatrixWorkspaceProperty, ITableWorkspaceProperty, PropertyMode, MatrixWorkspace
from mantid.simpleapi import *
from mantid.kernel import Direction
import numpy as np
def mask_ws(ws_to_mask, xstart, xend):
"""
Calls MaskBins twice, for masking the first and last bins of a workspace
Args:
red: reduced workspace
xstart: MaskBins between x[0] and x[xstart]
xend: MaskBins between x[xend] and x[-1]
"""
x_values = ws_to_mask.readX(0)
if xstart > 0:
logger.debug('Mask bins smaller than {0}'.format(xstart))
MaskBins(InputWorkspace=ws_to_mask, OutputWorkspace=ws_to_mask, XMin=x_values[0], XMax=x_values[int(xstart)])
else:
logger.debug('No masking due to x bin <= 0!: {0}'.format(xstart))
if xend < len(x_values) - 1:
logger.debug('Mask bins larger than {0}'.format(xend))
MaskBins(InputWorkspace=ws_to_mask, OutputWorkspace=ws_to_mask, XMin=x_values[int(xend) + 1], XMax=x_values[-1])
else:
logger.debug('No masking due to x bin >= len(x_values) - 1!: {0}'.format(xend))
if xstart > 0 and xend < len(x_values) - 1:
logger.information('Bins out of range {0} {1} [Unit of X-axis] are masked'.format
(x_values[int(xstart)],x_values[int(xend) + 1]))
class MatchPeaks(PythonAlgorithm):
# Mandatory workspaces
_input_ws = None
_output_ws = None
# Optional workspace
_input_2_ws = None
_input_3_ws = None
_output_bin_range = None
# Bool flags
_masking = False
_match_option = False
def category(self):
return "Transforms"
def summary(self):
return 'Circular shifts (numpy.roll) the data of the input workspace to align peaks without modifying the x-axis.'
def PyInit(self):
self.declareProperty(MatrixWorkspaceProperty('InputWorkspace',
defaultValue='',
direction=Direction.Input),
doc='Input workspace')
self.declareProperty(MatrixWorkspaceProperty('InputWorkspace2',
defaultValue='',
direction=Direction.Input,
optional=PropertyMode.Optional),
doc='Input workspace to align peaks with')
self.declareProperty(MatrixWorkspaceProperty('InputWorkspace3',
defaultValue='',
direction=Direction.Input,
optional=PropertyMode.Optional),
doc='Input workspace to align peaks with')
self.declareProperty('MaskBins',
defaultValue=False,
doc='Whether to mask shifted bins')
self.declareProperty('MatchInput2ToCenter',
defaultValue=False,
doc='Match peaks such that InputWorkspace2 would be centered')
self.declareProperty(MatrixWorkspaceProperty('OutputWorkspace',
defaultValue='',
direction=Direction.Output),
doc='Shifted output workspace')
self.declareProperty(ITableWorkspaceProperty('BinRangeTable',
defaultValue='',
direction=Direction.Output,
optional=PropertyMode.Optional),
doc='Table workspace that contains two values defining a range. '
'Bins outside this range are overflown out of range due to circular shift'
'and can be masked.')
def setUp(self):
self._input_ws = self.getPropertyValue('InputWorkspace')
self._input_2_ws = self.getPropertyValue('InputWorkspace2')
self._input_3_ws = self.getPropertyValue('InputWorkspace3')
self._output_ws = self.getPropertyValue('OutputWorkspace')
self._output_bin_range = self.getPropertyValue('BinRangeTable')
self._masking = self.getProperty('MaskBins').value
self._match_option = self.getProperty('MatchInput2ToCenter').value
if self._input_ws:
ReplaceSpecialValues(InputWorkspace = self._input_ws, OutputWorkspace = self._input_ws,
NaNValue = 0, InfinityValue = 0)
if self._input_2_ws:
ReplaceSpecialValues(InputWorkspace = self._input_2_ws, OutputWorkspace = self._input_2_ws,
NaNValue = 0, InfinityValue = 0)
if self._input_3_ws:
ReplaceSpecialValues(InputWorkspace = self._input_3_ws, OutputWorkspace = self._input_3_ws,
NaNValue = 0, InfinityValue = 0)
def validateInputs(self):
issues = dict()
input1 = self.getPropertyValue('InputWorkspace')
input2 = self.getPropertyValue('InputWorkspace2')
input3 = self.getPropertyValue('InputWorkspace3')
if input3:
if not input2:
issues['InputWorkspace2'] = 'When InputWorkspace3 is given, InputWorkspace2 is also required.'
else:
if mtd[input3].isDistribution() and not mtd[input2].isDistribution():
issues['InputWorkspace3'] = 'InputWorkspace2 and InputWorkspace3 must be either point data or ' \
'histogram data'
elif mtd[input3].blocksize() != mtd[input2].blocksize():
issues['InputWorkspace3'] = 'Incompatible number of bins'
elif mtd[input3].getNumberHistograms() != mtd[input2].getNumberHistograms():
issues['InputWorkspace3'] = 'Incompatible number of spectra'
elif np.any(mtd[input3].extractX() - mtd[input2].extractX()):
issues['InputWorkspace3'] = 'Incompatible x-values'
if input2:
if mtd[input1].isDistribution() and not mtd[input2].isDistribution():
issues['InputWorkspace2'] = 'InputWorkspace2 and InputWorkspace3 must be either point data or ' \
'histogram data'
elif mtd[input1].blocksize() != mtd[input2].blocksize():
issues['InputWorkspace2'] = 'Incompatible number of bins'
elif mtd[input1].getNumberHistograms() != mtd[input2].getNumberHistograms():
issues['InputWorkspace2'] = 'Incompatible number of spectra'
elif np.any(mtd[input1].extractX() - mtd[input2].extractX()):
issues['InputWorkspace2'] = 'Incompatible x-values'
return issues
def PyExec(self):
self.setUp()
output_ws = CloneWorkspace(InputWorkspace=mtd[self._input_ws], OutputWorkspace=self._output_ws)
size = mtd[self._input_ws].blocksize()
mid_bin = int(size / 2)
# Find peak positions in input workspace
peak_bins1 = self._get_peak_position(output_ws)
self.log().information('Peak bins {0}: {1}'.format(self._input_ws, peak_bins1))
# Find peak positions in second input workspace
if self._input_2_ws:
peak_bins2 = self._get_peak_position(mtd[self._input_2_ws])
self.log().information('Peak bins {0}: {1}'.format(self._input_2_ws, peak_bins2))
# Find peak positions in third input workspace
if self._input_3_ws:
peak_bins3 = self._get_peak_position(mtd[self._input_3_ws])
self.log().information('Peak bins {0}: {1}'.format(self._input_3_ws, peak_bins3))
# All bins must be positive and larger than zero
if not self._input_2_ws:
# Input workspace will match center
to_shift = peak_bins1 - mid_bin * np.ones(mtd[self._input_ws].getNumberHistograms())
else:
if not self._input_3_ws:
if self._match_option:
# Input workspace will be shifted according to centered peak of second input workspace
to_shift = peak_bins2 - mid_bin * np.ones(mtd[self._input_ws].getNumberHistograms())
else:
# Input workspace will be shifted according to peak position of second input workspace
to_shift = peak_bins1 - peak_bins2
else:
# Input workspace will be shifted according to the difference of peak positions between second and third
to_shift = peak_bins3 - peak_bins2
# perform actual shift
for i in range(output_ws.getNumberHistograms()):
# Shift Y and E values of spectrum i
output_ws.setY(i, np.roll(output_ws.readY(i), int(-to_shift[i])))
output_ws.setE(i, np.roll(output_ws.readE(i), int(-to_shift[i])))
self.log().debug('Shift array: {0}'.format(-to_shift))
# Final treatment of bins (masking, produce output)
min_bin = 0
max_bin = size
# This is a left shift, bins at the end of the workspace may be masked
if np.min(-to_shift) < 0:
max_bin = size - 1 + np.min(-to_shift)
# This is a right shift, bins at the beginning of the workspace may be masked
if np.max(-to_shift) > 0:
min_bin = np.max(-to_shift)
if self._masking:
mask_ws(output_ws, min_bin, max_bin)
if self._output_bin_range:
# Create table with its columns containing bin range
bin_range = CreateEmptyTableWorkspace(OutputWorkspace=self._output_bin_range)
bin_range.addColumn(type="double", name='MinBin')
bin_range.addColumn(type="double", name='MaxBin')
bin_range.addRow({'MinBin': min_bin, 'MaxBin': max_bin})
self.setProperty('BinRangeTable', bin_range)
# Set output properties
self.setProperty('OutputWorkspace', output_ws)
return
@staticmethod
def _get_peak_position(input_ws):
"""
Gives bin of the peak of each spectrum in the input_ws
@param input_ws :: input workspace
@return :: bin numbers of the peak positions
"""
fit_table_name = input_ws.name() + '_epp'
if isinstance(input_ws, MatrixWorkspace):
fit_table = FindEPP(InputWorkspace=input_ws, OutputWorkspace=fit_table_name)
elif isinstance(input_ws, ITableWorkspace):
fit_table = input_ws
else:
logger.error('Workspace not defined')
# Mid bin number
mid_bin = int(input_ws.blocksize() / 2)
# Initialisation
peak_bin = np.ones(input_ws.getNumberHistograms()) * mid_bin
# Bin range: difference between mid bin and peak bin should be in this range
tolerance = int(mid_bin / 2)
x_values = input_ws.readX(0)
for i in range(input_ws.getNumberHistograms()):
fit = fit_table.row(i)
# Bin number, where Y has its maximum
y_values = input_ws.readY(i)
max_pos = np.argmax(y_values)
peak_plus_error = abs(fit["PeakCentreError"]) + abs(fit["PeakCentre"])
if peak_plus_error > x_values[0] and peak_plus_error < x_values[-1]:
peak_plus_error_bin = input_ws.yIndexOfX(peak_plus_error)
if abs(peak_plus_error_bin - mid_bin) < tolerance and fit["FitStatus"] == 'success':
# fit succeeded, and fitted peak is within acceptance range, take it
peak_bin[i] = input_ws.yIndexOfX(fit["PeakCentre"])
logger.debug('Fit successfull, peak inside tolerance')
elif abs(max_pos - mid_bin) < tolerance:
# fit not reliable, take the maximum if within acceptance
peak_bin[i] = max_pos
logger.debug('Fit outside the trusted range, take the maximum position')
else:
# do not shift
logger.debug('Both the fit and the max are outside the trusted range. '
'Do not shift the spectrum.')
elif abs(max_pos - mid_bin) < tolerance:
# fit not reliable, take the maximum if within acceptance
peak_bin[i] = max_pos
logger.debug('Fit outside the x-range, take the maximum position')
else:
# do not shift
logger.debug('Both the fit and the max are outside the trusted range. '
'Do not shift the spectrum.')
logger.debug('Spectrum {0} will be shifted to bin {1}'.format(i,peak_bin[i]))
DeleteWorkspace(fit_table)
return peak_bin
AlgorithmFactory.subscribe(MatchPeaks)