Skip to content

Commit

Permalink
Revert "Issue 176 (#194)"
Browse files Browse the repository at this point in the history
This reverts commit e1f024c.
  • Loading branch information
JSKenyon committed Apr 6, 2022
1 parent e1f024c commit 08cd86f
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 60 deletions.
45 changes: 9 additions & 36 deletions daskms/apps/convert.py
Expand Up @@ -152,15 +152,10 @@ def is_measurement_set(self):
def reader(self, **kw):
try:
group_cols = kw.pop("group_columns", None)
index_cols = kw.pop("index_columns", None)

if self.is_measurement_set():
from daskms import xds_from_ms
return partial(
xds_from_ms,
group_cols=group_cols,
index_cols=index_cols
)
return partial(xds_from_ms, group_cols=group_cols)
else:
from daskms import xds_from_table
return xds_from_table
Expand Down Expand Up @@ -201,13 +196,9 @@ def subtables(self):

def reader(self, **kw):
group_columns = kw.pop("group_columns", False)
index_columns = kw.pop("index_columns", False)

if group_columns:
raise ValueError("\"group_columns\" is not supported "
"for zarr inputs")
if index_columns:
raise ValueError("\"index_columns\" is not supported "
raise ValueError("\"group_column\" is not supported "
"for zarr inputs")
try:
from daskms.experimental.zarr import xds_from_zarr
Expand Down Expand Up @@ -242,14 +233,10 @@ def subtables(self):

def reader(self, **kw):
group_columns = kw.pop("group_columns", False)
index_columns = kw.pop("index_columns", False)

if group_columns:
raise ValueError("\"group_column\" is not supported "
"for parquet inputs")
if index_columns:
raise ValueError("\"index_columns\" is not supported "
"for parquet inputs")
"for zarr inputs")

try:
from daskms.experimental.arrow.reads import xds_from_parquet
Expand All @@ -272,10 +259,7 @@ def convert_table(args):
in_fmt = TableFormat.from_path(args.input)
out_fmt = TableFormat.from_type(args.format)

reader = in_fmt.reader(
group_columns=args.group_columns,
index_columns=args.index_columns
)
reader = in_fmt.reader(group_columns=args.group_columns)
writer = out_fmt.writer()

datasets = reader(args.input, chunks=args.chunks)
Expand Down Expand Up @@ -346,34 +330,23 @@ def __init__(self, args, log):
self.args = args

@staticmethod
def col_converter(columns):
if not columns:
def group_col_converter(group_columns):
if not group_columns:
return None

return [c.strip() for c in columns.split(",")]
return [c.strip() for c in group_columns.split(",")]

@classmethod
def setup_parser(cls, parser):
parser.add_argument("input", type=_check_input_path)
parser.add_argument("-o", "--output", type=Path)
parser.add_argument("-g", "--group-columns",
type=Convert.col_converter,
type=Convert.group_col_converter,
default="",
help="Columns to group or partition "
"the input dataset by. "
"This defaults to the default "
"for the underlying storage mechanism."
"This is only supported when converting "
"from casa format.")
parser.add_argument("-i", "--index-columns",
type=Convert.col_converter,
default="",
help="Columns to sort "
"the input dataset by. "
"This defaults to the default "
"for the underlying storage mechanism."
"This is only supported when converting "
"from casa format.")
"for the underlying storage mechanism")
parser.add_argument("-f", "--format",
choices=["casa", "zarr", "parquet"],
default="zarr",
Expand Down
23 changes: 1 addition & 22 deletions daskms/experimental/arrow/extension_types.py
Expand Up @@ -33,9 +33,6 @@ def _tensor_to_array(obj, pa_dtype):
child_array = pa.ExtensionArray.from_storage(pa_dtype, storage)
elif pa_dtype == pa.string():
child_array = pa.array(list(flatten(obj.tolist())))
elif pa_dtype == pa.bool_():
flat_array = obj.ravel()
child_array = pa.array(flat_array)
else:
child_buf = pa.py_buffer(obj)
child_array = pa.Array.from_buffers(pa_dtype, total_elements,
Expand Down Expand Up @@ -127,8 +124,7 @@ def to_numpy(self, zero_copy_only=True, writeable=False):

shape = (len(self),) + self.type.shape
storage_list_type = self.storage.type
value_type = storage_list_type.value_type
dtype = value_type.to_pandas_dtype()
dtype = storage_list_type.value_type.to_pandas_dtype()
bufs = self.storage.buffers()

# string case
Expand All @@ -138,23 +134,6 @@ def to_numpy(self, zero_copy_only=True, writeable=False):
return np.array(self.storage.tolist(), dtype=object)
elif np.issubdtype(dtype, np.complexfloating):
return np.ndarray(shape, buffer=bufs[4], dtype=dtype)
elif pa.types.is_boolean(value_type):
# The following accounts for the fact that booleans are stored as
# bits in arrow but are represented as bytes in python. NOTE: This
# may be slower than other types as it is not zero-copy.
numpy_size = np.prod(shape)
arrow_size = int(np.ceil(numpy_size / 8)) # 8 bits in a byte.
packed_array = np.ndarray(
arrow_size,
buffer=bufs[3],
dtype=np.uint8
)
unpacked_array = np.unpackbits(
packed_array,
count=numpy_size,
bitorder='little'
)
return unpacked_array.view(np.bool_).reshape(shape)
else:
return np.ndarray(shape, buffer=bufs[3], dtype=dtype)

Expand Down
2 changes: 0 additions & 2 deletions daskms/experimental/arrow/tests/test_parquet.py
Expand Up @@ -42,14 +42,12 @@ def test_parquet_roundtrip(tmp_path_factory):
shape = (nrow, nchan, ncorr)
data = np.random.random(shape) + np.random.random(shape)*1j
uvw = np.random.random((nrow, 3))
flag = np.random.randint(0, 2, shape).astype(np.bool_)

columns = {
"TIME": time,
"ANTENNA1": ant1,
"ANTENNA2": ant2,
"UVW": uvw,
"FLAG": flag,
"DATA": data,
}

Expand Down

0 comments on commit 08cd86f

Please sign in to comment.