Skip to content

Commit

Permalink
Support for Native PySpark (#1185)
Browse files Browse the repository at this point in the history
* init

* init

* init structure

* disable imports'

* adding structure for pyspark

* setting dependency

* update class import

* fixing datatype cls

* adding dtypes for pyspark

* keep only bool type

* remove pydantic schema

* register pyspark data types

* add check method

* updating equivalents to native types

* update column schema

* refactor array to column

* rename array to column_schema

* remove pandas imports

* remove index and multiindex functionality

* adding pydantic schema class

* adding model components

* add model config

* define pyspark BaseConfig class

* removing index and multi-indexes

* remove modify schema

* Pyspark backend components, base, container, accessor, test file for accessor

* Pyspark backend components, base, container, accessor, test file for accessor

* Pyspark backend components, base, container, accessor, test file for accessor

* Pyspark backend components, base, container, accessor, test file for accessor

* add pyspark model components and types

* remove hypothesis

* remove synthesis and hypothesis

* Pyspark backend components, base, container, accessor, test file for accessor

* test for pyspark dataframeschema class

* test schema with alias types

* ensuring treat dataframes as tables types

* update container for pyspark dataframe

* adding negative test flow

* removing series and index on pysparrk dataframes

* remove series

* revert series from pyspark.pandas

* adding checks for pyspark

* registering pysparkCheckBackend

* cleaning base

* Fixing the broken type cast check, validation of schema fix.

* define spark level schema

* fixing check flow

* setting apply fn

* add sub sample functionality

* adjusting test case against common attributes

* need apply for column level check

* adding builtin checks for pyspark

* adding checks for pyspark df

* getting check registered

* fixing a bug a in error handling for schema check

* check_name validation fixed

* implementing dtype checks for pyspark

* updating error msg

* fixing dtype reason_code

* updating builtin checks for pyspark

* registeration

* Implementation of checks import and spark columns information check

* enhancing __call__, checks classes and builtin_checks

* delete junk files

* Changes to fix the implemtation of checks. Changed Apply function to send list with dataframe and column name, builtin function registers functions with lists which inculdes the dataframe

* extending pyspark checks

* Fixed builtin check bug and added test for supported builtin checks for pyspark

* add todos

* bydefault validate all checks

* fixing issue with sqlctx

* add dtypes pytests

* setting up schema

* add negative and positive tests

* add fixtures and refactor tests

* generalize spark_df func

* refactor to use conftest

* use conftest

* add support for decimal dtype and fixing other types

* Added new Datatypes support for pyspark, test cases for dtypes pyspark, created test file for error

* refactor ArraySchema

* rename array to column.py

* 1) Changes in test cases to look for summarised error raise instead of fast fail, since default behaviour is changed to summarised.
2) Added functionality to accept and check the precision and scale in Decimal Datatypes.

* add neg test

* add custom ErrorHandler

* Added functionality to DayTimeIntervalType datatype to accept parameters

* Added functionality to DayTimeIntervalType datatype to accept parameters

* return summarized error report

* replace dataframe to dict for return obj

* Changed checks input datatype to custom named tuple from the existing list. Also started changing the pyspark checks to include more datatypes

* refactor

* introduce error categories

* rename error categories

* fixing bug in schema.dtype.check

* fixing error category to by dynamic

* Added checks for each datatype in test cases. Reduced the code redundancy of the code in test file. Refactored the name of custom datatype object for checks.

* error_handler pass through

* add ErrorHandler to column api

* removed SchemaErrors since we now aggregate in errorHandler

* fixing dict keys

* Added Decorator to raise TypeError in case of unexpected input type for the check function.

* replace validator with report_errors

* cleaning debugs

* Support DataModels and Field

* Added Decorator to raise TypeError in case of unexpected input type for the check function. Merged with Develop

* Fix to run using the class schema type

* use alias types

* clean up

* add new typing for pyspark.sql

* Added Decorator to raise TypeError in case of unexpected input type for the check function. Merged with Develop

* Added changes to support raising error for use of datatype not supported by the check and support for map and array type.

* support bare dtypes for DataFrameModel

* remove resolved TODOs and breakpoints

* change to bare types

* use spark types instead of bare types

* using SchemaErrorReason instead of hardcode in container

* fixing an issue with error reason codes

* minor fix

* fixing checks and errors in pyspark

* Changes include the following:
1) Updated dtypes test functionality to make it more readable
2) Changes in accessor tests to support the new functionality
3) Changes in engine class to conform to check class everywhere else

* enhancing dataframeschema and model classes

* Changes to remove the pandas dependency

* Refactoring of the checks test functions

* Fixing the test case breaking

* Isort and Black formatting

* Container Test function failure

* Isort and black linting

* Changes to remove the pandas dependency

* Refactoring of the checks test functions

* Isort and black linting

* Added Changes to refactor the checks class. Fixes to some test cases failures.

* Removing breakpoint

* fixing raise error

* adding metadata dict

* Removing the reference of pandas from docstrings

* Removing redundant code block in utils

* Changes to return dataframe with errors property

* add accessor for errorHandler

* support errors access on pyspark.sql

* updating pyspark error tcs

* fixing model test cases

* adjusting errors to use pandera.errors

* use accessor instead of dict

* revert to  develop

* Removal of imports which are not needed and improved test case.

* setting independent pyspark import

* pyspark imports

* revert comments

* store and retrieve metadata at schema levels

* adding metadata support

* Added changes to support parameter based run.
1) Added parameters.yaml file to hold the configurations
2) Added code in utility to read the config
3) Updated the test cases to support the parameter based run
4) Moved pyspark decorators to a new file decorators.py in backend
5) Type fix in get_matadata property in container.py file

* Changing the default value in config

* change to consistent interface

* cleaning api/pyspark

* backend and tests

* adding setter on errors accessors for pyspark

* reformatting error dict

* doc

* run black linter

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* fix lint

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

* update pylintrc

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>

---------

Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Co-authored-by: jaskaransinghsidana <jaskaran_singh_sidana@mckinsey.com>
Co-authored-by: jaskaransinghsidana <112083212+jaskaransinghsidana@users.noreply.github.com>
Co-authored-by: Niels Bantilan <niels.bantilan@gmail.com>
  • Loading branch information
4 people committed May 13, 2023
1 parent f401617 commit 74be58c
Show file tree
Hide file tree
Showing 83 changed files with 7,455 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repos:
- id: check-yaml
description: Attempts to load all yaml files to verify syntax
- id: debug-statements
description: Check for debugger imports and py37+ breakpoint() calls in python source
description: Check for debugger imports and py37+ calls in python source
- id: end-of-file-fixer
description: Makes sure files end in a newline and only a newline
- id: trailing-whitespace
Expand Down
6 changes: 4 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[BASIC]
ignore=mypy.py,noxfile.py
ignore=mypy.py,noxfile.py,pandera/accessors/pyspark_sql_accessor.py,pandera/engines/pyspark_engine.py,pandera/pyspark.py,pandera/typing/pyspark_sql.py,
ignore-patterns=pandera/api/pyspark/*,tests/pyspark/*
good-names=
T,
F,
Expand Down Expand Up @@ -45,4 +46,5 @@ disable=
function-redefined,
arguments-differ,
unnecessary-dunder-call,
use-dict-literal
use-dict-literal,
invalid-name
3 changes: 3 additions & 0 deletions conf/pyspark/parameters.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Params are case sensitive use only upper case
VALIDATION: ENABLE # Supported Value [ENABLE/DISABLE]
DEPTH: SCHEMA_AND_DATA #[Supported values: SCHEMA_ONLY, DATA_ONLY, SCHEMA_AND_DATA]
8 changes: 7 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,10 @@ allow_redefinition = True
warn_return_any = False
warn_unused_configs = True
show_error_codes = True
exclude = tests/mypy/modules
exclude=(?x)(
^tests/mypy/modules
| ^pandera/engines/pyspark_engine
| ^pandera/api/pyspark
| ^pandera/backends/pyspark
| ^tests/pyspark
)
2 changes: 1 addition & 1 deletion pandera/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""A flexible and expressive pandas validation library."""
import platform

import pandera.backends
from pandera import errors, external_config, typing
from pandera.accessors import pandas_accessor
from pandera.api import extensions
Expand Down Expand Up @@ -85,7 +86,6 @@
except ImportError:
pass


try:
import modin.pandas

Expand Down
136 changes: 136 additions & 0 deletions pandera/accessors/pyspark_sql_accessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Custom accessor functionality for PySpark.Sql.
"""

import warnings
from functools import wraps
from typing import Optional, Union


from pandera.api.pyspark.container import DataFrameSchema
from pandera.api.pyspark.error_handler import ErrorHandler

"""Register pyspark accessor for pandera schema metadata."""


Schemas = Union[DataFrameSchema]
Errors = Union[ErrorHandler]


# Todo Refactor to create a seperate module for panderaAccessor
class PanderaAccessor:
"""Pandera accessor for pyspark object."""

def __init__(self, pyspark_obj):
"""Initialize the pandera accessor."""
self._pyspark_obj = pyspark_obj
self._schema: Optional[Schemas] = None
self._errors: Optional[Errors] = None

@staticmethod
def check_schema_type(schema: Schemas):
"""Abstract method for checking the schema type."""
raise NotImplementedError

def add_schema(self, schema):
"""Add a schema to the pyspark object."""
self.check_schema_type(schema)
self._schema = schema
return self._pyspark_obj

@property
def schema(self) -> Optional[Schemas]:
"""Access schema metadata."""
return self._schema

@property
def errors(self) -> Optional[Errors]:
"""Access errors data."""
return self._errors

@errors.setter
def errors(self, value: dict):
"""Set errors data."""
self._errors = value


class CachedAccessor:
"""
Custom property-like object.
A descriptor for caching accessors:
:param name: Namespace that accessor's methods, properties, etc will be
accessed under, e.g. "foo" for a dataframe accessor yields the accessor
``df.foo``
:param cls: Class with the extension methods.
For accessor, the class's __init__ method assumes that you are registering
an accessor for one of ``Series``, ``DataFrame``, or ``Index``.
"""

def __init__(self, name, accessor):
self._name = name
self._accessor = accessor

def __get__(self, obj, cls):
if obj is None: # pragma: no cover
return self._accessor
accessor_obj = self._accessor(obj)
object.__setattr__(obj, self._name, accessor_obj)
return accessor_obj


def _register_accessor(name, cls):
"""
Register a custom accessor on {class} objects.
:param name: Name under which the accessor should be registered. A warning
is issued if this name conflicts with a preexisting attribute.
:returns: A class decorator callable.
"""

def decorator(accessor):
if hasattr(cls, name):
msg = (
f"registration of accessor {accessor} under name '{name}' for "
"type {cls.__name__} is overriding a preexisting attribute "
"with the same name."
)

warnings.warn(
msg,
UserWarning,
stacklevel=2,
)
setattr(cls, name, CachedAccessor(name, accessor))
return accessor

return decorator


def register_dataframe_accessor(name):
"""
Register a custom accessor with a DataFrame
:param name: name used when calling the accessor after its registered
:returns: a class decorator callable.
"""
# pylint: disable=import-outside-toplevel
from pyspark.sql import DataFrame

return _register_accessor(name, DataFrame)


class PanderaDataFrameAccessor(PanderaAccessor):
"""Pandera accessor for pyspark DataFrame."""

@staticmethod
def check_schema_type(schema):
if not isinstance(schema, DataFrameSchema):
raise TypeError(
f"schema arg must be a DataFrameSchema, found {type(schema)}"
)


register_dataframe_accessor("pandera")(PanderaDataFrameAccessor)
# register_series_accessor("pandera")(PanderaSeriesAccessor)
3 changes: 2 additions & 1 deletion pandera/api/base/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Union,
no_type_check,
)

import pandas as pd
from multimethod import multidispatch as _multidispatch

Expand Down Expand Up @@ -173,7 +174,6 @@ def from_builtin_check_name(
# by the check object
if statistics is None:
statistics = check_kwargs

return cls(
cls.get_builtin_check_fn(name),
statistics=statistics,
Expand All @@ -188,6 +188,7 @@ def register_backend(cls, type_: Type, backend: Type[BaseCheckBackend]):
@classmethod
def get_backend(cls, check_obj: Any) -> Type[BaseCheckBackend]:
"""Get the backend associated with the type of ``check_obj`` ."""

check_obj_cls = type(check_obj)
classes = inspect.getmro(check_obj_cls)
for _class in classes:
Expand Down
1 change: 0 additions & 1 deletion pandera/api/base/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from pandera.api.checks import Check
from pandera.typing import AnnotationInfo


TBaseModel = TypeVar("TBaseModel", bound="BaseModel")


Expand Down
6 changes: 6 additions & 0 deletions pandera/api/base/model_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class BaseFieldInfo:
"title",
"description",
"default",
"metadata",
)

def __init__(
Expand All @@ -58,6 +59,7 @@ def __init__(
title: Optional[str] = None,
description: Optional[str] = None,
default: Optional[Any] = None,
metadata: Optional[dict] = None,
) -> None:
self.checks = to_checklist(checks)
self.nullable = nullable
Expand All @@ -71,6 +73,7 @@ def __init__(
self.title = title
self.description = description
self.default = default
self.metadata = metadata

@property
def name(self) -> str:
Expand Down Expand Up @@ -107,6 +110,9 @@ def __ne__(self, other):
def __set__(self, instance: Any, value: Any) -> None: # pragma: no cover
raise AttributeError(f"Can't set the {self.original_name} field.")

def __get_metadata__(self):
return self.metadata


class BaseCheckInfo: # pylint:disable=too-few-public-methods
"""Captures extra information about a Check."""
Expand Down
19 changes: 16 additions & 3 deletions pandera/api/base/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import inspect
from abc import ABC
from functools import wraps
from typing import Any, Dict, Tuple, Type, Union
from typing import Any, Dict, Tuple, Type, Optional, Union

from pandera.backends.base import BaseSchemaBackend
from pandera.errors import BackendNotFoundError
Expand All @@ -32,6 +32,7 @@ def __init__(
name=None,
title=None,
description=None,
metadata=None,
):
"""Abstract base schema initializer."""
self.dtype = dtype
Expand All @@ -40,6 +41,7 @@ def __init__(
self.name = name
self.title = title
self.description = description
self.metadata = metadata

def validate(
self,
Expand Down Expand Up @@ -69,9 +71,20 @@ def register_backend(cls, type_: Type, backend: Type[BaseSchemaBackend]):
cls.BACKEND_REGISTRY[(cls, type_)] = backend

@classmethod
def get_backend(cls, check_obj: Any) -> BaseSchemaBackend:
def get_backend(
cls,
check_obj: Optional[Any] = None,
check_type: Optional[Type] = None,
) -> BaseSchemaBackend:
"""Get the backend associated with the type of ``check_obj`` ."""
check_obj_cls = type(check_obj)
if check_obj is not None:
check_obj_cls = type(check_obj)
elif check_type is not None:
check_obj_cls = check_type
else:
raise ValueError(
"Must pass in one of `check_obj` or `check_type`."
)
classes = inspect.getmro(check_obj_cls)
for _class in classes:
try:
Expand Down
3 changes: 1 addition & 2 deletions pandera/api/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from pandera.api.base.checks import BaseCheck, CheckResult
from pandera.strategies import SearchStrategy


T = TypeVar("T")


Expand Down Expand Up @@ -165,7 +164,6 @@ def __init__(
"""
super().__init__(name=name, error=error)

if element_wise and groupby is not None:
raise errors.SchemaInitError(
"Cannot use groupby when element_wise=True."
Expand Down Expand Up @@ -480,6 +478,7 @@ def str_startswith(cls, string: str, **kwargs) -> "Check":
:param string: String all values should start with
:param kwargs: key-word arguments passed into the `Check` initializer.
"""

return cls.from_builtin_check_name(
"str_startswith",
kwargs,
Expand Down
9 changes: 6 additions & 3 deletions pandera/api/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ def register_check_method(
check_fn=None,
*,
statistics: Optional[List[str]] = None,
supported_types: Union[type, Tuple, List] = (pd.DataFrame, pd.Series),
supported_types: Union[type, Tuple, List] = (
pd.DataFrame,
pd.Series,
),
check_type: Union[CheckType, str] = "vectorized",
strategy=None,
):
Expand All @@ -156,8 +159,8 @@ def register_check_method(
which serve as the statistics needed to serialize/de-serialize the
check and generate data if a ``strategy`` function is provided.
:param supported_types: the pandas type(s) supported by the check function.
Valid values are ``pd.DataFrame``, ``pd.Series``, or a list/tuple of
``(pa.DataFrame, pa.Series)`` if both types are supported.
Valid values are ``pd.DataFrame``, ``pd.Series``, ``ps.DataFrame``, or a list/tuple of
``(pa.DataFrame, pa.Series, ps.DataFrame)`` if both types are supported.
:param check_type: the expected input of the check function. Valid values
are :class:`~pandera.extensions.CheckType` enums or
``{"vectorized", "element_wise", "groupby"}``. The input signature of
Expand Down
1 change: 0 additions & 1 deletion pandera/api/hypotheses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pandera.api.checks import Check
from pandera.strategies import SearchStrategy


DEFAULT_ALPHA = 0.01


Expand Down
6 changes: 1 addition & 5 deletions pandera/api/pandas/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
from pandera.api.base.schema import BaseSchema, inferred_schema_guard
from pandera.api.checks import Check
from pandera.api.hypotheses import Hypothesis
from pandera.api.pandas.types import (
CheckList,
PandasDtypeInputTypes,
is_field,
)
from pandera.api.pandas.types import CheckList, PandasDtypeInputTypes, is_field
from pandera.dtypes import DataType, UniqueSettings
from pandera.engines import pandas_engine

Expand Down
2 changes: 1 addition & 1 deletion pandera/api/pandas/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def get_regex_columns(
from pandera.backends.pandas.components import ColumnBackend

return cast(
ColumnBackend, self.get_backend(pd.DataFrame())
ColumnBackend, self.get_backend(check_type=pd.DataFrame)
).get_regex_columns(self, columns)

def __eq__(self, other):
Expand Down
1 change: 0 additions & 1 deletion pandera/api/pandas/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ def _validate(
lazy: bool = False,
inplace: bool = False,
) -> pd.DataFrame:

if self._is_inferred:
warnings.warn(
f"This {type(self)} is an inferred schema that hasn't been "
Expand Down

0 comments on commit 74be58c

Please sign in to comment.