# Data Understanding - DQ Rules as SQL

This is an exploration for writing (and visualizing) data quality rules. Think of the problem
when you need to communicate data issues between two teams (companies). Data issues may range
from simple things like "collumn missing" or "value is not supposed to be null" to something
more advanced like "codes should be joinable with industry catalog foo after 1/1/2020".

There is not much explanation here. But with a bit of luck it will reappear soon(-ish) as usable
open source project (with spark or clickhouse in python or scala).


In [1]:
import contextlib, csv, importlib, json, os, pwd, re, subprocess, tempfile, textwrap
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Generator, Iterator, List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from dataclasses_json import dataclass_json

import pyspark
from pyspark.sql import DataFrame, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T


In [2]:
spark_builder = pyspark.sql.SparkSession.builder.appName('pysparktest')
# spark_builder = spark_builder.config('spark.jars', '/some/path/my.jar')
# spark_builder = spark_builder.config('some.key', 'value') 
spark = spark_builder.getOrCreate()


In [3]:
data_dir = 'data'
os.makedirs(data_dir, exist_ok=True) 
# !mkdir -p '{data_dir}'
tmp_dir = 'tmp'
os.makedirs(tmp_dir, exist_ok=True) 


## Test Data

In [4]:
quality_codes = [{'quality': i, 'category': ['bad', 'average', 'good'][max(0, (i-1)//3)]} for i in range(10)]
products = [{'id': i, 'name': f'product_{i}'} for i in range(10)]
orders = [{
    'id': i,
    'customer_id': f'customer_{i % 3}',
    'product_id': f'product_{abs(hash(str(i))) % 5}',
    'quality': abs(hash(str(i))) % 10
} for i in range(10)]

orders[1]['id'] = None
orders[2]['id'] = -42
orders[3]['id'] = 42
orders[4]['id'] = 42
orders[5]['customer_id'] = ''
orders[6]['product_id'] = None
orders[7]['product_id'] = 'product_non_existing'
orders[8]['quality'] = None
orders[9]['quality'] = 42
orders[9]['product_id'] = 'malformed_product_id'

def create_and_register_data_frame(data, table_name=None):
    rows = [ Row(**v) for v in data ]
    df = spark.createDataFrame(rows)
    if table_name:
        df.createOrReplaceTempView(table_name)
#         csv_file_name = f'{data_dir}/{table_name}.csv'
#         df.to_csv(csv_file_name, header=True, index=False, index_label='index')
#         df = spark.read.format('csv').option('header', 'true').load(csv_file_name)
    return df

quality_code_dfs = create_and_register_data_frame(quality_codes, 'quality_code')
product_dfs = create_and_register_data_frame(products, 'product')
order_dfs = create_and_register_data_frame(orders, 'order')
order_dfs.toPandas()


Unnamed: 0,id,customer_id,product_id,quality
0,0.0,customer_0,product_1,6.0
1,,customer_1,product_1,1.0
2,-42.0,customer_2,product_1,1.0
3,42.0,customer_0,product_0,5.0
4,42.0,customer_1,product_4,4.0
5,5.0,,product_1,1.0
6,6.0,customer_0,,7.0
7,7.0,customer_1,product_non_existing,4.0
8,8.0,customer_2,product_0,
9,9.0,customer_0,malformed_product_id,42.0


## Metadata

We assume they come from outside. We can get them from file system stats (if the input is a file).

In [5]:
metadata = {
    'filename': 'orders_20200202_200002.csv',
    'size': '123456789',
    'upload_date': '2020-02-02T20:20:20Z',
}
# metadata_df = pd.DataFrame(metadata.items(), columns=['key', 'value'])
# metadata_df
metadata


{'filename': 'orders_20200202_200002.csv',
 'size': '123456789',
 'upload_date': '2020-02-02T20:20:20Z'}

## DQ Rules in SQL

In [6]:
dq_sql = '''
WITH wide_order AS (
  SELECT * FROM order
)
SELECT
  table.*
  ,(id IS NOT NULL) AND (id >= 0) AS id_valid
  ,(COUNT(*) OVER (PARTITION BY id)) = 1 AS id_unique
  ,(customer_id IS NOT NULL) AND (LENGTH(customer_id) > 0) AS customer_id_valid
  ,product_id IS NOT NULL AS product_id_not_null
  ,product_id RLIKE 'product_\\\\d+' AS product_id_valid
  ,regexp_extract(customer_id, 'customer_(\\\\d+)') <= regexp_extract(product_id, 'product_(\\\\d+)')
      AS customer_id_less_or_equal_product_id
  ,(table.quality IS NOT NULL) AND (table.quality >= 0) AS quality_valid
--  ,EXISTS (SELECT * FROM quality_code AS qc WHERE table.quality = qc.quality) AS quality_in_quality_code_exists
  ,(SELECT FIRST(qcss.quality) FROM quality_code AS qcss WHERE table.quality = qcss.quality) IS NOT NULL
      AS quality_in_quality_code_sub_select
  ,qc.quality IS NOT NULL AS quality_in_quality_code_join
FROM wide_order AS table
LEFT JOIN quality_code AS qc ON table.quality = qc.quality
ORDER BY id
'''

dfs = spark.sql(dq_sql)
dfs.toPandas()


Unnamed: 0,id,customer_id,product_id,quality,id_valid,id_unique,customer_id_valid,product_id_not_null,product_id_valid,customer_id_less_or_equal_product_id,quality_valid,quality_in_quality_code_sub_select,quality_in_quality_code_join
0,,customer_1,product_1,1.0,False,True,True,True,True,True,True,True,True
1,-42.0,customer_2,product_1,1.0,False,True,True,True,True,False,True,True,True
2,0.0,customer_0,product_1,6.0,True,True,True,True,True,True,True,True,True
3,5.0,,product_1,1.0,True,True,False,True,True,True,True,True,True
4,6.0,customer_0,,7.0,True,True,True,False,,,True,True,True
5,7.0,customer_1,product_non_existing,4.0,True,True,True,True,False,False,True,True,True
6,8.0,customer_2,product_0,,True,True,True,True,True,False,False,False,False
7,9.0,customer_0,malformed_product_id,42.0,True,True,True,True,False,False,True,False,False
8,42.0,customer_0,product_0,5.0,True,False,True,True,True,True,True,True,True
9,42.0,customer_1,product_4,4.0,True,False,True,True,True,True,True,True,True


## Parsed SQL

This is generated by scala code using spark libraries if the dq rules are authored as the sql statement above.

In [7]:
@dataclass_json
@dataclass
class DqColumn:
    name: str
    is_key: bool

@dataclass_json
@dataclass
class DqRule:
    name: str
    expression: str
    referenced_columns: List[str]
    description: Optional[str] = None

columns = [
    DqColumn('id', True),
    DqColumn('customer_id', False),
    DqColumn('product_id', False),
    DqColumn('quality', False),
]
dq_rules = [
    DqRule('id_valid', '(id IS NOT NULL) AND (id >= 0)', ['id']),
    DqRule('id_unique', '(COUNT(*) OVER (PARTITION BY id)) = 1', ['id']),
    DqRule('customer_id_valid', '(customer_id IS NOT NULL) AND (LENGTH(customer_id) > 0)', ['customer_id']),
    DqRule('product_id_not_null', 'product_id IS NOT NULL', ['product_id']),
    DqRule('product_id_valid', 'product_id RLIKE \'product_\\\\d+\'', ['product_id']),
    DqRule(
        'customer_id_less_or_equal_product_id',
        'regexp_extract(customer_id, \'customer_(\\\\d+)\') <= regexp_extract(product_id, \'product_(\\\\d+)\')',
        ['customer_id', 'product_id']
    ),
    DqRule('quality_valid', '(quality IS NOT NULL) AND (quality >= 0)', ['quality'],),
    DqRule(
        'quality_in_quality_code_sub_select',
        '(SELECT FIRST(qcss.quality) FROM quality_code AS qcss WHERE table.quality = qcss.quality) IS NOT NULL',
        ['quality'],
        description = 'Quality code must be one of industry agreed codes in XYZ spec.'
    ),
]

column_names = [column.name for column in columns]
key_column_names = [column.name for column in columns if column.is_key]
rule_names = [rule.name for rule in dq_rules]
rule_names_by_column_name = defaultdict(list)
for rule in dq_rules:
    for column in rule.referenced_columns:
        rule_names_by_column_name[column].append(rule.name)


In [8]:
def _named_struct_str_expr(**kwargs):
    named_fields = [f"'{k}', {v}" for k, v in kwargs.items()]
    return f"NAMED_STRUCT({', '.join(named_fields)})"
# print(_named_struct_str_expr(rows='COUNT(1)'))


## Statistics

In [9]:
counts_dfs = dfs.selectExpr(
    _named_struct_str_expr(rows='COUNT(1)', rows_valid=f'COUNT_IF({" AND ".join(rule_names)})') + ' AS table_stats',
    _named_struct_str_expr(**{cn: f'COUNT_IF(NOT({" AND ".join(rns)}))' for cn, rns in rule_names_by_column_name.items()}) + ' AS column_errors',
    _named_struct_str_expr(**{rn: f'COUNT_IF(NOT({rn}))' for rn in rule_names}) + ' AS rule_violations',
    _named_struct_str_expr(**{rn: f'COUNT(1)' for rn in rule_names}) + ' AS rule_rows',
)
counts_dfs.toPandas()


Unnamed: 0,table_stats,column_errors,rule_violations,rule_rows
0,"(10, 1)","(4, 5, 5, 2)","(2, 2, 1, 1, 2, 4, 1, 2)","(10, 10, 10, 10, 10, 10, 10, 10)"


In [10]:
counts = counts_dfs.collect()[0].asDict()
table_stats = counts['table_stats'].asDict()
column_errors = counts['column_errors'].asDict()
rule_violations = counts['rule_violations'].asDict()
rule_rows = counts['rule_rows'].asDict()

stack_expr = ', '.join([f'"{cn}", column_errors.{cn}' for cn in column_names])
column_stats_dfs = counts_dfs.selectExpr(f'STACK({len(column_names)}, {stack_expr}) AS (column_name, errors)')
column_stats_df = column_stats_dfs.toPandas()
column_stats_df

stack_expr = ', '.join([f'"{rn}", rule_violations.{rn}, rule_rows.{rn}' for rn in rule_names])
rule_stats_dfs = counts_dfs.selectExpr(f'STACK({len(dq_rules)}, {stack_expr}) AS (rule_name, violations, rows)')
rule_stats_df = rule_stats_dfs.toPandas()
rule_stats_df


Unnamed: 0,rule_name,violations,rows
0,id_valid,2,10
1,id_unique,2,10
2,customer_id_valid,1,10
3,product_id_not_null,1,10
4,product_id_valid,2,10
5,customer_id_less_or_equal_product_id,4,10
6,quality_valid,1,10
7,quality_in_quality_code_sub_select,2,10


## Sampling

In [11]:
hashes_exprs = [f'IF({r.name}, -1, ABS(XXHASH64({", ".join(r.referenced_columns)}, 42))) AS {r.name}' for r in dq_rules]
# key_hash_expr = f'XXHASH64({", ".join(key_column_names)}, 42) AS key_hash'
hashes_dfs = dfs.selectExpr(*key_column_names, *hashes_exprs)
hashes_dfs.toPandas()


Unnamed: 0,id,id_valid,id_unique,customer_id_valid,product_id_not_null,product_id_valid,customer_id_less_or_equal_product_id,quality_valid,quality_in_quality_code_sub_select
0,,387659249110444264,-1,-1,-1,-1,-1,-1,-1
1,-42.0,302398896908130140,-1,-1,-1,-1,4079665484095093443,-1,-1
2,0.0,-1,-1,-1,-1,-1,-1,-1,-1
3,5.0,-1,-1,3524361793169708440,-1,-1,-1,-1,-1
4,6.0,-1,-1,-1,387659249110444264,387659249110444264,5071635315020205719,-1,-1
5,7.0,-1,-1,-1,-1,61245963751297218,580177245197876706,-1,-1
6,8.0,-1,-1,-1,-1,-1,3747556350684387284,387659249110444264,387659249110444264
7,9.0,-1,-1,-1,-1,769905491925726899,4955671045544078839,-1,7302487111805052767
8,42.0,-1,7302487111805052767,-1,-1,-1,-1,-1,-1
9,42.0,-1,7302487111805052767,-1,-1,-1,-1,-1,-1


In [12]:
# TODO: Should we use STACK or UNNION? Shoul we do row numbers? The custom aggregte function the only reasonable option.

max_rows_per_rule = 1

# samples_expr = f'{", ".join(key_column_names)}, {{rn}} AS hash, "{{rn}}" AS rule_name'
# samples_query_single = f'(SELECT {samples_expr} FROM hashes WHERE {{rn}} >= 0 ORDER BY hash ASC LIMIT {max_rows_per_rule})'
# samples_query = '\nUNION\n'.join([samples_query_single.format(rn=rn) for rn in rule_names])
# # print(samples_query)
# hashes_dfs.createOrReplaceTempView('hashes')
# samples_dfs = spark.sql(samples_query)
# spark.catalog.dropTempView('hashes')
# samples_dfs.orderBy(key_column_names).toPandas()

stack_expr = ', '.join([f'"{rn}", {rn}' for rn in rule_names])
samples_dfs = hashes_dfs.selectExpr(*key_column_names, f'STACK({len(rule_names)}, {stack_expr}) AS (rule_name, hash)')
samples_dfs = samples_dfs.where('hash >= 0')#.orderBy('hash')
samples_rn_expr = 'ROW_NUMBER() OVER (PARTITION BY rule_name ORDER BY hash ASC) AS rn'
samples_dfs = samples_dfs.selectExpr(*key_column_names, 'hash', 'rule_name', samples_rn_expr)
samples_dfs = samples_dfs.where(f'rn <= {max_rows_per_rule}').drop('rn')
samples_dfs.orderBy(key_column_names).toPandas()


Unnamed: 0,id,hash,rule_name
0,-42,302398896908130140,id_valid
1,5,3524361793169708440,customer_id_valid
2,6,387659249110444264,product_id_not_null
3,7,61245963751297218,product_id_valid
4,7,580177245197876706,customer_id_less_or_equal_product_id
5,8,387659249110444264,quality_in_quality_code_sub_select
6,8,387659249110444264,quality_valid
7,42,7302487111805052767,id_unique


In [13]:
# TODO: This will wrong in number of ways if there is collision on key. The cutom aggregate function is necessary.

sampled_keys_dfs = samples_dfs.select(key_column_names).distinct()
sampled_dfs = dfs.join(F.broadcast(sampled_keys_dfs), key_column_names).orderBy(key_column_names)
sampled_dfs.toPandas()


Unnamed: 0,id,customer_id,product_id,quality,id_valid,id_unique,customer_id_valid,product_id_not_null,product_id_valid,customer_id_less_or_equal_product_id,quality_valid,quality_in_quality_code_sub_select,quality_in_quality_code_join
0,-42,customer_2,product_1,1.0,False,True,True,True,True,False,True,True,True
1,5,,product_1,1.0,True,True,False,True,True,True,True,True,True
2,6,customer_0,,7.0,True,True,True,False,,,True,True,True
3,7,customer_1,product_non_existing,4.0,True,True,True,True,False,False,True,True,True
4,8,customer_2,product_0,,True,True,True,True,True,False,False,False,False
5,42,customer_0,product_0,5.0,True,False,True,True,True,True,True,True,True
6,42,customer_1,product_4,4.0,True,False,True,True,True,True,True,True,True


## Rendering

In [14]:
def get_row_sample_styler(df):
    violations_df = pd.DataFrame({cn: df[rns].all(axis=1) for cn, rns in rule_names_by_column_name.items()})
    violations_style_df = violations_df.replace({True: None, False: 'background-color: #ffa39e'})
    styler = df[column_names].style
#     styler.set_properties(subset=key_column_names, **{'background-color': '#fffb8f'})
    styler.apply(lambda data: violations_style_df, axis=None)
#     styler.set_precision(0)
#     styler.format({'id': '{:.0f}', 'quality': '{:.0f}'})
#     styler.hide_index()
    styler.set_na_rep('NULL')
    return styler

get_row_sample_styler(dfs.toPandas())
# get_row_sample_styler(sampled_dfs.toPandas())


Unnamed: 0,id,customer_id,product_id,quality
0,,customer_1,product_1,1.0
1,-42.0,customer_2,product_1,1.0
2,0.0,customer_0,product_1,6.0
3,5.0,,product_1,1.0
4,6.0,customer_0,,7.0
5,7.0,customer_1,product_non_existing,4.0
6,8.0,customer_2,product_0,
7,9.0,customer_0,malformed_product_id,42.0
8,42.0,customer_0,product_0,5.0
9,42.0,customer_1,product_4,4.0


In [15]:
from IPython.core.display import display, display_html, HTML

# HTML(f'''
# <table>
#     <tr><th>Rule Stats</th><th>Rows</th></tr>
#     <tr><td>{rule_stats_df._repr_html_()}</td><td>{pretty_print_dq_sample(sampled_dfs)._repr_html_()}</td></tr>
# </table>
# ''')

def get_rule_description(rule_name):
    rules_by_name = {rule.name: rule for rule in dq_rules}
    rule = rules_by_name[rule_name]
    return '  <br/>  '.join([s for s in [rule.description, rule.expression] if s is not None])

render_rules_df = rule_stats_df.copy()
render_rules_df['description'] = render_rules_df['rule_name'].map(get_rule_description)
render_rules_df = render_rules_df.set_index('rule_name').rename_axis(None)
render_rules_df = render_rules_df.style
render_rules_df.set_properties(subset=['description'], **{'max-width': '32em'})
render_rules_df.set_properties(**{'text-align': 'left'})
render_rules_df.set_table_styles([{'selector': 'th', 'props': [('text-align', 'left')]}])

render_metadata_df = pd.DataFrame({**metadata, **table_stats}.items(), columns=['key', 'value'])
render_metadata_df = render_metadata_df.set_index('key').rename_axis(None)

render_row_sample_df = sampled_dfs.toPandas()
# render_row_sample_df['product_id'] = render_row_sample_df['product_id'] + ' ~=[,,_,,]:3' * 20
# render_row_sample_df = render_row_sample_df.rename(columns=lambda c: f'{c} ({column_stats.get(c, 0)})')
# render_row_sample_df = render_row_sample_df.rename(columns={'id': 'qqqq'})
# render_row_sample_df = render_row_sample_df[['id']]
render_row_sample_df = get_row_sample_styler(render_row_sample_df)
render_row_sample_df.set_properties(**{'white-space': 'nowrap'})

HTML(f'''
<table>
    <tr><th>Metadata</th><th>Rule Stats</th></tr>
    <tr>
        <td>
            <div style="height: 16em; overflow: auto;">
                {render_metadata_df._repr_html_()}</td>
            </div>
        <td>
            <div style="height: 16em; overflow-y: scroll;">
                {render_rules_df._repr_html_()}
            </div>
        </td>
    </tr>
    <tr><th colspan="2">Row Sample</th></tr>
    <tr>
        <td colspan="2">
            <div style="max-height: 32em; max-width: 100em; overflow: auto;">
                {render_row_sample_df._repr_html_()}
            </div>
        </td>
    </tr>
</table>
''')


Unnamed: 0_level_0,value,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0
Unnamed: 0_level_1,violations,rows,description,Unnamed: 4_level_1
Unnamed: 0_level_2,id,customer_id,product_id,quality
filename,orders_20200202_200002.csv,,,
size,123456789,,,
upload_date,2020-02-02T20:20:20Z,,,
rows,10,,,
rows_valid,1,,,
id_valid,2,10,(id IS NOT NULL) AND (id >= 0),
id_unique,2,10,(COUNT(*) OVER (PARTITION BY id)) = 1,
customer_id_valid,1,10,(customer_id IS NOT NULL) AND (LENGTH(customer_id) > 0),
product_id_not_null,1,10,product_id IS NOT NULL,
product_id_valid,2,10,product_id RLIKE 'product_\\d+',

Unnamed: 0,value
filename,orders_20200202_200002.csv
size,123456789
upload_date,2020-02-02T20:20:20Z
rows,10
rows_valid,1

Unnamed: 0,violations,rows,description
id_valid,2,10,(id IS NOT NULL) AND (id >= 0)
id_unique,2,10,(COUNT(*) OVER (PARTITION BY id)) = 1
customer_id_valid,1,10,(customer_id IS NOT NULL) AND (LENGTH(customer_id) > 0)
product_id_not_null,1,10,product_id IS NOT NULL
product_id_valid,2,10,product_id RLIKE 'product_\\d+'
customer_id_less_or_equal_product_id,4,10,"regexp_extract(customer_id, 'customer_(\\d+)') <= regexp_extract(product_id, 'product_(\\d+)')"
quality_valid,1,10,(quality IS NOT NULL) AND (quality >= 0)
quality_in_quality_code_sub_select,2,10,Quality code must be one of industry agreed codes in XYZ spec. (SELECT FIRST(qcss.quality) FROM quality_code AS qcss WHERE table.quality = qcss.quality) IS NOT NULL

Unnamed: 0,id,customer_id,product_id,quality
0,-42,customer_2,product_1,1.0
1,5,,product_1,1.0
2,6,customer_0,,7.0
3,7,customer_1,product_non_existing,4.0
4,8,customer_2,product_0,
5,42,customer_0,product_0,5.0
6,42,customer_1,product_4,4.0


## Joins - equi-join

In [16]:
def generate_equi_join_rate_query(
        left_table_name: str, right_table_name: str, left_key: List[str], right_key: List[str]) -> str:
    join_rate_query = f"""
        WITH left AS (
          (SELECT {', '.join(left_key)}, COUNT(1) AS cnt FROM {left_table_name} GROUP BY {', '.join(left_key)})
        ), right AS (
          (SELECT {', '.join(right_key)}, COUNT(1) AS cnt FROM {right_table_name} GROUP BY {', '.join(right_key)})
        )
        SELECT
          {', '.join([f'COALESCE(left.{lkey}, right.{rkey}) AS {lkey}' for lkey, rkey in zip(left_key, right_key)])},
          left.cnt AS left_cnt, right.cnt AS right_cnt
        FROM left FULL JOIN right
        ON {' AND '.join([f'left.{lkey} <=> right.{rkey}' for lkey, rkey in zip(left_key, right_key)])}
        ORDER BY {', '.join(left_key)}
    """
    return textwrap.dedent(join_rate_query)

join_rate_query = generate_equi_join_rate_query('order', 'product', ['product_id'], ['name'])
# print(join_rate_query)
join_rate_dfs = spark.sql(join_rate_query)
join_rate_dfs.toPandas()


Unnamed: 0,product_id,left_cnt,right_cnt
0,,1.0,
1,malformed_product_id,1.0,
2,product_0,2.0,1.0
3,product_1,4.0,1.0
4,product_2,,1.0
5,product_3,,1.0
6,product_4,1.0,1.0
7,product_5,,1.0
8,product_6,,1.0
9,product_7,,1.0


In [17]:
join_rate_dfs.selectExpr(
    'SUM(left_cnt) AS left_rows',
    'SUM(right_cnt) AS right_rows',
    'COUNT(left_cnt > 0) AS left_distinct',
    'COUNT(right_cnt > 0) AS right_distinct',
    'SUM(left_cnt * right_cnt) AS inner',
    'SUM(left_cnt * GREATEST(1, right_cnt)) AS left',
    'SUM(GREATEST(1, left_cnt) * right_cnt) AS right',
    'SUM(GREATEST(1, left_cnt) * GREATEST(1, right_cnt)) AS full',
    'SUM(left_cnt) * SUM(right_cnt) AS cross',
).toPandas()#.transpose().rename(columns={0: 'count'})


Unnamed: 0,left_rows,right_rows,left_distinct,right_distinct,inner,left,right,full,cross
0,10,10,6,10,7,10,14,17,100


In [18]:
join_rate_dfs.selectExpr(
    '*',
    'left_cnt * right_cnt AS inner',
    'left_cnt * GREATEST(1, right_cnt) AS left',
    'GREATEST(1, left_cnt) * right_cnt AS right',
    'GREATEST(1, left_cnt) * GREATEST(1, right_cnt) AS full',
).toPandas()


Unnamed: 0,product_id,left_cnt,right_cnt,inner,left,right,full
0,,1.0,,,1.0,,1
1,malformed_product_id,1.0,,,1.0,,1
2,product_0,2.0,1.0,2.0,2.0,2.0,2
3,product_1,4.0,1.0,4.0,4.0,4.0,4
4,product_2,,1.0,,,1.0,1
5,product_3,,1.0,,,1.0,1
6,product_4,1.0,1.0,1.0,1.0,1.0,1
7,product_5,,1.0,,,1.0,1
8,product_6,,1.0,,,1.0,1
9,product_7,,1.0,,,1.0,1


In [19]:
join_rate_dfs.selectExpr(
    '*',
    'left_cnt IS NOT NULL AND left_cnt > 0 AS left_not_missing',
    'right_cnt IS NOT NULL AND right_cnt > 0 AS right_not_missing',
    'left_cnt IS NULL OR left_cnt < 2 AS left_unique',
    'right_cnt IS NULL OR right_cnt < 2 AS right_unique',
    # TODO: these are ugly, make them work for composite key, report them per key part?
    'NOT (product_id IS NULL AND left_cnt IS NOT NULL AND left_cnt > 0) AS left_not_complete',
    'NOT (product_id IS NULL AND right_cnt IS NOT NULL AND right_cnt > 0) AS right_not_complete',
).toPandas()


Unnamed: 0,product_id,left_cnt,right_cnt,left_not_missing,right_not_missing,left_unique,right_unique,left_not_complete,right_not_complete
0,,1.0,,True,False,True,True,False,True
1,malformed_product_id,1.0,,True,False,True,True,True,True
2,product_0,2.0,1.0,True,True,False,True,True,True
3,product_1,4.0,1.0,True,True,False,True,True,True
4,product_2,,1.0,False,True,True,True,True,True
5,product_3,,1.0,False,True,True,True,True,True
6,product_4,1.0,1.0,True,True,True,True,True,True
7,product_5,,1.0,False,True,True,True,True,True
8,product_6,,1.0,False,True,True,True,True,True
9,product_7,,1.0,False,True,True,True,True,True


## Joins - arbitrary join expressions

In [20]:
# TODO: Can we do better (without risking cartesian product)?
spark.sql(f"""
SELECT
  COUNT(1) AS total,
  COUNT_IF(ISNULL(product.name)) AS miss,
  COUNT(order.product_id) AS left_cnt,
  COUNT(product.name) AS right_cnt,
  COUNT(DISTINCT order.product_id) AS left_distinct_cnt,
  COUNT(DISTINCT product.name) AS right_distinct_cnt
FROM order LEFT JOIN product
--ON order.product_id = product.name
ON MOD(XXHASH64(order.product_id, product.name), 2) = 0
GROUP BY 'dummy'
""").toPandas()

# spark.sql(f"""
# SELECT
#   *
# FROM order LEFT JOIN product ON order.product_id = product.name
# ORDER BY order.id
# """).toPandas()


Unnamed: 0,total,miss,left_cnt,right_cnt,left_distinct_cnt,right_distinct_cnt
0,60,0,56,60,5,10


## Schema Checks Automation

In [21]:
# TODO: Completely redo this. The type check actually does not work this way.

# this can come from excel except they are not standardized or it can come from dataclass
# but both will fall short once one want to express anything usual like min max value, length, regex, ...
expected_schema = T.StructType([
    T.StructField('id', T.LongType(), False),
    T.StructField('customer_id', T.StringType(), False),
    T.StructField('product_id', T.StringType(), False),
    T.StructField('quality', T.FloatType(), True),
#     T.StructField('quality', T.BooleanType(), True),
#     T.StructField('meh', T.StringType(), True),
])

@dataclass
class NamedExpression(object):
    name: Union[str, Tuple[str, ...]]
    expression: str

    def name_str(self) -> str:
        if isinstance(self.name, str):
            return self.name
        if isinstance(self.name, tuple):
            return '__'.join(self.name)

    def named_expression_str(self) -> str:
        return f'{self.expression} AS {self.name_str()}'

def _generate_field_check_expression(field, existing_columns: List[str]) -> Generator[NamedExpression, None, None]:
    if field.name in existing_columns:
        expr = f'{field.name} IS NULL OR CAST({field.name} AS {field.dataType.typeName()}) IS NOT NULL'
        yield NamedExpression((field.name, 'has_valid_type'), expr)
        if not field.nullable:
            yield NamedExpression((field.name, 'is_not_null'), f'{field.name} IS NOT NULL')
    else:
        yield NamedExpression((field.name, 'is_present'), f'false')

def generate_schema_check_statement(table_name, schema):
    existing_columns = spark.table(table_name).columns
    expressions = [expr for field in schema for expr in _generate_field_check_expression(field, existing_columns)]
    nl = '\n'
    expressions_str = ',\n    '.join(['*', *[e.named_expression_str() for e in expressions]])
    return f"SELECT{nl}    {expressions_str}{nl}FROM {table_name}"

schema_check_query = generate_schema_check_statement('order', expected_schema)
# print(schema_check_query)
schema_check_dfs = spark.sql(schema_check_query)
schema_check_dfs.toPandas()


Unnamed: 0,id,customer_id,product_id,quality,id__has_valid_type,id__is_not_null,customer_id__has_valid_type,customer_id__is_not_null,product_id__has_valid_type,product_id__is_not_null,quality__has_valid_type
0,0.0,customer_0,product_1,6.0,True,True,True,True,True,True,True
1,,customer_1,product_1,1.0,True,False,True,True,True,True,True
2,-42.0,customer_2,product_1,1.0,True,True,True,True,True,True,True
3,42.0,customer_0,product_0,5.0,True,True,True,True,True,True,True
4,42.0,customer_1,product_4,4.0,True,True,True,True,True,True,True
5,5.0,,product_1,1.0,True,True,True,True,True,True,True
6,6.0,customer_0,,7.0,True,True,True,True,True,False,True
7,7.0,customer_1,product_non_existing,4.0,True,True,True,True,True,True,True
8,8.0,customer_2,product_0,,True,True,True,True,True,True,True
9,9.0,customer_0,malformed_product_id,42.0,True,True,True,True,True,True,True


## Custom Format

In [22]:
(table_name, timestamp) = re.search('(?P<table_name>.*)_(?P<timestamp>\d+_\d+).csv', metadata['filename']).groups()

def _escape_quotes(s):
    return s.replace('\'', '\\\'').replace('"', '\\"')

stack_exprs = [
    f'{rule.name}, "{rc}", CAST({rc} AS STRING), "{rule.name}", "{_escape_quotes(rule.expression)}"'
    for rule in dq_rules for rc in rule.referenced_columns]
stacked_dfs = dfs.selectExpr(
    f'ARRAY({", ".join(key_column_names)}) AS key',
    f'STACK({len(stack_exprs)}, {", ".join(stack_exprs)}) AS (passed, column, value, rule_name, expression)')
stacked_dfs = stacked_dfs.where('NOT passed')
custom_dfs = stacked_dfs.selectExpr(
    f'"{table_name}" AS table', f'"{timestamp}" AS version', 'ARRAY_JOIN(key, ":") AS key',
    'column', 'value', 'rule_name AS rule', 'expression AS description')
custom_dfs.toPandas()


Unnamed: 0,table,version,key,column,value,rule,description
0,orders,20200202_200002,,id,,id_valid,(id IS NOT NULL) AND (id >= 0)
1,orders,20200202_200002,-42.0,id,-42,id_valid,(id IS NOT NULL) AND (id >= 0)
2,orders,20200202_200002,-42.0,customer_id,customer_2,customer_id_less_or_equal_product_id,"regexp_extract(customer_id, 'customer_(\d+)') ..."
3,orders,20200202_200002,-42.0,product_id,product_1,customer_id_less_or_equal_product_id,"regexp_extract(customer_id, 'customer_(\d+)') ..."
4,orders,20200202_200002,5.0,customer_id,,customer_id_valid,(customer_id IS NOT NULL) AND (LENGTH(customer...
5,orders,20200202_200002,6.0,product_id,,product_id_not_null,product_id IS NOT NULL
6,orders,20200202_200002,7.0,product_id,product_non_existing,product_id_valid,product_id RLIKE 'product_\d+'
7,orders,20200202_200002,7.0,customer_id,customer_1,customer_id_less_or_equal_product_id,"regexp_extract(customer_id, 'customer_(\d+)') ..."
8,orders,20200202_200002,7.0,product_id,product_non_existing,customer_id_less_or_equal_product_id,"regexp_extract(customer_id, 'customer_(\d+)') ..."
9,orders,20200202_200002,8.0,customer_id,customer_2,customer_id_less_or_equal_product_id,"regexp_extract(customer_id, 'customer_(\d+)') ..."


## Output Json Blob

Note: The whole json structure will be redone to something more logical.

In [23]:
violations_exprs = [f'IF(NOT {rn}, \'{rn}\', null)' for rn in rule_names]
violations_expr = f'FILTER(ARRAY({", ".join(violations_exprs)}), v -> v IS NOT NULL)'
row_expr = _named_struct_str_expr(**{cn: cn for cn in column_names})
row_sample_expr = _named_struct_str_expr(row=row_expr, violations=violations_expr)
row_sample_dfs = sampled_dfs.selectExpr(f'{row_sample_expr} AS row_sample')
row_sample_df = row_sample_dfs.toPandas()
row_sample_df.style.set_properties(subset=['row_sample'], **{'max-width': '100%', 'text-align': 'left'})


Unnamed: 0,row_sample
0,"Row(row=Row(id=-42, customer_id='customer_2', product_id='product_1', quality=1), violations=['id_valid', 'customer_id_less_or_equal_product_id'])"
1,"Row(row=Row(id=5, customer_id='', product_id='product_1', quality=1), violations=['customer_id_valid'])"
2,"Row(row=Row(id=6, customer_id='customer_0', product_id=None, quality=7), violations=['product_id_not_null'])"
3,"Row(row=Row(id=7, customer_id='customer_1', product_id='product_non_existing', quality=4), violations=['product_id_valid', 'customer_id_less_or_equal_product_id'])"
4,"Row(row=Row(id=8, customer_id='customer_2', product_id='product_0', quality=None), violations=['customer_id_less_or_equal_product_id', 'quality_valid', 'quality_in_quality_code_sub_select'])"
5,"Row(row=Row(id=42, customer_id='customer_0', product_id='product_0', quality=5), violations=['id_unique'])"
6,"Row(row=Row(id=42, customer_id='customer_1', product_id='product_4', quality=4), violations=['id_unique'])"


In [24]:
row_sample = [{**rs.asDict(), 'row': rs.asDict()['row'].asDict()} for rs in row_sample_df['row_sample'].to_list()]

dq_json_blob = {
    'metadata': {**metadata, **table_stats},
    'schema': [c.to_dict() for c in columns],
    'dq_rules': [r.to_dict() for r in dq_rules],
    'dq_rules_stats': rule_stats_df.to_dict(orient='records'),
    'column_stats': column_stats_df.to_dict(orient='records'),
    'row_sample': row_sample,
}

with open(f'{tmp_dir}/dq_json_blob.json', 'w') as outfile:
    json.dump(dq_json_blob, outfile, indent=2)
print(json.dumps(dq_json_blob, indent=2))


{
  "metadata": {
    "filename": "orders_20200202_200002.csv",
    "size": "123456789",
    "upload_date": "2020-02-02T20:20:20Z",
    "rows": 10,
    "rows_valid": 1
  },
  "schema": [
    {
      "name": "id",
      "is_key": true
    },
    {
      "name": "customer_id",
      "is_key": false
    },
    {
      "name": "product_id",
      "is_key": false
    },
    {
      "name": "quality",
      "is_key": false
    }
  ],
  "dq_rules": [
    {
      "name": "id_valid",
      "expression": "(id IS NOT NULL) AND (id >= 0)",
      "referenced_columns": [
        "id"
      ],
      "description": null
    },
    {
      "name": "id_unique",
      "expression": "(COUNT(*) OVER (PARTITION BY id)) = 1",
      "referenced_columns": [
        "id"
      ],
      "description": null
    },
    {
      "name": "customer_id_valid",
      "expression": "(customer_id IS NOT NULL) AND (LENGTH(customer_id) > 0)",
      "referenced_columns": [
        "customer_id"
      ],
      "description": 