
# Pandera: Beyond Pandas Data Validation 🐼✅

**Niels Bantilan**, Chief ML Engineer @ Union.ai

*SciPy 2023, July 12th 2023*

# Background

- 📜 B.A. in Biology and Dance
- 📜 M.P.H. in Sociomedical Science and Public Health Informatics
- 🤖 Chief Machine Learning Engineer @ Union.ai
- 🛩 Flytekit OSS Maintainer
- ✅ Author and Maintainer of Pandera
- 🦾 Author of UnionML
- 🛠 Make DS/ML practitioners more productive

### This is a talk about open source development 🧑🏾‍💻

# Outline 📝

- 🐣 Origins: solving a local problem
- 🐓 Evolution: solving other people's problems
- 🦩 Revolution: rewriting Pandera's internals
- 🌅 What's next?

### Where's the Code?

🖼 **Slides**: https://unionai-oss.github.io/pandera-presentations/slides/20230712_scipy_beyond_pandas.slides.html

📓 **Notebook**: https://github.com/unionai-oss/pandera-presentations/blob/master/notebooks/20230712_scipy_beyond_pandas.ipynb

# 🐣 Origins

In [1]:
import os
import warnings
import pyspark

from IPython.display import display, Markdown

warnings.simplefilter("ignore")
pyspark.SparkContext().setLogLevel("OFF")
os.environ["MODIN_ENGINE"] = "ray"

## 🤷‍♂️ Why Should I Validate Data?

## What's a `DataFrame`?

In [2]:
import uuid

import numpy as np
import pandas as pd

dataframe = pd.DataFrame({
    "person_id": [str(uuid.uuid4())[:7] for _ in range(6)],
    "hours_worked": [38.5, 41.25, "35.0", 27.75, 22.25, -20.5],
    "wage_per_hour": [15.1, 15, 21.30, 17.5, 19.50, 25.50],
}).set_index("person_id")

df = dataframe

In [3]:
dataframe.head()

Unnamed: 0_level_0,hours_worked,wage_per_hour
person_id,Unnamed: 1_level_1,Unnamed: 2_level_1
1ac0c42,38.5,15.1
edda497,41.25,15.0
e8c6fb7,35.0,21.3
1332472,27.75,17.5
beb478b,22.25,19.5


## What's Data Validation?

Data validation is the act of _falsifying_ data against explicit assumptions
for some downstream purpose, like analysis, modeling, and visualization.

> "All swans are white"

In [4]:
%%html
<p>
    <a href="https://commons.wikimedia.org/wiki/File:Black_Swans.jpg#/media/File:Black_Swans.jpg">
    <img src="https://upload.wikimedia.org/wikipedia/commons/6/60/Black_Swans.jpg" alt="Pair of black swans swimming" height="275" width="275"
     style="display: block; margin-left: auto; margin-right: auto;"/>
    </a>
    <p style="font-size: x-small; text-align: center;">
    <a href="http://creativecommons.org/licenses/by-sa/3.0/" title="Creative Commons Attribution-Share Alike 3.0">CC BY-SA 3.0</a>,
    <a href="https://commons.wikimedia.org/w/index.php?curid=1243220">Link</a>
    </p>
</p>

## Why Do I Need it?

#### 🐞 It can be difficult to reason about and debug data processing pipelines.

#### ⚠️ It's critical to ensuring data quality in many contexts especially when the end product informs business decisions, supports scientific findings, or generates predictions in a production setting.

## Everyone has a personal relationship with their data

### Story Time 📖

##### Imagine that you're a data scientist maintaining an existing data processing pipeline 👩‍💻👨‍💻...

In [5]:
def process_data(df):
    return df.assign(weekly_income=lambda x: x.hours_worked * x.wage_per_hour)

try:
    process_data(dataframe)
except TypeError as exc:
    print(exc)

can't multiply sequence by non-int of type 'float'


### One day, you encounter an error log trail and decide to follow it...

```python
/usr/local/miniconda3/envs/pandera-presentations/lib/python3.7/site-packages/pandas/core/ops/__init__.py in masked_arith_op(x, y, op)
    445         if mask.any():
    446             with np.errstate(all="ignore"):
--> 447                 result[mask] = op(xrav[mask], com.values_from_object(yrav[mask]))
    448 
    449     else:

TypeError: can't multiply sequence by non-int of type 'float'
```

### And you find yourself at the top of a function...

In [6]:
def process_data(df):
    ...

### You look around, and see some hints of what had happened...

In [7]:
def process_data(df):
    return df.assign(weekly_income=lambda x: x.hours_worked * x.wage_per_hour)

### You sort of know what's going on, but you want to take a closer look!

In [8]:
def process_data(df):
    import pdb; pdb.set_trace()  # <- insert breakpoint
    return df.assign(weekly_income=lambda x: x.hours_worked * x.wage_per_hour)

### And you find some funny business going on...

In [9]:
print(df)

          hours_worked  wage_per_hour
person_id                            
1ac0c42           38.5           15.1
edda497          41.25           15.0
e8c6fb7           35.0           21.3
1332472          27.75           17.5
beb478b          22.25           19.5
40e25f2          -20.5           25.5


In [10]:
df.dtypes

hours_worked      object
wage_per_hour    float64
dtype: object

In [11]:
df.hours_worked.map(type)

person_id
1ac0c42    <class 'float'>
edda497    <class 'float'>
e8c6fb7      <class 'str'>
1332472    <class 'float'>
beb478b    <class 'float'>
40e25f2    <class 'float'>
Name: hours_worked, dtype: object

### You squash the bug and add documentation for the next weary traveler who happens upon this code.

In [12]:
def process_data(df):
    return (
        df
        # make sure columns are floats
        .astype({"hours_worked": float, "wage_per_hour": float})
        # replace negative values with nans
        .assign(hours_worked=lambda x: x.hours_worked.where(x.hours_worked >= 0, np.nan))
        # compute weekly income
        .assign(weekly_income=lambda x: x.hours_worked * x.wage_per_hour)
    )

In [13]:
process_data(df)

Unnamed: 0_level_0,hours_worked,wage_per_hour,weekly_income
person_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1ac0c42,38.5,15.1,581.35
edda497,41.25,15.0,618.75
e8c6fb7,35.0,21.3,745.5
1332472,27.75,17.5,485.625
beb478b,22.25,19.5,433.875
40e25f2,,25.5,


### ⏱ A few months later...

### You find yourself at a familiar function, but it looks a little different from when you left it...

In [14]:
# This needs to be here, but skipped for story-telling effect in the slides
import pandera as pa
from pandera.typing import DataFrame

class RawData(pa.DataFrameModel):
    hours_worked: float = pa.Field(coerce=True, nullable=True)
    wage_per_hour: float = pa.Field(coerce=True, nullable=True)

class ProcessedData(RawData):
    hours_worked: float = pa.Field(ge=0, coerce=True, nullable=True)
    weekly_income: float = pa.Field(nullable=True)

In [15]:
@pa.check_types
def process_data(df: DataFrame[RawData]) -> DataFrame[ProcessedData]:
    return (
        # replace negative values with nans
        df.assign(hours_worked=lambda x: x.hours_worked.where(x.hours_worked >= 0, np.nan))
        # compute weekly income
        .assign(weekly_income=lambda x: x.hours_worked * x.wage_per_hour)
    )

### You look above and see what `RawData` and `ProcessedData` are, finding a `NOTE` that a fellow traveler has left for you.

In [16]:
import pandera as pa

# NOTE: this is what's supposed to be in `df` going into `process_data`
class RawData(pa.SchemaModel):
    hours_worked: float = pa.Field(coerce=True, nullable=True)
    wage_per_hour: float = pa.Field(coerce=True, nullable=True)


# ... and this is what `process_data` is supposed to return.
class ProcessedData(RawData):
    hours_worked: float = pa.Field(ge=0, coerce=True, nullable=True)
    weekly_income: float = pa.Field(nullable=True)


@pa.check_types
def process_data(df: DataFrame[RawData]) -> DataFrame[ProcessedData]:
    ...

## Moral of the Story

> ##### The better you can reason about the contents of a dataframe, the faster you can debug.

> ##### The faster you can debug, the sooner you can focus on downstream tasks that you care about.

> ##### By validating data through explicit contracts, you also create data documentation *and* a simple, stateless data shift detector.

## Pandera Design Principles

From [scipy 2020 - pandera: Statistical Data Validation of Pandas Dataframes](https://conference.scipy.org/proceedings/scipy2020/niels_bantilan.html) 

<image src="../static/pandera_design_principles.png" width="550px">

## Pandera Programming Model

The pandera programming model is an iterative loop of building
statistical domain knowledge, implementing data transforms and schemas,
and verifying data.

<br>

<image src="../static/pandera_programming_model.png" width="700px">

### Meta Comment

##### This presentation notebook is validated by pandera 🤯

![mindblown](https://media.giphy.com/media/xT0xeJpnrWC4XWblEk/giphy-downsized-large.gif)

# 🤔 What's Data Testing

### And How Can I Put it Into Practice?

> **Data validation:** The act of falsifying data against explicit assumptions for some downstream purpose, like
> analysis, modeling, and visualization.

> **Data Testing:** Validating not only real data, but also the functions that produce them.


# In the Real World 🌍

#### Validate real data in production


# In the Test Suite 🧪

#### Validate functions that produce data, given some test cases

<img src="https://raw.githubusercontent.com/pandera-dev/pandera/master/docs/source/_static/pandera-logo.png" width="125px" style="margin: 0;"/>

<h2 style="margin-top: 0;">Pandera</h2>

#### An expressive and light-weight statistical typing tool for dataframe-like containers

- Check the types and properties of dataframes
- Easily integrate with existing data pipelines via function decorators
- Synthesize data from schema objects for property-based testing

#### Object-based API

Defining a schema looks and feels like defining a pandas dataframe

In [17]:
import pandera as pa

clean_data_schema = pa.DataFrameSchema(
    columns={
        "continuous": pa.Column(float, pa.Check.ge(0), nullable=True),
        "categorical": pa.Column(str, pa.Check.isin(["A", "B", "C"]), nullable=True),
    },
    coerce=True,
)

#### Class-based API

Define complex types with modern Python, inspired by [pydantic](https://pydantic-docs.helpmanual.io/) and `dataclasses`

In [18]:
from pandera.typing import DataFrame, Series

class CleanData(pa.SchemaModel):
    continuous: Series[float] = pa.Field(ge=0, nullable=True)
    categorical: Series[str] = pa.Field(isin=["A", "B", "C"], nullable=True)

    class Config:
        coerce = True

Pandera comes in two flavors

#### Pandera Raises Informative Errors

Know Exactly What Went Wrong with Your Data

In [19]:
raw_data = pd.DataFrame({
    "continuous": ["-1.1", "4.0", "10.25", "-0.1", "5.2"],
    "categorical": ["A", "B", "C", "Z", "X"],
})

try:
    CleanData.validate(raw_data, lazy=True)
except pa.errors.SchemaErrors as exc:
    display(exc.failure_cases)

Unnamed: 0,schema_context,column,check,check_number,failure_case,index
0,Column,continuous,greater_than_or_equal_to(0),0,-1.1,0
1,Column,continuous,greater_than_or_equal_to(0),0,-0.1,3
2,Column,categorical,"isin(['A', 'B', 'C'])",0,Z,3
3,Column,categorical,"isin(['A', 'B', 'C'])",0,X,4


### Pandera Supports Schema Transformations/Inheritence

#### Object-based API

Dynamically transform schema objects on the fly

In [20]:
raw_data_schema = pa.DataFrameSchema(
    columns={
        "continuous": pa.Column(float),
        "categorical": pa.Column(str),
    },
    coerce=True,
)

clean_data_schema.update_columns({
    "continuous": {"nullable": True},
    "categorical": {"checks": pa.Check.isin(["A", "B", "C"]), "nullable": True},
});

#### Class-based API

Inherit from `pandera.SchemaModel` to Define Type Hierarchies

In [21]:
class RawData(pa.SchemaModel):
    continuous: Series[float]
    categorical: Series[str]

    class Config:
        coerce = True

class CleanData(RawData):
    continuous = pa.Field(ge=0, nullable=True)
    categorical = pa.Field(isin=["A", "B", "C"], nullable=True);

### Integrate Seamlessly with your Pipeline

Use decorators to add IO checkpoints to the critical functions in your pipeline

In [22]:
@pa.check_types
def fn(raw_data: DataFrame[RawData]) -> DataFrame[CleanData]:
    return raw_data.assign(
        continuous=lambda df: df["continuous"].where(lambda x: x > 0, np.nan),
        categorical=lambda df: df["categorical"].where(lambda x: x.isin(["A", "B", "C"]), np.nan),
    )


fn(raw_data)

Unnamed: 0,continuous,categorical
0,,A
1,4.0,B
2,10.25,C
3,,
4,5.2,


### Generative Schemas

Schemas that synthesize valid data under its constraints

In [23]:
CleanData.example(size=5)

Unnamed: 0,continuous,categorical
0,,C
1,5e-324,A
2,284777800000000.0,A
3,,
4,,


**Data Testing:** Test the functions that produce clean data

In [24]:
from hypothesis import given


@given(RawData.strategy(size=5))
def test_fn(raw_data):
    fn(raw_data)


def run_test_suite():
    test_fn()
    print("tests passed ✅")


run_test_suite()



tests passed ✅


# 🐓 Evolution

## Major Events


> ##### 📖 Documentation Improvements

> ##### 🔤 Class-based API

> ##### 📊 Data Synthesis Strategies

> ##### ⌨️ Pandera Type System

## Expanding scope

Adding `geopandas`, `dask`, `modin`, and `pyspark.pandas` was relatively
straight forward.

In [25]:
display(raw_data)

Unnamed: 0,continuous,categorical
0,-1.1,A
1,4.0,B
2,10.25,C
3,-0.1,Z
4,5.2,X


#### `dask`

In [26]:
import dask.dataframe as dd

dask_dataframe = dd.from_pandas(raw_data, npartitions=1)

try:
    CleanData(dask_dataframe, lazy=True).compute()
except pa.errors.SchemaErrors as exc:
    display(exc.failure_cases.sort_index())

Unnamed: 0,schema_context,column,check,check_number,failure_case,index
0,Column,continuous,greater_than_or_equal_to(0),0,-1.1,0
1,Column,continuous,greater_than_or_equal_to(0),0,-0.1,3
2,Column,categorical,"isin(['A', 'B', 'C'])",0,Z,3
3,Column,categorical,"isin(['A', 'B', 'C'])",0,X,4


#### `modin`

In [30]:
import modin.pandas as mpd

modin_dataframe = mpd.DataFrame(raw_data)

try:
    CleanData(modin_dataframe, lazy=True)
except pa.errors.SchemaErrors as exc:
    display(exc.failure_cases.sort_index())



Unnamed: 0,schema_context,column,check,check_number,failure_case,index
0,Column,continuous,greater_than_or_equal_to(0),0,-1.1,0
1,Column,continuous,greater_than_or_equal_to(0),0,-0.1,3
2,Column,categorical,"isin(['A', 'B', 'C'])",0,Z,3
3,Column,categorical,"isin(['A', 'B', 'C'])",0,X,4


#### `pyspark.pandas`

In [28]:
import pyspark.pandas as ps

pyspark_pd_dataframe = ps.DataFrame(raw_data)

try:
    CleanData(pyspark_pd_dataframe, lazy=True)
except pa.errors.SchemaErrors as exc:
    display(exc.failure_cases)

                                                                                

Unnamed: 0,schema_context,column,check,check_number,failure_case,index
0,DataFrameSchema,,column_in_dataframe,,continuous,
1,DataFrameSchema,,column_in_dataframe,,categorical,


### Problem: What about non-pandas-compliant dataframes?

### 😩 Design weaknesses

- Schemas and checks were strongly coupled with pandas
- Error reporting and eager validation assumed in-memory data
- Leaky pandas abstractions

### 💪 Design strengths

- Generic schema interface
- Flexible check abstraction
- Flexible type system

# 🦩 Revolution

## Re-writing pandera internals

**High-level approach:** decoupling schema specification from backend

- A `pandera.api` subpackage, which contains the schema specification that
  defines the properties of an underlying data structure.
- A `pandera.backends` subpackage, which leverages the schema specification and
  implements the actual validation logic.
- A backend registry, which maps a particular API specification to a backend
  based on the DataFrame type being validated.
- A common type-aware `Check` namespace and registry, which registers
  type-specific implementations of built-in checks and allows contributors to
  easily add new built-in checks.

### Writing your own schema

``` python
import sloth as sl
from pandera.api.base.schema import BaseSchema
from pandera.backends.base import BaseSchemaBackend

class DataFrameSchema(BaseSchema):
    def __init__(self, **kwargs):
        # add properties that this dataframe would contain

class DataFrameSchemaBackend(BaseSchemaBackend):
    def validate(
        self,
        check_obj: sl.DataFrame,
        schema: DataFrameSchema,
        *,
        **kwargs,
    ):
        # implement custom validation logic
        
# register the backend
DataFrameSchema.register_backend(
    sloth.DataFrame,
    DataFrameSchemaBackend,
)
````

### 📢 Pandera now supports `pyspark.sql.DataFrame` in `0.16.0b`!

https://pandera.readthedocs.io/en/latest/

```python
import pandera.pyspark as pa
import pyspark.sql.types as T

from decimal import Decimal
from pyspark.sql import DataFrame
from pandera.pyspark import DataFrameModel


class PanderaSchema(DataFrameModel):
    id: T.IntegerType() = pa.Field(gt=5)
    product_name: T.StringType() = pa.Field(str_startswith="B")
    price: T.DecimalType(20, 5) = pa.Field()
    description: T.ArrayType(T.StringType()) = pa.Field()
    meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()
````

## Organizational and Development Challenges

- **Multi-tasking the rewrite with PR reviews**
- **Centralized knowledge**
- **Informal governance**

## Retrospective

> #### Things in place that reduced the risk of regressions
> 
> - Unit tests.
> - Localized pandas coupling.
> - Lessons learned from pandas-compliant integrations.

> #### Additional approaches to put into practice in the future:
> 
> - Thoughtful design work.
> - Library-independent error reporting.
> - Decoupling metadata from data.
> - Investing in governance and community.

## Updated Principles

<image src="../static/pandera_updated_principles.png" width="600px">

## Announcement

### 🎉 Pandera has joined Union.ai 🎉


What does this mean?

- It will continue to be open source.
- It will have more resources to maintain and govern it.
- We can learn from enterprise users.

## 🛣️ Roadmap

- 🤝 **Integrations**: support more data manipulation libraries:
  - Polars support: https://github.com/unionai-oss/pandera/issues/1064
  - Ibis support: https://github.com/unionai-oss/pandera/issues/1105
  - Investigate the dataframe-api standard: https://github.com/data-apis/dataframe-api
  - Open an issue! https://github.com/unionai-oss/pandera/issues
- 💻 **User Experience:** polish the API:
  - better error-reporting
  - more built-in checks
  - conditional validation
- 🤝 **Interoperability:** tighter integrations with the python ecosystem:
  - `pydantic v2`
  - `pytest`: collect data coverage statistics
  - `hypothesis`: faster data synthesis
- 🏆 **Innovations:** new capabilities:
  - stateful data validation
  - model-based types

# Join the Community!

![badge](https://img.shields.io/github/stars/pandera-dev/pandera?style=social)
[![badge](https://img.shields.io/pypi/pyversions/pandera.svg)](https://pypi.python.org/pypi/pandera/)
[![badge](https://img.shields.io/pypi/v/pandera.svg)](https://pypi.org/project/pandera/)
![badge](https://img.shields.io/github/contributors/pandera-dev/pandera)
[![badge](https://pepy.tech/badge/pandera)](https://pepy.tech/project/pandera)
[![badge](https://pepy.tech/badge/pandera/month)](https://pepy.tech/project/pandera)
[![badge](https://img.shields.io/badge/discord-chat-purple?color=%235765F2&label=discord&logo=discord)](https://discord.gg/vyanhWuaKB)


- **Twitter**: [@cosmicbboy](https://twitter.com/cosmicBboy)
- **Discord**: https://discord.gg/vyanhWuaKB
- **Email**: [niels@union.ai](mailto:niels@union.ai)
- **Repo**: https://github.com/unionai-oss/pandera
- **Docs**: https://pandera.readthedocs.io
- **Contributing Guide**: https://pandera.readthedocs.io/en/stable/CONTRIBUTING.html
- **Become a Sponsor**: https://github.com/sponsors/cosmicBboy

# Join me at the sprints!

Contribute to the `Flyte` project: https://www.flyte.org

<image src="../static/flyte_sprint.png" width="600px">