-
Notifications
You must be signed in to change notification settings - Fork 122
/
IndirectReplaceFitResult.py
219 lines (160 loc) · 10 KB
/
IndirectReplaceFitResult.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2019 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
# pylint: disable=no-init,too-many-instance-attributes
from mantid.api import (AlgorithmFactory, AnalysisDataService, MatrixWorkspace, MatrixWorkspaceProperty,
PythonAlgorithm, PropertyMode, WorkspaceGroup)
from mantid.kernel import Direction
def string_ends_with(string, delimiter):
delimiter_length = len(delimiter)
return string[-delimiter_length:] == delimiter if len(string) > delimiter_length else False
def exists_in_ads(workspace_name):
return AnalysisDataService.doesExist(workspace_name)
def get_ads_workspace(workspace_name):
return AnalysisDataService.retrieve(workspace_name) if exists_in_ads(workspace_name) else None
def contains_workspace(group, workspace):
return group.contains(workspace) if isinstance(group, WorkspaceGroup) else False
def filter_by_contents(workspace_names, workspace):
return [name for name in workspace_names if contains_workspace(get_ads_workspace(name), workspace)]
def filter_by_name_end(workspace_names, delimiter):
return [name for name in workspace_names if string_ends_with(name, delimiter)]
def find_result_group_containing(workspace_name, group_extension):
workspace_names = AnalysisDataService.Instance().getObjectNames()
result_groups = filter_by_name_end(workspace_names, group_extension)
groups = filter_by_contents(result_groups, workspace_name)
return groups[0] if groups else None
def is_equal(a, b, tolerance):
return abs(a - b) <= tolerance * max(abs(a), abs(b))
def get_bin_index_of_value(workspace, value):
x_axis = workspace.getAxis(0)
for i in range(0, x_axis.length()):
if is_equal(x_axis.getValue(i), value, 0.000001):
return i
raise ValueError('The corresponding bin in the input workspace could not be found.')
def get_x_insertion_index(input_workspace, single_fit_workspace):
single_fit_x_axis = single_fit_workspace.getAxis(0)
bin_value = float(single_fit_x_axis.label(0))
return get_bin_index_of_value(input_workspace, bin_value)
def get_indices_of_equivalent_labels(input_workspace, destination_workspace):
input_labels = input_workspace.getAxis(1).extractValues()
labels = destination_workspace.getAxis(1).extractValues()
return [index for index, label in enumerate(labels) if label in input_labels]
def fit_parameter_missing(single_fit_workspace, destination_workspace, exclude_parameters):
single_parameters = single_fit_workspace.getAxis(1).extractValues()
destination_parameters = destination_workspace.getAxis(1).extractValues()
return any([parameter not in destination_parameters and parameter not in exclude_parameters
for parameter in single_parameters])
class IndirectReplaceFitResult(PythonAlgorithm):
_input_workspace = None
_single_fit_workspace = None
_output_workspace = None
_bin_value = None
_insertion_x_index = None
_row_indices = None
_insertion_y_indices = None
_result_group = None
_allowed_extension = '_Result'
def category(self):
return "Workflow\\DataHandling;Inelastic\\Indirect"
def summary(self):
return 'Replaces a fit result within the Input Workspace with the corresponding fit result found in the ' \
'Single Fit Workspace.'
def PyInit(self):
self.declareProperty(MatrixWorkspaceProperty('InputWorkspace', '',
optional=PropertyMode.Mandatory,
direction=Direction.Input),
doc='The result workspace containing the poor fit value which needs replacing. It\'s name '
'must end with _Result.')
self.declareProperty(MatrixWorkspaceProperty('SingleFitWorkspace', '',
optional=PropertyMode.Mandatory,
direction=Direction.Input),
doc='The result workspace containing the result data from a single fit. It\'s name must '
'end with _Result.')
self.declareProperty(MatrixWorkspaceProperty('OutputWorkspace', '',
direction=Direction.Output,
optional=PropertyMode.Optional),
doc='The name of the output workspace.')
def validateInputs(self):
issues = dict()
input_name = self.getPropertyValue('InputWorkspace')
single_fit_name = self.getPropertyValue('SingleFitWorkspace')
output_name = self.getPropertyValue('OutputWorkspace')
input_workspace = self.getProperty('InputWorkspace').value
single_fit_workspace = self.getProperty('SingleFitWorkspace').value
if not string_ends_with(input_name, self._allowed_extension):
issues['InputWorkspace'] = 'The input workspace must have a name ending in ' \
'{0}'.format(self._allowed_extension)
if not string_ends_with(single_fit_name, self._allowed_extension):
issues['SingleFitWorkspace'] = 'This workspace must have a name ending in ' \
'{0}'.format(self._allowed_extension)
if not output_name:
issues['OutputWorkspace'] = 'No OutputWorkspace name was provided.'
if not isinstance(input_workspace, MatrixWorkspace):
issues['InputWorkspace'] = 'The input workspace must be a matrix workspace.'
else:
input_x_axis = input_workspace.getAxis(0)
input_y_axis = input_workspace.getAxis(1)
if not input_x_axis.isNumeric():
issues['InputWorkspace'] = 'The input workspace must have a numeric x axis.'
if len(input_workspace.readY(0)) < 2:
issues['InputWorkspace'] = 'The input workspace must contain result data from a fit involving 2 or ' \
'more spectra.'
if not input_y_axis.isText():
issues['InputWorkspace'] = 'The input workspace must have a text y axis.'
if not isinstance(single_fit_workspace, MatrixWorkspace):
issues['SingleFitWorkspace'] = 'The single fit workspace must be a matrix workspace.'
else:
single_fit_x_axis = single_fit_workspace.getAxis(0)
single_fit_y_axis = single_fit_workspace.getAxis(1)
if not single_fit_x_axis.isNumeric():
issues['SingleFitWorkspace'] = 'The single fit workspace must have a numeric x axis.'
if len(single_fit_workspace.readY(0)) > 1:
issues['SingleFitWorkspace'] = 'The single fit workspace must contain data from a single fit.'
if not single_fit_y_axis.isText():
issues['SingleFitWorkspace'] = 'The single fit workspace must have a text y axis.'
if isinstance(input_workspace, MatrixWorkspace) and isinstance(single_fit_workspace, MatrixWorkspace):
if fit_parameter_missing(single_fit_workspace, input_workspace, ['Chi_squared']):
issues['InputWorkspace'] = 'The fit parameters in the input workspace and single fit workspace do ' \
'not match.'
return issues
def _setup(self):
self._input_workspace = self.getPropertyValue('InputWorkspace')
self._single_fit_workspace = self.getPropertyValue('SingleFitWorkspace')
self._output_workspace = self.getPropertyValue('OutputWorkspace')
input_workspace = get_ads_workspace(self._input_workspace)
single_fit_workspace = get_ads_workspace(self._single_fit_workspace)
self._bin_value = single_fit_workspace.readX(0)[0]
self._insertion_x_index = get_x_insertion_index(input_workspace, single_fit_workspace)
self._row_indices = get_indices_of_equivalent_labels(input_workspace, single_fit_workspace)
self._insertion_y_indices = get_indices_of_equivalent_labels(single_fit_workspace, input_workspace)
self._result_group = find_result_group_containing(self._input_workspace, self._allowed_extension + 's')
def PyExec(self):
self._setup()
self._copy_data()
self.setProperty('OutputWorkspace', self._output_workspace)
self._add_workspace_to_group()
def _copy_data(self):
self._copy_value(self._row_indices[0], self._insertion_y_indices[0], self._input_workspace)
for from_index, to_index in zip(self._row_indices[1:], self._insertion_y_indices[1:]):
self._copy_value(from_index, to_index, self._output_workspace)
def _copy_value(self, row_index, insertion_index, destination_workspace):
copy_algorithm = self.createChildAlgorithm(name='CopyDataRange', startProgress=0.1,
endProgress=1.0, enableLogging=True)
copy_algorithm.setAlwaysStoreInADS(True)
args = {"InputWorkspace": self._single_fit_workspace, "DestWorkspace": destination_workspace,
"StartWorkspaceIndex": row_index, "EndWorkspaceIndex": row_index, "XMin": self._bin_value,
"XMax": self._bin_value, "InsertionYIndex": insertion_index, "InsertionXIndex": self._insertion_x_index,
"OutputWorkspace": self._output_workspace}
for key, value in args.items():
copy_algorithm.setProperty(key, value)
copy_algorithm.execute()
def _add_workspace_to_group(self):
group_workspace = get_ads_workspace(self._result_group)
if self._input_workspace == self._output_workspace:
group_workspace.remove(self._input_workspace)
group_workspace.addWorkspace(get_ads_workspace(self._output_workspace))
# Register algorithm with Mantid
AlgorithmFactory.subscribe(IndirectReplaceFitResult)