Skip to content

Commit

Permalink
Merge d216902 into 1497c07
Browse files Browse the repository at this point in the history
  • Loading branch information
e-koch committed Aug 25, 2020
2 parents 1497c07 + d216902 commit a9ad8bb
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 101 deletions.
107 changes: 78 additions & 29 deletions spectral_cube/base_class.py
Expand Up @@ -498,7 +498,7 @@ def goodbeams_mask(self, value):

self._goodbeams_mask = value

def identify_bad_beams(self, threshold, reference_beam=None,
def identify_bad_beams(self, threshold=None, reference_beam=None,
criteria=['sr','major','minor'],
mid_value=np.nanmedian):
"""
Expand All @@ -507,8 +507,10 @@ def identify_bad_beams(self, threshold, reference_beam=None,
Parameters
----------
threshold : float
Fractional threshold
threshold : float, optional
The fractional difference between beam major, minor, and pa to
permit. The default is to `~SpectralCube.beam_threshold`, which is initially set
to 0.01 (i.e., <1% changes in the beam area are allowed).
reference_beam : Beam
A beam to use as the reference. If unspecified, ``mid_value`` will
be used to select a middle beam
Expand All @@ -525,6 +527,9 @@ def identify_bad_beams(self, threshold, reference_beam=None,
A boolean array where ``True`` indicates the good beams
"""

if threshold is None:
threshold = self.beam_threshold

includemask = np.ones(self.unmasked_beams.size, dtype='bool')

all_criteria = {'sr','major','minor','pa'}
Expand All @@ -541,6 +546,8 @@ def identify_bad_beams(self, threshold, reference_beam=None,
pa=mid_value(props['pa'])
)

# Change to deviation in areas with respect to a pixel area.

for prop in criteria:
val = props[prop]
mid = getattr(reference_beam, prop)
Expand All @@ -553,31 +560,41 @@ def identify_bad_beams(self, threshold, reference_beam=None,

return includemask

def average_beams(self, threshold, mask='compute', warn=False):
def set_common_beam(self,
threshold=None, mask='compute', warn=False,
combeam_kwargs={}):
"""
Average the beams. Note that this operation only makes sense in
limited contexts! Generally one would want to convolve all the beams
to a common shape, but this method is meant to handle the "simple" case
when all your beams are the same to within some small factor and can
therefore be arithmetically averaged.
Set the common beam for operations.
Many cubes will have a beam that varies by a small factor (less than a single
spatial pixel area) across spectral channels. In that case, this method will
handle avoid spatially-convolving to an exact common beam, which is an expensive
operation. To avoid this behaviour, a cube should be convolved to a common beam
prior to applying further operations.
Parameters
----------
threshold : float
threshold : float, optional
The fractional difference between beam major, minor, and pa to
permit
permit. The default is to `~SpectralCube.beam_threshold`, which is initially set
to 0.01 (i.e., <1% changes in the beam area are allowed).
mask : 'compute', None, or boolean array
The mask to apply to the beams. Useful for excluding bad channels
and edge beams.
warn : bool
Warn if successful?
combeam_kwargs : dict
Additional kwargs for the common beam algorithm. See `~radio_beam.Beams.common_beam`.
Returns
-------
new_beam : radio_beam.Beam
A new radio beam object that is the average of the unmasked beams
"""

if threshold is None:
threshold = self.beam_threshold

use_dask = isinstance(self._data, da.Array)

if mask == 'compute':
Expand All @@ -601,24 +618,29 @@ def average_beams(self, threshold, mask='compute', warn=False):

# use private _beams here because the public one excludes the bad beams
# by default
new_beam = self._beams.average_beam(includemask=beam_mask)
# new_beam = self._beams.average_beam(includemask=beam_mask)
new_beam = self._beams.common_beam(includemask=beam_mask, **combeam_kwargs)

if np.isnan(new_beam):
raise ValueError("Beam was not finite after averaging. "
raise ValueError("Common beam is not finite. "
"This either indicates that there was a problem "
"with the include mask, one of the beam's values, "
"or a bug.")

self._check_beam_areas(threshold, mean_beam=new_beam, mask=beam_mask)
if warn:
warnings.warn("Arithmetic beam averaging is being performed. This is "
"not a mathematically robust operation, but is being "
"permitted because the beams differ by "
"<{0}".format(threshold),
BeamAverageWarning
)
return new_beam
# This will now print a warning describing whether a common beam convolution will
# be triggered. Or if small beam variations will be ignored.
self._check_beam_areas(threshold, new_beam, mask=beam_mask, raise_error=False)

self._common_beam = new_beam

@property
def common_beam(self):

if not hasattr(self, '_common_beam'):
# Compute the common beam with the default parameters if not set.
self.set_common_beam()

return self._common_beam

def _handle_beam_areas_wrapper(self, function, beam_threshold=None):
"""
Expand Down Expand Up @@ -652,7 +674,7 @@ def newfunc(*args, **kwargs):
if need_to_handle_beams:
# do this check *first* so we don't do an expensive operation
# and crash afterward
avg_beam = self.average_beams(beam_threshold, warn=True)
self._check_beam_areas(self.beam_threshold, self.common_beam)

result = function(*args, **kwargs)

Expand All @@ -661,14 +683,15 @@ def newfunc(*args, **kwargs):
return result

elif need_to_handle_beams:
result.meta['beam'] = avg_beam
result._beam = avg_beam
result.meta['beam'] = self.common_beam
result._beam = self.common_beam

return result

return newfunc

def _check_beam_areas(self, threshold, mean_beam, mask=None):
def _check_beam_areas(self, threshold, common_beam, mask=None,
raise_error=True):
"""
Check that the beam areas are the same to within some threshold
"""
Expand All @@ -691,7 +714,7 @@ def _check_beam_areas(self, threshold, mean_beam, mask=None):
for (qtyname, qty) in (qtys.items()):
minv = qty[mask].min()
maxv = qty[mask].max()
mn = getattr(mean_beam, qtyname)
mn = getattr(common_beam, qtyname)
maxdiff = (np.max(np.abs(u.Quantity((maxv-mn, minv-mn))))/mn).decompose()

if isinstance(threshold, dict):
Expand All @@ -706,20 +729,46 @@ def _check_beam_areas(self, threshold, mean_beam, mask=None):
qtyname
))
if errormessage != "":
raise ValueError(errormessage)

def mask_out_bad_beams(self, threshold, reference_beam=None,
if raise_error:
raise ValueError(f"{errormessage}\nConvolve to a common beam before applying any"
" spectral operation.")

else:
warnings.warn(errormessage)
warnings.warn("Convolution to a common beam will be triggered at an intermediate level."
" To avoid this step, first convolve the spectral-cube to a common beam size.")

else:
warnings.warn("Small beam differences are being ignored in this operation. "
" Beams differ by <{0}".format(threshold) +
" If this behavior is not desired, convolve to a common beam first.",
BeamAverageWarning
)

def mask_out_bad_beams(self, threshold=None, reference_beam=None,
criteria=['sr','major','minor'],
mid_value=np.nanmedian):
"""
See `identify_bad_beams`. This function returns a masked cube
Parameters
----------
threshold : float, optional
The fractional difference between beam major, minor, and pa to
permit. The default is to `~SpectralCube.beam_threshold`, which is initially set
to 0.01 (i.e., <1% changes in the beam area are allowed).
Returns
-------
newcube : VaryingResolutionSpectralCube
The cube with bad beams masked out
"""

if threshold is None:
threshold = self.beam_threshold

goodbeams = self.identify_bad_beams(threshold=threshold,
reference_beam=reference_beam,
criteria=criteria,
Expand Down
29 changes: 29 additions & 0 deletions spectral_cube/conftest.py
Expand Up @@ -72,6 +72,21 @@ def prepare_4_beams():
return beams


def prepare_4_similar_beams():
# These are chosen such that the common beam areas differs by <2%
# This is to test avoiding common beam operations for small differences.
beams = np.recarray(4, dtype=[('BMAJ', '>f4'), ('BMIN', '>f4'),
('BPA', '>f4'), ('CHAN', '>i4'),
('POL', '>i4')])
beams['BMAJ'] = [0.420,0.419,0.418,0.419] # arcseconds
beams['BMIN'] = [0.200,0.200,0.200,0.199]
beams['BPA'] = [0,0,0,0] # degrees
beams['CHAN'] = [0,1,2,3]
beams['POL'] = [0,0,0,0]
beams = fits.BinTableHDU(beams)
return beams


def prepare_advs_data():
# Single Stokes
h = fits.header.Header.fromtextfile(HEADER_FILENAME)
Expand Down Expand Up @@ -282,6 +297,20 @@ def data_vda_beams(tmp_path):
return tmp_path / 'vda_beams.fits'


@pytest.fixture
def data_vda_similarbeams(tmp_path):
d, h = prepare_adv_data()
d, h = transpose(d, h, [2, 0, 1])
d, h = transpose(d, h, [2, 1, 0])
h['BUNIT'] = ' Jy / beam '
del h['BMAJ'], h['BMIN'], h['BPA']
beams = prepare_4_similar_beams()
hdul = fits.HDUList([fits.PrimaryHDU(data=d, header=h),
beams])
hdul.writeto(tmp_path / 'vda_similarbeams.fits')
return tmp_path / 'vda_similarbeams.fits'


def prepare_255_header():
# make a version with spatial pixels
h = fits.header.Header.fromtextfile(HEADER_FILENAME)
Expand Down
3 changes: 3 additions & 0 deletions spectral_cube/dask_spectral_cube.py
Expand Up @@ -121,6 +121,8 @@ def wrapper(self, *args, **kwargs):

if axis is None:

# TODO: avoid statistics for beam dependent units (e.g. Jy/beam)

# return is scalar
if unit is not None:
return u.Quantity(out, unit=unit)
Expand Down Expand Up @@ -281,6 +283,7 @@ def __exit__(self, *args):
return SchedulerHandler(self, original_scheduler_kwargs)

def _compute(self, array):

return array.compute(**self._scheduler_kwargs)

def _warn_slow(self, funcname):
Expand Down
3 changes: 2 additions & 1 deletion spectral_cube/io/tests/test_casa_low_level_io.py
Expand Up @@ -26,7 +26,8 @@
'data_adv_jybeam_whitespace', 'data_adv_beams',
'data_vad', 'data_vda', 'data_vda_jybeam_upper',
'data_vda_jybeam_lower', 'data_vda_jybeam_whitespace',
'data_vda_beams', 'data_255', 'data_255_delta',
'data_vda_beams', 'data_vda_similarbeams',
'data_255', 'data_255_delta',
# 'data_455_delta_beams',
'data_522_delta',
# 'data_522_delta_beams'
Expand Down
46 changes: 4 additions & 42 deletions spectral_cube/spectral_cube.py
Expand Up @@ -3768,47 +3768,6 @@ def _new_cube_with(self, goodbeams_mask=None, **kwargs):

_new_cube_with.__doc__ = BaseSpectralCube._new_cube_with.__doc__

def _check_beam_areas(self, threshold, mean_beam, mask=None):
"""
Check that the beam areas are the same to within some threshold
"""

if mask is not None:
assert len(mask) == len(self.unmasked_beams)
mask = np.array(mask, dtype='bool')
else:
mask = np.ones(len(self.unmasked_beams), dtype='bool')

qtys = dict(sr=self.unmasked_beams.sr,
major=self.unmasked_beams.major.to(u.deg),
minor=self.unmasked_beams.minor.to(u.deg),
# position angles are not really comparable
#pa=u.Quantity([bm.pa for bm in self.unmasked_beams], u.deg),
)

errormessage = ""

for (qtyname, qty) in (qtys.items()):
minv = qty[mask].min()
maxv = qty[mask].max()
mn = getattr(mean_beam, qtyname)
maxdiff = (np.max(np.abs(u.Quantity((maxv-mn, minv-mn))))/mn).decompose()

if isinstance(threshold, dict):
th = threshold[qtyname]
else:
th = threshold

if maxdiff > th:
errormessage += ("Beam {2}s differ by up to {0}x, which is greater"
" than the threshold {1}\n".format(maxdiff,
threshold,
qtyname
))
if errormessage != "":
raise ValueError(errormessage)


def __getattribute__(self, attrname):
"""
For any functions that operate over the spectral axis, perform beam
Expand All @@ -3823,7 +3782,10 @@ def __getattribute__(self, attrname):
# called by some of these, maybe *only* those should be wrapped to
# avoid redundant calls
if attrname in ('moment', 'apply_numpy_function', 'apply_function',
'apply_function_parallel_spectral'):
'apply_function_parallel_spectral', 'sum',
'mean', 'median', 'percentile', 'std', 'mad_std',
'max', 'min', 'argmax', 'argmin'):

origfunc = super(VRSC, self).__getattribute__(attrname)
return self._handle_beam_areas_wrapper(origfunc)
else:
Expand Down

0 comments on commit a9ad8bb

Please sign in to comment.