In [1]:
import polars as pl
import numpy as np
from datetime import datetime, timedelta

# Sample dataset with quality issues
np.random.seed(42)
sample_data = pl.DataFrame({
    'user_id': list(range(1, 1001)) + [1, 2, 3],  # Duplicates
    'email': [f'user{i}@example.com' for i in range(1, 1001)] + ['invalid-email', '', None],
    'age': list(np.random.randint(18, 80, 1000)) + [-5, 150, None],  # Invalid ages
    'salary': list(np.random.randint(30000, 150000, 1000)) + [0, -1000, None],  # Invalid salaries
    'signup_date': [datetime(2024, 1, 1) + timedelta(days=i) for i in range(1000)] + [None, None, datetime(2030, 1, 1)],  # Future date
    'status': np.random.choice(['active', 'inactive', 'pending'], 1003)
})

print(f"Dataset shape: {sample_data.shape}")
print("Sample data:")
print(sample_data.tail())

Dataset shape: (1003, 6)
Sample data:
shape: (5, 6)
┌─────────┬──────────────────────┬──────┬────────┬─────────────────────┬──────────┐
│ user_id ┆ email                ┆ age  ┆ salary ┆ signup_date         ┆ status   │
│ ---     ┆ ---                  ┆ ---  ┆ ---    ┆ ---                 ┆ ---      │
│ i64     ┆ str                  ┆ i32  ┆ i32    ┆ datetime[μs]        ┆ str      │
╞═════════╪══════════════════════╪══════╪════════╪═════════════════════╪══════════╡
│ 999     ┆ user999@example.com  ┆ 64   ┆ 142573 ┆ 2026-09-25 00:00:00 ┆ inactive │
│ 1000    ┆ user1000@example.com ┆ 66   ┆ 149076 ┆ 2026-09-26 00:00:00 ┆ inactive │
│ 1       ┆ invalid-email        ┆ -5   ┆ 0      ┆ null                ┆ pending  │
│ 2       ┆                      ┆ 150  ┆ -1000  ┆ null                ┆ active   │
│ 3       ┆ null                 ┆ null ┆ null   ┆ 2030-01-01 00:00:00 ┆ active   │
└─────────┴──────────────────────┴──────┴────────┴─────────────────────┴──────────┘


In [2]:
def check_null_values(df: pl.DataFrame, columns: list = None) -> pl.DataFrame:
    """
    Check for null values in specified columns or all columns.
    """
    if columns is None:
        columns = df.columns
    
    null_checks = []
    for col in columns:
        null_count = df.select(pl.col(col).is_null().sum()).item()
        null_percentage = (null_count / df.height) * 100
        null_checks.append({
            'column': col,
            'null_count': null_count,
            'null_percentage': round(null_percentage, 2),
            'is_valid': null_count == 0
        })
    
    return pl.DataFrame(null_checks)

# Check for nulls
null_report = check_null_values(sample_data)
print("Null Value Report:")
print(null_report)

# Identify rows with nulls in critical columns
critical_columns = ['user_id', 'email']
null_rows = sample_data.filter(
    pl.any_horizontal([pl.col(col).is_null() for col in critical_columns])
)
print(f"\nRows with nulls in critical columns: {null_rows.height}")

Null Value Report:
shape: (6, 4)
┌─────────────┬────────────┬─────────────────┬──────────┐
│ column      ┆ null_count ┆ null_percentage ┆ is_valid │
│ ---         ┆ ---        ┆ ---             ┆ ---      │
│ str         ┆ i64        ┆ f64             ┆ bool     │
╞═════════════╪════════════╪═════════════════╪══════════╡
│ user_id     ┆ 0          ┆ 0.0             ┆ true     │
│ email       ┆ 1          ┆ 0.1             ┆ false    │
│ age         ┆ 1          ┆ 0.1             ┆ false    │
│ salary      ┆ 1          ┆ 0.1             ┆ false    │
│ signup_date ┆ 2          ┆ 0.2             ┆ false    │
│ status      ┆ 0          ┆ 0.0             ┆ true     │
└─────────────┴────────────┴─────────────────┴──────────┘

Rows with nulls in critical columns: 1


In [3]:
def check_uniqueness(df: pl.DataFrame, columns: list) -> dict:
    """
    Check for duplicate values in specified columns.
    """
    uniqueness_report = {}
    
    for col in columns:
        total_count = df.height
        unique_count = df.select(pl.col(col).n_unique()).item()
        duplicate_count = total_count - unique_count
        
        uniqueness_report[col] = {
            'total_rows': total_count,
            'unique_values': unique_count,
            'duplicate_count': duplicate_count,
            'is_unique': duplicate_count == 0
        }
    
    return uniqueness_report

# Check uniqueness
uniqueness_report = check_uniqueness(sample_data, ['user_id', 'email'])
print("Uniqueness Report:")
for col, report in uniqueness_report.items():
    print(f"{col}: {report}")

# Find duplicate rows
duplicates = sample_data.filter(
    pl.col('user_id').is_duplicated()
)
print(f"\nDuplicate user_id rows: {duplicates.height}")
print(duplicates.select(['user_id', 'email']).head())

Uniqueness Report:
user_id: {'total_rows': 1003, 'unique_values': 1000, 'duplicate_count': 3, 'is_unique': False}
email: {'total_rows': 1003, 'unique_values': 1003, 'duplicate_count': 0, 'is_unique': True}

Duplicate user_id rows: 6
shape: (5, 2)
┌─────────┬───────────────────┐
│ user_id ┆ email             │
│ ---     ┆ ---               │
│ i64     ┆ str               │
╞═════════╪═══════════════════╡
│ 1       ┆ user1@example.com │
│ 2       ┆ user2@example.com │
│ 3       ┆ user3@example.com │
│ 1       ┆ invalid-email     │
│ 2       ┆                   │
└─────────┴───────────────────┘


In [4]:
def validate_age_range(df: pl.DataFrame, min_age: int = 0, max_age: int = 120) -> pl.DataFrame:
    """
    Validate age values are within reasonable range.
    """
    return df.with_columns([
        pl.when(
            (pl.col('age').is_null()) |
            (pl.col('age') < min_age) |
            (pl.col('age') > max_age)
        )
        .then(pl.lit(False))
        .otherwise(pl.lit(True))
        .alias('age_valid')
    ])

def validate_email_format(df: pl.DataFrame) -> pl.DataFrame:
    """
    Basic email format validation.
    """
    email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    
    return df.with_columns([
        pl.when(
            pl.col('email').is_null() |
            ~pl.col('email').str.contains(email_pattern)
        )
        .then(pl.lit(False))
        .otherwise(pl.lit(True))
        .alias('email_valid')
    ])

def validate_salary_range(df: pl.DataFrame, min_salary: int = 1000) -> pl.DataFrame:
    """
    Validate salary values.
    """
    return df.with_columns([
        pl.when(
            (pl.col('salary').is_null()) |
            (pl.col('salary') < min_salary)
        )
        .then(pl.lit(False))
        .otherwise(pl.lit(True))
        .alias('salary_valid')
    ])

def validate_date_range(df: pl.DataFrame, min_date: datetime = None, max_date: datetime = None) -> pl.DataFrame:
    """
    Validate date values are within reasonable range.
    """
    if min_date is None:
        min_date = datetime(2020, 1, 1)
    if max_date is None:
        max_date = datetime.now()
    
    return df.with_columns([
        pl.when(
            (pl.col('signup_date').is_null()) |
            (pl.col('signup_date') < min_date) |
            (pl.col('signup_date') > max_date)
        )
        .then(pl.lit(False))
        .otherwise(pl.lit(True))
        .alias('signup_date_valid')
    ])

# Apply validations
validated_data = (
    sample_data
    .pipe(validate_age_range)
    .pipe(validate_email_format)
    .pipe(validate_salary_range)
    .pipe(validate_date_range)
)

# Summary of validation results
validation_summary = validated_data.select([
    pl.col('age_valid').sum().alias('valid_ages'),
    pl.col('email_valid').sum().alias('valid_emails'),
    pl.col('salary_valid').sum().alias('valid_salaries'),
    pl.col('signup_date_valid').sum().alias('valid_signup_dates'),
    pl.len().alias('total_rows')
])

print("Validation Summary:")
print(validation_summary)

# Show invalid records
invalid_records = validated_data.filter(
    ~pl.col('age_valid') |
    ~pl.col('email_valid') |
    ~pl.col('salary_valid') |
    ~pl.col('signup_date_valid')
)

print(f"\nInvalid records: {invalid_records.height}")
print(invalid_records.select(['user_id', 'age', 'email', 'salary', 'signup_date', 'age_valid', 'email_valid', 'salary_valid', 'signup_date_valid']).head())

Validation Summary:
shape: (1, 5)
┌────────────┬──────────────┬────────────────┬────────────────────┬────────────┐
│ valid_ages ┆ valid_emails ┆ valid_salaries ┆ valid_signup_dates ┆ total_rows │
│ ---        ┆ ---          ┆ ---            ┆ ---                ┆ ---        │
│ u32        ┆ u32          ┆ u32            ┆ u32                ┆ u32        │
╞════════════╪══════════════╪════════════════╪════════════════════╪════════════╡
│ 1000       ┆ 1000         ┆ 1000           ┆ 733                ┆ 1003       │
└────────────┴──────────────┴────────────────┴────────────────────┴────────────┘

Invalid records: 270
shape: (5, 9)
┌─────────┬─────┬──────────────┬────────┬───┬───────────┬─────────────┬──────────────┬─────────────┐
│ user_id ┆ age ┆ email        ┆ salary ┆ … ┆ age_valid ┆ email_valid ┆ salary_valid ┆ signup_date │
│ ---     ┆ --- ┆ ---          ┆ ---    ┆   ┆ ---       ┆ ---         ┆ ---          ┆ _valid      │
│ i64     ┆ i32 ┆ str          ┆ i32    ┆   ┆ bool      ┆ bo

In [5]:
def validate_schema(df: pl.DataFrame, expected_schema: dict) -> dict:
    """
    Validate DataFrame schema against expected structure.
    
    expected_schema: {'column_name': pl.DataType}
    """
    schema_report = {
        'valid': True,
        'missing_columns': [],
        'extra_columns': [],
        'type_mismatches': []
    }
    
    # Check for missing columns
    expected_columns = set(expected_schema.keys())
    actual_columns = set(df.columns)
    
    missing = expected_columns - actual_columns
    extra = actual_columns - expected_columns
    
    if missing:
        schema_report['missing_columns'] = list(missing)
        schema_report['valid'] = False
    
    if extra:
        schema_report['extra_columns'] = list(extra)
    
    # Check data types for existing columns
    for col, expected_type in expected_schema.items():
        if col in df.columns:
            actual_type = df.schema[col]
            if actual_type != expected_type:
                schema_report['type_mismatches'].append({
                    'column': col,
                    'expected': str(expected_type),
                    'actual': str(actual_type)
                })
                schema_report['valid'] = False
    
    return schema_report

# Define expected schema
expected_schema = {
    'user_id': pl.Int64,
    'email': pl.Utf8,
    'age': pl.Int64,
    'salary': pl.Int64,
    'signup_date': pl.Datetime,
    'status': pl.Utf8
}

# Validate schema
schema_validation = validate_schema(sample_data, expected_schema)
print("Schema Validation Report:")
print(f"Valid: {schema_validation['valid']}")
if not schema_validation['valid']:
    print(f"Missing columns: {schema_validation['missing_columns']}")
    print(f"Extra columns: {schema_validation['extra_columns']}")
    print(f"Type mismatches: {schema_validation['type_mismatches']}")

Schema Validation Report:
Valid: False
Missing columns: []
Extra columns: []
Type mismatches: [{'column': 'age', 'expected': 'Int64', 'actual': 'Int32'}, {'column': 'salary', 'expected': 'Int64', 'actual': 'Int32'}]


In [6]:
def create_validation_rule(name: str, condition: pl.Expr, error_message: str = None):
    """
    Create a reusable validation rule.
    """
    return {
        'name': name,
        'condition': condition,
        'error_message': error_message or f"Validation failed for {name}"
    }

def apply_validation_rules(df: pl.DataFrame, rules: list) -> pl.DataFrame:
    """
    Apply multiple validation rules to a DataFrame.
    """
    result_df = df
    
    for rule in rules:
        validation_col = f"{rule['name']}_valid"
        result_df = result_df.with_columns(
            rule['condition'].alias(validation_col)
        )
    
    return result_df

def create_validation_report(df: pl.DataFrame, rules: list) -> pl.DataFrame:
    """
    Create a comprehensive validation report.
    """
    report_data = []
    
    for rule in rules:
        validation_col = f"{rule['name']}_valid"
        if validation_col in df.columns:
            valid_count = df.select(pl.col(validation_col).sum()).item()
            total_count = df.height
            invalid_count = total_count - valid_count
            
            report_data.append({
                'rule_name': rule['name'],
                'total_records': total_count,
                'valid_records': valid_count,
                'invalid_records': invalid_count,
                'success_rate': round((valid_count / total_count) * 100, 2),
                'error_message': rule['error_message']
            })
    
    return pl.DataFrame(report_data)

# Define validation rules
validation_rules = [
    create_validation_rule(
        'positive_age',
        (pl.col('age') > 0) & (pl.col('age') <= 120),
        'Age must be between 1 and 120'
    ),
    create_validation_rule(
        'valid_email',
        pl.col('email').str.contains(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'),
        'Email must be in valid format'
    ),
    create_validation_rule(
        'positive_salary',
        pl.col('salary') >= 1000,
        'Salary must be at least 1000'
    ),
    create_validation_rule(
        'valid_status',
        pl.col('status').is_in(['active', 'inactive', 'pending']),
        'Status must be active, inactive, or pending'
    ),
    create_validation_rule(
        'recent_signup',
        pl.col('signup_date') <= datetime.now(),
        'Signup date cannot be in the future'
    )
]

# Apply validation rules
validated_df = apply_validation_rules(sample_data, validation_rules)

# Generate validation report
validation_report = create_validation_report(validated_df, validation_rules)
print("Comprehensive Validation Report:")
print(validation_report)

# Create overall data quality score
validation_columns = [f"{rule['name']}_valid" for rule in validation_rules]
overall_quality = validated_df.with_columns(
    pl.all_horizontal([pl.col(col) for col in validation_columns]).alias('all_valid')
).select(
    pl.col('all_valid').sum().alias('fully_valid_records'),
    pl.len().alias('total_records')
).with_columns(
    (pl.col('fully_valid_records') / pl.col('total_records') * 100).alias('data_quality_score')
)

print("\nOverall Data Quality Score:")
print(overall_quality)


Comprehensive Validation Report:
shape: (5, 6)
┌─────────────────┬───────────────┬───────────────┬────────────────┬──────────────┬────────────────┐
│ rule_name       ┆ total_records ┆ valid_records ┆ invalid_record ┆ success_rate ┆ error_message  │
│ ---             ┆ ---           ┆ ---           ┆ s              ┆ ---          ┆ ---            │
│ str             ┆ i64           ┆ i64           ┆ ---            ┆ f64          ┆ str            │
│                 ┆               ┆               ┆ i64            ┆              ┆                │
╞═════════════════╪═══════════════╪═══════════════╪════════════════╪══════════════╪════════════════╡
│ positive_age    ┆ 1003          ┆ 1000          ┆ 3              ┆ 99.7         ┆ Age must be    │
│                 ┆               ┆               ┆                ┆              ┆ between 1 and  │
│                 ┆               ┆               ┆                ┆              ┆ 120            │
│ valid_email     ┆ 1003          ┆ 1000    

In [8]:
class DataQualityPipeline:
    def __init__(self):
        self.validation_rules = []
        self.cleaning_rules = []
    
    def add_validation_rule(self, name: str, condition: pl.Expr, error_message: str = None):
        self.validation_rules.append({
            'name': name,
            'condition': condition,
            'error_message': error_message or f"Validation failed for {name}"
        })
        return self
    
    def add_cleaning_rule(self, name: str, transformation: pl.Expr):
        self.cleaning_rules.append({
            'name': name,
            'transformation': transformation
        })
        return self
    
    def validate(self, df: pl.DataFrame) -> tuple[pl.DataFrame, pl.DataFrame]:
        """Return validated DataFrame and validation report"""
        validated_df = apply_validation_rules(df, self.validation_rules)
        report = create_validation_report(validated_df, self.validation_rules)
        return validated_df, report
    
    def clean(self, df: pl.DataFrame) -> pl.DataFrame:
        """Apply cleaning transformations"""
        result_df = df
        for rule in self.cleaning_rules:
            result_df = result_df.with_columns(rule['transformation'])
        return result_df
    
    def process(self, df: pl.DataFrame) -> tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]:
        """Full pipeline: validate, clean, re-validate"""
        # Initial validation
        validated_df, initial_report = self.validate(df)
        
        # Clean data
        cleaned_df = self.clean(validated_df)
        
        # Re-validate after cleaning
        final_validated_df, final_report = self.validate(cleaned_df)
        
        return final_validated_df, initial_report, final_report

# Create and configure pipeline
pipeline = DataQualityPipeline()

# Add validation rules
(pipeline
 .add_validation_rule('positive_age', (pl.col('age') > 0) & (pl.col('age') <= 120))
 .add_validation_rule('valid_email', pl.col('email').str.contains(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'))
 .add_validation_rule('positive_salary', pl.col('salary') >= 1000)
)

# Add cleaning rules
(pipeline
 .add_cleaning_rule('clean_email', pl.col('email').str.to_lowercase().str.strip_chars())
 .add_cleaning_rule('cap_age', pl.when(pl.col('age') > 120).then(pl.lit(None)).otherwise(pl.col('age')))
 .add_cleaning_rule('fix_negative_salary', pl.when(pl.col('salary') < 0).then(pl.lit(None)).otherwise(pl.col('salary')))
)

# Process data
processed_df, initial_report, final_report = pipeline.process(sample_data)

print("Initial Validation Report:")
print(initial_report.select(['rule_name', 'success_rate']))
print("\nFinal Validation Report (after cleaning):")
print(final_report.select(['rule_name', 'success_rate']))

Initial Validation Report:
shape: (3, 2)
┌─────────────────┬──────────────┐
│ rule_name       ┆ success_rate │
│ ---             ┆ ---          │
│ str             ┆ f64          │
╞═════════════════╪══════════════╡
│ positive_age    ┆ 99.7         │
│ valid_email     ┆ 99.7         │
│ positive_salary ┆ 99.7         │
└─────────────────┴──────────────┘

Final Validation Report (after cleaning):
shape: (3, 2)
┌─────────────────┬──────────────┐
│ rule_name       ┆ success_rate │
│ ---             ┆ ---          │
│ str             ┆ f64          │
╞═════════════════╪══════════════╡
│ positive_age    ┆ 99.7         │
│ valid_email     ┆ 99.7         │
│ positive_salary ┆ 99.7         │
└─────────────────┴──────────────┘


In [None]:
# Example of using Great Expectations with Polars
# Note: This requires great_expectations and the Polars integration

# import great_expectations as gx
# from great_expectations.datasource.fluent import PolarsDatasource

# # Create Great Expectations context
# context = gx.get_context()

# # Add Polars datasource
# datasource = context.sources.add_polars("polars_datasource")

# # Add data asset
# data_asset = datasource.add_dataframe_asset(
#     name="user_data",
#     dataframe=sample_data
# )

# # Create expectations
# batch_request = data_asset.build_batch_request()
# validator = context.get_validator(batch_request=batch_request)

# # Add expectations
# validator.expect_column_values_to_not_be_null("user_id")
# validator.expect_column_values_to_be_unique("user_id")
# validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
# validator.expect_column_values_to_match_regex("email", r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')

# # Run validation
# results = validator.validate()
# print("Great Expectations Results:")
# print(results)