Skip to content

Commit

Permalink
Backport PR pandas-dev#57764 on branch 2.2.x (BUG: PyArrow dtypes wer…
Browse files Browse the repository at this point in the history
…e not supported in the interchange protocol) (pandas-dev#57947)
  • Loading branch information
MarcoGorelli committed Mar 21, 2024
1 parent 78f7a02 commit 7e8d492
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 38 deletions.
4 changes: 3 additions & 1 deletion doc/source/whatsnew/v2.2.2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ including other versions of pandas.
Fixed regressions
~~~~~~~~~~~~~~~~~
- :meth:`DataFrame.__dataframe__` was producing incorrect data buffers when the a column's type was a pandas nullable on with missing values (:issue:`56702`)
- :meth:`DataFrame.__dataframe__` was producing incorrect data buffers when the a column's type was a pyarrow nullable on with missing values (:issue:`57664`)
-

.. ---------------------------------------------------------------------------
.. _whatsnew_222.bug_fixes:

Bug fixes
~~~~~~~~~
-
- :meth:`DataFrame.__dataframe__` was showing bytemask instead of bitmask for ``'string[pyarrow]'`` validity buffer (:issue:`57762`)
- :meth:`DataFrame.__dataframe__` was showing non-null validity buffer (instead of ``None``) ``'string[pyarrow]'`` without missing values (:issue:`57761`)

.. ---------------------------------------------------------------------------
.. _whatsnew_222.other:
Expand Down
58 changes: 58 additions & 0 deletions pandas/core/interchange/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

if TYPE_CHECKING:
import numpy as np
import pyarrow as pa


class PandasBuffer(Buffer):
Expand Down Expand Up @@ -76,3 +77,60 @@ def __repr__(self) -> str:
)
+ ")"
)


class PandasBufferPyarrow(Buffer):
"""
Data in the buffer is guaranteed to be contiguous in memory.
"""

def __init__(
self,
buffer: pa.Buffer,
*,
length: int,
) -> None:
"""
Handle pyarrow chunked arrays.
"""
self._buffer = buffer
self._length = length

@property
def bufsize(self) -> int:
"""
Buffer size in bytes.
"""
return self._buffer.size

@property
def ptr(self) -> int:
"""
Pointer to start of the buffer as an integer.
"""
return self._buffer.address

def __dlpack__(self) -> Any:
"""
Represent this structure as DLPack interface.
"""
raise NotImplementedError()

def __dlpack_device__(self) -> tuple[DlpackDeviceType, int | None]:
"""
Device type and device ID for where the data in the buffer resides.
"""
return (DlpackDeviceType.CPU, None)

def __repr__(self) -> str:
return (
"PandasBuffer[pyarrow]("
+ str(
{
"bufsize": self.bufsize,
"ptr": self.ptr,
"device": "CPU",
}
)
+ ")"
)
66 changes: 56 additions & 10 deletions pandas/core/interchange/column.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

from typing import Any
from typing import (
TYPE_CHECKING,
Any,
)

import numpy as np

Expand All @@ -9,15 +12,18 @@
from pandas.errors import NoBufferPresent
from pandas.util._decorators import cache_readonly

from pandas.core.dtypes.dtypes import (
from pandas.core.dtypes.dtypes import BaseMaskedDtype

import pandas as pd
from pandas import (
ArrowDtype,
BaseMaskedDtype,
DatetimeTZDtype,
)

import pandas as pd
from pandas.api.types import is_string_dtype
from pandas.core.interchange.buffer import PandasBuffer
from pandas.core.interchange.buffer import (
PandasBuffer,
PandasBufferPyarrow,
)
from pandas.core.interchange.dataframe_protocol import (
Column,
ColumnBuffers,
Expand All @@ -30,6 +36,9 @@
dtype_to_arrow_c_fmt,
)

if TYPE_CHECKING:
from pandas.core.interchange.dataframe_protocol import Buffer

_NP_KINDS = {
"i": DtypeKind.INT,
"u": DtypeKind.UINT,
Expand Down Expand Up @@ -157,6 +166,16 @@ def _dtype_from_pandasdtype(self, dtype) -> tuple[DtypeKind, int, str, str]:
else:
byteorder = dtype.byteorder

if dtype == "bool[pyarrow]":
# return early to avoid the `* 8` below, as this is a bitmask
# rather than a bytemask
return (
kind,
dtype.itemsize, # pyright: ignore[reportGeneralTypeIssues]
ArrowCTypes.BOOL,
byteorder,
)

return kind, dtype.itemsize * 8, dtype_to_arrow_c_fmt(dtype), byteorder

@property
Expand Down Expand Up @@ -194,6 +213,12 @@ def describe_null(self):
column_null_dtype = ColumnNullType.USE_BYTEMASK
null_value = 1
return column_null_dtype, null_value
if isinstance(self._col.dtype, ArrowDtype):
# We already rechunk (if necessary / allowed) upon initialization, so this
# is already single-chunk by the time we get here.
if self._col.array._pa_array.chunks[0].buffers()[0] is None: # type: ignore[attr-defined]
return ColumnNullType.NON_NULLABLE, None
return ColumnNullType.USE_BITMASK, 0
kind = self.dtype[0]
try:
null, value = _NULL_DESCRIPTION[kind]
Expand Down Expand Up @@ -278,10 +303,11 @@ def get_buffers(self) -> ColumnBuffers:

def _get_data_buffer(
self,
) -> tuple[PandasBuffer, Any]: # Any is for self.dtype tuple
) -> tuple[Buffer, tuple[DtypeKind, int, str, str]]:
"""
Return the buffer containing the data and the buffer's associated dtype.
"""
buffer: Buffer
if self.dtype[0] in (
DtypeKind.INT,
DtypeKind.UINT,
Expand All @@ -291,18 +317,25 @@ def _get_data_buffer(
):
# self.dtype[2] is an ArrowCTypes.TIMESTAMP where the tz will make
# it longer than 4 characters
dtype = self.dtype
if self.dtype[0] == DtypeKind.DATETIME and len(self.dtype[2]) > 4:
np_arr = self._col.dt.tz_convert(None).to_numpy()
else:
arr = self._col.array
if isinstance(self._col.dtype, BaseMaskedDtype):
np_arr = arr._data # type: ignore[attr-defined]
elif isinstance(self._col.dtype, ArrowDtype):
raise NotImplementedError("ArrowDtype not handled yet")
# We already rechunk (if necessary / allowed) upon initialization,
# so this is already single-chunk by the time we get here.
arr = arr._pa_array.chunks[0] # type: ignore[attr-defined]
buffer = PandasBufferPyarrow(
arr.buffers()[1], # type: ignore[attr-defined]
length=len(arr),
)
return buffer, dtype
else:
np_arr = arr._ndarray # type: ignore[attr-defined]
buffer = PandasBuffer(np_arr, allow_copy=self._allow_copy)
dtype = self.dtype
elif self.dtype[0] == DtypeKind.CATEGORICAL:
codes = self._col.values._codes
buffer = PandasBuffer(codes, allow_copy=self._allow_copy)
Expand Down Expand Up @@ -330,13 +363,26 @@ def _get_data_buffer(

return buffer, dtype

def _get_validity_buffer(self) -> tuple[PandasBuffer, Any]:
def _get_validity_buffer(self) -> tuple[Buffer, Any] | None:
"""
Return the buffer containing the mask values indicating missing data and
the buffer's associated dtype.
Raises NoBufferPresent if null representation is not a bit or byte mask.
"""
null, invalid = self.describe_null
buffer: Buffer
if isinstance(self._col.dtype, ArrowDtype):
# We already rechunk (if necessary / allowed) upon initialization, so this
# is already single-chunk by the time we get here.
arr = self._col.array._pa_array.chunks[0] # type: ignore[attr-defined]
dtype = (DtypeKind.BOOL, 1, ArrowCTypes.BOOL, Endianness.NATIVE)
if arr.buffers()[0] is None:
return None
buffer = PandasBufferPyarrow(
arr.buffers()[0],
length=len(arr),
)
return buffer, dtype

if isinstance(self._col.dtype, BaseMaskedDtype):
mask = self._col.array._mask # type: ignore[attr-defined]
Expand Down
5 changes: 5 additions & 0 deletions pandas/core/interchange/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from pandas.core.interchange.column import PandasColumn
from pandas.core.interchange.dataframe_protocol import DataFrame as DataFrameXchg
from pandas.core.interchange.utils import maybe_rechunk

if TYPE_CHECKING:
from collections.abc import (
Expand Down Expand Up @@ -34,6 +35,10 @@ def __init__(self, df: DataFrame, allow_copy: bool = True) -> None:
"""
self._df = df.rename(columns=str, copy=False)
self._allow_copy = allow_copy
for i, _col in enumerate(self._df.columns):
rechunked = maybe_rechunk(self._df.iloc[:, i], allow_copy=allow_copy)
if rechunked is not None:
self._df.isetitem(i, rechunked)

def __dataframe__(
self, nan_as_null: bool = False, allow_copy: bool = True
Expand Down
17 changes: 10 additions & 7 deletions pandas/core/interchange/from_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,14 @@ def string_column_to_ndarray(col: Column) -> tuple[np.ndarray, Any]:

null_pos = None
if null_kind in (ColumnNullType.USE_BITMASK, ColumnNullType.USE_BYTEMASK):
assert buffers["validity"], "Validity buffers cannot be empty for masks"
valid_buff, valid_dtype = buffers["validity"]
null_pos = buffer_to_ndarray(
valid_buff, valid_dtype, offset=col.offset, length=col.size()
)
if sentinel_val == 0:
null_pos = ~null_pos
validity = buffers["validity"]
if validity is not None:
valid_buff, valid_dtype = validity
null_pos = buffer_to_ndarray(
valid_buff, valid_dtype, offset=col.offset, length=col.size()
)
if sentinel_val == 0:
null_pos = ~null_pos

# Assemble the strings from the code units
str_list: list[None | float | str] = [None] * col.size()
Expand Down Expand Up @@ -486,6 +487,8 @@ def set_nulls(
np.ndarray or pd.Series
Data with the nulls being set.
"""
if validity is None:
return data
null_kind, sentinel_val = col.describe_null
null_pos = None

Expand Down
28 changes: 28 additions & 0 deletions pandas/core/interchange/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
DatetimeTZDtype,
)

import pandas as pd

if typing.TYPE_CHECKING:
from pandas._typing import DtypeObj

Expand Down Expand Up @@ -145,3 +147,29 @@ def dtype_to_arrow_c_fmt(dtype: DtypeObj) -> str:
raise NotImplementedError(
f"Conversion of {dtype} to Arrow C format string is not implemented."
)


def maybe_rechunk(series: pd.Series, *, allow_copy: bool) -> pd.Series | None:
"""
Rechunk a multi-chunk pyarrow array into a single-chunk array, if necessary.
- Returns `None` if the input series is not backed by a multi-chunk pyarrow array
(and so doesn't need rechunking)
- Returns a single-chunk-backed-Series if the input is backed by a multi-chunk
pyarrow array and `allow_copy` is `True`.
- Raises a `RuntimeError` if `allow_copy` is `False` and input is a
based by a multi-chunk pyarrow array.
"""
if not isinstance(series.dtype, pd.ArrowDtype):
return None
chunked_array = series.array._pa_array # type: ignore[attr-defined]
if len(chunked_array.chunks) == 1:
return None
if not allow_copy:
raise RuntimeError(
"Found multi-chunk pyarrow array, but `allow_copy` is False. "
"Please rechunk the array before calling this function, or set "
"`allow_copy=True`."
)
arr = chunked_array.combine_chunks()
return pd.Series(arr, dtype=series.dtype, name=series.name, index=series.index)
Loading

0 comments on commit 7e8d492

Please sign in to comment.