Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pyspark.sql.DataFrame #1138

Closed
cosmicBboy opened this issue Mar 23, 2023 · 26 comments · Fixed by #1243
Closed

Support pyspark.sql.DataFrame #1138

cosmicBboy opened this issue Mar 23, 2023 · 26 comments · Fixed by #1243
Assignees
Labels
enhancement New feature or request

Comments

@cosmicBboy
Copy link
Collaborator

Is your feature request related to a problem? Please describe.

Currently, pandera only supports validation of pyspark.pandas.DataFrame objects. This issue will track the work needed to support pyspark.sql.DataFrame objects.

Describe the solution you'd like

The solution will require work in three major areas:

  • engines: create a pyspark_engine.py that implements just a few of the basic data types (bool, int, float, str). These can be expanded later when we have a working POC.
  • backends: create a pyspark backend for validating dataframes and columns contained in the dataframe
  • api: depending on how well the current DataFrameSchema and Column abstraction fits pyspark.sql, we can simply refactor the codebase so that we have a base class that both pandas and pyspark inherit from.

The initial POC for pyspark support can live in the main pandera codebase itself, since the pandera[pyspark] extra already exists. If, after getting a working POC, we feel like it would be easier to maintain a separate pandera-pyspark package, we can do so (see discussion here), either as a pandera monorepo, or separate repos per data framework (e.g. pandera-pyspark, pandera-polars, etc).

Describe alternatives you've considered
NA

Additional context
NA

@cosmicBboy cosmicBboy added the enhancement New feature or request label Mar 23, 2023
@NeerajMalhotra-QB
Copy link
Collaborator

NeerajMalhotra-QB commented Mar 23, 2023

hi @cosmicBboy, I am working on implementing pyspark.sql.DataFrame integration for Pandera. As I understood the purpose of engines, I believe it helps add type checks for framework specific data types. Engine is a wrapper around native datatypes. Feel free to correct this understanding!!!

With this in mind, I have, added a new engine: pyspark_engine.py for pyspark specifically as follows:

"""PySpark engine and data types."""
# pylint:disable=too-many-ancestors

# docstrings are inherited
# pylint:disable=missing-class-docstring

# pylint doesn't know about __init__ generated with dataclass
# pylint:disable=unexpected-keyword-arg,no-value-for-parameter
import builtins
import dataclasses
import datetime
import decimal
import inspect
import warnings
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Type,
    Union,
    cast,
)

from pydantic import BaseModel, ValidationError

from pandera import dtypes, errors
from pandera.dtypes import immutable
from pandera.engines import engine
import pyspark.sql.types as pst

try:
    import pyarrow  # pylint:disable=unused-import

    PYARROW_INSTALLED = True
except ImportError:
    PYARROW_INSTALLED = False

try:
    from typing import Literal  # type: ignore
except ImportError:
    from typing_extensions import Literal  # type: ignore


@immutable(init=True)
class DataType(dtypes.DataType):
    """Base `DataType` for boxing PySpark data types."""

    type: Any = dataclasses.field(repr=False, init=False)
    """Native pyspark dtype boxed by the data type."""

    def __init__(self, dtype: Any):
        super().__init__()
        object.__setattr__(self, "type", dtype)
        dtype_cls = dtype if inspect.isclass(dtype) else dtype.__class__
        warnings.warn(
            f"'{dtype_cls}' support is not guaranteed.\n"
            + "Usage Tip: Consider writing a custom "
            + "pandera.dtypes.DataType or opening an issue at "
            + "https://github.com/pandera-dev/pandera"
        )

    def __post_init__(self):
        # this method isn't called if __init__ is defined
        object.__setattr__(self, "type", self.type)  # pragma: no cover

    def check(
        self,
        pandera_dtype: dtypes.DataType,
    ) -> Union[bool, Iterable[bool]]:
        try:
            pandera_dtype = Engine.dtype(pandera_dtype)

        except TypeError:
            return False

        # attempts to compare pandas native type if possible
        # to let subclass inherit check
        # (super will compare that DataType classes are exactly the same)
        try:
            return self.type == pandera_dtype.type or super().check(pandera_dtype)
        except TypeError:
            return super().check(pandera_dtype)

    def __str__(self) -> str:
        return str(self.type)

    def __repr__(self) -> str:
        return f"DataType({self})"


class Engine(  # pylint:disable=too-few-public-methods
    metaclass=engine.Engine,
    base_pandera_dtypes=(DataType),
):
    """PySpark data type engine."""

    @classmethod
    def dtype(cls, data_type: Any) -> dtypes.DataType:
        """Convert input into a pyspark-compatible
        Pandera :class:`~pandera.dtypes.DataType` object."""
        try:
            return engine.Engine.dtype(cls, data_type)
        except TypeError:
            raise


###############################################################################
# boolean
###############################################################################


@Engine.register_dtype(
    equivalents=["bool", pst.BooleanType()],
)
@immutable
class Bool(DataType, dtypes.Bool):
    """Semantic representation of a :class:`pyspark.sql.types.BooleanType`."""

    type = pst.BooleanType()
    _bool_like = frozenset({True, False})

    def coerce_value(self, value: Any) -> Any:
        """Coerce an value to specified boolean type."""
        if value not in self._bool_like:
            raise TypeError(f"value {value} cannot be coerced to type {self.type}")
        return super().coerce_value(value)


@Engine.register_dtype(
    equivalents=["string", pst.StringType()],  # type: ignore
)
@immutable
class String(DataType, dtypes.String):  # type: ignore
    """Semantic representation of a :class:`pyspark.sql.StringType`."""

    type = pst.StringType()  # type: ignore


@Engine.register_dtype(
    equivalents=["int", pst.IntegerType()],  # type: ignore
)
@immutable
class Int(DataType, dtypes.Int):  # type: ignore
    """Semantic representation of a :class:`pyspark.sql.IntegerType`."""

    type = pst.IntegerType()  # type: ignore


@Engine.register_dtype(
    equivalents=["float", pst.FloatType()],  # type: ignore
)
@immutable
class Float(DataType, dtypes.Float):  # type: ignore
    """Semantic representation of a :class:`pyspark.sql.FloatType`."""

    type = pst.FloatType()  # type: ignore

Let me know if this is in right direction. Specifically pay close attention to @Engine.register_dtype. Do we need equivalents to include pandera dtypes too?

Secondly, what is the difference between

@Engine.register_dtype(
    equivalents=["int", pst.IntegerType()],  # type: ignore
)

vs

@Engine.register_dtype(
    pst.IntegerType(),
    equivalents=["int", pst.IntegerType()],  # type: ignore
)

Looking forward to your thoughts on above questions. Thanks :)

@cosmicBboy
Copy link
Collaborator Author

I am working on implementing pyspark.sql.DataFrame integration for Pandera. As I understood the purpose of engines, I believe it helps add type checks for framework specific data types. Engine is a wrapper around native datatypes. Feel free to correct this understanding!!!

Yes, this is correct! The engines (really should be called type_engines) subpackage contains a pandera-specific type system that allows users do data-type and logical-type checking, providing utilities to alias the type to a known set of strings or equivalent types.

@cosmicBboy
Copy link
Collaborator Author

cosmicBboy commented Apr 6, 2023

Notes 3/6/2023

@NeerajMalhotra-QB @cosmicBboy

  • Parameterize checks to have access to check attributes, e.g. n_failure_cases so that user can control how many rows to check.
  • Switches:
    • Killswitch: an env var to don't do validation, or don't raise an error
    • Just validate schemas, don't valid the actual data values

@cosmicBboy
Copy link
Collaborator Author

cosmicBboy commented Apr 14, 2023

Notes 4/14/2023

@NeerajMalhotra-QB @cosmicBboy

  • Datatypes: to support map and array data types? For now perhaps only support the data type validation, not custom checks.
  • Error handling: create a new error handler class for the pyspark integration
  • Clean up the way backend containers calls validate on the schema components: currently it'll call the e.g. Column.validate method
  • For the pyspark use case, introduce a new method like schema.report_error to return a data structure containing th errors.
  • Metadata (tags): be able to create sub-schemas from a big schema based on use cases (expressed as tags or metadata) so that users can see which columns are needed for a particular use case e.g. ("house_price" and "gas_prices" use cases)

@NeerajMalhotra-QB
Copy link
Collaborator

Notes 4/14/2023

@NeerajMalhotra-QB @cosmicBboy

  • Datatypes: to support map and array data types? For now perhaps only support the data type validation, not custom checks.

  • Error handling: create a new error handler class for the pyspark integration

  • Clean up the way backend containers calls validate on the schema components: currently it'll call the e.g. Column.validate method

  • For the pyspark use case, introduce a new method like schema.report_error to return a data structure containing th errors.

  • Metadata (tags): be able to create sub-schemas from a big schema based on use cases (expressed as tags or metadata) so that users can see which columns are needed for a particular use case e.g. ("house_price" and "gas_prices" use cases)

Thanks Niels (@cosmicBboy) for summarizing our discussion.

@cosmicBboy
Copy link
Collaborator Author

cosmicBboy commented Apr 20, 2023

Notes 4/20/2023

  • pinpoint where in model.py the changes need to make that the DataFrameModel can support using the bare types instead of needing to write Series[<TYPE>]
  • need to recommend a path forward re: Map and Array types, to be able to handle parameterized checks

@NeerajMalhotra-QB
Copy link
Collaborator

Notes 4/20/2023

  • pinpoint where in model.py the changes need to make that the DataFrameModel can support using the bare types instead of needing to write Series[<TYPE>]
  • need to recommend a path forward re: Map and Array types, to be able to handle parameterized checks

For pt1, I think this is one of the areas which needs to be changed to support pyspark generics:

pandera/api/pandas/model.py
if (
                annotation.origin in SERIES_TYPES
                or annotation.raw_annotation in SERIES_TYPES
            ):
                col_constructor = field.to_column if field else Column

                if check_name is False:
                    raise SchemaInitError(
                        f"'check_name' is not supported for {field_name}."
                    )

                columns[field_name] = col_constructor(  # type: ignore
                    dtype,
                    required=not annotation.optional,
                    checks=field_checks,
                    name=field_name,
                )

@NeerajMalhotra-QB
Copy link
Collaborator

hi Niels (@cosmicBboy)
One of the challenge I am seeing with attempting to support generic types ('str', 'int' etc) in Field is that it's not setting AnnotationInfo properly as it's closely coupled with pandera.typing and expecting all types to be either Series or Index.

Here's a sample schema definition using DataFrameModel class & Field and its representation in AnnotationInfo.

class pandera_schema(DataFrameModel):
        product: str = Field(str_startswith="B")
        price: int = Field(gt=5)
        id: int

in pandera/api/pyspark/model.py
_build_columns_index returns None for column types.

{'product': <Schema Column(name=product, type=None)>, 'price': <Schema Column(name=price, type=None)>}
annotation.metadata = {}
annotation.arg = {}

I guess this will need changes in common areas too (e.g. pandas flows). WDYT?

@NeerajMalhotra-QB
Copy link
Collaborator

NeerajMalhotra-QB commented Apr 21, 2023

In case we define pandera schema with native pyspark types as follows, it gets past previous issue but type=None still persists which leads to issue in check_dtype.

class pandera_schema(DataFrameModel):
        product: T.StringType() = Field(str_startswith="B")
        price: T.IntegerType() = Field(gt=5)
        id: T.IntegerType() = Field()

Now I feel if we can find a way to set type correctly with aliases ('str', 'int', etc) we defined in pyspark.engine for native pyspark.sql, it will work.

@cosmicBboy
Copy link
Collaborator Author

Hi @NeerajMalhotra-QB, will do a little digging and get back to you early next week!

@NeerajMalhotra-QB
Copy link
Collaborator

Hi Niels (@cosmicBboy), I just pushed a change PR to support DataFrameModel for pyspark.sql, more specifically Field(). Please feel free to look at above PR.

There are some areas we need to redesign under pandera.typing which I would be looking at next.

@cosmicBboy
Copy link
Collaborator Author

Hi @NeerajMalhotra-QB, check out this PR which adds support for the bare data type (instead of using pandera.typing.Series)

Re: your PR, I'd rather not introduce a pandera.typing.Column generic, as #1166 establishes a pattern for using the data type directly in the DataFrameModel definition.

@cosmicBboy
Copy link
Collaborator Author

need to recommend a path forward re: Map and Array types, to be able to handle parameterized checks

Looking into this issue now

@cosmicBboy
Copy link
Collaborator Author

Notes 4/27/2023

@NeerajMalhotra-QB @cosmicBboy

  • Use data_container in the check method of the pyspark types
  • Consider generalizing the parameterized type system

TODO:

  • The validation killswitch: some global config that disables all validation
  • Control over schema-level checks, data-level checks, or both

Release Plan

  • Cut a beta release for 1.6.0b0 by next week
  • Write a blog post to announce the Pyspark native integration

@NeerajMalhotra-QB
Copy link
Collaborator

@cosmicBboy, I will draft an initial version for blog by next week and we can collaborate on it. Let's plan it for right after we merge the changes in beta release. :)

@cosmicBboy
Copy link
Collaborator Author

Awesome, thanks @NeerajMalhotra-QB !

@cosmicBboy
Copy link
Collaborator Author

cosmicBboy commented May 4, 2023

Notes 5/4/2023

@NeerajMalhotra-QB @cosmicBboy

  • pyspark native integration has its own namespace: pandera.pyspark, basically create pyspark.py file and import all public classes/functions in there.
  • metadata tags for fields/columns and also at the dataframe level: these are meant to be used in userspace for folks to define their own ways of leveraging the metadata
  • disabling validation globally: set as a configuration at the framework-level, e.g. for pandas api, pyspark api

TODO:

  • Release 0.16.0b0 next week, 5/11/2023

@NeerajMalhotra-QB
Copy link
Collaborator

Thanks @cosmicBboy for capturing notes.

@NeerajMalhotra-QB
Copy link
Collaborator

Hi Niels (@cosmicBboy). I have a quick question on DataFrameModel.

I noticed that we need to use Config class to add any dataframe level details if we are defining schema using DataFrameModel class. Here's an example from pandera docs:

class Schema(pa.DataFrameModel):

    year: Series[int] = pa.Field(gt=2000, coerce=True)
    month: Series[int] = pa.Field(ge=1, le=12, coerce=True)
    day: Series[int] = pa.Field(ge=0, le=365, coerce=True)

    class Config:
        name = "BaseSchema"
        strict = True
        coerce = True
        foo = "bar"  # Interpreted as dataframe check

But we follow different style for schema definitions in case of DataFrameSchema class (everything at one level):

import pandera as pa

from pandera import Column, DataFrameSchema, Check, Index

schema = DataFrameSchema(
    {
        "column1": Column(int),
        "column2": Column(float, Check(lambda s: s < -1.2)),
        # you can provide a list of validators
        "column3": Column(str, [
           Check(lambda s: s.str.startswith("value")),
           Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
        ]),
    },
    index=Index(int),
    strict=True,
    coerce=True,
)

So is my understanding correct in saying if we have to support metadata in DataFrameModel then it has to be the part of Config class.

In other words, this is how I will add metadata for DataFrameModel based schema:

class pandera_schema(DataFrameModel):
        id: T.IntegerType() = Field(
            gt=5,
            metadata={"usecase": ["t1", "r1"], "type": "product_pricing"},
        )
        product_name: T.StringType() = Field(str_startswith="B")
        price: T.DecimalType(20, 5) = Field()

        class Config:
            name = "product_info"
            strict = True
            coerce = True
            metadata = {"category": "product-details"}

@cosmicBboy
Copy link
Collaborator Author

So is my understanding correct in saying if we have to support metadata in DataFrameModel then it has to be the part of Config class.

correct. The reason behind this is that class attributes are reserved for fields, so Config forwards arguments to what would typically be the kwargs to DataFrameSchema.

@NeerajMalhotra-QB
Copy link
Collaborator

Thanks Niels, it makes sense.

@cosmicBboy
Copy link
Collaborator Author

cosmicBboy commented May 5, 2023

that said, I'm trying to think about other syntax for doing this... like perhaps dunder attributes like __dtype__ so that you don't need a Config class at all. Perhaps an improvement for the future

@cosmicBboy
Copy link
Collaborator Author

cosmicBboy commented Jun 1, 2023

Notes 6/1/2023

@NeerajMalhotra-QB @cosmicBboy

  • Update about current state of PySpark fork.
    • Almost done with features
    • Added a lot of test coverage
    • Resolved pylint issues
    • Implemented killswitch
    • Added metadata support in pandas
    • Flag for determining validation depth
    • Implemented error dictionaries
    • Docstrings have been updated
    • RST documentation in progress
    • Blog is being worked on. Target date: mid-June
    • @cosmicBboy to look at PR: introducing kill switch, parameters and metadata features #1196
  • Discuss supporting multiple frameworks
    • today: to support dataframe library "x", add a "x" extra here
    • future state: (maybe) pandera becomes a monorepo of n different python packages for each framework
      • an alternative would be to have the core pandera modules to be python-based, not specific to pandas
      • e.g. pip install pandera[pandas,pyspark,strategies]
  • Discuss removing Pandas dependency for PySpark use cases.
    • future state: decouple the base imports of core pandera from the pandas modules

@cosmicBboy
Copy link
Collaborator Author

cosmicBboy commented Jun 8, 2023

Notes 6/8/2023

@NeerajMalhotra-QB @cosmicBboy

  • Merge this PR: Support native PySpark.sql on Pandera #1213
  • Write docs in a separate PR, based on dev
  • Merge dev onto main
  • Target for beta release 0.16.0b0: 6/16
    • Announcement: blog post on QB and UnionAI side
    • Setup up pandera community sync

@NeerajMalhotra-QB
Copy link
Collaborator

thanks @cosmicBboy for taking meeting notes :)

@NeerajMalhotra-QB NeerajMalhotra-QB self-assigned this Jun 8, 2023
@cosmicBboy
Copy link
Collaborator Author

Notes 6/15/2023

@NeerajMalhotra-QB @cosmicBboy

  • create docs PR
  • create clean-up PR
  • push beta release
  • announce in discord channel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants