Skip to content

Commit

Permalink
Merge 09b43c1 into 31b0b51
Browse files Browse the repository at this point in the history
  • Loading branch information
keflavich committed Feb 14, 2020
2 parents 31b0b51 + 09b43c1 commit d54613b
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 127 deletions.
232 changes: 108 additions & 124 deletions spectral_cube/io/casa_image.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import print_function, absolute_import, division

import os
import six
import uuid
import warnings
import tempfile
import shutil
Expand Down Expand Up @@ -64,120 +66,6 @@ def wcs_casa2astropy(ia, coordsys):
return WCS(tmpfitsfile)


class ArraylikeCasaData:

def __init__(self, filename, ia_kwargs={}):

try:
import casatools
self.iatool = casatools.image
tb = casatools.table()
except ImportError:
try:
from taskinit import iatool, tbtool
self.iatool = iatool
tb = tbtool()
except ImportError:
raise ImportError("Could not import CASA (casac) and therefore cannot read CASA .image files")


self.ia_kwargs = ia_kwargs

self.filename = filename

self._cache = {}

log.debug("Creating ArrayLikeCasa object")

# try to trick CASA into destroying the ia object
def getshape():
ia = self.iatool()
# use the ia tool to get the file contents
try:
ia.open(self.filename, cache=False)
except AssertionError as ex:
if 'must be of cReqPath type' in str(ex):
raise IOError("File {0} not found. Error was: {1}"
.format(self.filename, str(ex)))
else:
raise ex

self.shape = tuple(ia.shape()[::-1])
self.dtype = np.dtype(ia.pixeltype())

ia.done()
ia.close()

getshape()

self.ndim = len(self.shape)

tb.open(self.filename)
dminfo = tb.getdminfo()
tb.done()

# unclear if this is always the correct callspec!!!
# (transpose requires this be backwards)
self.chunksize = dminfo['*1']['SPEC']['DEFAULTTILESHAPE'][::-1]


log.debug("Finished with initialization of ArrayLikeCasa object")



def __getitem__(self, value):


log.debug(f"Retrieving slice {value} from {self}")

value = sanitize_slices(value[::-1], self.ndim)

# several cases:
# integer: just use an integer
# slice starting w/number: start number
# slice starting w/None: use -1
blc = [(-1 if slc.start is None else slc.start)
if hasattr(slc, 'start') else slc
for slc in value]
# slice ending w/number >= 1: end number -1 (CASA is end-inclusive)
# slice ending w/zero: use zero, not -1.
# slice ending w/negative: use it, but ?????
# slice ending w/None: use -1
trc = [((slc.stop-1 if slc.stop >= 1 else slc.stop)
if slc.stop is not None else -1)
if hasattr(slc, 'stop') else slc for slc in value]
inc = [(slc.step or 1) if hasattr(slc, 'step') else 1 for slc in value]


ia = self.iatool()
# use the ia tool to get the file contents
try:
ia.open(self.filename, cache=False)
except AssertionError as ex:
if 'must be of cReqPath type' in str(ex):
raise IOError("File {0} not found. Error was: {1}"
.format(self.filename, str(ex)))
else:
raise ex

log.debug(f'blc={blc}, trc={trc}, inc={inc}, kwargs={self.ia_kwargs}')
data = ia.getchunk(blc=blc, trc=trc, inc=inc, **self.ia_kwargs)
ia.done()
ia.close()

log.debug(f"Done retrieving slice {value} from {self}")

# keep all sliced axes but drop all integer axes
new_view = [slice(None) if isinstance(slc, slice) else 0
for slc in value]

transposed_data = data[tuple(new_view)].transpose()

log.debug(f"Done transposing data with view {new_view}")

return transposed_data


def load_casa_image(filename, skipdata=False,
skipvalid=False, skipcs=False, target_cls=None, **kwargs):
"""
Expand Down Expand Up @@ -214,19 +102,11 @@ def load_casa_image(filename, skipdata=False,

# read in the data
if not skipdata:
arrdata = ArraylikeCasaData(filename)
# CASA data are apparently transposed.
data = dask.array.from_array(arrdata,
chunks=arrdata.chunksize,
name=filename
)
data = casa_image_dask_reader(filename)

# CASA stores validity of data as a mask
if not skipvalid:
boolarr = ArraylikeCasaData(filename, ia_kwargs={'getmask': True})
valid = dask.array.from_array(boolarr, chunks=boolarr.chunksize,
name=filename+".mask"
)
valid = casa_image_dask_reader(filename, mask=True)

# transpose is dealt with within the cube object

Expand Down Expand Up @@ -356,6 +236,110 @@ def load_casa_image(filename, skipdata=False,
return normalize_cube_stokes(cube, target_cls=target_cls)


class MemmapWrapper:
"""
A wrapper class for dask that opens memmap only when __getitem__ is called,
which prevents issues with too many open files if opening a memmap for all
chunks at the same time.
"""

def __init__(self, filename, **kwargs):
self._filename = filename
self._kwargs = kwargs
self.shape = kwargs['shape'][::-1]
self.dtype = kwargs['dtype']
self.ndim = len(self.shape)

def __getitem__(self, item):
# We open a file manually and return an in-memory copy of the array
# otherwise the file doesn't get closed properly.
with open(self._filename) as f:
return np.memmap(f, mode='readonly', **self._kwargs).T[item].copy()


def from_array_fast(arrays, asarray=None, lock=False):
"""
This is a more efficient alternative to doing::
[dask.array.from_array(array) for array in arrays]
that avoids a lot of the overhead in from_array by using the Array
initializer directly.
"""
slices = tuple(slice(0, size) for size in arrays[0].shape)
chunk = tuple((size,) for size in arrays[0].shape)
meta = np.zeros((0,), dtype=arrays[0].dtype)
dask_arrays = []
for array in arrays:
name = str(uuid.uuid4())
dask_arrays.append(dask.array.Array({(name,) + (0,) * array.ndim: (dask.array.core.getter, name,
slices, asarray, lock),
name: array},
name, chunk, meta=meta))
return dask_arrays


def casa_image_dask_reader(imagename, memmap=True, mask=False):
"""
Read a CASA image (a folder containing a ``table.f0_TSM0`` file) into a
numpy array.
"""
from casatools import table
tb = table()

# load the metadata from the image table
tb.open(str(imagename))
dminfo = tb.getdminfo()
tb.close()

# chunkshape definse how the chunks (array subsets) are written to disk
chunkshape = tuple(dminfo['*1']['SPEC']['DEFAULTTILESHAPE'])
chunksize = np.product(chunkshape)

# the total size defines the final output array size
totalshape = dminfo['*1']['SPEC']['HYPERCUBES']['*1']['CubeShape']
totalsize = np.product(totalshape)

# the ratio between these tells you how many chunks must be combined
# to create a final stack
stacks = totalshape // chunkshape
nchunks = np.product(totalshape) // np.product(chunkshape)

# the data is stored in the following binary file
# each of the chunks is stored on disk in fortran-order
if mask:
img_fn = os.path.join(str(imagename), 'mask0', 'table.f0_TSM0')
else:
img_fn = os.path.join(str(imagename), 'table.f0_TSM0')

if memmap:
chunks = [MemmapWrapper(img_fn, dtype='float32', offset=ii*chunksize*4,
shape=chunkshape, order='F')
for ii in range(nchunks)]
else:
full_array = np.fromfile(img_fn, dtype='float32', count=totalsize)
chunks = [full_array[ii*chunksize:(ii+1)*chunksize].reshape(chunkshape, order='F').T
for ii in range(nchunks)]

# convert all chunks to dask arrays - and set name and meta appropriately
# to prevent dask trying to access the data to determine these
# automatically.
chunks = from_array_fast(chunks)

# make a nested list of all chunks then use block() to construct the final
# dask array.
def make_nested_list(chunks, stacks):
chunks = [chunks[i*stacks[0]:(i+1)*stacks[0]] for i in range(len(chunks) // stacks[0])]
if len(stacks) > 1:
return make_nested_list(chunks, stacks[1:])
else:
return chunks[0]

chunks = make_nested_list(chunks, stacks)

return dask.array.block(chunks)


io_registry.register_reader('casa', BaseSpectralCube, load_casa_image)
io_registry.register_reader('casa_image', BaseSpectralCube, load_casa_image)
io_registry.register_identifier('casa', BaseSpectralCube, is_casa_image)
Expand Down
45 changes: 42 additions & 3 deletions spectral_cube/tests/test_casafuncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
from astropy import units as u

from ..io.casa_masks import make_casa_mask
from ..io.casa_image import wcs_casa2astropy
from ..io.casa_image import wcs_casa2astropy, casa_image_dask_reader
from .. import SpectralCube, StokesSpectralCube, BooleanArrayMask, VaryingResolutionSpectralCube
from . import path

try:
import casatools
from casatools import image
casaOK = True
except ImportError:
try:
from taskinit import ia
from taskinit import ia as image
casaOK = True
except ImportError:
print("Run in CASA environment.")
Expand All @@ -33,7 +34,7 @@ def make_casa_testimage(infile, outname):
raise Exception("Attempted to make a CASA test image in a non-CASA "
"environment")

ia = casatools.image()
ia = image()

ia.fromfits(infile=infile, outfile=outname, overwrite=True)
ia.unlock()
Expand Down Expand Up @@ -63,6 +64,16 @@ def make_casa_testimage(infile, outname):
ia.done()


def make_casa_testimage_of_shape(shape, outfile):
ia = image()

size = np.product(shape)
im = np.arange(size).reshape(shape).T

ia.fromarray(outfile=str(outfile), pixels=im, overwrite=True)
ia.close()


@pytest.fixture
def filename(request):
return request.getfixturevalue(request.param)
Expand Down Expand Up @@ -96,6 +107,34 @@ def test_casa_read_stokes(data_advs, tmp_path):
assert casacube.I.shape == cube.I.shape


@pytest.mark.skipif(not casaOK, reason='CASA tests must be run in a CASA environment.')
@pytest.mark.parametrize('filename', ('data_adv', 'data_advs', 'data_sdav',
'data_vad', 'data_vsad'),
indirect=['filename'])
def test_casa_numpyreader_read(filename, tmp_path):

cube = SpectralCube.read(filename)

make_casa_testimage(filename, tmp_path / 'casa.image')

casacube = SpectralCube.read(tmp_path / 'casa.image')

assert casacube.shape == cube.shape


@pytest.mark.skipif(not casaOK, reason='CASA tests must be run in a CASA environment.')
@pytest.mark.parametrize('shape', ((129,128,130), (513,128,128), (128,513,128),
(128,128,513), (512,64,64)),
)
def test_casa_numpyreader_read_shape(shape, tmp_path):

make_casa_testimage_of_shape(shape, tmp_path / 'casa.image')

casacube = SpectralCube.read(tmp_path / 'casa.image')

assert casacube.shape == shape


@pytest.mark.skipif(not casaOK, reason='CASA tests must be run in a CASA environment.')
def test_casa_mask(data_adv, tmp_path):

Expand Down

0 comments on commit d54613b

Please sign in to comment.