-
Notifications
You must be signed in to change notification settings - Fork 122
/
FitIncidentSpectrum.py
190 lines (167 loc) · 8.34 KB
/
FitIncidentSpectrum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
from copy import copy
import numpy as np
from scipy import ndimage, interpolate
from mantid.api import AlgorithmFactory, MatrixWorkspaceProperty, PythonAlgorithm
from mantid.kernel import Direction, StringListValidator, FloatArrayProperty, RebinParamsValidator
from mantid.simpleapi import CreateWorkspace, Rebin, SplineSmoothing
class FitIncidentSpectrum(PythonAlgorithm):
_input_ws = None
_output_ws = None
_scipy_not_old = hasattr(interpolate.UnivariateSpline, "derivative")
# check if scipy version is greater than 0.12.1 i.e it has the derivative function
def category(self):
return 'Diffraction\\Fitting'
def name(self):
return 'FitIncidentSpectrum'
def summary(self):
return 'Calculate a fit for an incident spectrum using different methods. ' \
'Outputs a workspace containing the functionalized fit and its first ' \
'derivative.'
def version(self):
return 1
def PyInit(self):
self.declareProperty(
MatrixWorkspaceProperty('InputWorkspace', '',
direction=Direction.Input,),
doc='Incident spectrum to be fit.')
self.declareProperty(
MatrixWorkspaceProperty('OutputWorkspace', '',
direction=Direction.Output),
doc='Output workspace containing the fit and it\'s first derivative.')
self.declareProperty(
name='WorkspaceIndex',
defaultValue=0,
doc='Workspace index of the spectra to be fitted (Defaults to the first index.)')
self.declareProperty(FloatArrayProperty(name="BinningForCalc",
validator=RebinParamsValidator(AllowEmpty=True),
direction=Direction.Input),
doc='Bin range for calculation given as an array of floats in the same format as `Rebin`: '
'[Start],[Increment],[End]. If empty use default binning. The calculated '
'spectrum will use this binning')
self.declareProperty(FloatArrayProperty(name="BinningForFit",
validator=RebinParamsValidator(AllowEmpty=True),
direction=Direction.Input),
doc='Bin range for fitting given as an array of floats in the same format as `Rebin`: '
'[Start],[Increment],[End]. If empty use BinningForCalc. The '
'incident spectrum will be rebined to this range before being fit.')
self.declareProperty(
name='FitSpectrumWith',
defaultValue='GaussConvCubicSpline',
validator=StringListValidator(['GaussConvCubicSpline', 'CubicSpline', 'CubicSplineViaMantid']),
doc='The method for fitting the incident spectrum.')
def _setup(self):
self._input_ws = self.getProperty('InputWorkspace').value
self._output_ws = self.getProperty('OutputWorkspace').valueAsStr
self._incident_index = self.getProperty('WorkspaceIndex').value
self._binning_for_calc = self.getProperty('BinningForCalc').value
self._binning_for_fit = self.getProperty('BinningForFit').value
self._fit_spectrum_with = self.getProperty('FitSpectrumWith').value
def PyExec(self):
self._setup()
if self._binning_for_calc.size == 0:
x = np.array(self._input_ws.readX(self._incident_index))
self._binning_for_calc = [i for i in [min(x), x[1] - x[0], max(x) + x[1] - x[0]]]
else:
x = np.arange(self._binning_for_calc[0], self._binning_for_calc[2], self._binning_for_calc[1])
if self._binning_for_fit.size == 0:
x_fit = np.array(self._input_ws.readX(self._incident_index))
y_fit = np.array(self._input_ws.readY(self._incident_index))
else:
rebinned = Rebin(
InputWorkspace=self._input_ws,
Params=self._binning_for_fit,
PreserveEvents=True,
StoreInADS=False)
x_fit = np.array(rebinned.readX(self._incident_index))
y_fit = np.array(rebinned.readY(self._incident_index))
rebin_norm = x.size/x_fit.size
x_bin_centers = 0.5 * (x[:-1] + x[1:])
if len(x_fit) != len(y_fit):
x_fit = 0.5*(x_fit[:-1] + x_fit[1:])
if self._fit_spectrum_with == 'CubicSpline':
# Fit using cubic spline
fit, fit_prime = self.fit_cubic_spline(x_fit, y_fit, x_bin_centers, s=1e7)
elif self._fit_spectrum_with == 'CubicSplineViaMantid':
# Fit using cubic spline via Mantid
fit, fit_prime = self.fit_cubic_spline_via_mantid_spline_smoothing(
self._input_ws,
params_input=self._binning_for_fit,
params_output=self._binning_for_calc,
Error=0.0001,
MaxNumberOfBreaks=0)
elif self._fit_spectrum_with == 'GaussConvCubicSpline':
# Fit using Gauss conv cubic spline
fit, fit_prime = self.fit_cubic_spline_with_gauss_conv(x_fit, y_fit, x_bin_centers, sigma=0.5)
# Create output workspace
unit = self._input_ws.getAxis(0).getUnit().unitID()
output_workspace = CreateWorkspace(
DataX=x,
DataY=np.append(fit, fit_prime)/rebin_norm,
UnitX=unit,
NSpec=2,
Distribution=False,
ParentWorkspace=self._input_ws,
StoreInADS=False)
self.setProperty("OutputWorkspace", output_workspace)
def fit_cubic_spline_with_gauss_conv(self, x_fit, y_fit, x, n_gouss=39, sigma=3.0):
# Fit with Cubic Spline using a Gaussian Convolution to get weights
def moving_average(y, n=n_gouss, sig=sigma):
from scipy.signal import gaussian
b = gaussian(n, sig)
average = ndimage.filters.convolve1d(y, b / b.sum())
var = ndimage.filters.convolve1d(np.power(y - average, 2), b / b.sum())
return average, var
avg, var = moving_average(y_fit)
spline_fit = interpolate.UnivariateSpline(x_fit, y_fit, w=1. / np.sqrt(var))
fit = spline_fit(x)
if self._scipy_not_old:
spline_fit_prime = spline_fit.derivative()
fit_prime = spline_fit_prime(x)
else:
index = np.arange(len(x))
fit_prime = np.empty(len(x))
for pos in index:
dx = (x[1] - x[0])/1000
y1 = spline_fit(x[pos] - dx)
y2 = spline_fit(x[pos] + dx)
fit_prime[pos] = (y2-y1)/dx
return fit, fit_prime
def fit_cubic_spline(self, x_fit, y_fit, x, s=1e15):
# Fit with Cubic Spline
tck = interpolate.splrep(x_fit, y_fit, s=s)
fit = interpolate.splev(x, tck, der=0)
fit_prime = interpolate.splev(x, tck, der=1)
return fit, fit_prime
def fit_cubic_spline_via_mantid_spline_smoothing(self, InputWorkspace, params_input, params_output, **kwargs):
# Fit with Cubic Spline using the mantid SplineSmoothing algorithm
rebinned = Rebin(
InputWorkspace=InputWorkspace,
Params=params_input,
PreserveEvents=True,
StoreInADS=False)
fit_tuple = SplineSmoothing(
InputWorkspace=rebinned,
OutputWorkspaceDeriv='fit_prime',
DerivOrder=1,
StoreInADS=False,
**kwargs)
fit = Rebin(
InputWorkspace=fit_tuple.OutputWorkspace,
Params=params_output,
PreserveEvents=True,
StoreInADS=False)
fit_prime = Rebin(
InputWorkspace=fit_tuple.OutputWorkspaceDeriv[0],
Params=params_output,
PreserveEvents=True,
StoreInADS=False)
fit_array = copy(fit.readY(0))
fit_prime_array = copy(fit_prime.readY(0))
return fit_array, fit_prime_array
AlgorithmFactory.subscribe(FitIncidentSpectrum)