Skip to content

Commit

Permalink
Merge 171b263 into 31b0b51
Browse files Browse the repository at this point in the history
  • Loading branch information
keflavich committed Feb 19, 2020
2 parents 31b0b51 + 171b263 commit de5aba7
Show file tree
Hide file tree
Showing 3 changed files with 327 additions and 156 deletions.
334 changes: 199 additions & 135 deletions spectral_cube/io/casa_image.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import print_function, absolute_import, division

import os
import six
from math import ceil, floor
import uuid
import warnings
import tempfile
import shutil
Expand Down Expand Up @@ -35,8 +38,11 @@ def is_casa_image(origin, filepath, fileobj, *args, **kwargs):

# See note before StringWrapper definition
from .core import StringWrapper
if len(args) > 0 and isinstance(args[0], StringWrapper):
filepath = args[0].value
if filepath is None and len(args) > 0:
if isinstance(args[0], StringWrapper):
filepath = args[0].value
elif isinstance(args[0], str):
filepath = args[0]

return filepath is not None and filepath.lower().endswith('.image')

Expand Down Expand Up @@ -64,120 +70,6 @@ def wcs_casa2astropy(ia, coordsys):
return WCS(tmpfitsfile)


class ArraylikeCasaData:

def __init__(self, filename, ia_kwargs={}):

try:
import casatools
self.iatool = casatools.image
tb = casatools.table()
except ImportError:
try:
from taskinit import iatool, tbtool
self.iatool = iatool
tb = tbtool()
except ImportError:
raise ImportError("Could not import CASA (casac) and therefore cannot read CASA .image files")


self.ia_kwargs = ia_kwargs

self.filename = filename

self._cache = {}

log.debug("Creating ArrayLikeCasa object")

# try to trick CASA into destroying the ia object
def getshape():
ia = self.iatool()
# use the ia tool to get the file contents
try:
ia.open(self.filename, cache=False)
except AssertionError as ex:
if 'must be of cReqPath type' in str(ex):
raise IOError("File {0} not found. Error was: {1}"
.format(self.filename, str(ex)))
else:
raise ex

self.shape = tuple(ia.shape()[::-1])
self.dtype = np.dtype(ia.pixeltype())

ia.done()
ia.close()

getshape()

self.ndim = len(self.shape)

tb.open(self.filename)
dminfo = tb.getdminfo()
tb.done()

# unclear if this is always the correct callspec!!!
# (transpose requires this be backwards)
self.chunksize = dminfo['*1']['SPEC']['DEFAULTTILESHAPE'][::-1]


log.debug("Finished with initialization of ArrayLikeCasa object")



def __getitem__(self, value):


log.debug(f"Retrieving slice {value} from {self}")

value = sanitize_slices(value[::-1], self.ndim)

# several cases:
# integer: just use an integer
# slice starting w/number: start number
# slice starting w/None: use -1
blc = [(-1 if slc.start is None else slc.start)
if hasattr(slc, 'start') else slc
for slc in value]
# slice ending w/number >= 1: end number -1 (CASA is end-inclusive)
# slice ending w/zero: use zero, not -1.
# slice ending w/negative: use it, but ?????
# slice ending w/None: use -1
trc = [((slc.stop-1 if slc.stop >= 1 else slc.stop)
if slc.stop is not None else -1)
if hasattr(slc, 'stop') else slc for slc in value]
inc = [(slc.step or 1) if hasattr(slc, 'step') else 1 for slc in value]


ia = self.iatool()
# use the ia tool to get the file contents
try:
ia.open(self.filename, cache=False)
except AssertionError as ex:
if 'must be of cReqPath type' in str(ex):
raise IOError("File {0} not found. Error was: {1}"
.format(self.filename, str(ex)))
else:
raise ex

log.debug(f'blc={blc}, trc={trc}, inc={inc}, kwargs={self.ia_kwargs}')
data = ia.getchunk(blc=blc, trc=trc, inc=inc, **self.ia_kwargs)
ia.done()
ia.close()

log.debug(f"Done retrieving slice {value} from {self}")

# keep all sliced axes but drop all integer axes
new_view = [slice(None) if isinstance(slc, slice) else 0
for slc in value]

transposed_data = data[tuple(new_view)].transpose()

log.debug(f"Done transposing data with view {new_view}")

return transposed_data


def load_casa_image(filename, skipdata=False,
skipvalid=False, skipcs=False, target_cls=None, **kwargs):
"""
Expand Down Expand Up @@ -214,19 +106,16 @@ def load_casa_image(filename, skipdata=False,

# read in the data
if not skipdata:
arrdata = ArraylikeCasaData(filename)
# CASA data are apparently transposed.
data = dask.array.from_array(arrdata,
chunks=arrdata.chunksize,
name=filename
)
data = casa_image_dask_reader(filename)

# CASA stores validity of data as a mask
if not skipvalid:
boolarr = ArraylikeCasaData(filename, ia_kwargs={'getmask': True})
valid = dask.array.from_array(boolarr, chunks=boolarr.chunksize,
name=filename+".mask"
)
if skipvalid:
valid = None
else:
try:
valid = casa_image_dask_reader(filename, mask=True)
except FileNotFoundError:
valid = None

# transpose is dealt with within the cube object

Expand Down Expand Up @@ -305,9 +194,13 @@ def load_casa_image(filename, skipdata=False,

if wcs.naxis == 3:
data, wcs_slice = cube_utils._orient(data, wcs)
valid, _ = cube_utils._orient(valid, wcs)

mask = BooleanArrayMask(valid, wcs_slice)
if valid is not None:
valid, _ = cube_utils._orient(valid, wcs)
mask = BooleanArrayMask(valid, wcs_slice)
else:
mask = None

if 'beam' in locals():
cube = SpectralCube(data, wcs_slice, mask, meta=meta, beam=beam)
elif 'beams' in locals():
Expand All @@ -318,16 +211,21 @@ def load_casa_image(filename, skipdata=False,
# we've already loaded the cube into memory because of CASA
# limitations, so there's no reason to disallow operations
# cube.allow_huge_operations = True
assert cube.mask.shape == cube.shape
if mask is not None:
assert cube.mask.shape == cube.shape

elif wcs.naxis == 4:
valid, _ = cube_utils._split_stokes(valid, wcs)
if valid is not None:
valid, _ = cube_utils._split_stokes(valid, wcs)
data, wcs = cube_utils._split_stokes(data, wcs)
mask = {}
for component in data:
data_, wcs_slice = cube_utils._orient(data[component], wcs)
valid_, _ = cube_utils._orient(valid[component], wcs)
mask[component] = BooleanArrayMask(valid_, wcs_slice)
if valid is not None:
valid_, _ = cube_utils._orient(valid[component], wcs)
mask[component] = BooleanArrayMask(valid_, wcs_slice)
else:
mask[component] = None

if 'beam' in locals():
data[component] = SpectralCube(data_, wcs_slice, mask[component],
Expand All @@ -346,8 +244,9 @@ def load_casa_image(filename, skipdata=False,


cube = StokesSpectralCube(stokes_data=data)
assert cube.I.mask.shape == cube.shape
assert wcs_utils.check_equality(cube.I.mask._wcs, cube.wcs)
if mask is not None:
assert cube.I.mask.shape == cube.shape
assert wcs_utils.check_equality(cube.I.mask._wcs, cube.wcs)
else:
raise ValueError("CASA image has {0} dimensions, and therefore "
"is not readable by spectral-cube.".format(wcs.naxis))
Expand All @@ -356,6 +255,171 @@ def load_casa_image(filename, skipdata=False,
return normalize_cube_stokes(cube, target_cls=target_cls)


class MemmapWrapper:
"""
A wrapper class for dask that opens memmap only when __getitem__ is called,
which prevents issues with too many open files if opening a memmap for all
chunks at the same time.
"""

def __init__(self, filename, **kwargs):
self._filename = filename
self._kwargs = kwargs
self.shape = kwargs['shape'][::-1]
self.dtype = kwargs['dtype']
self.ndim = len(self.shape)

def __getitem__(self, item):
# We open a file manually and return an in-memory copy of the array
# otherwise the file doesn't get closed properly.
with open(self._filename) as f:
return np.memmap(f, mode='readonly', order='F', **self._kwargs).T[item].copy()


class MaskWrapper:
"""
A wrapper class for dask that opens binary masks from file and returns
a chunk from it on-the-fly.
"""

def __init__(self, filename, offset, count, shape):
self._filename = filename
self._offset = offset
self._count = count
self.shape = shape[::-1]
self.dtype = np.bool_
self.ndim = len(self.shape)

def __getitem__(self, item):
# The offset is in the final bit array - but fromfile needs to operate
# by reading in uint8, so we need to make sure we align what we read
# in to the bytes.
start = floor(self._offset / 8)
end = ceil((self._offset + self._count) / 8)
array_uint8 = np.fromfile(self._filename, dtype=np.uint8,
offset=start, count=end - start)
array_bits = np.unpackbits(array_uint8, bitorder='little')
chunk = array_bits[self._offset - start * 8:self._offset + self._count - start * 8]
return chunk.astype(np.bool_).reshape(self.shape[::-1], order='F').T[item]


def from_array_fast(arrays, asarray=False, lock=False):
"""
This is a more efficient alternative to doing::
[dask.array.from_array(array) for array in arrays]
that avoids a lot of the overhead in from_array by using the Array
initializer directly.
"""
slices = tuple(slice(0, size) for size in arrays[0].shape)
chunk = tuple((size,) for size in arrays[0].shape)
meta = np.zeros((0,), dtype=arrays[0].dtype)
dask_arrays = []
for array in arrays:
name1 = str(uuid.uuid4())
name2 = str(uuid.uuid4())
dsk = {(name1,) + (0,) * array.ndim: (dask.array.core.getter, name2,
slices, asarray, lock),
name2: array}
dask_arrays.append(dask.array.Array(dsk, name1, chunk, meta=meta, dtype=array.dtype))
return dask_arrays


def casa_image_dask_reader(imagename, memmap=True, mask=False):
"""
Read a CASA image (a folder containing a ``table.f0_TSM0`` file) into a
numpy array.
"""
from casatools import table
tb = table()

# the data is stored in the following binary file
# each of the chunks is stored on disk in fortran-order
if mask:
if mask is True:
mask = 'mask0'
imagename = os.path.join(str(imagename), mask)

if not os.path.exists(imagename):
raise FileNotFoundError(imagename)

# the data is stored in the following binary file
# each of the chunks is stored on disk in fortran-order
img_fn = os.path.join(str(imagename), 'table.f0_TSM0')

# load the metadata from the image table
tb.open(str(imagename))
dminfo = tb.getdminfo()
tb.close()

# chunkshape definse how the chunks (array subsets) are written to disk
chunkshape = tuple(dminfo['*1']['SPEC']['DEFAULTTILESHAPE'])
chunksize = np.product(chunkshape)

# the total size defines the final output array size
totalshape = dminfo['*1']['SPEC']['HYPERCUBES']['*1']['CubeShape']
totalsize = np.product(totalshape)

# the file size helps us figure out what the dtype of the array is
filesize = os.stat(img_fn).st_size

# check that the file size is as expected and determine the data dtype
if mask:
if filesize != ceil(totalsize / 8):
raise ValueError("Unexpected file size for mask, found {0} but "
"expected {1}".format(filesize, ceil(totalsize / 8)))
else:
if filesize == totalsize * 4:
dtype = np.float32
elif filesize == totalsize * 8:
dtype = np.float64
else:
raise ValueError("Unexpected file size for data, found {0} but "
"expected {1} or {2}".format(filesize, totalsize * 4, totalsize * 8))

# the ratio between these tells you how many chunks must be combined
# to create a final stack
stacks = totalshape // chunkshape
nchunks = np.product(totalshape) // np.product(chunkshape)

if memmap:
if mask:
chunks = [MaskWrapper(img_fn, offset=ii*chunksize, count=chunksize,
shape=chunkshape)
for ii in range(nchunks)]
else:
chunks = [MemmapWrapper(img_fn, dtype=dtype, offset=ii*chunksize*4,
shape=chunkshape)
for ii in range(nchunks)]
else:
if mask:
full_array = np.fromfile(img_fn, dtype='uint8')
full_array = np.unpackbits(full_array, bitorder='little').astype(np.bool_)
else:
full_array = np.fromfile(img_fn, dtype=dtype)
chunks = [full_array[ii*chunksize:(ii+1)*chunksize].reshape(chunkshape, order='F').T
for ii in range(nchunks)]

# convert all chunks to dask arrays - and set name and meta appropriately
# to prevent dask trying to access the data to determine these
# automatically.
chunks = from_array_fast(chunks)

# make a nested list of all chunks then use block() to construct the final
# dask array.
def make_nested_list(chunks, stacks):
chunks = [chunks[i*stacks[0]:(i+1)*stacks[0]] for i in range(len(chunks) // stacks[0])]
if len(stacks) > 1:
return make_nested_list(chunks, stacks[1:])
else:
return chunks[0]

chunks = make_nested_list(chunks, stacks)

return dask.array.block(chunks)


io_registry.register_reader('casa', BaseSpectralCube, load_casa_image)
io_registry.register_reader('casa_image', BaseSpectralCube, load_casa_image)
io_registry.register_identifier('casa', BaseSpectralCube, is_casa_image)
Expand Down

0 comments on commit de5aba7

Please sign in to comment.