forked from sherpa-deproject/deproject
-
Notifications
You must be signed in to change notification settings - Fork 0
/
specstack.py
270 lines (225 loc) · 9.88 KB
/
specstack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
Manipulate a stack of spectra in Sherpa.
The methods in the SpecStack class provide a way to automatically apply
familiar Sherpa commands such as `set_par`_ or `freeze`_ or `plot_fit`_
to a stack of PHA spectra. This simplifies simultaneous fitting of
multiple spectra.
Note that the :mod:`specstack` module is currently distributed within with the
:mod:`deproject` package. `Specstack` is not yet fully documented or tested
outside the context of `deproject`.
:Copyright: Smithsonian Astrophysical Observatory (2009)
:Author: Tom Aldcroft (aldcroft@head.cfa.harvard.edu)
"""
import re
import numpy
import sherpa.astro.ui as SherpaUI
try:
import pycrates
import pychips
except ImportError:
# Allow doc generation to work without pychips and pycrates
print "ERROR: could not import pycrates and pychips"
def _sherpa_plot_func(func):
def _sherpa_plot_func_(self, *args, **kwargs):
self._sherpa_plot(func, *args, **kwargs)
return _sherpa_plot_func_
class SpecStack(object):
"""
Manipulate a stack of spectra in Sherpa.
"""
def __init__(self):
self.datasets = []
self.model_comps = [] # Model components
self.obsids = set()
self.srcmodel_comps = [] # Generic model components in source model expression
self.model_comps = [] # All instantiated model components for shells
def load_pha(self, specfile, annulus):
"""
Load a pha file and add to the datasets for stacked analysis.
:param specfile: extracted source PHA/PI spectrum file
:param annulus: annulus for spectrum file
"""
dataid = len(self.datasets)
print 'Loading spectrum file %s as dataset id %d' % (specfile, dataid)
SherpaUI.load_pha(dataid, specfile)
try:
obsid = int(pycrates.read_file(specfile).get_key_value('OBS_ID'))
except (TypeError, ValueError):
obsid = 0
dataset = dict(file=specfile,
obsid=obsid,
id=dataid,
annulus=annulus
)
self.datasets.append(dataset)
self.obsids.add(obsid)
def find_parval(self, parname):
"""
Find the value of the first parameter with the given ``parname``. Ignore
case when matching.
:param parname: parameter name
:rtype: parameter value
"""
RE_parname = re.compile(parname + '$', re.IGNORECASE)
for model_comp in self.model_comps:
mc = model_comp['object'] # Get the model component object
for par in mc.pars:
if RE_parname.match(par.name):
return par.val
raise ValueError('Parameter %s not found in any model component' % shell)
def find_norm(self, shell):
"""
Find the normalization value for the ``shell`` model. Check for multiple
or missing norms.
:param shell: shell index
:rtype: norm value
"""
norms = []
for model_comp in self.model_comps:
if model_comp['shell'] == shell:
mc = model_comp['object'] # Get the model component object
if hasattr(mc, 'norm'): # Special attr of model component
norms.append(mc.norm.val)
if len(norms) == 0:
raise ValueError('Model for shell %d has no norm' % shell)
elif len(norms) > 1:
raise ValueError('Model for shell %d has multiple norms' % shell)
return norms[0]
def _get_n_datasets(self):
"""
Return the number of datasets
:rtype: int"""
return len(self.datasets)
n_datasets = property(_get_n_datasets)
def _sherpa_cmd(self, func, *args):
"""
Apply an arbitrary Sherpa function to each of the datasets.
:rtype: None
"""
for dataset in self.datasets:
func(dataset['id'], *args)
def subtract(self, *args):
"""Subtract background from each dataset"""
self._sherpa_cmd(SherpaUI.subtract, *args)
def notice(self, *args):
"""Apply Sherpa notice command to each dataset."""
self._sherpa_cmd(SherpaUI.notice_id, *args)
def ignore(self, *args):
"""Apply Sherpa ignore command to each dataset."""
self._sherpa_cmd(SherpaUI.ignore_id, *args)
def _sherpa_par(self, func, par, msg, *args):
"""Apply ``func(*args)`` to all shell model component parameters named ``par``.
See thaw(), freeze(), set_par() and get_par() for examples.
:param func: Sherpa function that takes a full parameter name specification and
optional args, e.g. set_par() used as set_par('xsmekal_7.kt', 2.0)
:param par: Model type and param name as in source model expression e.g. 'xsmekal.kt'
:param msg: Format string to indicate action.
:param *args: Optional function arguments
:rtype: numpy array of function return values ordered by shell
"""
model_type, parname = par.split('.')
vals = [] # return values
for model_comp in self.model_comps:
if model_comp['type'] == model_type:
fullparname = '%s.%s' % (model_comp['name'], parname)
if msg is not None:
print msg % fullparname
vals.append(func(fullparname, *args))
return vals
def thaw(self, par):
"""Apply thaw command to specified parameter for each dataset.
:param par: parameter specifier in format <model_type>.<par_name>
:rtype: None
"""
self._sherpa_par(SherpaUI.thaw, par, 'Thawing %s')
def freeze(self, par):
"""Apply freeze command to specified parameter for each dataset.
:param par: parameter specifier in format <model_type>.<par_name>
:rtype: None
"""
self._sherpa_par(SherpaUI.freeze, par, 'Freezing %s')
def set_par(self, par, val):
"""Set parameter value for each dataset.
:param par: parameter specifier in format <model_type>.<par_name>
:param val: parameter value
:rtype: None
"""
self._sherpa_par(SherpaUI.set_par, par, 'Setting %%s=%s' % str(val), val)
def get_par(self, par):
"""Get array of parameter values for datasets.
:param par: parameter specifier in format <model_type>.<par_name>
:param val: parameter value
:rtype: numpy array of parameter value ordered by dataset
"""
pars = self._sherpa_par(SherpaUI.get_par, par, 'Getting %s')
return numpy.array([x.val for x in pars])
def _sherpa_plot(self, func, *args, **kwargs):
"""Call Sherpa plot ``func`` for each dataset.
:param func: Sherpa plot function
:param args: plot function list arguments
:param kwargs: plot function named (keyword) arguments
:rtype: None
"""
for shell in range(self.nshell):
window_id = 'Shell%d' % shell
try:
pychips.add_window(['id', window_id])
except RuntimeError:
pass # already exists
new_args = args
if len(args) > 0:
# Try to format first arg
try:
new_args = tuple([args[0] % shell]) + args[1:]
except TypeError:
pass
pychips.set_current_window(window_id)
func(*new_args, **kwargs)
def print_window(self, *args, **kwargs):
"""Print window for each dataset.
:param args: list arguments to pass to print_window
:param kwargs: named (keyword) arguments to pass to print_window
:rtype: None
"""
if len(args) > 0:
args = tuple([args[0] + '%d']) + args[1:]
self._sherpa_plot(pychips.plot_window, *args, **kwargs)
def dummyfunc(self, *args, **kwargs):
pass
try:
log_scale = _sherpa_plot_func(pychips.log_scale)
linear_scale = _sherpa_plot_func(pychips.linear_scale)
except NameError:
# Allow doc generation to work without pychips
log_scale = dummyfunc
linear_scale = dummyfunc
plot_fit = _sherpa_plot_func(SherpaUI.plot_fit)
plot_arf = _sherpa_plot_func(SherpaUI.plot_arf)
plot_bkg_fit = _sherpa_plot_func(SherpaUI.plot_bkg_fit)
plot_bkg_ratio = _sherpa_plot_func(SherpaUI.plot_bkg_ratio)
plot_chisqr = _sherpa_plot_func(SherpaUI.plot_chisqr)
plot_fit_delchi = _sherpa_plot_func(SherpaUI.plot_fit_delchi)
plot_psf = _sherpa_plot_func(SherpaUI.plot_psf)
plot_bkg = _sherpa_plot_func(SherpaUI.plot_bkg)
plot_bkg_fit_delchi = _sherpa_plot_func(SherpaUI.plot_bkg_fit_delchi)
plot_bkg_resid = _sherpa_plot_func(SherpaUI.plot_bkg_resid)
plot_data = _sherpa_plot_func(SherpaUI.plot_data)
plot_fit_resid = _sherpa_plot_func(SherpaUI.plot_fit_resid)
plot_ratio = _sherpa_plot_func(SherpaUI.plot_ratio)
plot_bkg_chisqr = _sherpa_plot_func(SherpaUI.plot_bkg_chisqr)
plot_bkg_fit_resid = _sherpa_plot_func(SherpaUI.plot_bkg_fit_resid)
plot_bkg_source = _sherpa_plot_func(SherpaUI.plot_bkg_source)
plot_delchi = _sherpa_plot_func(SherpaUI.plot_delchi)
plot_model = _sherpa_plot_func(SherpaUI.plot_model)
plot_resid = _sherpa_plot_func(SherpaUI.plot_resid)
plot_bkg_delchi = _sherpa_plot_func(SherpaUI.plot_bkg_delchi)
plot_bkg_model = _sherpa_plot_func(SherpaUI.plot_bkg_model)
# CIAO 4.3 re-named plot_bkg_unconvolved to plot_bkg_source
if hasattr(SherpaUI, "plot_bkg_unconvolved"):
plot_bkg_unconvolved = _sherpa_plot_func(SherpaUI.plot_bkg_unconvolved)
else:
plot_bkg_unconvolved = _sherpa_plot_func(SherpaUI.plot_bkg_source)
plot_bkg_source = _sherpa_plot_func(SherpaUI.plot_bkg_source)
plot_fit = _sherpa_plot_func(SherpaUI.plot_fit)
plot_order = _sherpa_plot_func(SherpaUI.plot_order)
plot_source = _sherpa_plot_func(SherpaUI.plot_source)