/
image_utils.py
763 lines (643 loc) · 31.2 KB
/
image_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
"""
@brief Module to perform standard operations on sensor images such
computing median images, unbiasing using the serial overscan region,
trimming, etc..
"""
import os
import warnings
import numpy as np
import numpy.random as random
from scipy import interpolate
from astropy.io import fits
from astropy.utils.exceptions import AstropyWarning, AstropyUserWarning
from .fitsTools import fitsWriteto
import lsst.afw
import lsst.geom as lsstGeom
import lsst.afw.image as afwImage
import lsst.afw.math as afwMath
class Metadata(object):
def __init__(self, infile, hdu=0):
self.header = None
try:
self.md = afwImage.readMetadata(infile, dm_hdu(hdu))
except:
# This exception occurs when DM stack encounters a "." in
# a FITS header keyword.
with fits.open(infile) as hdulist:
self.header = dict()
self.header.update(hdulist[hdu].header)
def get(self, key):
return self(key)
def __call__(self, key):
if self.header is None:
return self.md.getScalar(key)
else:
return self.header[key]
def allAmps(fits_file=None):
all_amps = list(range(1, 17))
if fits_file is None:
return all_amps
with fits.open(fits_file) as f:
try:
# Get the number of amps from the FITS header if this is
# ptc or detector response file.
namps = f[0].header['NAMPS']
except KeyError:
# Otherwise, this is a raw image FITS file, so infer the
# number of amps from the number of extensions.
if len(f) <= 12:
# Wavefront sensor
return list(range(1, 9))
else:
# Full 16 amp sensor.
return all_amps
else:
# Number of amps specified in the FITS file header.
return list(range(1, namps + 1))
# Segment ID to HDU number in FITS dictionary
hdu_dict = dict([(1, 'Segment10'), (2, 'Segment11'), (3, 'Segment12'),
(4, 'Segment13'), (5, 'Segment14'), (6, 'Segment15'),
(7, 'Segment16'), (8, 'Segment17'), (9, 'Segment07'),
(10, 'Segment06'), (11, 'Segment05'), (12, 'Segment04'),
(13, 'Segment03'), (14, 'Segment02'), (15, 'Segment01'),
(16, 'Segment00')])
channelIds = dict([(i, hdu_dict[i][-2:]) for i in allAmps()])
def mean(x): return afwMath.makeStatistics(x, afwMath.MEAN).getValue()
def median(x): return afwMath.makeStatistics(x, afwMath.MEDIAN).getValue()
def stdev(x): return afwMath.makeStatistics(x, afwMath.STDEV).getValue()
def dm_hdu(hdu):
""" Compute DM HDU from the actual FITS file HDU."""
if lsst.afw.__version__.startswith('12.0'):
return hdu + 1
return hdu
def bias(im, overscan, **kwargs):
"""Compute the offset from the mean of the pixels in the serial
overscan region.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the serial overscan region.
Returns:
A single float value for the mean of the overscan region.
"""
return mean(im.Factory(im, overscan))
def bias_row(im, overscan, dxmin=5, dxmax=2, statistic=np.mean, **kwargs):
"""Compute the offset based on a statistic for each row in the serial
overscan region for columns dxmin through dxmax.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the serial overscan region.
dxmin: The number of columns to skip at the beginning of the serial
overscan region.
dxmax: The number of columns to skip at the end of the serial overscan region.
statistic: The statistic to use to calculate the offset for each row.
Returns:
A numpy array with length equal to the number of rows in the serial overscan
region.
"""
try:
imarr = im.Factory(im, overscan).getArray()
except AttributeError: # Dealing with a MaskedImage
imarr = im.Factory(im, overscan).getImage().getArray()
ny, nx = imarr.shape
rows = np.arange(ny)
values = np.array([statistic(imarr[j][dxmin:-dxmax]) for j in rows])
return lambda x: values[x]
def bias_col(im, overscan, dymin=5, dymax=2, statistic=np.mean, **kwargs):
"""Compute the offset based on a statistic for each column in the serial
overscan region for rows dymin through dymax.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the parallel overscan region.
dymin: The number of rows to skip at the beginning of the serial
overscan region.
dymax: The number of rows to skip at the end of the parallel overscan region.
statistic: The statistic to use to calculate the offset for each column.
Returns:
A numpy array with length equal to the number of columhs in the serial overscan
region.
"""
try:
imarr = im.Factory(im, overscan).getArray()
except AttributeError: # Dealing with a MaskedImage
imarr = im.Factory(im, overscan).getImage().getArray()
ny, nx = imarr.shape
cols = np.arange(nx)
values = np.array([statistic(imarr[dymin:-dymax,j]) for j in cols])
return lambda x: values[x]
def bias_func(im, overscan, dxmin=5, dxmax=2, statistic=np.mean, **kwargs):
"""Compute the offset by fitting a polynomial (order 1 by default)
to the mean of each row of the serial overscan region. This
returns a numpy.poly1d object with the fitted bias as function of pixel row.
Allows the option to explicitly set the fit order to apply to
each row using additional **kwargs.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the serial overscan region.
dxmin: The number of columns to skip at the beginning of the serial
overscan region.
dxmax: The number of columns to skip at the end of the serial overscan region.
statistic: The statistic to use to calculate the offset for each row.
Keyword Arguments:
fit_order: The order of the polynomial. The default is: 1.
Returns:
A np.poly1d object containing the coefficients for the polynomial fit.
"""
try:
imarr = im.Factory(im, overscan).getArray()
except AttributeError: # Dealing with a MaskedImage
imarr = im.Factory(im, overscan).getImage().getArray()
ny, nx = imarr.shape
rows = np.arange(ny)
values = np.array([statistic(imarr[j][dxmin:-dxmax]) for j in rows])
return np.poly1d(np.polyfit(rows, values, kwargs.get('fit_order', 1)))
def bias_col_func(im, overscan, dymin=5, dymax=2, statistic=np.mean, **kwargs):
"""Compute the offset by fitting a polynomial (order 1 by default)
to the mean of each column of the parallel overscan region. This
returns a numpy.poly1d object with the fitted bias as function of pixel columns.
Allows the option to explicitly set the fit order to apply to
each columns using additional **kwargs.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the parallel overscan region.
dymin: The number of rows to skip at the beginning of the parallel
overscan region.
dymax: The number of rows to skip at the end of the parallel overscan region.
statistic: The statistic to use to calculate the offset for each columns.
Keyword Arguments:
fit_order: The order of the polynomial. The default is: 1.
Returns:
A np.poly1d object containing the coefficients for the polynomial fit.
"""
try:
imarr = im.Factory(im, overscan).getArray()
except AttributeError: # Dealing with a MaskedImage
imarr = im.Factory(im, overscan).getImage().getArray()
ny, nx = imarr.shape
cols = np.arange(nx)
values = np.array([statistic(imarr[dymin:-dymax,j]) for j in cols])
return np.poly1d(np.polyfit(cols, values, kwargs.get('fit_order', 1)))
def bias_spline(im, overscan, dxmin=5, dxmax=2, statistic=np.mean, **kwargs):
"""Compute the offset by fitting a spline to the mean of each row in the
serial overscan region.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the serial overscan region.
dxmin: The number of columns to skip at the beginning of the serial
overscan region.
dxmax: The number of columns to skip at the end of the serial overscan region.
statistic: The statistic to use to calculate the offset for each row.
Keyword Arguments:
k: The degree of the spline fit. The default is: 3.
s: The amount of smoothing to be applied to the fit. The default is: 18000.
t: The number of knots. If None, finds the number of knots to use
for a given smoothing factor, s. The default is: None.
Returns:
A tuple (t,c,k) containing the vector of knots, the B-spline coefficients,
and the degree of the spline.
"""
try:
imarr = im.Factory(im, overscan).getArray()
except AttributeError: # Dealing with a MaskedImage
imarr = im.Factory(im, overscan).getImage().getArray()
ny, nx = imarr.shape
rows = np.arange(ny)
values = np.array([statistic(imarr[j][dxmin:-dxmax]) for j in rows])
rms = 7 # Expected read noise per pixel
weights = np.ones(ny) * (rms / np.sqrt(nx))
return interpolate.splrep(rows, values, w=1/weights, k=kwargs.get('k', 3),
s=kwargs.get('s', 18000), t=kwargs.get('t', None))
def bias_col_spline(im, overscan, dymin=5, dymax=2, statistic=np.mean, **kwargs):
"""Compute the offset by fitting a spline to the mean of each row in the
serial overscan region.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the parallel overscan region.
dymin: The number of rows to skip at the beginning of the parallel
overscan region.
dymax: The number of rows to skip at the end of the parallel overscan region.
statistic: The statistic to use to calculate the offset for each columns.
Keyword Arguments:
k: The degree of the spline fit. The default is: 3.
s: The amount of smoothing to be applied to the fit. The default is: 18000.
t: The number of knots. If None, finds the number of knots to use
for a given smoothing factor, s. The default is: None.
Returns:
A tuple (t,c,k) containing the vector of knots, the B-spline coefficients,
and the degree of the spline.
"""
try:
imarr = im.Factory(im, overscan).getArray()
except AttributeError: # Dealing with a MaskedImage
imarr = im.Factory(im, overscan).getImage().getArray()
ny, nx = imarr.shape
cols = np.arange(nx)
values = np.array([statistic(imarr[dymin:-dymax,j]) for j in cols])
rms = 7 # Expected read noise per pixel
weights = np.ones(nx) * (rms / np.sqrt(nx))
return interpolate.splrep(cols, values, w=1/weights, k=kwargs.get('k', 3),
s=kwargs.get('s', 18000), t=kwargs.get('t', None))
def bias_image(im, overscan, dxmin=5, dxmax=2, statistic=np.mean, bias_method='row', **kwargs):
"""Generate a bias image containing the offset values calculated from
bias(), bias_row(), bias_func() or bias_spline().
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the serial overscan region.
dxmin: The number of columns to skip at the beginning of the serial
overscan region.
dxmax: The number of columns to skip at the end of the serial overscan region.
statistic: The statistic to use to calculate the offset for each row.
bias_method: Either 'mean', 'row', 'func' or 'spline'.
Keyword Arguments:
fit_order: The order of the polynomial. This only needs to be specified when
using the 'func' method. The default is: 1.
k: The degree of the spline fit. This only needs to be specified when using
the 'spline' method. The default is: 3.
s: The amount of smoothing to be applied to the fit. This only needs to be
specified when using the 'spline' method. The default is: 18000.
t: The number of knots. If None, finds the number of knots to use for a given
smoothing factor, s. This only needs to be specified when using the 'spline'
method. The default is: None.
Returns:
An image with size equal to the input image containing the offset level.
"""
if bias_method.lower() not in ['mean', 'row', 'func', 'spline', 'none']:
raise RuntimeError('Bias method must be either "none", "mean", "row", "func" or "spline".')
def dummy_none(im, overscan, dxmin, dxmax, **kwargs):
return 0.0
method = {'mean' : bias, 'row' : bias_row, 'func' : bias_func, 'spline' : bias_spline, 'none' : dummy_none}
my_bias = method[bias_method.lower()](im, overscan, dxmin=dxmin, dxmax=dxmax, **kwargs)
biasim = afwImage.ImageF(im.getDimensions())
imarr = biasim.getArray()
ny, nx = imarr.shape
if (bias_method == 'row') or (bias_method == 'func'):
values = my_bias(np.arange(ny))
elif bias_method == 'spline':
values = interpolate.splev(np.arange(ny), my_bias)
elif isinstance(my_bias, float):
values = np.full(ny, my_bias)
for row in range(ny):
imarr[row] += values[row]
return biasim
def bias_image_col(im, overscan, dymin=5, dymax=2, statistic=np.mean, bias_method='col', **kwargs):
"""Generate a bias image containing the offset values calculated from
bias(), bias_col(), bias_func() or bias_spline().
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the parallel overscan region.
dymin: The number of rows to skip at the beginning of the parallel
overscan region.
dymax: The number of rows to skip at the end of the parallel overscan region.
statistic: The statistic to use to calculate the offset for each row.
bias_method: Either 'mean', 'col', 'func' or 'spline'.
Keyword Arguments:
fit_order: The order of the polynomial. This only needs to be specified when
using the 'func' method. The default is: 1.
k: The degree of the spline fit. This only needs to be specified when using
the 'spline' method. The default is: 3.
s: The amount of smoothing to be applied to the fit. This only needs to be
specified when using the 'spline' method. The default is: 18000.
t: The number of knots. If None, finds the number of knots to use for a given
smoothing factor, s. This only needs to be specified when using the 'spline'
method. The default is: None.
Returns:
An image with size equal to the input image containing the offset level.
"""
if bias_method not in ['mean', 'col', 'func', 'spline']:
raise RuntimeError('Bias method must be either "mean", "col", "func" or "spline".')
method = {'mean' : bias, 'col' : bias_col, 'func' : bias_col_func, 'spline' : bias_col_spline}
my_bias = method[bias_method](im, overscan, dymin=dymin, dymax=dymax, **kwargs)
biasim = afwImage.ImageF(im.getDimensions())
imarr = biasim.getArray()
nx = overscan.width
if (bias_method == 'col') or (bias_method == 'func'):
values = my_bias(np.arange(nx))
elif bias_method == 'spline':
values = interpolate.splev(np.arange(nx), my_bias)
elif isinstance(my_bias, float):
values = np.full(nx, my_bias)
for col in range(nx):
imarr[:,col] += values[col]
return biasim
def bias_image_rowcol(im, serial_overscan, parallel_overscan, \
dxmin=5, dxmax=2, dymin=5, dymax=2, statistic=np.mean, **kwargs):
"""Generate a bias image based on a statistic for each row in the serial
overscan region for columns dxmin through dxmax, and the parallel
overscan region for rows dymin through dymax.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
serial_overscan: A bounding box for the serial overscan region.
parallel_overscan: A bounding box for the parallel overscan region.
dxmin: The number of columns to skip at the beginning of the serial
overscan region.
dxmax: The number of columns to skip at the end of the serial overscan region.
dymin: The number of rows to skip at the beginning of the serial
overscan region.
dymax: The number of rows to skip at the end of the parallel overscan region.
statistic: The statistic to use to calculate the offset for each row.
Returns:
An image with size equal to the input image containing the offset level.
"""
biasim = afwImage.ImageF(im.getDimensions())
imarr = biasim.getArray()
amp_bbox = im.getBBox()
corner_start_x = serial_overscan.getMin().x
corner_start_y = parallel_overscan.getMin().y
corner_bbox = lsst.geom.Box2I(lsst.geom.Point2I(corner_start_x, corner_start_y), amp_bbox.getMax())
my_bias_row = bias_row(im, serial_overscan, dxmin=dxmin, dxmax=dxmax, statistic=statistic, **kwargs)
my_bias_col = bias_col(im, parallel_overscan, dymin=dymin, dymax=dymax, statistic=statistic, **kwargs)
my_bias_corner = bias(im, corner_bbox)
nx = parallel_overscan.width
ny = serial_overscan.height
imarr[:, :nx] += my_bias_col(np.arange(nx))
imarr[:ny] += my_bias_row(np.arange(ny))[:, None]
imarr[:ny,:nx] -= my_bias_corner
imarr[ny:,nx:] += my_bias_corner
return biasim
def trim(im, imaging):
"""Trim the prescan and overscan regions.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
imaging: A bounding box containing only the imaging section and
excluding the prescan.
Returns:
An afw image.
"""
return im.Factory(im, imaging)
def unbias_and_trim(im, overscan, imaging=None, dxmin=5, dxmax=2, bias_method='row',
bias_frame=None, **kwargs):
"""Subtract the offset calculated from the serial overscan region and optionally trim
prescan and overscan regions. Includes the option to subtract the median of a stack of
offset-subtracted bias frames to remove the bias level.
Args:
im: A masked (lsst.afw.image.imageLib.MaskedImageF) or unmasked
(lsst.afw.image.imageLib.ImageF) afw image.
overscan: A bounding box for the serial overscan region.
imaging: A bounding box containing only the imaging section and
excluding the prescan.
dxmin: The number of columns to skip at the beginning of the serial
overscan region.
dxmax: The number of columns to skip at the end of the serial overscan region.
bias_method: Either 'mean', 'row', 'func' or 'spline'.
bias_frame: A single bias image containing a set of stacked oversan-corrected
and trimmed bias frames.
Keyword Arguments:
fit_order: The order of the polynomial. This only needs to be specified when using
the 'func' method. The default is: 1.
k: The degree of the spline fit. This only needs to be specified when using the 'spline'
method. The default is: 3.
s: The amount of smoothing to be applied to the fit. This only needs to be specified when
using the 'spline' method. The default is: 18000.
t: The number of knots. If None, finds the number of knots to use for a given smoothing
factor, s. This only needs to be specified when using the 'spline' method.
The default is: None.
bias_method_col : Method to subtract the parallel overscan. Typically not used.
However can be set to 'mean', 'col', 'func', or 'spline'. Default is None, in which
case the parallel overscan is not subtracted.
dymin: The number of columns to skip at the beginning of the parallel
overscan region, if parallel overscan subtraction is requested. Defaults to 5
dymax: The number of columns to skip at the end of the parallel
overscan region, if parallel overscan subtraction is requested. Defaults to 2
Returns:
An afw image.
"""
if bias_method == 'rowcol':
serial_overscan = kwargs['serial_overscan']
parallel_overscan = kwargs['parallel_overscan']
parallel_bbox = lsst.geom.Box2I(
lsst.geom.Point2I(0, parallel_overscan.getMin().y),
parallel_overscan.getMax())
im -= bias_image_rowcol(im, serial_overscan=serial_overscan,
parallel_overscan=parallel_bbox)
else:
im -= bias_image(im, overscan, dxmin=dxmin, dxmax=dxmax,
bias_method=bias_method, **kwargs)
bias_method_col = kwargs.get('bias_method_col', None)
overscan_col = kwargs.get('overscan_col')
if bias_method_col not in ['None', None]:
im -= bias_image_col(im, overscan_col, dymin=kwargs.get('dymin', 5),
dymax=kwargs.get('dymax', 2),
bias_method=bias_method_col, **kwargs)
if bias_frame:
im -= bias_frame
if imaging is not None:
return trim(im, imaging)
return im
def set_bitpix(hdu, bitpix):
dtypes = {16: np.int16, -32: np.float32, 32: np.int32}
for keyword in 'BSCALE BZERO'.split():
if keyword in hdu.header:
del hdu.header[keyword]
if bitpix > 0:
my_round = np.round
else:
def my_round(x): return x
hdu.data = np.array(my_round(hdu.data), dtype=dtypes[bitpix])
def fits_median_file(files, outfile, bitpix=16, overwrite=True):
output = fits.HDUList()
output.append(fits.PrimaryHDU())
all_amps = allAmps()
for amp in all_amps:
data = fits_median(files, hdu=dm_hdu(amp)).getArray()
if bitpix < 0:
output.append(fits.ImageHDU(data=data))
else:
output.append(fits.CompImageHDU(data=data,
compresssion_type='RICE_1'))
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning, append=True)
warnings.filterwarnings('ignore', category=AstropyWarning, append=True)
warnings.filterwarnings('ignore', category=AstropyUserWarning,
append=True)
with fits.open(files[0]) as template:
output[0].header.update(template[0].header)
output[0].header['FILENAME'] = os.path.basename(outfile)
for amp in all_amps:
output[amp].header.update(template[amp].header)
set_bitpix(output[amp], bitpix)
fitsWriteto(output, outfile, overwrite=overwrite)
def fits_mean_file(files, outfile, overwrite=True, bitpix=32):
output = fits.HDUList()
output.append(fits.PrimaryHDU())
all_amps = allAmps()
for amp in all_amps:
images = [afwImage.ImageF(item, dm_hdu(amp)) for item in files]
if lsst.afw.__version__.startswith('12.0'):
images = afwImage.vectorImageF(images)
mean_image = afwMath.statisticsStack(images, afwMath.MEAN)
if bitpix < 0:
output.append(fits.ImageHDU(data=mean_image.getArray()))
else:
output.append(fits.CompImageHDU(data=mean_image.getArray(),
compression_type='RICE_1'))
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning, append=True)
warnings.filterwarnings('ignore', category=AstropyWarning, append=True)
warnings.filterwarnings('ignore', category=AstropyUserWarning,
append=True)
with fits.open(files[0]) as template:
output[0].header.update(template[0].header)
output[0].header['FILENAME'] = os.path.basename(outfile)
for amp in all_amps:
output[amp].header.update(template[amp].header)
set_bitpix(output[amp], bitpix)
for i in (-3, -2, -1):
output.append(template[i])
fitsWriteto(output, outfile, overwrite=overwrite)
def fits_median(files, hdu=2, fix=True):
"""Compute the median image from a set of image FITS files."""
ims = [afwImage.ImageF(f, hdu) for f in files]
exptimes = [Metadata(f).get('EXPTIME') for f in files]
if min(exptimes) != max(exptimes):
raise RuntimeError("Error: unequal exposure times")
if fix:
medians = np.array([median(im) for im in ims])
med = sum(medians)/len(medians)
errs = medians - med
for im, err in zip(ims, errs):
im -= err
if lsst.afw.__version__.startswith('12.0'):
ims = afwImage.vectorImageF(ims)
median_image = afwMath.statisticsStack(ims, afwMath.MEDIAN)
return median_image
def stack(ims, statistic=afwMath.MEDIAN, stat_ctrl=afwMath.StatisticsControl()):
"""Stacks a list of images based on a statistic."""
images = []
for image in ims:
images.append(image)
if lsst.afw.__version__.startswith('12.0'):
images = afwImage.vectorImageF(images)
summary = afwMath.statisticsStack(images, statistic, stat_ctrl)
return summary
def superbias(files, overscan, imaging=None, dxmin=5, dxmax=2,
bias_method='row', hdu=2, statistic=afwMath.MEDIAN, **kwargs):
"""Generates a single stacked 'super' bias frame based on
a statistic. Images must be either all masked or all unmasked."""
ims = [afwImage.ImageF(f, hdu) for f in files]
bias_frames = [unbias_and_trim(im, overscan, imaging, dxmin, dxmax,
bias_method, **kwargs) for im in ims]
return stack(bias_frames, statistic)
def superbias_file(files, overscan, outfile, imaging=None, dxmin=5, dxmax=2,
bias_method='row', bitpix=-32, clobber=True, **kwargs):
images = {amp: superbias(files, overscan, imaging, dxmin, dxmax,
bias_method, hdu=dm_hdu(amp), **kwargs)
for amp in allAmps(files[0])}
writeFits(images, outfile, files[0], bitpix=bitpix)
def writeFits(images, outfile, template_file, bitpix=32):
output = fits.HDUList()
output.append(fits.PrimaryHDU())
all_amps = allAmps(template_file)
for amp in all_amps:
if bitpix < 0:
output.append(fits.ImageHDU(data=images[amp].getArray()))
else:
output.append(fits.CompImageHDU(data=images[amp].getArray(),
compression_type='RICE_1'))
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning, append=True)
warnings.filterwarnings('ignore', category=AstropyWarning, append=True)
warnings.filterwarnings('ignore', category=AstropyUserWarning,
append=True)
with fits.open(template_file) as template:
output[0].header.update(template[0].header)
output[0].header['FILENAME'] = outfile
metadata = images.get('METADATA', None)
if metadata is not None:
for key, val in metadata.items():
output[0].header[key] = val
for amp in all_amps:
output[amp].header.update(template[amp].header)
set_bitpix(output[amp], bitpix)
for i in (-3, -2, -1):
output.append(template[i])
fitsWriteto(output, outfile, overwrite=True, checksum=True)
def check_temperatures(files, tol, setpoint=None, warn_only=False):
for infile in files:
md = Metadata(infile) # Read PHDU keywords
if setpoint is not None:
ref_temp = setpoint
else:
ref_temp = md.get('TEMP_SET')
try:
ccd_temp = md.get('CCDTEMP')
except:
print("Missing CCDTEMP keyword:", infile)
return
if np.abs(ccd_temp - ref_temp) > tol:
what = "Measured operating temperature %(ccd_temp)s departs from expected temperature %(ref_temp)s by more than the %(tol)s tolerance for file %(infile)s" % locals(
)
if warn_only:
print(what)
else:
raise RuntimeError(what)
class SubRegionSampler(object):
def __init__(self, dx, dy, nsamp, imaging):
self.dx = dx
self.dy = dy
#
# Generate sub-regions at random locations on the segment
# imaging region.
#
self.xarr = random.randint(imaging.getWidth() - dx - 1, size=nsamp)
self.yarr = random.randint(imaging.getHeight() - dy - 1, size=nsamp)
self.imaging = imaging
def bbox(self, x, y):
return lsstGeom.Box2I(lsstGeom.Point2I(int(x), int(y)),
lsstGeom.Extent2I(self.dx, self.dy))
def subim(self, im, x, y):
return im.Factory(im, self.bbox(x, y))
def bad_column(column_indices, threshold):
"""
Count the sizes of contiguous sequences of masked pixels and
return True if the length of any sequence exceeds the threshold
number.
"""
if len(column_indices) < threshold:
# There are not enough masked pixels to mark this as a bad
# column.
return False
# Fill an array with zeros, then fill with ones at mask locations.
column = np.zeros(max(column_indices) + 1)
column[(column_indices,)] = 1
# Count pixels in contiguous masked sequences.
masked_pixel_count = []
last = 0
for value in column:
if value != 0 and last == 0:
masked_pixel_count.append(1)
elif value != 0 and last != 0:
masked_pixel_count[-1] += 1
last = value
if len(masked_pixel_count) > 0 and max(masked_pixel_count) >= threshold:
return True
return False
def rebin_array(arr, binsize, use_mean=False):
"See http://scipython.com/blog/binning-a-2d-array-in-numpy/"
if binsize == 1:
return arr
shape = (arr.shape[0]//binsize, binsize, arr.shape[1]//binsize, binsize)
if use_mean:
result = arr.reshape(shape).mean(-1).mean(1)
else:
result = arr.reshape(shape).sum(-1).sum(1)
return result
def rebin(image, binsize, use_mean=False):
rebinned_array = rebin_array(image.getArray(), binsize, use_mean=use_mean)
output_image = image.Factory(*rebinned_array.shape)
out_array = output_image.getArray()
out_array += rebinned_array.transpose()
return output_image
if __name__ == '__main__':
import glob
files = glob.glob('data/dark*.fits')
im = fits_median(files)