In [0]:
%pip install databricks-labs-dqx==0.12.0

In [0]:
%restart_python

In [0]:
import yaml
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.rule import DQRowRule

In [0]:
# Create a sample DataFrame
data = [
    ("", "abc@example.com"),
    (2, None),
    (3, "kafka@example.com"),
]

columns = ["id", "email_address"]
input_df = spark.createDataFrame(data, columns)

print("Original DataFrame:")
display(input_df)

In [0]:
checks_yaml_str = """
# checks.yml file
# Error: rows with null or empty `id` are quarantined under `_errors`.
# Warn: rows with null or empty `email_address` are quarantined under `_warnings`.

- criticality: error
  check:
    function: is_not_null_and_not_empty
    arguments:
      column: id

- criticality: warn
  check:
    function: is_not_null_and_not_empty
    arguments:
      column: email_address

"""

In [0]:
dq_engine = DQEngine(WorkspaceClient())

# Load the checks YAML string into a Python dictionary
checks = yaml.safe_load(checks_yaml_str)

# Apply checks and automatically split the dataframe into valid and quarantined rows
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)

print("\nValid Rows (passed all checks):")
display(valid_df)

print("\nQuarantined Rows (failed one or more checks):")
display(quarantine_df)

In [0]:
# Define a simple data quality rule in Python to check that email_address is not null or empty
checks = [
    DQRowRule(
        criticality="error",
        check_func=check_funcs.is_not_null_and_not_empty,
        column="id",
    ),
    DQRowRule(
        criticality="warn",
        check_func=check_funcs.is_not_null_and_not_empty,
        column="email_address",
    ),
]

# Apply the rule and automatically split the DataFrame into valid and quarantined rows
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)

print("\nValid Rows (passed all checks):")
display(valid_df)

print("\nQuarantined Rows (failed one or more checks):")
display(quarantine_df)