Skip to content

Commit

Permalink
Compatible with pandas 1.5.0 (#3276)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed Oct 13, 2022
1 parent 075ea11 commit 98b5ac2
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
else
pip install virtualenv flaky
if [ -n "$WITH_KUBERNETES" ]; then
pip install kubernetes
pip install kubernetes "pandas<1.5.0"
kubectl get pods -A
fi
if [ -n "$WITH_HADOOP" ]; then
Expand Down
10 changes: 8 additions & 2 deletions mars/_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ from .lib.mmh3 import hash as mmh_hash, hash_bytes as mmh_hash_bytes, \
cdef bint _has_cupy = bool(pkgutil.find_loader('cupy'))
cdef bint _has_cudf = bool(pkgutil.find_loader('cudf'))
cdef bint _has_sqlalchemy = bool(pkgutil.find_loader('sqlalchemy'))
cdef bint _has_interval_array_inclusive = hasattr(
pd.arrays.IntervalArray, "inclusive"
)


cdef extern from "MurmurHash3.h":
Expand Down Expand Up @@ -322,8 +325,11 @@ cdef list tokenize_pandas_tick(ob):
return iterative_tokenize([ob.freqstr])


cdef list tokenize_pandas_interval_arrays(ob):
return iterative_tokenize([ob.left, ob.right, ob.closed])
cdef list tokenize_pandas_interval_arrays(ob): # pragma: no cover
if _has_interval_array_inclusive:
return iterative_tokenize([ob.left, ob.right, ob.inclusive])
else:
return iterative_tokenize([ob.left, ob.right, ob.closed])


cdef list tokenize_sqlalchemy_data_type(ob):
Expand Down
17 changes: 17 additions & 0 deletions mars/dataframe/arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,23 @@
except ImportError: # pragma: no cover
pa = None
pa_null = None
try:
import pyarrow.compute as pc
except ImportError: # pragma: no cover
pc = None

from ..config import options
from ..core import is_kernel_mode
from ..utils import pd_release_version, tokenize

_use_bool_any_all = pd_release_version[:2] >= (1, 3)
_use_extension_index = pd_release_version[:2] >= (1, 4)
_object_engine_for_string_array = pd_release_version[:2] >= (1, 5)

if _object_engine_for_string_array:
StringArrayBase = type(StringArrayBase)(
"StringArrayBase", StringArrayBase.__bases__, dict(StringArrayBase.__dict__)
)


class ArrowDtype(ExtensionDtype):
Expand Down Expand Up @@ -254,6 +264,8 @@ def _init_by_arrow(self, values, dtype: ArrowDtype = None, copy=False):
arrow_array = values
elif isinstance(values, pa.Array):
arrow_array = pa.chunked_array([values])
elif len(values) == 0: # pragma: no cover
arrow_array = pa.chunked_array([pa.array([], type=dtype.arrow_type)])
else:
arrow_array = pa.chunked_array([pa.array(values, type=dtype.arrow_type)])

Expand Down Expand Up @@ -561,6 +573,11 @@ def copy(self):
else:
return type(self)(self._ndarray.copy())

def unique(self):
if self._force_use_pandas or not self._use_arrow or not hasattr(pc, "unique"):
return type(self)(np.unique(self.to_numpy()), dtype=self._dtype)
return type(self)(pc.unique(self._arrow_array), dtype=self._dtype)

def value_counts(self, dropna=False):
if self._use_arrow:
series = self._arrow_array.to_pandas()
Expand Down
12 changes: 10 additions & 2 deletions mars/dataframe/base/shift.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from ... import opcodes as OperandDef
from ...core import OutputType
from ...serialization.serializables import KeyField, AnyField, Int8Field, Int64Field
from ...utils import has_unknown_shape
from ...utils import has_unknown_shape, no_default, pd_release_version
from ..operands import DataFrameOperand, DataFrameOperandMixin
from ..utils import parse_index, build_df, build_series, validate_axis

_need_consolidate = pd.__version__ in ("1.1.0", "1.3.0", "1.3.1")
_enable_no_default = pd_release_version[:2] > (1, 1)
_with_column_freq_bug = (1, 2, 0) <= pd_release_version < (1, 4, 3)


class DataFrameShift(DataFrameOperand, DataFrameOperandMixin):
Expand Down Expand Up @@ -456,7 +458,13 @@ def shift(df_or_series, periods=1, freq=None, axis=0, fill_value=None):
axis = validate_axis(axis, df_or_series)
if periods == 0:
return df_or_series.copy()

if fill_value is no_default: # pragma: no cover
if not _enable_no_default or (
_with_column_freq_bug and axis == 1 and freq is not None
):
# pandas shift shows different behavior for axis=1 when freq is specified,
# see https://github.com/pandas-dev/pandas/issues/47039 for details.
fill_value = None
op = DataFrameShift(periods=periods, freq=freq, axis=axis, fill_value=fill_value)
return op(df_or_series)

Expand Down
21 changes: 15 additions & 6 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@
from ....tensor import arange, tensor
from ....tensor.random import rand
from ....tests.core import require_cudf
from ....utils import lazy_import, pd_release_version
from ....utils import lazy_import, pd_release_version, no_default
from ... import eval as mars_eval, cut, qcut, get_dummies
from ...datasource.dataframe import from_pandas as from_pandas_df
from ...datasource.series import from_pandas as from_pandas_series
from ...datasource.index import from_pandas as from_pandas_index
from .. import to_gpu, to_cpu
from ..bloom_filter import filter_by_bloom_filter
from ..shift import _enable_no_default, _with_column_freq_bug
from ..to_numeric import to_numeric
from ..rebalance import DataFrameRebalance

Expand All @@ -46,6 +47,7 @@
cudf = lazy_import("cudf")

_explode_with_ignore_index = pd_release_version[:2] >= (1, 1)
_interval_range_closed_arg = pd_release_version[:2] >= (1, 5)


@require_cudf
Expand Down Expand Up @@ -815,7 +817,10 @@ def test_cut_execution(setup):
raw = rs.random(15) * 1000
s = pd.Series(raw, index=[f"i{i}" for i in range(15)])
bins = [10, 100, 500]
ii = pd.interval_range(10, 500, 3)
if _interval_range_closed_arg:
ii = pd.interval_range(10, 500, 3, closed="right")
else:
ii = pd.interval_range(10, 500, 3)
labels = ["a", "b"]

t = tensor(raw, chunk_size=4)
Expand Down Expand Up @@ -1154,6 +1159,10 @@ def test_q_cut_execution(setup):


def test_shift_execution(setup):
fill_value_default = no_default
if not _enable_no_default or _with_column_freq_bug:
fill_value_default = None

# test dataframe
rs = np.random.RandomState(0)
raw = pd.DataFrame(
Expand All @@ -1164,7 +1173,7 @@ def test_shift_execution(setup):

for periods in (2, -2, 6, -6):
for axis in (0, 1):
for fill_value in (None, 0, 1.0):
for fill_value in (fill_value_default, 0, 1.0):
r = df.shift(periods=periods, axis=axis, fill_value=fill_value)

try:
Expand All @@ -1187,7 +1196,7 @@ def test_shift_execution(setup):
# test freq not None
for periods in (2, -2):
for axis in (0, 1):
for fill_value in (None, 0, 1.0):
for fill_value in (fill_value_default, 0, 1.0):
r = df2.shift(
periods=periods, freq="D", axis=axis, fill_value=fill_value
)
Expand Down Expand Up @@ -1217,7 +1226,7 @@ def test_shift_execution(setup):

series = from_pandas_series(s, chunk_size=5)
for periods in (0, 2, -2, 6, -6):
for fill_value in (None, 0, 1.0):
for fill_value in (fill_value_default, 0, 1.0):
r = series.shift(periods=periods, fill_value=fill_value)

try:
Expand All @@ -1234,7 +1243,7 @@ def test_shift_execution(setup):
# test freq not None
series2 = from_pandas_series(s2, chunk_size=5)
for periods in (2, -2):
for fill_value in (None, 0, 1.0):
for fill_value in (fill_value_default, 0, 1.0):
r = series2.shift(periods=periods, freq="D", fill_value=fill_value)

try:
Expand Down
9 changes: 8 additions & 1 deletion mars/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,16 @@ def names(self) -> list:
def to_pandas(self):
data = getattr(self, "_data", None)
sortorder = getattr(self, "_sortorder", None)

def _build_empty_array(dtype):
try:
return np.array([], dtype=dtype)
except TypeError: # pragma: no cover
return pd.array([], dtype=dtype)

if data is None:
return pd.MultiIndex.from_arrays(
[np.array([], dtype=dtype) for dtype in self._dtypes],
[_build_empty_array(dtype) for dtype in self._dtypes],
sortorder=sortorder,
names=self._names,
)
Expand Down
10 changes: 5 additions & 5 deletions mars/dataframe/datasource/date_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ...core import OutputType
from ...serialization.serializables import AnyField, Int64Field, BoolField, StringField
from ...tensor.utils import decide_chunk_sizes
from ...utils import pd_release_version, NoDefault
from ...utils import pd_release_version, no_default
from ..operands import DataFrameOperand, DataFrameOperandMixin
from ..utils import parse_index

Expand Down Expand Up @@ -300,7 +300,7 @@ def date_range(
tz=None,
normalize=False,
name=None,
closed=NoDefault.no_default,
closed=no_default,
inclusive=None,
chunk_size=None,
**kwargs,
Expand Down Expand Up @@ -461,14 +461,14 @@ def date_range(
)
freq = to_offset(freq)

if _date_range_use_inclusive and closed is not NoDefault.no_default:
if _date_range_use_inclusive and closed is not no_default:
warnings.warn(
"Argument `closed` is deprecated in favor of `inclusive`.", FutureWarning
)
elif closed is NoDefault.no_default:
elif closed is no_default:
closed = None

if inclusive is None and closed is not NoDefault.no_default:
if inclusive is None and closed is not no_default:
inclusive = closed

if start is not None:
Expand Down
6 changes: 2 additions & 4 deletions mars/dataframe/datasource/tests/test_datasource_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def test_read_csv_execution(setup):
"col3": np.arange(100),
}
)
df.iloc[20:, :] = pd.NA
df.iloc[:100, :] = pd.NA
df.to_csv(file_path)

pdf = pd.read_csv(file_path, index_col=0)
Expand All @@ -522,9 +522,7 @@ def test_read_csv_execution(setup):
pd.testing.assert_frame_equal(pdf, result)

# dtypes is inferred as expected
pd.testing.assert_series_equal(
mdf.dtypes, pd.Series(["float64", "object", "int64"], index=df.columns)
)
pd.testing.assert_series_equal(mdf.dtypes, pdf.dtypes)

# test compression
with tempfile.TemporaryDirectory() as tempdir:
Expand Down
12 changes: 9 additions & 3 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
pd_release_version,
estimate_pandas_size,
)
from ..arrays import ArrowArray
from ..core import GROUPBY_TYPE
from ..merge import DataFrameConcat
from ..operands import DataFrameOperand, DataFrameOperandMixin, DataFrameShuffleProxy
Expand Down Expand Up @@ -949,10 +950,15 @@ def _get_grouped(cls, op: "DataFrameGroupByAgg", df, ctx, copy=False, grouper=No
new_by.append(v)
params["by"] = new_by

if op.stage == OperandStage.agg:
grouped = df.groupby(**params)
else:
try:
grouped = df.groupby(**params)
except ValueError: # pragma: no cover
if isinstance(df.index.values, ArrowArray):
df = df.copy()
df.index = pd.Index(df.index.to_numpy(), name=df.index.name)
grouped = df.groupby(**params)
else:
raise

if selection is not None:
grouped = grouped[selection]
Expand Down
13 changes: 9 additions & 4 deletions mars/dataframe/groupby/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ...core.operand import OperandStage, MapReduceOperand
from ...lib.groupby_wrapper import wrapped_groupby
from ...serialization.serializables import BoolField, Int32Field, AnyField
from ...utils import lazy_import
from ...utils import lazy_import, pd_release_version, no_default
from ..align import align_dataframe_series, align_series_series
from ..initializer import Series as asseries
from ..core import SERIES_TYPE, SERIES_CHUNK_TYPE
Expand All @@ -40,6 +40,9 @@

cudf = lazy_import("cudf")

_GROUP_KEYS_NO_DEFAULT = pd_release_version >= (1, 5, 0)
_default_group_keys = no_default if _GROUP_KEYS_NO_DEFAULT else True


class DataFrameGroupByOperand(MapReduceOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.GROUPBY
Expand Down Expand Up @@ -497,11 +500,13 @@ def execute(cls, ctx, op: "DataFrameGroupByOperand"):
level=op.level,
as_index=op.as_index,
sort=op.sort,
group_keys=op.group_keys,
group_keys=op.group_keys if op.group_keys is not None else no_default,
)


def groupby(df, by=None, level=None, as_index=True, sort=True, group_keys=True):
def groupby(
df, by=None, level=None, as_index=True, sort=True, group_keys=_default_group_keys
):
if not as_index and df.op.output_types[0] == OutputType.series:
raise TypeError("as_index=False only valid with DataFrame")

Expand All @@ -519,7 +524,7 @@ def groupby(df, by=None, level=None, as_index=True, sort=True, group_keys=True):
level=level,
as_index=as_index,
sort=sort,
group_keys=group_keys,
group_keys=group_keys if group_keys is not no_default else None,
output_types=output_types,
)
return op(df)

0 comments on commit 98b5ac2

Please sign in to comment.