Skip to content

Latest commit

 

History

History
118 lines (83 loc) · 3.19 KB

pyspark.rst

File metadata and controls

118 lines (83 loc) · 3.19 KB
.. currentmodule:: pandera

Data Validation with Pyspark ⭐️ (New)

new in 0.10.0

Pyspark is a distributed compute framework that offers a pandas drop-in replacement dataframe implementation via the pyspark.pandas API . You can use pandera to validate :py:func:`~pyspark.pandas.DataFrame` and :py:func:`~pyspark.pandas.Series` objects directly. First, install pandera with the pyspark extra:

pip install pandera[pyspark]

Then you can use pandera schemas to validate pyspark dataframes. In the example below we'll use the :ref:`class-based API <dataframe_models>` to define a :py:class:`~pandera.api.pandas.model.DataFrameModel` for validation.

.. testcode:: scaling_pyspark
    :skipif: SKIP_PANDAS_LT_V1_OR_GT_V2

    import pyspark.pandas as ps
    import pandas as pd
    import pandera as pa

    from pandera.typing.pyspark import DataFrame, Series


    class Schema(pa.DataFrameModel):
        state: Series[str]
        city: Series[str]
        price: Series[int] = pa.Field(in_range={"min_value": 5, "max_value": 20})


    # create a pyspark.pandas dataframe that's validated on object initialization
    df = DataFrame[Schema](
        {
            'state': ['FL','FL','FL','CA','CA','CA'],
            'city': [
                'Orlando',
                'Miami',
                'Tampa',
                'San Francisco',
                'Los Angeles',
                'San Diego',
            ],
            'price': [8, 12, 10, 16, 20, 18],
        }
    )
    print(df)


.. testoutput:: scaling_pyspark
    :skipif: SKIP_PANDAS_LT_V1_OR_GT_V2

      state           city  price
    0    FL        Orlando      8
    1    FL          Miami     12
    2    FL          Tampa     10
    3    CA  San Francisco     16
    4    CA    Los Angeles     20
    5    CA      San Diego     18


You can also use the :py:func:`~pandera.check_types` decorator to validate pyspark pandas dataframes at runtime:

.. testcode:: scaling_pyspark
    :skipif: SKIP_PANDAS_LT_V1_OR_GT_V2

    @pa.check_types
    def function(df: DataFrame[Schema]) -> DataFrame[Schema]:
        return df[df["state"] == "CA"]

    print(function(df))


.. testoutput:: scaling_pyspark
    :skipif: SKIP_PANDAS_LT_V1_OR_GT_V2

      state           city  price
    3    CA  San Francisco     16
    4    CA    Los Angeles     20
    5    CA      San Diego     18


And of course, you can use the object-based API to validate dask dataframes:

.. testcode:: scaling_pyspark
    :skipif: SKIP_PANDAS_LT_V1_OR_GT_V2

    schema = pa.DataFrameSchema({
        "state": pa.Column(str),
        "city": pa.Column(str),
        "price": pa.Column(int, pa.Check.in_range(min_value=5, max_value=20))
    })
    print(schema(df))


.. testoutput:: scaling_pyspark
    :skipif: SKIP_PANDAS_LT_V1_OR_GT_V2

      state           city  price
    0    FL        Orlando      8
    1    FL          Miami     12
    2    FL          Tampa     10
    3    CA  San Francisco     16
    4    CA    Los Angeles     20
    5    CA      San Diego     18