# Configuration

In [7]:
from pyspark.sql import SparkSession, DataFrame

from sparkenforce import validate, infer_dataframe_annotation, DataFrameValidationError

# Create SparkSession
spark = SparkSession.builder.master("local[1]").getOrCreate()

In [8]:
from datetime import date

data = [
    ("James", "", "Smith", date(1991, 4, 1), 3000),
    ("Michael", "Rose", "", date(2000, 5, 19), 4000),
    ("Robert", "", "Williams", date(1978, 9, 5), 4000),
    ("Maria", "Anne", "Jones", date(1967, 12, 1), 4000),
    ("Jen", "Mary", "Brown", date(1980, 2, 17), -1),
]

schema = "firstname string, middlename string, lastname string, dob date, salary int"

In [9]:
df = spark.createDataFrame(data=data, schema=schema)

df.printSchema()

df.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+----------+------+
|firstname|middlename|lastname|       dob|salary|
+---------+----------+--------+----------+------+
|    James|          |   Smith|1991-04-01|  3000|
|  Michael|      Rose|        |2000-05-19|  4000|
|   Robert|          |Williams|1978-09-05|  4000|
|    Maria|      Anne|   Jones|1967-12-01|  4000|
|      Jen|      Mary|   Brown|1980-02-17|    -1|
+---------+----------+--------+----------+------+



                                                                                

# Demonstration

### Example 0: Get an annotation from a DataFrame

In [10]:
print(infer_dataframe_annotation(df))

DataFrame["firstname": str, "middlename": str, "lastname": str, "dob": date, "salary": int]


### Example 1: Successful return value validation

In [None]:
from pyspark.sql import functions as fn


@validate
def transform_data(df: DataFrame["firstname":str, ...]) -> DataFrame["name":str, "length":int]:
    """Function that validates both input and output."""
    return df.select(df.firstname.alias("name"), fn.length(df.firstname).alias("length"))


# This should work correctly
result = transform_data(df)
print("✅ Validation successful!")
result.show()

✅ Validation successful!
+-------+------+
|   name|length|
+-------+------+
|  James|     5|
|Michael|     7|
| Robert|     6|
|  Maria|     5|
|    Jen|     3|
+-------+------+



### Example 2: Error due to incorrect schema in return value

In [None]:
@validate
def incorrect_return_schema(df: DataFrame["firstname":str, ...]) -> DataFrame["name":str, "length":int]:
    """Function that returns an incorrect schema."""
    return df.select("firstname", "lastname")  # Incorrect columns


# This should fail
try:
    result = incorrect_return_schema(df)
    print("❌ Should not reach here")
except DataFrameValidationError as e:
    print("✅ Expected error in return validation:")
    raise e

✅ Expected error in return validation:


DataFrameValidationError: return value columns mismatch. missing required columns: {'length', 'name'}, unexpected columns: {'lastname', 'firstname'}

### Example 3: Error due to returning incorrect type (not DataFrame)

In [None]:
@validate
def non_dataframe_return(df: DataFrame["firstname":str, ...]) -> DataFrame["result":str]:
    """Function that returns something that is not a DataFrame."""
    return "Not a DataFrame"


# This should fail
try:
    result = non_dataframe_return(df)
    print("❌ Should not reach here")
except DataFrameValidationError as e:
    print("✅ Expected error - not DataFrame:")
    raise e

✅ Expected error - not DataFrame:


DataFrameValidationError: return value must be a PySpark DataFrame, got <class 'str'>

### Example 4: No return annotation

In [None]:
@validate
def no_return_annotation(df: DataFrame["firstname":str, ...]):
    """Function without return annotation - return is not validated."""
    return "Can return anything"


@validate
def explicit_none_return(df: DataFrame["firstname":str, ...]) -> None:
    """Function with explicit None return - not validated."""
    return 42


# Both should work without return validation
result1 = no_return_annotation(df)
result2 = explicit_none_return(df)
print(f"✅ No annotation: {result1}")
print(f"✅ With explicit None: {result2}")

✅ No annotation: Can return anything
✅ With explicit None: 42


### Example 5: Validation with ellipsis (minimum columns)

In [None]:
@validate
def ellipsis_return_example(df: DataFrame["firstname":str, ...]) -> DataFrame["firstname":str, "summary":str, ...]:
    """Function that allows additional columns in the return."""
    # Add additional columns (allowed with ellipsis)
    return df.select(
        "firstname",
        fn.lit("processed").alias("summary"),
        "lastname",  # Additional column allowed
        "salary",  # Another additional column allowed
    )


# This should work correctly
result = ellipsis_return_example(df)
print("✅ Ellipsis validation successful!")
result.select("firstname", "summary", "lastname").show(3)

✅ Ellipsis validation successful!
+---------+---------+--------+
|firstname|  summary|lastname|
+---------+---------+--------+
|    James|processed|   Smith|
|  Michael|processed|        |
|   Robert|processed|Williams|
+---------+---------+--------+
only showing top 3 rows


### Example 6: Validation with both Python and Spark types

In [None]:
from pyspark.sql.types import DateType
from datetime import date


@validate
def validation_py_date(df: DataFrame["firstname":str, "dob":date, ...]) -> DataFrame["firstname":str, "age":int]:
    """Function that validates date and calculates age."""
    return df.select(
        "firstname",
        (fn.year(fn.now()) - fn.year(df.dob)).alias("age"),
    )


result = validation_py_date(df)
print("✅ Date (Python) validation successful!")


@validate
def validation_spark_date(
    df: DataFrame["firstname":str, "dob" : DateType(), ...],
) -> DataFrame["firstname":str, "age":int]:
    """Function that validates Spark date type and calculates age."""
    return df.select(
        "firstname",
        (fn.year(fn.current_date()) - fn.year(df.dob)).alias("age"),
    )


result = validation_spark_date(df)
print("✅ Date (Spark) validation successful!")

✅ Date (Python) validation successful!
✅ Date (Spark) validation successful!


### Example 7: using custom types

In [None]:
from sparkenforce import register_type_mapping
from dataclasses import dataclass
from pyspark.sql import types as spark_types


class Name: ...


register_type_mapping(Name, spark_types.StringType())


@validate
def custom_class_example_ok(df) -> DataFrame["name":Name]:
    """Function that uses dataclass to specify schema."""
    return df.select(
        df.firstname.alias("name"),
    )


result = custom_class_example_ok(df)
print("✅ Name class to string validation successful!")


@validate
def custom_class_example_nok(df) -> DataFrame["name":Name]:
    """Function that uses dataclass to specify schema."""
    return df.select(
        df.dob.alias("name"),
    )


try:
    result = custom_class_example_nok(df)
    print("❌ Should not reach here")
except DataFrameValidationError as e:
    print("✅ DataFrameValidationError correctly raised for custom class with wrong type!")
    raise e

✅ Name class to string validation successful!
✅ DataFrameValidationError correctly raised for custom class with wrong type!


DataFrameValidationError: return value column 'name' has incorrect type. Expected StringType(), got DateType()

### Example 8: using custom types for structs

In [None]:
from sparkenforce import register_type_mapping
from pyspark.sql import types as spark_types


@dataclass
class Name:
    firstname: str
    middlename: str = ""


struct_type = spark_types.StructType(
    [
        spark_types.StructField("forename", spark_types.StringType(), True),
        spark_types.StructField("surname", spark_types.StringType(), True),
    ]
)

register_type_mapping(Name, struct_type)


@validate
def dataclass_example(df) -> DataFrame["name":Name]:
    """Function that uses dataclass to specify schema."""
    return df.select(
        fn.struct(
            df.firstname.alias("forename"),
            df.lastname.alias("surname"),
        ).alias("name"),
    )


result = dataclass_example(df)
print("✅ Dataclass validation successful!")
result.show(truncate=False)

✅ Dataclass validation successful!
+------------------+
|name              |
+------------------+
|{James, Smith}    |
|{Michael, }       |
|{Robert, Williams}|
|{Maria, Jones}    |
|{Jen, Brown}      |
+------------------+

