forked from AmbaPant/mantid
-
Notifications
You must be signed in to change notification settings - Fork 1
/
DeltaPDF3D.py
397 lines (331 loc) · 18.7 KB
/
DeltaPDF3D.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
from mantid.api import PythonAlgorithm, AlgorithmFactory, IMDHistoWorkspaceProperty, PropertyMode, WorkspaceProperty, Progress
from mantid.kernel import (Direction, EnabledWhenProperty, PropertyCriterion, Property, StringListValidator, FloatArrayBoundedValidator,
FloatArrayProperty, FloatBoundedValidator)
from mantid.geometry import SpaceGroupFactory
from mantid import logger
import numpy as np
from scipy import ndimage
class DeltaPDF3D(PythonAlgorithm):
def category(self):
return 'Diffraction\\Utility'
def name(self):
return 'DeltaPDF3D'
def summary(self):
return 'Calculates the 3D-deltaPDF from a HKL workspace'
def PyInit(self):
self.declareProperty(IMDHistoWorkspaceProperty("InputWorkspace", "",
optional=PropertyMode.Mandatory,
direction=Direction.Input),
"Input Workspace with HKL dimensions centered on zero.")
self.declareProperty(WorkspaceProperty("IntermediateWorkspace", "",
optional=PropertyMode.Optional,
direction=Direction.Output),
"The resulting workspace after reflection removal and filters applied. What is the input of the FFT.")
self.declareProperty(WorkspaceProperty("OutputWorkspace", "",
optional=PropertyMode.Mandatory,
direction=Direction.Output),
"Output Workspace")
self.declareProperty("Method", 'KAREN', StringListValidator(['None', 'Punch and fill', 'KAREN']), "Bragg peak removal method")
self.declareProperty("WindowFunction", 'Blackman', StringListValidator(['None', 'Gaussian', 'Blackman', 'Tukey', 'Kaiser']),
"Apply a window function to the data")
self.declareProperty("WindowParameter", defaultValue=0.5, validator=FloatBoundedValidator(0.),
doc="Parameter for window function, depends on window type, see algorithm docs")
# Punch and fill
condition = EnabledWhenProperty("Method", PropertyCriterion.IsEqualTo, 'Punch and fill')
self.declareProperty("Shape", "sphere", doc="Shape to punch out reflections",
validator=StringListValidator(['sphere', 'cube']))
self.setPropertySettings("Shape", condition)
val_min_zero = FloatArrayBoundedValidator(lower=0.)
self.declareProperty(FloatArrayProperty("Size", [0.2], validator=val_min_zero),
"Width of cube/diameter of sphere used to remove reflections, in (HKL) (one or three values)")
self.setPropertySettings("Size", condition)
self.declareProperty("SpaceGroup", "",
doc="Space group for reflection removal, either full name or number. If empty all HKL's will be removed.")
self.setPropertySettings("SpaceGroup", condition)
self.declareProperty("Convolution", True, "Apply convolution to fill in removed reflections")
self.setPropertySettings("Convolution", condition)
self.declareProperty("ConvolutionWidth", 2.0, validator=FloatBoundedValidator(0.),
doc="Width of gaussian convolution in pixels")
self.setPropertySettings("ConvolutionWidth", condition)
self.declareProperty("CropSphere", False, "Limit min/max q values. Can help with edge effects.")
condition = EnabledWhenProperty("CropSphere", PropertyCriterion.IsNotDefault)
self.declareProperty(FloatArrayProperty("SphereMin", [Property.EMPTY_DBL], validator=val_min_zero),
"HKL values below which will be removed (one or three values)")
self.setPropertySettings("SphereMin", condition)
self.declareProperty(FloatArrayProperty("SphereMax", [Property.EMPTY_DBL], validator=val_min_zero),
"HKL values above which will be removed (one or three values)")
self.setPropertySettings("SphereMax", condition)
self.declareProperty("FillValue", Property.EMPTY_DBL, "Value to replace with outside sphere")
self.setPropertySettings("FillValue", condition)
# KAREN
self.declareProperty("KARENWidth", 7, "Size of filter window")
# Reflections
self.setPropertyGroup("Shape","Punch and fill")
self.setPropertyGroup("Size","Punch and fill")
self.setPropertyGroup("SpaceGroup","Punch and fill")
# Sphere
self.setPropertyGroup("CropSphere","Cropping to a sphere")
self.setPropertyGroup("SphereMin","Cropping to a sphere")
self.setPropertyGroup("SphereMax","Cropping to a sphere")
self.setPropertyGroup("FillValue","Cropping to a sphere")
# Convolution
self.setPropertyGroup("Convolution","Convolution")
self.setPropertyGroup("ConvolutionWidth","Convolution")
def validateInputs(self):
issues = dict()
inWS = self.getProperty("InputWorkspace").value
dimX=inWS.getXDimension()
dimY=inWS.getYDimension()
dimZ=inWS.getZDimension()
if dimX.name != '[H,0,0]' or dimY.name != '[0,K,0]' or dimZ.name != '[0,0,L]':
issues['InputWorkspace'] = 'dimensions must be [H,0,0], [0,K,0] and [0,0,L]'
for d in range(inWS.getNumDims()):
dim = inWS.getDimension(d)
if not np.isclose(dim.getMaximum(), -dim.getMinimum(), atol=1e-5):
issues['InputWorkspace'] = 'dimensions must be centered on zero'
if self.getProperty("Convolution").value and self.getProperty("Method").value == 'Punch and fill':
try:
import astropy # noqa
except ImportError:
issues["Convolution"] = 'python-astropy required to do convolution'
size = self.getProperty("Size").value
if len(size) != 1 and len(size) != 3:
issues["Size"] = 'Must provide 1 or 3 sizes'
if self.getProperty("SpaceGroup").value:
space_group=self.getProperty("SpaceGroup").value
try:
if not SpaceGroupFactory.isSubscribedNumber(int(space_group)):
issues["SpaceGroup"] = 'Space group number is not valid'
except ValueError:
if not SpaceGroupFactory.isSubscribedSymbol(space_group):
issues["SpaceGroup"] = 'Space group name is not valid'
sphereMin = self.getProperty("SphereMin").value
if len(sphereMin) != 1 and len(sphereMin) != 3:
issues["SphereMin"] = 'Must provide 1 or 3 diameters'
sphereMax = self.getProperty("SphereMax").value
if len(sphereMax) != 1 and len(sphereMax) != 3:
issues["SphereMax"] = 'Must provide 1 or 3 diameters'
if self.getProperty("WindowFunction").value == 'Tukey':
import scipy.signal
if not hasattr(scipy.signal, 'tukey'):
issues["WindowFunction"] = 'Tukey window requires scipy >= 0.16.0'
return issues
def PyExec(self):
progress = Progress(self, 0.0, 1.0, 5)
inWS = self.getProperty("InputWorkspace").value
signal = inWS.getSignalArray().copy()
if self.getProperty("CropSphere").value:
signal = self._crop_sphere(signal, inWS.getXDimension(), inWS.getYDimension(), inWS.getZDimension())
window_function = self.getProperty("WindowFunction").value
if window_function != 'None':
paramater = self.getProperty("WindowParameter").value
_, _, Xbins, _ = self._get_dim_params(inWS.getXDimension())
_, _, Ybins, _ = self._get_dim_params(inWS.getYDimension())
_, _, Zbins, _ = self._get_dim_params(inWS.getZDimension())
if window_function == 'Gaussian':
progress.report("Applying Gaussian window")
window = self._gaussian_window((Xbins, Ybins, Zbins), paramater)
elif window_function == 'Blackman':
progress.report("Applying Blackman window")
window = self._blackman_window((Xbins, Ybins, Zbins))
elif window_function == 'Tukey':
progress.report("Applying Tukey window")
window = self._tukey_window((Xbins, Ybins, Zbins), paramater)
elif window_function == 'Kaiser':
progress.report("Applying Kaiser window")
window = self._kaiser_window((Xbins, Ybins, Zbins), paramater)
signal = np.multiply(signal, window)
if self.getProperty("Method").value == 'Punch and fill':
progress.report("Removing Reflections")
signal = self._punch_and_fill(signal, inWS.getXDimension(), inWS.getYDimension(), inWS.getZDimension())
if self.getProperty("Convolution").value:
progress.report("Convoluting signal")
signal = self._convolution(signal)
elif self.getProperty("Method").value == 'KAREN':
progress.report("Running KAREN")
signal = self._karen(signal, self.getProperty("KARENWidth").value)
if self.getPropertyValue("IntermediateWorkspace"):
cloneWS_alg = self.createChildAlgorithm("CloneMDWorkspace", enableLogging=False)
cloneWS_alg.setProperty("InputWorkspace",inWS)
cloneWS_alg.execute()
signalOutWS = cloneWS_alg.getProperty("OutputWorkspace").value
signalOutWS.setSignalArray(signal)
self.setProperty("IntermediateWorkspace", signalOutWS)
# Do FFT
progress.report("Running FFT")
# Replace any remaining nan's or inf's with 0
# Otherwise you end up with a lot of nan's
signal[np.isnan(signal)]=0
signal[np.isinf(signal)]=0
signal=np.fft.fftshift(np.fft.fftn(np.fft.ifftshift(signal)))
number_of_bins = signal.shape
# CreateMDHistoWorkspace expects Fortan `column-major` ordering
signal = signal.real.flatten('F')
createWS_alg = self.createChildAlgorithm("CreateMDHistoWorkspace", enableLogging=False)
createWS_alg.setProperty("SignalInput", signal)
createWS_alg.setProperty("ErrorInput", signal**2)
createWS_alg.setProperty("Dimensionality", 3)
createWS_alg.setProperty("Extents", self._calc_new_extents(inWS))
createWS_alg.setProperty("NumberOfBins", number_of_bins)
createWS_alg.setProperty("Names", 'x,y,z')
createWS_alg.setProperty("Units", 'a,b,c')
createWS_alg.execute()
outWS = createWS_alg.getProperty("OutputWorkspace").value
# Copy first experiment info
if inWS.getNumExperimentInfo() > 0:
outWS.copyExperimentInfos(inWS)
progress.report()
self.setProperty("OutputWorkspace", outWS)
def _punch_and_fill(self, signal, dimX, dimY, dimZ): # noqa
Xmin, Xmax, _, Xwidth = self._get_dim_params(dimX)
Ymin, Ymax, _, Ywidth = self._get_dim_params(dimY)
Zmin, Zmax, _, Zwidth = self._get_dim_params(dimZ)
X, Y, Z = self._get_XYZ_ogrid(dimX, dimY, dimZ)
size = self.getProperty("Size").value
if len(size)==1:
size = np.repeat(size, 3)
size/=2.0 # We want radii or half box width
cut_shape = self.getProperty("Shape").value
space_group = self.getProperty("SpaceGroup").value
if space_group:
check_space_group = True
try:
space_group=SpaceGroupFactory.subscribedSpaceGroupSymbols(int(space_group))[0]
except ValueError:
pass
logger.information('Using space group: '+space_group)
sg=SpaceGroupFactory.createSpaceGroup(space_group)
else:
check_space_group = False
if cut_shape == 'cube':
for h in range(int(np.ceil(Xmin)), int(Xmax)+1):
for k in range(int(np.ceil(Ymin)), int(Ymax)+1):
for l in range(int(np.ceil(Zmin)), int(Zmax)+1):
if not check_space_group or sg.isAllowedReflection([h,k,l]):
signal[int((h-size[0]-Xmin)/Xwidth+1):int((h+size[0]-Xmin)/Xwidth),
int((k-size[1]-Ymin)/Ywidth+1):int((k+size[1]-Ymin)/Ywidth),
int((l-size[2]-Zmin)/Zwidth+1):int((l+size[2]-Zmin)/Zwidth)]=np.nan
else: # sphere
mask=((X-np.round(X))**2/size[0]**2 + (Y-np.round(Y))**2/size[1]**2 + (Z-np.round(Z))**2/size[2]**2 < 1)
# Unmask invalid reflections
if check_space_group:
for h in range(int(np.ceil(Xmin)), int(Xmax)+1):
for k in range(int(np.ceil(Ymin)), int(Ymax)+1):
for l in range(int(np.ceil(Zmin)), int(Zmax)+1):
if not sg.isAllowedReflection([h,k,l]):
mask[int((h-0.5-Xmin)/Xwidth+1):int((h+0.5-Xmin)/Xwidth),
int((k-0.5-Ymin)/Ywidth+1):int((k+0.5-Ymin)/Ywidth),
int((l-0.5-Zmin)/Zwidth+1):int((l+0.5-Zmin)/Zwidth)]=False
signal[mask]=np.nan
return signal
def _crop_sphere(self, signal, dimX, dimY, dimZ):
X, Y, Z = self._get_XYZ_ogrid(dimX, dimY, dimZ)
sphereMin = self.getProperty("SphereMin").value
if sphereMin[0] < Property.EMPTY_DBL:
if len(sphereMin)==1:
sphereMin = np.repeat(sphereMin, 3)
signal[X**2/sphereMin[0]**2 + Y**2/sphereMin[1]**2 + Z**2/sphereMin[2]**2 < 1]=np.nan
sphereMax = self.getProperty("SphereMax").value
if sphereMax[0] < Property.EMPTY_DBL:
if len(sphereMax)==1:
sphereMax = np.repeat(sphereMax, 3)
if self.getProperty("FillValue").value == Property.EMPTY_DBL:
fill_value = np.nan
else:
fill_value = self.getProperty("FillValue").value
signal[X**2/sphereMax[0]**2 + Y**2/sphereMax[1]**2 + Z**2/sphereMax[2]**2 > 1]=fill_value
return signal
def _get_XYZ_ogrid(self, dimX, dimY, dimZ):
"""
Returns X, Y and Z as ogrid
"""
Xmin, Xmax, Xbins, _ = self._get_dim_params(dimX)
Ymin, Ymax, Ybins, _ = self._get_dim_params(dimY)
Zmin, Zmax, Zbins, _ = self._get_dim_params(dimZ)
return np.ogrid[(dimX.getX(0)+dimX.getX(1))/2:(dimX.getX(Xbins)+dimX.getX(Xbins-1))/2:Xbins*1j,
(dimY.getX(0)+dimY.getX(1))/2:(dimY.getX(Ybins)+dimY.getX(Ybins-1))/2:Ybins*1j,
(dimZ.getX(0)+dimZ.getX(1))/2:(dimZ.getX(Zbins)+dimZ.getX(Zbins-1))/2:Zbins*1j]
def _get_dim_params(self, dim):
"""
Return the min, max, number_of_bins and bin_width of dim
"""
return dim.getMinimum(), dim.getMaximum(), dim.getNBins(), dim.getBinWidth()
def _convolution(self, signal):
from astropy.convolution import convolve, convolve_fft, Gaussian1DKernel
G1D = Gaussian1DKernel(self.getProperty("ConvolutionWidth").value).array
G3D = G1D * G1D.reshape((-1,1)) * G1D.reshape((-1,1,1))
try:
logger.debug('Trying astropy.convolution.convolve_fft for convolution')
return convolve_fft(signal, G3D) # Faster but will fail with large signal and kernel arrays
except ValueError:
logger.debug('Using astropy.convolution.convolve for convolution')
return convolve(signal, G3D)
def _calc_new_extents(self, inWS):
# Calculate new extents for fft space
extents=''
for d in range(inWS.getNumDims()):
dim = inWS.getDimension(d)
if dim.getNBins() == 1:
fft_dim = 1./(dim.getMaximum()-dim.getMinimum())
extents+=str(-fft_dim/2.)+','+str(fft_dim/2.)+','
else:
fft_dim=np.fft.fftshift(np.fft.fftfreq(dim.getNBins(), (dim.getMaximum()-dim.getMinimum())/dim.getNBins()))
extents+=str(fft_dim[0])+','+str(fft_dim[-1])+','
return extents[:-1]
def _karen(self, signal, width):
"""
Bragg peaks are located as outliers in some moving window
Outliers are defined as values more than 3sigma away from the median
Sigma is estimated using 1.4826*MAD
Returns median+2.2*MAD of window for values detected to be outliers
Input dataset (dset) and window width (x)
Input an odd window or the window will be asymmetric and stuff breaks
"""
med = ndimage.filters.median_filter(signal, size=width, mode='nearest') # Get median of input data set
mad = ndimage.filters.median_filter(np.abs(signal-med), size=width, mode='nearest') # Get median absolute deviation (MAD)
asigma = np.abs(mad*3*1.4826) # Absolute value of approximate sigma
mask = np.logical_or(signal < (med-asigma), signal > (med+asigma)) # Check if value is outlier based on MAD
signal[mask] = (med+2.2*mad)[mask] # Return median+2.2*MAD if value is outlier
return signal
def _gaussian_window(self, width, sigma):
"""
Generates a gaussian window
sigma is based on the dat being in a range 0 to 1
"""
from scipy.signal import gaussian
return (gaussian(width[0], sigma*width[0]).reshape((-1,1,1))
* gaussian(width[1], sigma*width[1]).reshape((-1,1))
* gaussian(width[2], sigma*width[2]))
def _blackman_window(self, width):
"""
Generates a blackman window
"""
return np.blackman(width[0]).reshape((-1,1,1)) * np.blackman(width[1]).reshape((-1,1)) * np.blackman(width[2])
def _tukey_window(self, width, alpha):
"""
Generates a tukey window
0 <= alpha <=1
alpha = 0 becomes rectangular
alpha = 1 becomes a Hann window
"""
from scipy.signal import tukey
return (tukey(width[0], alpha).reshape((-1,1,1))
* tukey(width[1], alpha).reshape((-1,1))
* tukey(width[2], alpha))
def _kaiser_window(self, width, beta):
"""
Generates a kaiser window
beta Window shape
0 Rectangular
5 Similar to a Hamming
6 Similar to a Hann
8.6 Similar to a Blackman
"""
return np.kaiser(width[0], beta).reshape((-1,1,1)) * np.kaiser(width[1], beta).reshape((-1,1)) * np.kaiser(width[2], beta)
AlgorithmFactory.subscribe(DeltaPDF3D)