diff --git a/docs/requirements-doc.txt b/docs/requirements-doc.txt index 5de4f55aea5..79e78299466 100644 --- a/docs/requirements-doc.txt +++ b/docs/requirements-doc.txt @@ -13,7 +13,7 @@ recommonmark sphinx<6.0.0 sphinx-click # ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100 -ray==2.10.0 +ray>=2.1.0,!=2.5.0 grpcio # Override to latest version of modin-spreadsheet git+https://github.com/modin-project/modin-spreadsheet.git@49ffd89f683f54c311867d602c55443fb11bf2a5 diff --git a/environment-dev.yml b/environment-dev.yml index 0ab5e695a19..4d5dad714b0 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -13,7 +13,7 @@ dependencies: # optional dependencies # ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100 - - ray-core==2.10.0 + - ray-default>=2.1.0,!=2.5.0 - pyarrow>=7.0.0 # workaround for https://github.com/conda/conda/issues/11744 - grpcio!=1.45.* diff --git a/examples/tutorial/jupyter/execution/pandas_on_ray/cluster/modin-cluster.yaml b/examples/tutorial/jupyter/execution/pandas_on_ray/cluster/modin-cluster.yaml index 4d3710edfbc..c940dccc447 100644 --- a/examples/tutorial/jupyter/execution/pandas_on_ray/cluster/modin-cluster.yaml +++ b/examples/tutorial/jupyter/execution/pandas_on_ray/cluster/modin-cluster.yaml @@ -166,7 +166,7 @@ setup_commands: # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line: # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl" - - conda create -n "modin" -c conda-forge modin "ray-default"==2.7.1 -y + - conda create -n "modin" -c conda-forge modin "ray-default">=2.1.0,!=2.5.0 -y - conda activate modin && pip install -U fsspec>=2022.11.0 boto3 - echo "conda activate modin" >> ~/.bashrc - wget https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 8d9a94f40e5..b2ffc2e6788 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -80,7 +80,10 @@ def maybe_compute_dtypes_common_cast( # belong to the intersection, these will be NaN columns in the result mismatch_columns = columns_first ^ columns_second elif isinstance(second, dict): - dtypes_second = {key: np.dtype(type(value)) for key, value in second.items()} + dtypes_second = { + key: pandas.api.types.pandas_dtype(type(value)) + for key, value in second.items() + } columns_first = set(first.columns) columns_second = set(second.keys()) common_columns = columns_first.intersection(columns_second) @@ -90,16 +93,21 @@ def maybe_compute_dtypes_common_cast( else: if isinstance(second, (list, tuple)): second_dtypes_list = ( - [np.dtype(type(value)) for value in second] + [pandas.api.types.pandas_dtype(type(value)) for value in second] if axis == 1 # Here we've been given a column so it has only one dtype, # Infering the dtype using `np.array`, TODO: maybe there's more efficient way? else [np.array(second).dtype] * len(dtypes_first) ) elif is_scalar(second) or isinstance(second, np.ndarray): - second_dtypes_list = [ - getattr(second, "dtype", np.dtype(type(second))) - ] * len(dtypes_first) + try: + dtype = getattr(second, "dtype", None) or pandas.api.types.pandas_dtype( + type(second) + ) + except TypeError: + # For example, dtype '' not understood + dtype = pandas.Series(second).dtype + second_dtypes_list = [dtype] * len(dtypes_first) else: raise NotImplementedError( f"Can't compute common type for {type(first)} and {type(second)}." @@ -117,7 +125,7 @@ def maybe_compute_dtypes_common_cast( mismatch_columns = [] # If at least one column doesn't match, the result of the non matching column would be nan. - nan_dtype = np.dtype(type(np.nan)) + nan_dtype = pandas.api.types.pandas_dtype(type(np.nan)) dtypes = None if func is not None: try: @@ -242,7 +250,9 @@ def try_compute_new_dtypes( try: if infer_dtypes == "bool" or is_bool_dtype(result_dtype): - dtypes = maybe_build_dtypes_series(first, second, dtype=np.dtype(bool)) + dtypes = maybe_build_dtypes_series( + first, second, dtype=pandas.api.types.pandas_dtype(bool) + ) elif infer_dtypes == "common_cast": dtypes = maybe_compute_dtypes_common_cast( first, second, axis=axis, func=None diff --git a/modin/core/dataframe/base/interchange/dataframe_protocol/utils.py b/modin/core/dataframe/base/interchange/dataframe_protocol/utils.py index d4cdeef0c97..3f4d181d758 100644 --- a/modin/core/dataframe/base/interchange/dataframe_protocol/utils.py +++ b/modin/core/dataframe/base/interchange/dataframe_protocol/utils.py @@ -151,7 +151,7 @@ def pandas_dtype_to_arrow_c(dtype: Union[np.dtype, pandas.CategoricalDtype]) -> """ if isinstance(dtype, pandas.CategoricalDtype): return ArrowCTypes.INT64 - elif dtype == np.dtype("O"): + elif dtype == pandas.api.types.pandas_dtype("O"): return ArrowCTypes.STRING format_str = getattr(ArrowCTypes, dtype.name.upper(), None) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 213740755ef..df0e5c46e3a 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -28,7 +28,7 @@ from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype from pandas.core.indexes.api import Index, RangeIndex -from modin.config import IsRayCluster, NPartitions +from modin.config import Engine, IsRayCluster, NPartitions from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe from modin.core.dataframe.base.dataframe.utils import Axis, JoinType from modin.core.dataframe.pandas.dataframe.utils import ( @@ -1647,13 +1647,13 @@ def astype(self, col_dtypes, errors: str = "raise"): if new_dtypes is None: new_dtypes = self_dtypes.copy() # Update the new dtype series to the proper pandas dtype - try: - new_dtype = np.dtype(dtype) - except TypeError: - new_dtype = dtype + new_dtype = pandas.api.types.pandas_dtype(dtype) + if Engine.get() == "Dask" and hasattr(dtype, "_is_materialized"): + # FIXME: https://github.com/dask/distributed/issues/8585 + _ = dtype._materialize_categories() # We cannot infer without computing the dtype if - if isinstance(new_dtype, str) and new_dtype == "category": + if isinstance(new_dtype, pandas.CategoricalDtype): new_dtypes[column] = LazyProxyCategoricalDtype._build_proxy( # Actual parent will substitute `None` at `.set_dtypes_cache` parent=None, @@ -2187,7 +2187,8 @@ def map( # Materializing lazy columns in order to build dtype's index new_columns = new_columns.get(return_lengths=False) dtypes = pandas.Series( - [np.dtype(dtypes)] * len(new_columns), index=new_columns + [pandas.api.types.pandas_dtype(dtypes)] * len(new_columns), + index=new_columns, ) return self.__constructor__( new_partitions, @@ -3425,21 +3426,33 @@ def broadcast_apply_full_axis( else: if new_columns is None: kw["dtypes"] = ModinDtypes( - DtypesDescriptor(remaining_dtype=np.dtype(dtypes)) + DtypesDescriptor( + remaining_dtype=pandas.api.types.pandas_dtype(dtypes) + ) ) else: kw["dtypes"] = ( pandas.Series(dtypes, index=new_columns) if is_list_like(dtypes) else pandas.Series( - [np.dtype(dtypes)] * len(new_columns), index=new_columns + [pandas.api.types.pandas_dtype(dtypes)] * len(new_columns), + index=new_columns, ) ) - - if not keep_partitioning: - if kw["row_lengths"] is None and ModinIndex.is_materialized_index( - new_index + is_index_materialized = ModinIndex.is_materialized_index(new_index) + is_columns_materialized = ModinIndex.is_materialized_index(new_columns) + if axis == 0: + if ( + is_columns_materialized + and len(new_partitions.shape) > 1 + and new_partitions.shape[1] == 1 ): + kw["column_widths"] = [len(new_columns)] + elif axis == 1: + if is_index_materialized and new_partitions.shape[0] == 1: + kw["row_lengths"] = [len(new_index)] + if not keep_partitioning: + if kw["row_lengths"] is None and is_index_materialized: if axis == 0: kw["row_lengths"] = get_length_list( axis_len=len(new_index), num_splits=new_partitions.shape[0] @@ -3449,11 +3462,7 @@ def broadcast_apply_full_axis( self._row_lengths_cache ): kw["row_lengths"] = self._row_lengths_cache - elif len(new_index) == 1 and new_partitions.shape[0] == 1: - kw["row_lengths"] = [1] - if kw["column_widths"] is None and ModinIndex.is_materialized_index( - new_columns - ): + if kw["column_widths"] is None and is_columns_materialized: if axis == 1: kw["column_widths"] = get_length_list( axis_len=len(new_columns), @@ -3464,29 +3473,27 @@ def broadcast_apply_full_axis( new_columns ) == sum(self._column_widths_cache): kw["column_widths"] = self._column_widths_cache - elif len(new_columns) == 1 and new_partitions.shape[1] == 1: - kw["column_widths"] = [1] else: - if ( - axis == 0 - and kw["row_lengths"] is None - and self._row_lengths_cache is not None - and ModinIndex.is_materialized_index(new_index) - and len(new_index) == sum(self._row_lengths_cache) - # to avoid problems that may arise when filtering empty dataframes - and all(r != 0 for r in self._row_lengths_cache) - ): - kw["row_lengths"] = self._row_lengths_cache - if ( - axis == 1 - and kw["column_widths"] is None - and self._column_widths_cache is not None - and ModinIndex.is_materialized_index(new_columns) - and len(new_columns) == sum(self._column_widths_cache) - # to avoid problems that may arise when filtering empty dataframes - and all(w != 0 for w in self._column_widths_cache) - ): - kw["column_widths"] = self._column_widths_cache + if axis == 0: + if ( + kw["row_lengths"] is None + and self._row_lengths_cache is not None + and is_index_materialized + and len(new_index) == sum(self._row_lengths_cache) + # to avoid problems that may arise when filtering empty dataframes + and all(r != 0 for r in self._row_lengths_cache) + ): + kw["row_lengths"] = self._row_lengths_cache + elif axis == 1: + if ( + kw["column_widths"] is None + and self._column_widths_cache is not None + and is_columns_materialized + and len(new_columns) == sum(self._column_widths_cache) + # to avoid problems that may arise when filtering empty dataframes + and all(w != 0 for w in self._column_widths_cache) + ): + kw["column_widths"] = self._column_widths_cache result = self.__constructor__( new_partitions, index=new_index, columns=new_columns, **kw diff --git a/modin/core/dataframe/pandas/metadata/dtypes.py b/modin/core/dataframe/pandas/metadata/dtypes.py index 9f8cec8a0d5..b904c6fbff6 100644 --- a/modin/core/dataframe/pandas/metadata/dtypes.py +++ b/modin/core/dataframe/pandas/metadata/dtypes.py @@ -504,7 +504,7 @@ def _merge_dtypes( # meaning that we shouldn't try computing a new dtype for this column, # so marking it as 'unknown' i: ( - np.dtype(float) + pandas.api.types.pandas_dtype(float) if val._know_all_names and val._remaining_dtype is None else "unknown" ) @@ -531,7 +531,7 @@ def _merge_dtypes( def combine_dtypes(row): if (row == "unknown").any(): return "unknown" - row = row.fillna(np.dtype("float")) + row = row.fillna(pandas.api.types.pandas_dtype("float")) return find_common_type(list(row.values)) dtypes = dtypes_matrix.apply(combine_dtypes, axis=1) @@ -1238,6 +1238,9 @@ def extract_dtype(value): elif hasattr(value, "dtypes"): return value.dtypes elif is_scalar(value): - return np.dtype(type(value)) + if value is None: + # previous type was object instead of 'float64' + return pandas.api.types.pandas_dtype(value) + return pandas.api.types.pandas_dtype(type(value)) else: return np.array(value).dtype diff --git a/modin/core/storage_formats/pandas/aggregations.py b/modin/core/storage_formats/pandas/aggregations.py index e383a38a040..454b75c442b 100644 --- a/modin/core/storage_formats/pandas/aggregations.py +++ b/modin/core/storage_formats/pandas/aggregations.py @@ -65,7 +65,8 @@ def corr_method( qc._modin_frame.copy_columns_cache(), ) new_dtypes = pandas.Series( - np.repeat(np.dtype("float"), len(new_columns)), index=new_columns + np.repeat(pandas.api.types.pandas_dtype("float"), len(new_columns)), + index=new_columns, ) elif numeric_only and qc._modin_frame.has_materialized_dtypes: old_dtypes = qc._modin_frame.dtypes @@ -73,7 +74,8 @@ def corr_method( new_columns = old_dtypes[old_dtypes.map(is_numeric_dtype)].index new_index = new_columns.copy() new_dtypes = pandas.Series( - np.repeat(np.dtype("float"), len(new_columns)), index=new_columns + np.repeat(pandas.api.types.pandas_dtype("float"), len(new_columns)), + index=new_columns, ) else: new_index, new_columns, new_dtypes = None, None, None diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index bbbe6c5b6e6..60306307b42 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2734,7 +2734,7 @@ def _set_item(df, row_loc): # pragma: no cover # we would like to convert it and get its proper internal dtype item_type = item.to_numpy().dtype else: - item_type = np.dtype(type(item)) + item_type = pandas.api.types.pandas_dtype(type(item)) if isinstance(old_dtypes, pandas.Series): new_dtypes[col_loc] = [ @@ -3062,7 +3062,7 @@ def _compute_duplicated(df): # pragma: no cover hashed_modin_frame = self._modin_frame.reduce( axis=1, function=_compute_hash, - dtypes=np.dtype("O"), + dtypes=pandas.api.types.pandas_dtype("O"), ) else: hashed_modin_frame = self._modin_frame diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py index e4a4ab12436..0172d7a0ceb 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py @@ -554,7 +554,7 @@ def is_supported_arrow_type(dtype: pa.lib.DataType) -> bool: return False try: pandas_dtype = dtype.to_pandas_dtype() - return pandas_dtype != np.dtype("O") + return pandas_dtype != pandas.api.types.pandas_dtype("O") except NotImplementedError: return False diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/utils.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/utils.py index 4b9c757ddd5..1a4bd54592c 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/test/utils.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/test/utils.py @@ -13,7 +13,6 @@ import datetime -import numpy as np import pandas import pytest from pandas.core.dtypes.common import is_datetime64_any_dtype, is_object_dtype @@ -108,7 +107,7 @@ def align_datetime_dtypes(*dfs): # datetime.time is considered to be an 'object' dtype in pandas that's why # we have to explicitly check the values type in the column elif ( - dtype == np.dtype("O") + dtype == pandas.api.types.pandas_dtype("O") and col not in time_cols # HDK has difficulties with empty frames, so explicitly skip them # https://github.com/modin-project/modin/issues/3428 diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index cf66de98b2d..7ff6bb3f16c 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -915,5 +915,5 @@ def dtypes(self): def _check_int_or_float(op, dtypes): # noqa: GL08 for t in dtypes: - if t.char not in _SUPPORTED_NUM_TYPE_CODES: + if not isinstance(t, np.dtype) or t.char not in _SUPPORTED_NUM_TYPE_CODES: raise NotImplementedError(f"Operation '{op}' on type '{t.name}'") diff --git a/modin/pandas/base.py b/modin/pandas/base.py index 3bc0f6d9bc3..f63bc04db6c 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -1864,7 +1864,7 @@ def idxmax(self, axis=0, skipna=True, numeric_only=False): # noqa: PR01, RT01, """ Return index of first occurrence of maximum over requested axis. """ - if not all(d != np.dtype("O") for d in self._get_dtypes()): + if not all(d != pandas.api.types.pandas_dtype("O") for d in self._get_dtypes()): raise TypeError("reduce operation 'argmax' not allowed for this dtype") axis = self._get_axis_number(axis) return self._reduce_dimension( @@ -1877,7 +1877,7 @@ def idxmin(self, axis=0, skipna=True, numeric_only=False): # noqa: PR01, RT01, """ Return index of first occurrence of minimum over requested axis. """ - if not all(d != np.dtype("O") for d in self._get_dtypes()): + if not all(d != pandas.api.types.pandas_dtype("O") for d in self._get_dtypes()): raise TypeError("reduce operation 'argmin' not allowed for this dtype") axis = self._get_axis_number(axis) return self._reduce_dimension( diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index b0efd4ceb24..b354542c187 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1590,7 +1590,9 @@ def prod( ): new_index = self.columns if not axis else self.index return Series( - [np.nan] * len(new_index), index=new_index, dtype=np.dtype("object") + [np.nan] * len(new_index), + index=new_index, + dtype=pandas.api.types.pandas_dtype("object"), ) data = self._validate_dtypes_sum_prod_mean(axis, numeric_only, ignore_axis=True) @@ -2107,7 +2109,9 @@ def sum( ): new_index = self.columns if not axis else self.index return Series( - [np.nan] * len(new_index), index=new_index, dtype=np.dtype("object") + [np.nan] * len(new_index), + index=new_index, + dtype=pandas.api.types.pandas_dtype("object"), ) data = self._validate_dtypes_sum_prod_mean( @@ -2991,8 +2995,8 @@ def _validate_dtypes_min_max(self, axis, numeric_only): ): # check if there are columns with dtypes datetime or timedelta if all( - dtype != np.dtype("datetime64[ns]") - and dtype != np.dtype("timedelta64[ns]") + dtype != pandas.api.types.pandas_dtype("datetime64[ns]") + and dtype != pandas.api.types.pandas_dtype("timedelta64[ns]") for dtype in self.dtypes ): raise TypeError("Cannot compare Numeric and Non-Numeric Types") @@ -3024,7 +3028,10 @@ def _validate_dtypes_sum_prod_mean(self, axis, numeric_only, ignore_axis=False): if ( not axis and numeric_only is False - and any(dtype == np.dtype("datetime64[ns]") for dtype in self.dtypes) + and any( + dtype == pandas.api.types.pandas_dtype("datetime64[ns]") + for dtype in self.dtypes + ) ): raise TypeError("Cannot add Timestamp Types") @@ -3042,8 +3049,8 @@ def _validate_dtypes_sum_prod_mean(self, axis, numeric_only, ignore_axis=False): ): # check if there are columns with dtypes datetime or timedelta if all( - dtype != np.dtype("datetime64[ns]") - and dtype != np.dtype("timedelta64[ns]") + dtype != pandas.api.types.pandas_dtype("datetime64[ns]") + and dtype != pandas.api.types.pandas_dtype("timedelta64[ns]") for dtype in self.dtypes ): raise TypeError("Cannot operate on Numeric and Non-Numeric Types") diff --git a/modin/pandas/test/dataframe/test_map_metadata.py b/modin/pandas/test/dataframe/test_map_metadata.py index 8c80d1be480..cd583e7e5bd 100644 --- a/modin/pandas/test/dataframe/test_map_metadata.py +++ b/modin/pandas/test/dataframe/test_map_metadata.py @@ -409,6 +409,11 @@ def test_astype(): expected_df_casted = expected_df.astype(str) df_equals(modin_df_casted, expected_df_casted) + # pandas nullable dtype + modin_df_casted = modin_df.astype("Float64") + expected_df_casted = expected_df.astype("Float64") + df_equals(modin_df_casted, expected_df_casted) + modin_df_casted = modin_df.astype("category") expected_df_casted = expected_df.astype("category") df_equals(modin_df_casted, expected_df_casted) diff --git a/modin/pandas/test/dataframe/test_reduce.py b/modin/pandas/test/dataframe/test_reduce.py index ac94b970080..c301308af4a 100644 --- a/modin/pandas/test/dataframe/test_reduce.py +++ b/modin/pandas/test/dataframe/test_reduce.py @@ -17,7 +17,7 @@ import pytest import modin.pandas as pd -from modin.config import Engine, NPartitions, StorageFormat +from modin.config import NPartitions, StorageFormat from modin.pandas.test.utils import ( arg_keys, assert_dtypes_equal, @@ -316,7 +316,6 @@ def test_sum(data, axis, skipna, is_transposed, request): df_equals(modin_result, pandas_result) -@pytest.mark.skipif(Engine.get() == "Native", reason="Fails on HDK") @pytest.mark.parametrize("dtype", ["int64", "Int64"]) def test_dtype_consistency(dtype): # test for issue #6781 diff --git a/modin/utils.py b/modin/utils.py index b390281e02a..98eb5abab0c 100644 --- a/modin/utils.py +++ b/modin/utils.py @@ -82,7 +82,7 @@ def _to_numpy(self) -> Any: # noqa: GL08 pass -MIN_RAY_VERSION = version.parse("1.13.0") +MIN_RAY_VERSION = version.parse("2.1.0") MIN_DASK_VERSION = version.parse("2.22.0") MIN_UNIDIST_VERSION = version.parse("0.2.1") diff --git a/requirements-dev.txt b/requirements-dev.txt index f992ebebbcf..458a912d691 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -7,7 +7,7 @@ psutil>=5.8.0 ## optional dependencies # ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100 -ray==2.10.0 +ray[default]>=2.1.0,!=2.5.0 pyarrow>=7.0.0 grpcio dask[complete]>=2.22.0 diff --git a/setup.py b/setup.py index 8758edd9a27..8d931e21dce 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ dask_deps = ["dask>=2.22.0", "distributed>=2.22.0"] # ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100 -ray_deps = ["ray==2.10.0", "pyarrow>=7.0.0", "grpcio"] +ray_deps = ["ray[default]>=2.1.0,!=2.5.0", "pyarrow>=7.0.0", "grpcio"] mpi_deps = ["unidist[mpi]>=0.2.1"] spreadsheet_deps = ["modin-spreadsheet>=0.1.0"] # Currently, Modin does not include `mpi` option in `all`.