Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/modin-project/modin into …
Browse files Browse the repository at this point in the history
…try-another-ray-target

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Apr 2, 2024
2 parents 9cef2f9 + 3bf5d8f commit cf1aff5
Show file tree
Hide file tree
Showing 19 changed files with 108 additions and 76 deletions.
2 changes: 1 addition & 1 deletion docs/requirements-doc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ recommonmark
sphinx<6.0.0
sphinx-click
# ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100
ray==2.10.0
ray>=2.1.0,!=2.5.0
grpcio
# Override to latest version of modin-spreadsheet
git+https://github.com/modin-project/modin-spreadsheet.git@49ffd89f683f54c311867d602c55443fb11bf2a5
Expand Down
2 changes: 1 addition & 1 deletion environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies:

# optional dependencies
# ray==2.5.0 broken: https://github.com/conda-forge/ray-packages-feedstock/issues/100
- ray-core==2.10.0
- ray-default>=2.1.0,!=2.5.0
- pyarrow>=7.0.0
# workaround for https://github.com/conda/conda/issues/11744
- grpcio!=1.45.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ setup_commands:
# To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
# that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
# - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"
- conda create -n "modin" -c conda-forge modin "ray-default"==2.7.1 -y
- conda create -n "modin" -c conda-forge modin "ray-default">=2.1.0,!=2.5.0 -y
- conda activate modin && pip install -U fsspec>=2022.11.0 boto3
- echo "conda activate modin" >> ~/.bashrc
- wget https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv
Expand Down
24 changes: 17 additions & 7 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ def maybe_compute_dtypes_common_cast(
# belong to the intersection, these will be NaN columns in the result
mismatch_columns = columns_first ^ columns_second
elif isinstance(second, dict):
dtypes_second = {key: np.dtype(type(value)) for key, value in second.items()}
dtypes_second = {
key: pandas.api.types.pandas_dtype(type(value))
for key, value in second.items()
}
columns_first = set(first.columns)
columns_second = set(second.keys())
common_columns = columns_first.intersection(columns_second)
Expand All @@ -90,16 +93,21 @@ def maybe_compute_dtypes_common_cast(
else:
if isinstance(second, (list, tuple)):
second_dtypes_list = (
[np.dtype(type(value)) for value in second]
[pandas.api.types.pandas_dtype(type(value)) for value in second]
if axis == 1
# Here we've been given a column so it has only one dtype,
# Infering the dtype using `np.array`, TODO: maybe there's more efficient way?
else [np.array(second).dtype] * len(dtypes_first)
)
elif is_scalar(second) or isinstance(second, np.ndarray):
second_dtypes_list = [
getattr(second, "dtype", np.dtype(type(second)))
] * len(dtypes_first)
try:
dtype = getattr(second, "dtype", None) or pandas.api.types.pandas_dtype(
type(second)
)
except TypeError:
# For example, dtype '<class 'datetime.datetime'>' not understood
dtype = pandas.Series(second).dtype
second_dtypes_list = [dtype] * len(dtypes_first)
else:
raise NotImplementedError(
f"Can't compute common type for {type(first)} and {type(second)}."
Expand All @@ -117,7 +125,7 @@ def maybe_compute_dtypes_common_cast(
mismatch_columns = []

# If at least one column doesn't match, the result of the non matching column would be nan.
nan_dtype = np.dtype(type(np.nan))
nan_dtype = pandas.api.types.pandas_dtype(type(np.nan))
dtypes = None
if func is not None:
try:
Expand Down Expand Up @@ -242,7 +250,9 @@ def try_compute_new_dtypes(

try:
if infer_dtypes == "bool" or is_bool_dtype(result_dtype):
dtypes = maybe_build_dtypes_series(first, second, dtype=np.dtype(bool))
dtypes = maybe_build_dtypes_series(
first, second, dtype=pandas.api.types.pandas_dtype(bool)
)
elif infer_dtypes == "common_cast":
dtypes = maybe_compute_dtypes_common_cast(
first, second, axis=axis, func=None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def pandas_dtype_to_arrow_c(dtype: Union[np.dtype, pandas.CategoricalDtype]) ->
"""
if isinstance(dtype, pandas.CategoricalDtype):
return ArrowCTypes.INT64
elif dtype == np.dtype("O"):
elif dtype == pandas.api.types.pandas_dtype("O"):
return ArrowCTypes.STRING

format_str = getattr(ArrowCTypes, dtype.name.upper(), None)
Expand Down
87 changes: 47 additions & 40 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import IsRayCluster, NPartitions
from modin.config import Engine, IsRayCluster, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -1647,13 +1647,13 @@ def astype(self, col_dtypes, errors: str = "raise"):
if new_dtypes is None:
new_dtypes = self_dtypes.copy()
# Update the new dtype series to the proper pandas dtype
try:
new_dtype = np.dtype(dtype)
except TypeError:
new_dtype = dtype
new_dtype = pandas.api.types.pandas_dtype(dtype)
if Engine.get() == "Dask" and hasattr(dtype, "_is_materialized"):
# FIXME: https://github.com/dask/distributed/issues/8585
_ = dtype._materialize_categories()

# We cannot infer without computing the dtype if
if isinstance(new_dtype, str) and new_dtype == "category":
if isinstance(new_dtype, pandas.CategoricalDtype):
new_dtypes[column] = LazyProxyCategoricalDtype._build_proxy(
# Actual parent will substitute `None` at `.set_dtypes_cache`
parent=None,
Expand Down Expand Up @@ -2187,7 +2187,8 @@ def map(
# Materializing lazy columns in order to build dtype's index
new_columns = new_columns.get(return_lengths=False)
dtypes = pandas.Series(
[np.dtype(dtypes)] * len(new_columns), index=new_columns
[pandas.api.types.pandas_dtype(dtypes)] * len(new_columns),
index=new_columns,
)
return self.__constructor__(
new_partitions,
Expand Down Expand Up @@ -3425,21 +3426,33 @@ def broadcast_apply_full_axis(
else:
if new_columns is None:
kw["dtypes"] = ModinDtypes(
DtypesDescriptor(remaining_dtype=np.dtype(dtypes))
DtypesDescriptor(
remaining_dtype=pandas.api.types.pandas_dtype(dtypes)
)
)
else:
kw["dtypes"] = (
pandas.Series(dtypes, index=new_columns)
if is_list_like(dtypes)
else pandas.Series(
[np.dtype(dtypes)] * len(new_columns), index=new_columns
[pandas.api.types.pandas_dtype(dtypes)] * len(new_columns),
index=new_columns,
)
)

if not keep_partitioning:
if kw["row_lengths"] is None and ModinIndex.is_materialized_index(
new_index
is_index_materialized = ModinIndex.is_materialized_index(new_index)
is_columns_materialized = ModinIndex.is_materialized_index(new_columns)
if axis == 0:
if (
is_columns_materialized
and len(new_partitions.shape) > 1
and new_partitions.shape[1] == 1
):
kw["column_widths"] = [len(new_columns)]
elif axis == 1:
if is_index_materialized and new_partitions.shape[0] == 1:
kw["row_lengths"] = [len(new_index)]
if not keep_partitioning:
if kw["row_lengths"] is None and is_index_materialized:
if axis == 0:
kw["row_lengths"] = get_length_list(
axis_len=len(new_index), num_splits=new_partitions.shape[0]
Expand All @@ -3449,11 +3462,7 @@ def broadcast_apply_full_axis(
self._row_lengths_cache
):
kw["row_lengths"] = self._row_lengths_cache
elif len(new_index) == 1 and new_partitions.shape[0] == 1:
kw["row_lengths"] = [1]
if kw["column_widths"] is None and ModinIndex.is_materialized_index(
new_columns
):
if kw["column_widths"] is None and is_columns_materialized:
if axis == 1:
kw["column_widths"] = get_length_list(
axis_len=len(new_columns),
Expand All @@ -3464,29 +3473,27 @@ def broadcast_apply_full_axis(
new_columns
) == sum(self._column_widths_cache):
kw["column_widths"] = self._column_widths_cache
elif len(new_columns) == 1 and new_partitions.shape[1] == 1:
kw["column_widths"] = [1]
else:
if (
axis == 0
and kw["row_lengths"] is None
and self._row_lengths_cache is not None
and ModinIndex.is_materialized_index(new_index)
and len(new_index) == sum(self._row_lengths_cache)
# to avoid problems that may arise when filtering empty dataframes
and all(r != 0 for r in self._row_lengths_cache)
):
kw["row_lengths"] = self._row_lengths_cache
if (
axis == 1
and kw["column_widths"] is None
and self._column_widths_cache is not None
and ModinIndex.is_materialized_index(new_columns)
and len(new_columns) == sum(self._column_widths_cache)
# to avoid problems that may arise when filtering empty dataframes
and all(w != 0 for w in self._column_widths_cache)
):
kw["column_widths"] = self._column_widths_cache
if axis == 0:
if (
kw["row_lengths"] is None
and self._row_lengths_cache is not None
and is_index_materialized
and len(new_index) == sum(self._row_lengths_cache)
# to avoid problems that may arise when filtering empty dataframes
and all(r != 0 for r in self._row_lengths_cache)
):
kw["row_lengths"] = self._row_lengths_cache
elif axis == 1:
if (
kw["column_widths"] is None
and self._column_widths_cache is not None
and is_columns_materialized
and len(new_columns) == sum(self._column_widths_cache)
# to avoid problems that may arise when filtering empty dataframes
and all(w != 0 for w in self._column_widths_cache)
):
kw["column_widths"] = self._column_widths_cache

result = self.__constructor__(
new_partitions, index=new_index, columns=new_columns, **kw
Expand Down
9 changes: 6 additions & 3 deletions modin/core/dataframe/pandas/metadata/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def _merge_dtypes(
# meaning that we shouldn't try computing a new dtype for this column,
# so marking it as 'unknown'
i: (
np.dtype(float)
pandas.api.types.pandas_dtype(float)
if val._know_all_names and val._remaining_dtype is None
else "unknown"
)
Expand All @@ -531,7 +531,7 @@ def _merge_dtypes(
def combine_dtypes(row):
if (row == "unknown").any():
return "unknown"
row = row.fillna(np.dtype("float"))
row = row.fillna(pandas.api.types.pandas_dtype("float"))
return find_common_type(list(row.values))

dtypes = dtypes_matrix.apply(combine_dtypes, axis=1)
Expand Down Expand Up @@ -1238,6 +1238,9 @@ def extract_dtype(value):
elif hasattr(value, "dtypes"):
return value.dtypes
elif is_scalar(value):
return np.dtype(type(value))
if value is None:
# previous type was object instead of 'float64'
return pandas.api.types.pandas_dtype(value)
return pandas.api.types.pandas_dtype(type(value))
else:
return np.array(value).dtype
6 changes: 4 additions & 2 deletions modin/core/storage_formats/pandas/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ def corr_method(
qc._modin_frame.copy_columns_cache(),
)
new_dtypes = pandas.Series(
np.repeat(np.dtype("float"), len(new_columns)), index=new_columns
np.repeat(pandas.api.types.pandas_dtype("float"), len(new_columns)),
index=new_columns,
)
elif numeric_only and qc._modin_frame.has_materialized_dtypes:
old_dtypes = qc._modin_frame.dtypes

new_columns = old_dtypes[old_dtypes.map(is_numeric_dtype)].index
new_index = new_columns.copy()
new_dtypes = pandas.Series(
np.repeat(np.dtype("float"), len(new_columns)), index=new_columns
np.repeat(pandas.api.types.pandas_dtype("float"), len(new_columns)),
index=new_columns,
)
else:
new_index, new_columns, new_dtypes = None, None, None
Expand Down
4 changes: 2 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2734,7 +2734,7 @@ def _set_item(df, row_loc): # pragma: no cover
# we would like to convert it and get its proper internal dtype
item_type = item.to_numpy().dtype
else:
item_type = np.dtype(type(item))
item_type = pandas.api.types.pandas_dtype(type(item))

if isinstance(old_dtypes, pandas.Series):
new_dtypes[col_loc] = [
Expand Down Expand Up @@ -3062,7 +3062,7 @@ def _compute_duplicated(df): # pragma: no cover
hashed_modin_frame = self._modin_frame.reduce(
axis=1,
function=_compute_hash,
dtypes=np.dtype("O"),
dtypes=pandas.api.types.pandas_dtype("O"),
)
else:
hashed_modin_frame = self._modin_frame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def is_supported_arrow_type(dtype: pa.lib.DataType) -> bool:
return False
try:
pandas_dtype = dtype.to_pandas_dtype()
return pandas_dtype != np.dtype("O")
return pandas_dtype != pandas.api.types.pandas_dtype("O")
except NotImplementedError:
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import datetime

import numpy as np
import pandas
import pytest
from pandas.core.dtypes.common import is_datetime64_any_dtype, is_object_dtype
Expand Down Expand Up @@ -108,7 +107,7 @@ def align_datetime_dtypes(*dfs):
# datetime.time is considered to be an 'object' dtype in pandas that's why
# we have to explicitly check the values type in the column
elif (
dtype == np.dtype("O")
dtype == pandas.api.types.pandas_dtype("O")
and col not in time_cols
# HDK has difficulties with empty frames, so explicitly skip them
# https://github.com/modin-project/modin/issues/3428
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,5 +915,5 @@ def dtypes(self):

def _check_int_or_float(op, dtypes): # noqa: GL08
for t in dtypes:
if t.char not in _SUPPORTED_NUM_TYPE_CODES:
if not isinstance(t, np.dtype) or t.char not in _SUPPORTED_NUM_TYPE_CODES:
raise NotImplementedError(f"Operation '{op}' on type '{t.name}'")
4 changes: 2 additions & 2 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,7 +1864,7 @@ def idxmax(self, axis=0, skipna=True, numeric_only=False): # noqa: PR01, RT01,
"""
Return index of first occurrence of maximum over requested axis.
"""
if not all(d != np.dtype("O") for d in self._get_dtypes()):
if not all(d != pandas.api.types.pandas_dtype("O") for d in self._get_dtypes()):
raise TypeError("reduce operation 'argmax' not allowed for this dtype")
axis = self._get_axis_number(axis)
return self._reduce_dimension(
Expand All @@ -1877,7 +1877,7 @@ def idxmin(self, axis=0, skipna=True, numeric_only=False): # noqa: PR01, RT01,
"""
Return index of first occurrence of minimum over requested axis.
"""
if not all(d != np.dtype("O") for d in self._get_dtypes()):
if not all(d != pandas.api.types.pandas_dtype("O") for d in self._get_dtypes()):
raise TypeError("reduce operation 'argmin' not allowed for this dtype")
axis = self._get_axis_number(axis)
return self._reduce_dimension(
Expand Down
Loading

0 comments on commit cf1aff5

Please sign in to comment.