In [0]:
%sql

create catalog if not exists unified;
create schema if not exists system;

In [0]:
%sql
use catalog unified;
use schema system;

In [0]:
%sql
drop table if exists system.rule;
create table system.rule(
    `id`            bigint generated always as identity,
    `dataset`       string    not null comment 'Name of the dataset to apply the rule',
    `column`        string    not null comment 'Name of the column to apply the rule',
    `data_type`     string    not null comment 'Data type of the column to apply the rule',
    `description`   string    not null comment 'A human readable description of rule and why it''s being applied',
    `class`         string    not null comment '''validation'' that evaluates to true or false or ''cleanse'' transforms a value to another value',
    `type`          string    not null comment 'Expression, replacement, reference set',
    `function`      string    not null comment 'The function to apply to the column',
    `parameters`    string             comment 'Arguments to pass into the function, supports using {column} and {relacement} variables',
    `replacement`   string             comment 'A replacement value',
    `cast_as`       string    not null comment 'Cast the cleanse transform to a specific type',
    `order`         string    not null comment 'The order in wich to apply the rule for the table and column',
    `created_by`    string    not null comment 'The user who created the rule',
    `created_at`    timestamp not null comment 'The time the rule was created'
);
insert into system.rule (     
`dataset`    ,   
`column`     ,
`data_type`  ,
`description`,   
`class`      ,   
`type`       ,   
`function`   ,   
`parameters` ,   
`replacement`,   
`cast_as`    ,   
`order`      ,   
`created_by` ,   
`created_at`    
)
values
('test', 'value', 'string', 'replace all whitespace', 'cleanse', 'function', 'regexp_replace', '{column}, \'\\\\s+\', \'{replacement}\'', ' ', 'string', 0, current_user(), now()), 
('test', 'value', 'string', 'trim whitespace', 'cleanse', 'function', 'trim', '{column}', null, 'string', 1, current_user(), now() ),
('test', '*', 'string', 'trim whitespace', 'cleanse', 'function', 'trim', '{column}', null, 'string', 2, current_user(), now() );

In [0]:
%sql
create schema if not exists `test`;
drop table if exists `test`.`rule`;
create table if not exists `test`.`rule`(
  `id`  bigint generated always as identity,
  `value` string,
  `expected_value` string,
  `expected_type` string
);

insert into `test`.`rule` (`value`, `expected_value`, `expected_type`)
values 
('           stuff  stuff    stuff \n stuff', 'stuff stuff stuff stuff', 'string')

In [0]:
from pyspark.sql import functions as fn
from pyspark.sql import DataFrame

def cleanse_rule(rule:dict):
  variables = ['column', 'replacement']
  parameters = rule.get("parameters", "") 
  parameters = "" if parameters is None else parameters
  if parameters:
    for v in variables:
      var = "{" + v + "}"
      val = rule.get(v, "")
      val = "" if val is None else val
      parameters = parameters.replace(var, val)
    parameters = f"({parameters})"
  else:
    parameters = "()"

  r = {
    "column": rule["column"],
    "order": rule["order"],
    "data_type": rule["data_type"],
    "function": f"{rule['function']}{parameters}",
    "cast_as": rule["cast_as"],
    "description": rule["description"]
  }
  return r


def get_rules(column:str, data_type:str, rules:list):

  rules = [
    r for r in rules
    if r['column'] in [column, "*"] and r['data_type'] in [data_type, "*"]
  ]
  rules = sorted(rules, key=lambda d: d['order'])
  return rules


def apply_cleanse(
  df:DataFrame, 
  dataset:str,
  enable_pre_cleansed:bool=False
):

  df_rules = spark.sql("""
    select *
    from system.rule
    where dataset = {dataset}
    and class = 'cleanse'
    order by `column`, `order`               
  """, dataset=dataset).collect()
  
  rules = [cleanse_rule(rule.asDict()) for rule in df_rules]

  for column in df.columns:

    data_type = df.schema[column].dataType.typeName()
    column_rules = get_rules(column, data_type, rules)

    if enable_pre_cleansed:
      df = df.withColumn(f"_pre_cleansed_{column}", fn.col(column))

    for rule in column_rules:
      function = rule['function'].replace("*", column)
      function = f"cast({function} as {rule['cast_as']})"
      print(f"Applying rule {rule['order']}:{rule['description']} using function:{rule['function']} to dataset column {dataset}.{column} casting as {rule['cast_as']}")
      df = df.withColumn(column, fn.expr(function))

  return df





In [0]:


df = spark.sql("""
  select *
  from test.rule
""")

df = apply_cleanse(
  df, 
  dataset='test', 
  enable_pre_cleansed=True
) 

df = (df
  .withColumn("succeeded", fn.expr("value == expected_value"))
)

df.display()
