Skip to content

Commit

Permalink
Merge pull request #6 from NeerajMalhotra-QB/feature_pyspark_backend
Browse files Browse the repository at this point in the history
Feature pyspark backend
  • Loading branch information
jaskaransinghsidana committed Mar 27, 2023
2 parents 90707dc + 26cbdaa commit 0778e36
Show file tree
Hide file tree
Showing 18 changed files with 1,593 additions and 158 deletions.
1 change: 1 addition & 0 deletions 1.19.0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Requirement already satisfied: numpy in /opt/anaconda3/envs/pandera/lib/python3.8/site-packages (1.24.2)
1 change: 1 addition & 0 deletions =
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Requirement already satisfied: numpy in /opt/anaconda3/envs/pandera/lib/python3.8/site-packages (1.24.2)
21 changes: 16 additions & 5 deletions pandera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
from pandera.api import extensions
from pandera.api.checks import Check
from pandera.api.hypotheses import Hypothesis
from pandera.api.pandas import (
Column,
# from pandera.api.pandas import (
# Column,
# DataFrameSchema,
# Index,
# MultiIndex,
# SeriesSchema,
# )
from pandera.api.pyspark import (
DataFrameSchema,
Index,
MultiIndex,
SeriesSchema,
Column
)
from pandera.api.pandas.model import DataFrameModel, SchemaModel
from pandera.api.pandas.model_components import Field, check, dataframe_check
Expand Down Expand Up @@ -87,6 +91,13 @@
except ImportError:
pass

try:
import pyspark.sql

from pandera.accessors import pyspark_sql_accessor
except ImportError:
pass


try:
import modin.pandas
Expand Down
185 changes: 185 additions & 0 deletions pandera/accessors/pyspark_sql_accessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""Custom accessor functionality for modin.
Source code adapted from pyspark.pandas implementation:
https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.extensions.register_dataframe_accessor.html?highlight=register_dataframe_accessor#pyspark.pandas.extensions.register_dataframe_accessor
"""

import warnings
from functools import wraps

"""Register pandas accessor for pandera schema metadata."""

from typing import Optional, Union

import pandas as pd

from pandera.api.pyspark.container import DataFrameSchema

Schemas = Union[DataFrameSchema]

# Todo Refactor to create a seperate module for panderaAccessor
class PanderaAccessor:
"""Pandera accessor for pandas object."""

def __init__(self, pandas_obj):
"""Initialize the pandera accessor."""
self._pandas_obj = pandas_obj
self._schema: Optional[Schemas] = None

@staticmethod
def check_schema_type(schema: Schemas):
"""Abstract method for checking the schema type."""
raise NotImplementedError

def add_schema(self, schema):
"""Add a schema to the pandas object."""
self.check_schema_type(schema)
self._schema = schema
return self._pandas_obj

@property
def schema(self) -> Optional[Schemas]:
"""Access schema metadata."""
return self._schema




class CachedAccessor:
"""
Custom property-like object (descriptor) for caching accessors.
Parameters
----------
name : str
The namespace this will be accessed under, e.g. ``df.foo``
accessor : cls
The class with the extension methods.
NOTE
----
Modified based on pandas.core.accessor.
"""

def __init__(self, name, accessor):
self._name = name
self._accessor = accessor

def __get__(self, obj, cls):
if obj is None:
# we're accessing the attribute of the class, i.e., Dataset.geo
return self._accessor
accessor_obj = self._accessor(obj)
# Replace the property with the accessor object. Inspired by:
# http://www.pydanny.com/cached-property.html
setattr(obj, self._name, accessor_obj)
return accessor_obj




def _register_accessor(name, cls):
"""
Register a custom accessor on {class} objects.
:param name: Name under which the accessor should be registered. A warning
is issued if this name conflicts with a preexisting attribute.
:returns: A class decorator callable.
"""

def decorator(accessor):
if hasattr(cls, name):
msg = (
f"registration of accessor {accessor} under name '{name}' for "
"type {cls.__name__} is overriding a preexisting attribute "
"with the same name."
)

warnings.warn(
msg,
UserWarning,
stacklevel=2,
)
setattr(cls, name, CachedAccessor(name, accessor))
return accessor

return decorator


def register_dataframe_accessor(name):
"""
Register a custom accessor with a DataFrame
:param name: name used when calling the accessor after its registered
:returns: a class decorator callable.
"""
# pylint: disable=import-outside-toplevel
from pyspark.sql import DataFrame

return _register_accessor(name, DataFrame)


# def register_series_accessor(name):
# """
# Register a custom accessor with a Series object
#
# :param name: name used when calling the accessor after its registered
# :returns: a callable class decorator
# """
# # pylint: disable=import-outside-toplevel
#
# from pyspark.sql.functions import col
#
# return _register_accessor(name, col)
#


def register_dataframe_method(method):
"""Register a function as a method attached to the Pyspark DataFrame.
NOTE
----
Modified based on pandas_flavor.register.
"""

def inner(*args, **kwargs):
class AccessorMethod:
def __init__(self, pyspark_obj):
self._obj = pyspark_obj

@wraps(method)
def __call__(self, *args, **kwargs):
return method(self._obj, *args, **kwargs)

register_dataframe_accessor(method.__name__)(AccessorMethod)

return method

return inner()

@register_dataframe_accessor("pandera")
class PanderaDataFrameAccessor(PanderaAccessor):
"""Pandera accessor for pandas DataFrame."""

@staticmethod
def check_schema_type(schema):
if not isinstance(schema, DataFrameSchema):
raise TypeError(
f"schema arg must be a DataFrameSchema, found {type(schema)}"
)


# @register_series_accessor("pandera")
# class PanderaSeriesAccessor(PanderaAccessor):
# """Pandera accessor for pandas Series."""
#
# @staticmethod
# def check_schema_type(schema):
# if not isinstance(schema, SeriesSchema):
# raise TypeError(
# f"schema arg must be a SeriesSchema, found {type(schema)}"
# )


register_dataframe_accessor("pandera")(PanderaDataFrameAccessor)
#register_series_accessor("pandera")(PanderaSeriesAccessor)
2 changes: 2 additions & 0 deletions pandera/api/pyspark/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
"""PySpark native core."""
from pandera.api.pyspark.container import DataFrameSchema
from pandera.api.pyspark.components import Column

0 comments on commit 0778e36

Please sign in to comment.