Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: figuring out how to implement ak.from_cudf #3027

Merged
merged 5 commits into from
Feb 16, 2024

Conversation

jpivarski
Copy link
Member

For now, this is just tinkering in the studies directory to try to figure stuff out.

@jpivarski
Copy link
Member Author

@martindurant, I've started on this approach, in which I'm implementing

######################### equivalent for CuDF
def recurse_finalize(
out: ak.contents.Content,
column: cudf.core.column.column.ColumnBase,
validbits: None | cudf.core.buffer.buffer.Buffer,
generate_bitmasks: bool,
fix_offsets: bool = True,
):
if validbits is None:
return revertable(ak.contents.UnmaskedArray.simplified(out), out)
else:
return revertable(
ak.contents.BitMaskedArray.simplified(
ak.index.IndexU8(cupy.asarray(validbits)),
out,
valid_when=True,
length=len(out),
lsb_order=True,
),
out,
)
def recurse(
column: cudf.core.column.column.ColumnBase,
arrow_type: pyarrow.lib.DataType,
generate_bitmasks: bool,
):
if isinstance(arrow_type, pyarrow.lib.DictionaryType):
raise NotImplementedError
elif isinstance(arrow_type, pyarrow.lib.FixedSizeListType):
raise NotImplementedError
elif isinstance(arrow_type, (pyarrow.lib.LargeListType, pyarrow.lib.ListType)):
validbits = column.base_mask
paoffsets = column.offsets.base_data
if isinstance(arrow_type, pyarrow.lib.LargeListType):
akoffsets = ak.index.Index64(cupy.asarray(paoffsets).view(cupy.int64))
else:
akoffsets = ak.index.Index32(cupy.asarray(paoffsets).view(cupy.int32))
akcontent = recurse(
column.base_children[-1], arrow_type.value_type, generate_bitmasks
)
if not arrow_type.value_field.nullable:
# strip the dummy option-type node
akcontent = remove_optiontype(akcontent)
out = ak.contents.ListOffsetArray(akoffsets, akcontent, parameters=None)
return recurse_finalize(out, column, validbits, generate_bitmasks)
elif isinstance(arrow_type, pyarrow.lib.MapType):
raise NotImplementedError
elif isinstance(
arrow_type, (pyarrow.lib.Decimal128Type, pyarrow.lib.Decimal256Type)
):
# Note: Decimal128Type and Decimal256Type are subtypes of FixedSizeBinaryType.
# NumPy doesn't support decimal: https://github.com/numpy/numpy/issues/9789
raise ValueError(
"Arrow arrays containing pyarrow.decimal128 or pyarrow.decimal256 types can't be converted into Awkward Arrays"
)
elif isinstance(arrow_type, pyarrow.lib.FixedSizeBinaryType):
raise NotImplementedError
elif arrow_type in _string_like:
raise NotImplementedError
elif isinstance(arrow_type, pyarrow.lib.StructType):
raise NotImplementedError
elif isinstance(arrow_type, pyarrow.lib.UnionType):
raise NotImplementedError
elif arrow_type == pyarrow.null():
raise NotImplementedError
elif arrow_type == pyarrow.bool_():
raise NotImplementedError
elif isinstance(arrow_type, pyarrow.lib.DataType):
validbits = column.base_mask
dt = arrow_type.to_pandas_dtype()
out = ak.contents.NumpyArray(
cupy.asarray(column.base_data).view(dt),
parameters=None,
backend=CupyBackend.instance(),
)
return recurse_finalize(out, column, validbits, generate_bitmasks)
else:
raise TypeError(f"unrecognized Arrow array type: {arrow_type!r}")
def handle_cudf(cudf_series: cudf.core.series.Series, generate_bitmasks):
column = cudf_series._data[cudf_series.name]
dtype = column.dtype
if isinstance(dtype, numpy.dtype):
arrow_type = pyarrow.from_numpy_dtype(dtype)
else:
arrow_type = dtype.to_arrow()
return recurse(column, arrow_type, generate_bitmasks)
def cudf_to_awkward(
cudf_series: cudf.core.series.Series,
generate_bitmasks=False,
highlevel=True,
behavior=None,
attrs=None,
):
ctx = ak._layout.HighLevelContext(behavior=behavior, attrs=attrs).finalize()
out = handle_cudf(cudf_series, generate_bitmasks)
if isinstance(out, ak.contents.UnmaskedArray):
out = remove_optiontype(out)
def remove_revertable(layout, **kwargs):
if hasattr(layout, "__pyarrow_original"):
del layout.__pyarrow_original
ak._do.recursively_apply(out, remove_revertable)
return ctx.wrap(out, highlevel=highlevel)

to be like

def popbuffers_finalize(out, array, validbits, generate_bitmasks, fix_offsets=True):
# Every buffer from Arrow must be offsets-corrected.
if fix_offsets and (array.offset != 0 or len(array) != len(out)):
out = out[array.offset : array.offset + len(array)]
# Everything must leave popbuffers as option-type; the mask_node will be
# removed by the next level up in popbuffers recursion if appropriate.
if validbits is None and generate_bitmasks:
# ceildiv(len(out), 8) = -(len(out) // -8)
validbits = numpy.full(-(len(out) // -8), numpy.uint8(0xFF), dtype=numpy.uint8)
if validbits is None:
return revertable(ak.contents.UnmaskedArray.simplified(out), out)
else:
return revertable(
ak.contents.BitMaskedArray.simplified(
ak.index.IndexU8(numpy.frombuffer(validbits, dtype=numpy.uint8)),
out,
valid_when=True,
length=len(out),
lsb_order=True,
),
out,
)
def popbuffers(paarray, arrow_type, buffers, generate_bitmasks):
### Beginning of the big if-elif-elif chain!
if isinstance(arrow_type, pyarrow.lib.DictionaryType):
masked_index = popbuffers(
paarray.indices,
arrow_type.index_type,
buffers,
generate_bitmasks,
)
index = masked_index.content.data
if not isinstance(masked_index, ak.contents.UnmaskedArray):
mask = masked_index.mask_as_bool(valid_when=False)
if mask.any():
index = numpy.asarray(index, copy=True)
index[mask] = -1
content = handle_arrow(paarray.dictionary, generate_bitmasks)
parameters = {"__array__": "categorical"}
return revertable(
ak.contents.IndexedOptionArray.simplified(
ak.index.Index(index),
content,
parameters=parameters,
),
ak.contents.IndexedArray(
ak.index.Index(index),
remove_optiontype(content) if content.is_option else content,
parameters=parameters,
),
)
elif isinstance(arrow_type, pyarrow.lib.FixedSizeListType):
assert arrow_type.num_buffers == 1
validbits = buffers.pop(0)
akcontent = popbuffers(
paarray.values, arrow_type.value_type, buffers, generate_bitmasks
)
if not arrow_type.value_field.nullable:
# strip the dummy option-type node
akcontent = remove_optiontype(akcontent)
out = ak.contents.RegularArray(
akcontent,
arrow_type.list_size,
parameters=None,
)
return popbuffers_finalize(out, paarray, validbits, generate_bitmasks)
elif isinstance(arrow_type, (pyarrow.lib.LargeListType, pyarrow.lib.ListType)):
assert arrow_type.num_buffers == 2
validbits = buffers.pop(0)
paoffsets = buffers.pop(0)
if isinstance(arrow_type, pyarrow.lib.LargeListType):
akoffsets = ak.index.Index64(numpy.frombuffer(paoffsets, dtype=numpy.int64))
else:
akoffsets = ak.index.Index32(numpy.frombuffer(paoffsets, dtype=numpy.int32))
akcontent = popbuffers(
paarray.values, arrow_type.value_type, buffers, generate_bitmasks
)
if not arrow_type.value_field.nullable:
# strip the dummy option-type node
akcontent = remove_optiontype(akcontent)
out = ak.contents.ListOffsetArray(akoffsets, akcontent, parameters=None)
return popbuffers_finalize(out, paarray, validbits, generate_bitmasks)
elif isinstance(arrow_type, pyarrow.lib.MapType):
# FIXME: make a ListOffsetArray of 2-tuples with __array__ == "sorted_map".
# (Make sure the keys are sorted).
raise NotImplementedError
elif isinstance(
arrow_type, (pyarrow.lib.Decimal128Type, pyarrow.lib.Decimal256Type)
):
# Note: Decimal128Type and Decimal256Type are subtypes of FixedSizeBinaryType.
# NumPy doesn't support decimal: https://github.com/numpy/numpy/issues/9789
raise ValueError(
"Arrow arrays containing pyarrow.decimal128 or pyarrow.decimal256 types can't be converted into Awkward Arrays"
)
elif isinstance(arrow_type, pyarrow.lib.FixedSizeBinaryType):
assert arrow_type.num_buffers == 2
validbits = buffers.pop(0)
pacontent = buffers.pop(0)
parameters = {"__array__": "bytestring"}
sub_parameters = {"__array__": "byte"}
out = ak.contents.RegularArray(
ak.contents.NumpyArray(
numpy.frombuffer(pacontent, dtype=numpy.uint8),
parameters=sub_parameters,
backend=NumpyBackend.instance(),
),
arrow_type.byte_width,
parameters=parameters,
)
return popbuffers_finalize(out, paarray, validbits, generate_bitmasks)
elif arrow_type in _string_like:
assert arrow_type.num_buffers == 3
validbits = buffers.pop(0)
paoffsets = buffers.pop(0)
pacontent = buffers.pop(0)
if arrow_type in _string_like[::2]:
akoffsets = ak.index.Index32(numpy.frombuffer(paoffsets, dtype=numpy.int32))
else:
akoffsets = ak.index.Index64(numpy.frombuffer(paoffsets, dtype=numpy.int64))
if arrow_type in _string_like[:2]:
parameters = {"__array__": "string"}
sub_parameters = {"__array__": "char"}
else:
parameters = {"__array__": "bytestring"}
sub_parameters = {"__array__": "byte"}
out = ak.contents.ListOffsetArray(
akoffsets,
ak.contents.NumpyArray(
numpy.frombuffer(pacontent, dtype=numpy.uint8),
parameters=sub_parameters,
backend=NumpyBackend.instance(),
),
parameters=parameters,
)
return popbuffers_finalize(out, paarray, validbits, generate_bitmasks)
elif isinstance(arrow_type, pyarrow.lib.StructType):
assert arrow_type.num_buffers == 1
validbits = buffers.pop(0)
keys = []
contents = []
for i in range(arrow_type.num_fields):
field = arrow_type[i]
field_name = field.name
keys.append(field_name)
akcontent = popbuffers(
paarray.field(field_name), field.type, buffers, generate_bitmasks
)
if not field.nullable:
# strip the dummy option-type node
akcontent = remove_optiontype(akcontent)
contents.append(akcontent)
out = ak.contents.RecordArray(
contents, keys, length=len(paarray), parameters=None
)
return popbuffers_finalize(
out, paarray, validbits, generate_bitmasks, fix_offsets=False
)
elif isinstance(arrow_type, pyarrow.lib.UnionType):
if isinstance(arrow_type, pyarrow.lib.SparseUnionType):
assert arrow_type.num_buffers == 2
validbits = buffers.pop(0)
nptags = numpy.frombuffer(buffers.pop(0), dtype=numpy.int8)
npindex = numpy.arange(len(nptags), dtype=numpy.int32)
else:
assert arrow_type.num_buffers == 3
validbits = buffers.pop(0)
nptags = numpy.frombuffer(buffers.pop(0), dtype=numpy.int8)
npindex = numpy.frombuffer(buffers.pop(0), dtype=numpy.int32)
akcontents = []
for i in range(arrow_type.num_fields):
field = arrow_type[i]
akcontent = popbuffers(
paarray.field(i), field.type, buffers, generate_bitmasks
)
if not field.nullable:
# strip the dummy option-type node
akcontent = remove_optiontype(akcontent)
akcontents.append(akcontent)
out = ak.contents.UnionArray.simplified(
ak.index.Index8(nptags),
ak.index.Index32(npindex),
akcontents,
parameters=None,
)
return popbuffers_finalize(out, paarray, None, generate_bitmasks)
elif arrow_type == pyarrow.null():
validbits = buffers.pop(0)
assert arrow_type.num_fields == 0
# This is already an option-type and offsets-corrected, so no popbuffers_finalize.
return ak.contents.IndexedOptionArray(
ak.index.Index64(numpy.full(len(paarray), -1, dtype=numpy.int64)),
ak.contents.EmptyArray(parameters=None),
parameters=None,
)
elif arrow_type == pyarrow.bool_():
assert arrow_type.num_buffers == 2
validbits = buffers.pop(0)
bitdata = buffers.pop(0)
bytedata = numpy.unpackbits(
numpy.frombuffer(bitdata, dtype=numpy.uint8), bitorder="little"
)
out = ak.contents.NumpyArray(
bytedata.view(numpy.bool_),
parameters=None,
backend=NumpyBackend.instance(),
)
return popbuffers_finalize(out, paarray, validbits, generate_bitmasks)
elif isinstance(arrow_type, pyarrow.lib.DataType):
assert arrow_type.num_buffers == 2
validbits = buffers.pop(0)
data = buffers.pop(0)
to64, dt = _pyarrow_to_numpy_dtype.get(str(arrow_type), (False, None))
if to64:
data = numpy.astype(
numpy.frombuffer(data, dtype=numpy.int32), dtype=numpy.int64
)
if dt is None:
dt = arrow_type.to_pandas_dtype()
out = ak.contents.NumpyArray(
numpy.frombuffer(data, dtype=dt),
parameters=None,
backend=NumpyBackend.instance(),
)
return popbuffers_finalize(out, paarray, validbits, generate_bitmasks)
else:
raise TypeError(f"unrecognized Arrow array type: {arrow_type!r}")
def handle_arrow(obj, generate_bitmasks):
buffers = obj.buffers()
out = popbuffers(obj, obj.type, buffers, generate_bitmasks)
assert len(buffers) == 0
return out
def pyarrow_to_awkward(
pyarrow_array: pyarrow.lib.Array,
generate_bitmasks=False,
highlevel=True,
behavior=None,
attrs=None,
):
ctx = ak._layout.HighLevelContext(behavior=behavior, attrs=attrs).finalize()
out = handle_arrow(pyarrow_array, generate_bitmasks)
if isinstance(out, ak.contents.UnmaskedArray):
out = remove_optiontype(out)
def remove_revertable(layout, **kwargs):
if hasattr(layout, "__pyarrow_original"):
del layout.__pyarrow_original
ak._do.recursively_apply(out, remove_revertable)
return ctx.wrap(out, highlevel=highlevel)

substituting CuDF accessors (base_data, base_mask, base_children1) for pyarrow accessors (buffers in a particular order) and NumPy casting (np.frombuffer) for CuPy casting (cp.asarray). The tree of CuDF ColumnBase nodes is a safer interface than the list of pyarrow Buffer objects because you can't accidentally be on the wrong tree node; it's less sensitive to how many None values should be associated with a particular node, etc.

In

examples = [
[1.1, 2.2, 3.3],
[[1, 2, 3], [], [4, 5]],
[[[1, 2], [3]], [], [[]], [[4], [], [5, 6, 7]], [[8, 9]]],
[1.1, 2.2, None, 3.3],
[[1, 2, None, 3], [], [4, 5]],
[[1, 2, 3], None, [], [4, 5]],
[[[1, 2, None], [3]], [], [[]], [[4], [], [5, 6, 7]], [[8, 9]]],
[[[1, 2], None, [3]], [], [[]], [[4], [], [5, 6, 7]], [[8, 9]]],
[[[1, 2], [3]], None, [], [[]], [[4], [], [5, 6, 7]], [[8, 9]]],
]
for example in examples:
print(f"---- {example}")
df = cudf.DataFrame({"column": example})
awkward_array = cudf_to_awkward(df["column"])
assert ak.backend(awkward_array) == "cuda"
assert awkward_array.tolist() == example

you can see that I've started testing some nested lists and missing values. (We're not checking expected types because Arrow → Awkward has an ambiguity about top-level option-types.)

I can continue with this, though not right now. We still don't know how multiple children in a StructArray or multiple children and multiple Indexes (tags & index) in a UnionArray are laid out, but I think we've seen enough to know that CuDF exposing Series._data would be sufficient for our needs.

Footnotes

  1. For CuDF, base_children includes what we call Indexes as well as the subtrees, with the Indexes presented as NumericColumn. CuDF is not making a distinction between what we call Index and what we call Content.

@martindurant
Copy link
Contributor

Let me know if you want me to try anything here. Of course you know the code much better.

I feel like somewhere in https://github.com/rapidsai/cudf/blob/branch-24.04/python/cudf/cudf/core/column/column.py is a way to make a series (column) out of buffers.

@jpivarski
Copy link
Member Author

I think it's straightforward from this point. I'll just have to do it.

If you have an alternative idea, please go ahead and try it. Also, none of this touches ak.to_cudf, so if you can find out how to build a CuDF Column, that will help. (In Awkward, the to_arrow functions are implemented as methods on all of the Content node types—that direction is much easier because of the direction of the projection.)

@jpivarski
Copy link
Member Author

I finished off the implementation. Since this PR doesn't touch the codebase, I'll merge it without review.

Some comments on CuDF's implementation of Arrow:

  • We have to go down a different rabbithole to find the indexes and dictionary of a categorical variable than we do with pyarrow (not surprising). The index is not necessarily 32-bit though: for small numbers of categories, it can be 8-bit (surprising; I think that goes against the Arrow spec). I couldn't include a test of categoricals because IndexedOptionArray.simplified's kernel has not been implemented (though I tested it offline by swapping IndexedOptionArray for IndexedArray, which would be wrong with nullable data, but it's enough of a test to be confident in the implementation.)
  • CuDF has not implemented Arrow's FixedSizeListType or FixedSizeBinaryType. I included a very plausible implementation. (The new data, the fixed size, comes from the arrow_type, which we already have.)
  • CuDF has not implemented Arrow's binary (as opposed to Unicode string) type, but I included a very plausible implementation (because it's just like the Unicode string implementation).
  • CuDF's boolean arrays are implemented as bytes, rather than bits. (That's better when you want to apply masks, but it's against the Arrow spec.)
  • I tested date-time and time-delta types, but I don't have a way to do timezone-aware date-times because Awkward inherits NumPy's timezone-naivety.
  • CuDF does have decimal types, but I can't test them (just as for Arrow) because NumPy doesn't have them.
  • I don't know what the IntervalColumn is. Interval types are new to me.

@jpivarski jpivarski merged commit 1e7f923 into main Feb 16, 2024
18 checks passed
@jpivarski jpivarski deleted the jpivarski/figuring-out-how-to-do-ak.from_cudf branch February 16, 2024 23:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants