Skip to content

Commit

Permalink
Merge pull request #663 from astrofrog/single-pass-stats
Browse files Browse the repository at this point in the history
Added initial implementation of global 'single-pass' statistics
  • Loading branch information
keflavich committed Oct 1, 2020
2 parents 650334f + 59265aa commit 1bd6feb
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -18,6 +18,9 @@ None yet
- Add new dask-friendly classes ``DaskSpectralCube`` and
``DaskVaryingResolutionSpectralCube`` which use dask to efficiently
carry out calculations. #618
- Add new ``statistics`` method on ``DaskSpectralCube`` which allows
several global statistics to be computed by loading each cube chunk
only once. #663

0.4.5 (2019-11-30)
------------------
Expand Down
22 changes: 22 additions & 0 deletions docs/dask.rst
Expand Up @@ -238,3 +238,25 @@ needs to include ``axis=0`` in the call to :func:`~astropy.stats.sigma_clip` as
[########################################] | 100% Completed | 56.8s

This leads to an improvement in performance of 1.8x in this case.

Efficient global statistics
---------------------------

If you are interested in computing a number of global statistics (e.g. min, max, mean)
for a whole cube, and want to avoid separate calls which would lead to the data being
read each time, it is also possible to compute these statistics in a way that each
chunk is accessed only once - this is done via the
:meth:`~spectral_cube.DaskSpectralCube.statistics` method which returns a dictionary
of statistics, which are named using the same convention as CASA's
`ia.statistics <https://casa.nrao.edu/Release4.1.0/doc/CasaRef/image.statistics.html>`_::

>>> stats = cube.statistics() # doctest: +IGNORE_OUTPUT
>>> sorted(stats)
['max', 'mean', 'min', 'npts', 'rms', 'sigma', 'sum', 'sumsq']
>>> stats['min']
<Quantity -0.01408793 Jy / beam>
>>> stats['mean']
<Quantity 0.00338361 Jy / beam>

This method should respect the current scheduler, so you may be able to get better performance
with a multi-threaded scheduler.
48 changes: 48 additions & 0 deletions spectral_cube/dask_spectral_cube.py
Expand Up @@ -707,6 +707,54 @@ def argmin(self, axis=None, **kwargs):
"""
return self._compute(da.nanargmin(self._get_filled_data(fill=np.inf), axis=axis))

@ignore_warnings
def statistics(self):
"""
Return a dictinary of global basic statistics for the data.
This method is designed to minimize the number of times each chunk is
accessed. The statistics are computed for each chunk in turn before
being aggregated.
The names for each statistic are adopted from CASA's ia.statistics
(see https://casa.nrao.edu/Release4.1.0/doc/CasaRef/image.statistics.html)
"""

data = self._get_filled_data(fill=np.nan)

try:
from bottleneck import nanmin, nanmax, nansum
except ImportError:
from numpy import nanmin, nanmax, nansum

def compute_stats(chunk, *args):
return np.array([np.sum(~np.isnan(chunk)),
nanmin(chunk),
nanmax(chunk),
nansum(chunk),
nansum(chunk * chunk)])

with dask.config.set(**self._scheduler_kwargs):
results = da.map_blocks(compute_stats, data.reshape(-1)).compute()

count_values, min_values, max_values, sum_values, ssum_values = results.reshape((-1, 5)).T

stats = {'npts': count_values.sum(),
'min': min_values.min() * self._unit,
'max': max_values.max() * self._unit,
'sum': sum_values.sum() * self._unit,
'sumsq': ssum_values.sum() * self._unit ** 2}

stats['mean'] = stats['sum'] / stats['npts']

# FIXME: for now this uses the simple 'textbook' algorithm which is not
# numerically stable, so this should be replaced by a more robust approach
stats['sigma'] = ((stats['sumsq'] - stats['sum'] ** 2 / stats['npts']) / (stats['npts'] - 1)) ** 0.5

stats['rms'] = np.sqrt(stats['sumsq'] / stats['npts'])

return stats

def _map_blocks_to_cube(self, function, additional_arrays=None, fill=np.nan, rechunk=None, **kwargs):
"""
Call dask's map_blocks, returning a new spectral cube.
Expand Down
52 changes: 52 additions & 0 deletions spectral_cube/tests/test_dask.py
Expand Up @@ -2,7 +2,23 @@

import pytest

from numpy.testing import assert_allclose
from astropy.tests.helper import assert_quantity_allclose
from astropy import units as u

from spectral_cube import DaskSpectralCube
from .test_casafuncs import make_casa_testimage

try:
import casatools
from casatools import image
CASA_INSTALLED = True
except ImportError:
try:
from taskinit import ia as image
CASA_INSTALLED = True
except ImportError:
CASA_INSTALLED = False


class Array:
Expand Down Expand Up @@ -58,3 +74,39 @@ def test_rechunk(data_adv):
# note last element is 2 because the chunk size we asked for
# is larger than cube - this is fine and deliberate in this test
assert cube_new._data.chunksize == (1, 2, 2)


def test_statistics(data_adv):
cube = DaskSpectralCube.read(data_adv).rechunk(chunks=(1, 2, 3))
stats = cube.statistics()
assert_quantity_allclose(stats['npts'], 24)
assert_quantity_allclose(stats['mean'], 0.4941651776136591 * u.K)
assert_quantity_allclose(stats['sigma'], 0.3021908870982011 * u.K)
assert_quantity_allclose(stats['sum'], 11.85996426272782 * u.K)
assert_quantity_allclose(stats['sumsq'], 7.961125988022091 * u.K ** 2)
assert_quantity_allclose(stats['min'], 0.0363300285196364 * u.K)
assert_quantity_allclose(stats['max'], 0.9662900439556562 * u.K)
assert_quantity_allclose(stats['rms'], 0.5759458158839716 * u.K)


@pytest.mark.skipif(not CASA_INSTALLED, reason='Requires CASA to be installed')
def test_statistics_consistency_casa(data_adv, tmp_path):

# Similar to test_statistics but compares to CASA directly.

cube = DaskSpectralCube.read(data_adv)
stats = cube.statistics()

make_casa_testimage(data_adv, tmp_path / 'casa.image')

ia = casatools.image()
ia.open(str(tmp_path / 'casa.image'))
stats_casa = ia.statistics()
ia.close()

for key in stats:
if isinstance(stats[key], u.Quantity):
value = stats[key].value
else:
value = stats[key]
assert_allclose(value, stats_casa[key])

0 comments on commit 1bd6feb

Please sign in to comment.