Skip to content

Commit

Permalink
[WIP] support koalas and modin (#658)
Browse files Browse the repository at this point in the history
add tests for koalas

fix type issues with koalas patch to pd.Series, DataFrame

add datatype koalas tests

finish writing initial test suite for koalas

fix regressions

configure koalas

fix regressions

update pylint dep

update deps

update black

fix lint

use context manager for koalas ops_on_diff_frames

updates

update pre-commit mypy

typing ignore

fix docs

install hypothesis for koalas ci

don't cover modin import check

better handling of timestamp

fix koalas

wip

wip

wip

coverage

hypothesis health check
  • Loading branch information
cosmicBboy committed Nov 11, 2021
1 parent 0a72a51 commit 12378ea
Show file tree
Hide file tree
Showing 24 changed files with 986 additions and 118 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci-tests.yml
Expand Up @@ -145,6 +145,13 @@ jobs:
--non-interactive
--session "tests-${{ matrix.python-version }}(extra='strategies', pandas='${{ matrix.pandas-version }}')"
- name: Unit Tests - Koalas
run: >
nox
-db virtualenv -r -v
--non-interactive
--session "tests-${{ matrix.python-version }}(extra='koalas', pandas='${{ matrix.pandas-version }}')"
- name: Upload coverage to Codecov
uses: "codecov/codecov-action@v1"

Expand Down
8 changes: 8 additions & 0 deletions docs/source/conf.py
Expand Up @@ -64,10 +64,18 @@
else:
SKIP_STRATEGY = False
try:
import koalas
except ImportError:
KOALAS_INSTALLED = True
else:
KOALAS_INSTALLED = False
SKIP = sys.version_info < (3, 6)
PY36 = sys.version_info < (3, 7)
SKIP_PANDAS_LT_V1 = version.parse(pd.__version__).release < (1, 0) or PY36
SKIP_SCALING = True
SKIP_SCHEMA_MODEL = SKIP_PANDAS_LT_V1 or KOALAS_INSTALLED
"""

doctest_default_flags = (
Expand Down
4 changes: 2 additions & 2 deletions docs/source/schema_models.rst
Expand Up @@ -194,13 +194,13 @@ You must give a **type**, not an **instance**.
:red:`` Bad:

.. testcode:: dataframe_schema_model
:skipif: SKIP_PANDAS_LT_V1
:skipif: SKIP_SCHEMA_MODEL

class Schema(pa.SchemaModel):
a: Series[pd.StringDtype()]

.. testoutput:: dataframe_schema_model
:skipif: SKIP_PANDAS_LT_V1
:skipif: SKIP_SCHEMA_MODEL

Traceback (most recent call last):
...
Expand Down
6 changes: 5 additions & 1 deletion environment.yml
Expand Up @@ -19,14 +19,18 @@ dependencies:
- frictionless
- pyarrow

# koalas extra
- koalas
- pyspark

# testing and dependencies
- black >= 20.8b1

# testing
- isort >= 5.7.0
- codecov
- mypy >= 0.902 # mypy no longer bundle stubs for third-party libraries
- pylint = v2.11.1
- pylint = 2.11.1
- pytest
- pytest-cov
- pytest-xdist
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Expand Up @@ -184,7 +184,7 @@ def install_extras(
specs.append(
spec if spec != "pandas" else f"pandas{pandas_version}"
)
if extra == "core":
if extra in {"core", "koalas"}:
specs.append(REQUIRES["all"]["hypothesis"])

# this is a temporary measure to install setuptools due to this issue:
Expand Down
2 changes: 2 additions & 0 deletions pandera/__init__.py
@@ -1,6 +1,7 @@
"""A flexible and expressive pandas validation library."""
import platform

from pandera import external_config
from pandera.dtypes import (
Bool,
Category,
Expand Down Expand Up @@ -56,4 +57,5 @@
from .version import __version__

if platform.system() != "Windows":
# pylint: disable=ungrouped-imports
from pandera.dtypes import Complex256, Float128
98 changes: 94 additions & 4 deletions pandera/check_utils.py
@@ -1,9 +1,88 @@
"""Utility functions for validation."""

from typing import Optional, Tuple, Union
from functools import lru_cache
from typing import NamedTuple, Optional, Tuple, Union

import pandas as pd

SupportedTypes = NamedTuple(
"SupportedTypes",
(
("table_types", Tuple[type]),
("field_types", Tuple[type]),
("index_types", Tuple[type]),
("multiindex_types", Tuple[type]),
),
)


@lru_cache(maxsize=None)
def _supported_types():
# pylint: disable=import-outside-toplevel
table_types = [pd.DataFrame]
field_types = [pd.Series]
index_types = [pd.Index]
multiindex_types = [pd.MultiIndex]

try:
import databricks.koalas as ks

table_types.append(ks.DataFrame)
field_types.append(ks.Series)
index_types.append(ks.Index)
multiindex_types.append(ks.MultiIndex)
except ImportError:
pass
try: # pragma: no cover
import modin.pandas as mpd

table_types.append(mpd.DataFrame)
field_types.append(mpd.Series)
index_types.append(mpd.Index)
multiindex_types.append(mpd.MultiIndex)
except ImportError:
pass

return SupportedTypes(
tuple(table_types),
tuple(field_types),
tuple(index_types),
tuple(multiindex_types),
)


def is_table(obj):
"""Verifies whether an object is table-like.
Where a table is a 2-dimensional data matrix of rows and columns, which
can be indexed in multiple different ways.
"""
return isinstance(obj, _supported_types().table_types)


def is_field(obj):
"""Verifies whether an object is field-like.
Where a field is a columnar representation of data in a table-like
data structure.
"""
return isinstance(obj, _supported_types().field_types)


def is_index(obj):
"""Verifies whether an object is a table index."""
return isinstance(obj, _supported_types().index_types)


def is_multiindex(obj):
"""Verifies whether an object is a multi-level table index."""
return isinstance(obj, _supported_types().multiindex_types)


def is_supported_check_obj(obj):
"""Verifies whether an object is table- or field-like."""
return is_table(obj) or is_field(obj)


def prepare_series_check_output(
check_obj: Union[pd.Series, pd.DataFrame],
Expand All @@ -25,9 +104,20 @@ def prepare_series_check_output(
check_output = check_output | isna
failure_cases = check_obj[~check_output]
if not failure_cases.empty and n_failure_cases is not None:
failure_cases = failure_cases.groupby(check_output).head(
n_failure_cases
)
# NOTE: this is a hack to support koalas, since you can't use groupby
# on a dataframe with another dataframe
if type(failure_cases).__module__.startswith("databricks.koalas"):
failure_cases = (
failure_cases.rename("failure_cases")
.to_frame()
.assign(check_output=check_output)
.groupby("check_output")
.head(n_failure_cases)["failure_cases"]
)
else:
failure_cases = failure_cases.groupby(check_output).head(
n_failure_cases
)
return check_output, failure_cases


Expand Down
29 changes: 14 additions & 15 deletions pandera/checks.py
Expand Up @@ -367,27 +367,27 @@ def __call__(
``failure_cases``: subset of the check_object that failed.
"""
# prepare check object
if isinstance(df_or_series, pd.Series) or (
column is not None and isinstance(df_or_series, pd.DataFrame)
if check_utils.is_field(df_or_series) or (
column is not None and check_utils.is_table(df_or_series)
):
check_obj = self._prepare_series_input(df_or_series, column)
elif isinstance(df_or_series, pd.DataFrame):
elif check_utils.is_table(df_or_series):
check_obj = self._prepare_dataframe_input(df_or_series)
else:
raise ValueError(
f"object of type {df_or_series} not supported. Must be a "
"Series, a dictionary of Series, or DataFrame"
f"object of type {type(df_or_series)} not supported. Must be "
"a Series, a dictionary of Series, or DataFrame"
)

# apply check function to check object
check_fn = partial(self._check_fn, **self._check_kwargs)

if self.element_wise:
check_output = (
check_obj.apply(check_fn, axis=1)
if isinstance(check_obj, pd.DataFrame)
else check_obj.map(check_fn)
if isinstance(check_obj, pd.Series)
check_obj.apply(check_fn, axis=1) # type: ignore
if check_utils.is_table(check_obj)
else check_obj.map(check_fn) # type: ignore
if check_utils.is_field(check_obj)
else check_fn(check_obj)
)
else:
Expand All @@ -399,12 +399,12 @@ def __call__(
if (
isinstance(check_obj, dict)
or isinstance(check_output, bool)
or not isinstance(check_output, (pd.Series, pd.DataFrame))
or not check_utils.is_supported_check_obj(check_output)
or check_obj.shape[0] != check_output.shape[0]
or (check_obj.index != check_output.index).all()
):
failure_cases = None
elif isinstance(check_output, pd.Series):
elif check_utils.is_field(check_output):
(
check_output,
failure_cases,
Expand All @@ -414,7 +414,7 @@ def __call__(
ignore_na=self.ignore_na,
n_failure_cases=self.n_failure_cases,
)
elif isinstance(check_output, pd.DataFrame):
elif check_utils.is_table(check_output):
(
check_output,
failure_cases,
Expand All @@ -432,12 +432,11 @@ def __call__(

check_passed = (
check_output.all()
if isinstance(check_output, pd.Series)
if check_utils.is_field(check_output)
else check_output.all(axis=None)
if isinstance(check_output, pd.DataFrame)
if check_utils.is_table(check_output)
else check_output
)

return CheckResult(
check_output, check_passed, check_obj, failure_cases
)
Expand Down
42 changes: 35 additions & 7 deletions pandera/engines/pandas_engine.py
Expand Up @@ -444,12 +444,28 @@ class NpString(numpy_engine.String):
"""Specializes numpy_engine.String.coerce to handle pd.NA values."""

def coerce(self, data_container: PandasObject) -> np.ndarray:
# Convert to object first to avoid
# TypeError: object cannot be converted to an IntegerDtype
data_container = data_container.astype(object)
return data_container.where(
data_container.isna(), data_container.astype(str)
)
def _to_str(obj):
# NOTE: this is a hack to handle the following case:
# koalas.Index doesn't support .where method yet, use numpy
reverter = None
if type(obj).__module__.startswith("databricks.koalas"):
# pylint: disable=import-outside-toplevel
import databricks.koalas as ks

if isinstance(obj, ks.Index):
obj = obj.to_series()
reverter = ks.Index
else:
obj = obj.astype(object)

obj = (
obj.astype(str)
if obj.notna().all(axis=None)
else obj.where(obj.isna(), obj.astype(str))
)
return obj if reverter is None else reverter(obj)

return _to_str(data_container)

def check(self, pandera_dtype: dtypes.DataType) -> bool:
return isinstance(pandera_dtype, (numpy_engine.Object, type(self)))
Expand All @@ -471,6 +487,7 @@ def check(self, pandera_dtype: dtypes.DataType) -> bool:
object,
np.object_,
np.bytes_,
np.string_,
],
)

Expand Down Expand Up @@ -517,7 +534,18 @@ def __post_init__(self):

def coerce(self, data_container: PandasObject) -> PandasObject:
def _to_datetime(col: pd.Series) -> pd.Series:
col = pd.to_datetime(col, **self.to_datetime_kwargs)
# NOTE: this is a hack to support koalas. This needs to be
# thoroughly tested, right now koalas returns NA when a dtype value
# can't be coerced into the target dtype.
to_datetime_fn = pd.to_datetime
if type(col).__module__.startswith(
"databricks.koalas"
): # pragma: no cover
# pylint: disable=import-outside-toplevel
import databricks.koalas as ks

to_datetime_fn = ks.to_datetime
col = to_datetime_fn(col, **self.to_datetime_kwargs)
return col.astype(self.type)

if isinstance(data_container, pd.DataFrame):
Expand Down

0 comments on commit 12378ea

Please sign in to comment.