/
image_collection.py
629 lines (513 loc) · 22.1 KB
/
image_collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
import fnmatch
from os import listdir, path
import logging
import numpy as np
import numpy.ma as ma
from astropy.table import Table, vstack
import astropy.io.fits as fits
logger = logging.getLogger(__name__)
__all__ = ['ImageFileCollection']
class ImageFileCollection(object):
"""
Representation of a collection of image files.
The class offers a table summarizing values of
keywords in the FITS headers of the files in the collection and offers
convenient methods for iterating over the files in the collection. The
generator methods use simple filtering syntax and can automate storage
of any FITS files modified in the loop using the generator.
Parameters
----------
location : str, optional
path to directory containing FITS files
keywords : list of str or '*', optional
Keywords that should be used as column headings in the summary table.
If the value is or includes '*' then all keywords that appear in any
of the FITS headers of the files in the collection become table
columns.
info_file : str, optional
Path to file that contains a table of information about FITS files.
Attributes
----------
location
keywords
files
summary_info
Raises
------
ValueError
Raised if keywords are set to a combination of '*' and any other
value.
"""
def __init__(self, location=None, keywords=None, info_file=None):
self._location = location
self._files = []
if location:
self._files = self._fits_files_in_directory()
self._summary_info = {}
if keywords is None:
keywords = []
if info_file is not None:
try:
info_path = path.join(self.location, info_file)
except AttributeError:
info_path = info_file
try:
self._summary_info = Table.read(info_path,
format='ascii',
delimiter=',')
self._summary_info = Table(self._summary_info,
masked=True)
except IOError:
if location:
logger.warning('Unable to open table file %s, will try '
'initializing from location instead',
info_path)
else:
raise
# Used internally to keep track of whether the user asked for all
# keywords or a specific list. The keywords setter takes care of
# actually setting the correct value, this just ensure that there
# is always *some* value.
self._all_keywords = False
if keywords:
self.keywords = keywords
@property
def summary(self):
"""
astropy.table.Table of values of FITS keywords for files in the
collection.
Each keyword is a column heading. In addition, there is a column
called 'file' that contains the name of the FITS file. The directory
is not included as part of that name.
"""
return self._summary_info
@property
def summary_info(self):
"""
Deprecated -- use summary instead -- astropy.table.Table of values of
FITS keywords for files in the collection.
Each keyword is a column heading. In addition, there is a column
called 'file' that contains the name of the FITS file. The directory
is not included as part of that name.
"""
return self._summary_info
@property
def location(self):
"""
str, Path name to directory containing FITS files
"""
return self._location
@property
def keywords(self):
"""
list of str, Keywords currently in the summary table.
Setting the keywords causes the summary table to be regenerated unless
the new keywords are a subset of the old.
"""
if self.summary_info:
return self.summary_info.keys()
else:
return []
@keywords.setter
def keywords(self, keywords=None):
# since keywords are drawn from self.summary_info, setting
# summary_info sets the keywords.
if keywords is None:
self._summary_info = []
return
if keywords == '*':
self._all_keywords = True
else:
self._all_keywords = False
logging.debug('keywords in setter before pruning: %s', keywords)
# remove duplicates and force a copy
new_keys = list(set(keywords))
logging.debug('keywords after pruning %s', new_keys)
full_new_keys = list(set(new_keys))
full_new_keys.append('file')
full_new_set = set(full_new_keys)
current_set = set(self.keywords)
if full_new_set.issubset(current_set):
logging.debug('table columns before trimming: %s',
' '.join(current_set))
cut_keys = current_set.difference(full_new_set)
logging.debug('will try removing columns: %s',
' '.join(cut_keys))
for key in cut_keys:
self._summary_info.remove_column(key)
logging.debug('after removal column names are: %s',
' '.join(self.keywords))
else:
logging.debug('should be building new table...')
self._summary_info = self._fits_summary(header_keywords=new_keys)
@property
def files(self):
"""
list of str, Unfiltered list of FITS files in location.
"""
return self._files
def values(self, keyword, unique=False):
"""
List of values for a keyword.
Parameters
----------
keyword : str
Keyword (i.e. table column) for which values are desired.
unique : bool, optional
If True, return only the unique values for the keyword.
Returns
-------
list
Values as a list.
"""
if keyword not in self.keywords:
raise ValueError(
'keyword %s is not in the current summary' % keyword)
if unique:
return list(set(self.summary_info[keyword]))
else:
return list(self.summary_info[keyword])
def files_filtered(self, **kwd):
"""Determine files whose keywords have listed values.
`**kwd` is list of keywords and values the files must have.
The value '*' represents any value.
A missing keyword is indicated by value ''
Example:
>>> keys = ['imagetyp','filter']
>>> collection = ImageFileCollection('test/data', keywords=keys)
>>> collection.files_filtered(imagetyp='LIGHT', filter='R')
>>> collection.files_filtered(imagetyp='*', filter='')
NOTE: Value comparison is case *insensitive* for strings.
"""
# force a copy by explicitly converting to a list
current_file_mask = list(self.summary_info['file'].mask)
self._find_keywords_by_values(**kwd)
filtered_files = self.summary_info['file'].compressed()
self.summary_info['file'].mask = current_file_mask
return filtered_files
def refresh(self):
"""
Refresh the collection by re-reading headers.
"""
keywords = '*' if self._all_keywords else self.keywords
self._summary_info = self._fits_summary(header_keywords=keywords)
def _dict_from_fits_header(self, file_name, input_summary=None,
missing_marker=None):
"""
Construct a dictionary whose keys are the header keywords and values
are a list of the values from this file and the input dictionary.
Parameters
----------
file_name : str
Name of FITS file.
input_summary : dict
Existing dictionary to which new values should be appended.
Returns
-------
file_table : astropy.table.Table
"""
from collections import OrderedDict
def _add_val_to_dict(key, value, tbl_dict, n_prev):
try:
tbl_dict[key.lower()].append(value)
except KeyError:
tbl_dict[key.lower()] = [missing_marker] * n_previous
tbl_dict[key.lower()].append(value)
if input_summary is None:
summary = OrderedDict()
n_previous = 0
else:
summary = input_summary
n_previous = len(summary['file'])
h = fits.getheader(file_name)
assert 'file' not in h
# Try opening header before this so that file name is only added if
# file is valid FITS
try:
summary['file'].append(path.basename(file_name))
except KeyError:
summary['file'] = [path.basename(file_name)]
missing_in_this_file = [k for k in summary if (k not in h and
k != 'file')]
multi_entry_keys = {'comment': [],
'history': []}
for k, v in h.iteritems():
if k == '':
continue
if k.lower() in ['comment', 'history']:
multi_entry_keys[k.lower()].append(str(v))
# Accumulate these in a separate dictionary until the
# end to avoid adding multiple entries to summary.
continue
else:
val = v
_add_val_to_dict(k, val, summary, n_previous)
for k, v in multi_entry_keys.iteritems():
if v:
joined = ','.join(v)
_add_val_to_dict(k, joined, summary, n_previous)
for missing in missing_in_this_file:
summary[missing].append(missing_marker)
return summary
def _set_column_name_case_to_match_keywords(self, header_keys,
summary_table):
key_name_dict = {k.lower(): k for k in header_keys
if k != k.lower()}
for lcase, user_case in key_name_dict.iteritems():
try:
summary_table.rename_column(lcase, user_case)
except KeyError:
pass
def _fits_summary(self, header_keywords=None):
"""
Generate a summary table of keywords from FITS headers.
Parameters
----------
header_keywords : list of str or '*', optional
Keywords whose value should be extracted from FITS headers.
Default value is ``None``.
"""
from astropy.table import MaskedColumn
if not self.files:
return None
# Get rid of any duplicate keywords, also forces a copy.
header_keys = set(header_keywords)
header_keys.add('file')
file_name_column = MaskedColumn(name='file', data=self.files)
if not header_keys or (header_keys == set(['file'])):
summary_table = Table(masked=True)
summary_table.add_column(file_name_column)
return summary_table
summary_dict = None
missing_marker = None
for file_name in file_name_column:
file_path = path.join(self.location, file_name)
try:
summary_dict = self._dict_from_fits_header(
file_path, input_summary=summary_dict,
missing_marker=missing_marker)
except IOError as e:
logger.warning('Unable to get FITS header for file %s: %s',
file_path, e)
continue
summary_table = Table(summary_dict, masked=True)
for column in summary_table.colnames:
summary_table[column].mask = [v is missing_marker
for v in summary_table[column]]
self._set_column_name_case_to_match_keywords(header_keys,
summary_table)
missing_columns = header_keys - set(summary_table.colnames)
missing_columns -= set(['*'])
length = len(summary_table)
for column in missing_columns:
all_masked = MaskedColumn(name=column, data=np.zeros(length),
mask=np.ones(length))
summary_table.add_column(all_masked)
if '*' not in header_keys:
summary_table.keep_columns(header_keys)
if not summary_table.masked:
summary_table = Table(summary_table, masked=True)
return summary_table
def _find_keywords_by_values(self, **kwd):
"""
Find files whose keywords have given values.
`**kwd` is list of keywords and values the files must have.
The value '*' represents any value.
A missing keyword is indicated by value ''
Example:
>>> keys = ['imagetyp','filter']
>>> collection = ImageFileCollection('test/data', keywords=keys)
>>> collection.files_filtered(imagetyp='LIGHT', filter='R')
>>> collection.files_filtered(imagetyp='*', filter='')
NOTE: Value comparison is case *insensitive* for strings.
"""
keywords = kwd.keys()
values = kwd.values()
if (set(keywords).issubset(set(self.keywords))):
# we already have the information in memory
use_info = self.summary_info
else:
# we need to load information about these keywords.
use_info = self._fits_summary(header_keywords=keywords)
matches = np.array([True] * len(use_info))
for key, value in zip(keywords, values):
logger.debug('Key %s, value %s', key, value)
logger.debug('Value in table %s', use_info[key])
value_missing = use_info[key].mask
logger.debug('Value missing: %s', value_missing)
value_not_missing = np.logical_not(value_missing)
if value == '*':
have_this_value = value_not_missing
elif value is not None:
if isinstance(value, basestring):
# need to loop explicitly over array rather than using
# where to correctly do string comparison.
have_this_value = np.array([False] * len(use_info))
for idx, file_key_value in enumerate(use_info[key]):
if value_not_missing[idx]:
value_matches = (file_key_value.lower() ==
value.lower())
else:
value_matches = False
have_this_value[idx] = (value_not_missing[idx] &
value_matches)
else:
have_this_value = value_not_missing
tmp = (use_info[key][value_not_missing] == value)
have_this_value[value_not_missing] = tmp
have_this_value &= value_not_missing
else:
# this case--when value==None--is asking for the files which
# are missing a value for this keyword
have_this_value = value_missing
matches &= have_this_value
# the numpy convention is that the mask is True for values to
# be omitted, hence use ~matches.
logger.debug('Matches: %s', matches)
self.summary_info['file'].mask = ma.nomask
self.summary_info['file'][~matches] = ma.masked
def _fits_files_in_directory(self, extensions=None,
compressed=True):
"""
Get names of FITS files in directory, based on filename extension.
Parameters
----------
extension : list of str, optional
List of filename extensions that are FITS files. Default is
``['fit', 'fits']``
compressed : bool, optional
If ``True``, compressed files should be included in the list
(e.g. `.fits.gz`)
Returns
-------
list
*Names* of the files (with extension), not the full pathname.
"""
full_extensions = extensions or ['fit', 'fits']
if compressed:
with_gz = [extension + '.gz' for extension in full_extensions]
full_extensions.extend(with_gz)
all_files = listdir(self.location)
files = []
for extension in full_extensions:
files.extend(fnmatch.filter(all_files, '*' + extension))
return files
def _generator(self, return_type,
save_with_name="", save_location='',
clobber=False,
overwrite=False,
do_not_scale_image_data=True,
return_fname=False,
**kwd):
"""
Generator that yields each {name} in the collection.
If any of the parameters ``save_with_name``, ``save_location`` or
``overwrite`` evaluates to ``True`` the generator will write a copy of
each FITS file it is iterating over. In other words, if
``save_with_name`` and/or ``save_location`` is a string with non-zero
length, and/or ``overwrite`` is ``True``, a copy of each FITS file will
be made.
Parameters
----------
save_with_name : str
string added to end of file name (before extension) if
FITS file should be saved after iteration. Unless
`save_location` is set, files will be saved to location of
the source files `self.location`
save_location : str
Directory in which to save FITS files; implies that FITS
files will be saved. Note this provides an easy way to
copy a directory of files--loop over the {name} with
`save_location` set.
overwrite : bool
If ``True``, overwrite input FITS files.
do_not_scale_image_data : bool
If ``True``, prevents fits from scaling images. Default is
``{default_scaling}``.
return_fname : bool, default is False
If True, return the tuple (header, file_name) instead of just
header.
kwd : dict
Any additional keywords are used to filter the items returned; see
Examples for details.
Returns
-------
{return_type}
If ``return_fname`` is ``False``, yield the next {name} in the
collection
({return_type}, str)
If ``return_fname`` is ``True``, yield a tuple of
({name}, `file path`) for next the item in the collection.
"""
# store mask so we can reset at end--must COPY, otherwise
# current_mask just points to the mask of summary_info
if not self.summary_info:
return
current_mask = {}
for col in self.summary_info.columns:
current_mask[col] = self.summary_info[col].mask
if kwd:
self._find_keywords_by_values(**kwd)
for full_path in self._paths():
no_scale = do_not_scale_image_data
hdulist = fits.open(full_path,
do_not_scale_image_data=no_scale)
return_options = {'header': hdulist[0].header,
'hdu': hdulist[0],
'data': hdulist[0].data}
try:
yield (return_options[return_type] # pragma: no branch
if (not return_fname) else
(return_options[return_type], full_path))
except KeyError:
raise ValueError('No generator for {}'.format(return_type))
if save_location:
destination_dir = save_location
else:
destination_dir = path.dirname(full_path)
basename = path.basename(full_path)
if save_with_name:
base, ext = path.splitext(basename)
basename = base + save_with_name + ext
new_path = path.join(destination_dir, basename)
# I really should have called the option overwrite from
# the beginning. The hack below ensures old code works,
# at least...
nuke_existing = clobber or overwrite
if (new_path != full_path) or nuke_existing:
try:
hdulist.writeto(new_path, clobber=nuke_existing)
except IOError:
logger.error('Error writing file %s', new_path)
raise
hdulist.close()
# reset mask
for col in self.summary_info.columns:
self.summary_info[col].mask = current_mask[col]
def _paths(self):
"""
Full path to each file.
"""
unmasked_files = self.summary_info['file'].compressed()
return [path.join(self.location, file_) for file_ in unmasked_files]
def headers(self, do_not_scale_image_data=True, **kwd):
return self._generator('header',
do_not_scale_image_data=do_not_scale_image_data,
**kwd)
headers.__doc__ = _generator.__doc__.format(name='header',
default_scaling='True',
return_type='astropy.io.fits.Header')
def hdus(self, do_not_scale_image_data=False, **kwd):
return self._generator('hdu',
do_not_scale_image_data=do_not_scale_image_data,
**kwd)
hdus.__doc__ = _generator.__doc__.format(name='HDU',
default_scaling='False',
return_type='astropy.io.fits.HDU')
def data(self, do_not_scale_image_data=False, **kwd):
return self._generator('data',
do_not_scale_image_data=do_not_scale_image_data,
**kwd)
data.__doc__ = _generator.__doc__.format(name='image',
default_scaling='False',
return_type='numpy.ndarray')