In [None]:
#---------------------------------------------------------------------------------------------------------
# notebook_name : data_quality_framework
# notebook_description : this framework notebook used to run a generic data quality checks on a given table
# notebook_use : 1) have the below steps as part of a workflow task after the load notebook task (or)
#                2) add below command tothe load note book at end of it in a seperate cell
#           RunDataQualityNotebookTests(DatabaseName = DatabaseName, TableName = TableName, InsertQuery = Query)
#
# author : nava
# created : 11/24/2021
#---------------------------------------------------------------------------------------------------------

In [None]:
# notebook parameters
dbutils.widgets.text('jobEnv','')
dbutils.widgets.text('DatabaseName','')
dbutils.widgets.text('TableName','')
# dbutils.widgets.text('InsertQuery','')

# get notebook parametrs into local variables
jobEnv=dbutils.widgets.get('jobEnv')
DatabaseName=dbutils.widgets.get('DatabaseName')
TableName=dbutils.widgets.get('TableName')

#InsertQuery=dbutils.widgets.get('InsertQuery')
if jobEnv=='qa':
    job_catalog=''
elif jobEnv=='prod':
    job_catalog=''
else:
    job_catalog=''


In [None]:
# Define Constants

print(jobEnv, DatabaseName, TableName)

  


In [None]:
from dataclasses import dataclass

@dataclass
class DataQualityNotebookTesting:
  job_catalog: str
  DatabaseName: str
  TableName: str
  InsertQuery: str


def RunDataQualityNotebookTests(job_catalog, DatabaseName, TableName):
  """
  function to run the generic tests on a delta table
  DatabaseName, TableName : load DatabaseName, TableName
  """
  
  # if WorkspaceID in [WS_ENV_PROD, WS_ENV_QA, WS_ENV_DEV]:
  print (f'\n -- Generic Tests Started for {job_catalog}.{DatabaseName}.{TableName}')

  try:
    print ('-- -- RefTableConfig_test Tests Started')
    RefTableConfig_test(job_catalog, DatabaseName, TableName)
    print ('-- -- RefTableConfig_test Tests Completed')
  except Exception as Error:
    print(Error)
    exit()

  try:
    print ('-- -- NaturalKey_test Tests Started')
    NaturalKey_test(job_catalog, DatabaseName, TableName)
    print ('-- -- NaturalKey_test Tests Completed')
  except Exception as Error:
    print(Error)
    exit()

  try:
    print ('-- -- NullOrEmptyStringColumns_test Tests Started')
    NullOrEmptyStringColumns_test(job_catalog, DatabaseName, TableName)
    print ('-- -- NullOrEmptyStringColumns_test Tests Completed')
  except Exception as Error:
    print(Error)
    exit()

  try:
    print ('-- -- EmptyString_test Tests Started')
    EmptyString_test(job_catalog, DatabaseName, TableName)
    print ('-- -- EmptyString_test Tests Completed')
  except Exception as Error:
    print(Error)
    exit()

  try:
    print ('-- -- LeadingTrailingSpaces_test Tests Started')
    LeadingTrailingSpaces_test(job_catalog, DatabaseName, TableName)
    print ('-- -- LeadingTrailingSpaces_test Tests Completed')
  except Exception as Error:
    print(Error)
    exit()

  try:
    print ('-- -- RowCount_test Tests Started')
    RowCount_test(job_catalog, DatabaseName, TableName)
    print ('-- -- RowCount_test Tests Completed')
  except Exception as Error:
    print(Error)
    exit()

  try:
    print ('-- -- AuditColumns_test Tests Started')
    AuditColumns_test(job_catalog, DatabaseName, TableName)
    print ('-- -- AuditColumns_test Tests Completed')
  except Exception as Error:
    print(Error)
    exit()

  print (f'\n -- Generic Tests Completed for {job_catalog}.{DatabaseName}.{TableName}')
  
  # else:
    # print (f'\n -- Generic Tests Failed for {DatabaseName}.{TableName}')



In [None]:
def Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, StatusMessage):
  """
  Insert data quality check status into data_quality_monitor table
  """
  try:
    #check the configs from data_quality_checks
    Query = (f'''
      insert into {job_catalog}._utils.data_quality_monitor
      select 
        coalesce(max_id, 0) + 1,
        current_date(),
        "{StatusMessage}" dq_run_status,
        c.dq_id,
        current_timestamp()
      from
        {job_catalog}._utils.data_quality_configs r
        join {job_catalog}._utils.data_quality_checks c
          on r.dq_config_id = c.dq_config_id
        left join (select dq_id, max(dq_run_id) max_id from {job_catalog}.data_quality_monitor group by dq_id) m
          on c.dq_id = m.dq_id
      where 
        r.database_name = "{DatabaseName}"
        and r.table_name = "{TableName}"
        and c.dq_type = "{DataQualityType}" --"RowCount_test"
        and r.is_validate = "Y"
        and c.is_validate = "Y"
      ''')
    
    query_data = spark.sql(Query).collect()
    row_count = query_data[0]
    
    if row_count != 0:
      return 'Data Quality Check status inserted into monitor table'
    else:
      print ('\n FAIL. Data Quality Check status NOT inserted into monitor table, please review.')
      return 'FAIL'
    
  except Exception as Error:
    print(Error)
    exit()


In [None]:
def RefTableConfig_test(job_catalog, DatabaseName, TableName):
  """
  Check if table is in data_quality_configs
  """
  
  try:
    #declare variables
    DataQualityType = 'RefTableConfig_test'
    
    #data_quality_configs tables
    ref_Query = (f'SELECT table_name FROM {job_catalog}._utils.data_quality_configs where is_validate ="Y"')
    ref_TableInfo = spark.sql(ref_Query).collect()
    ref_Tables = [x.table_name for x in ref_TableInfo]
    
    if TableName in ref_Tables:

      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'PASS')

      return 'PASS'
    else: 
      print('\n data_quality_configs Availability Test' +
            f'\n FAIL. Table {TableName} is not available in data_quality_configs. Please review.')
      
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')

      return 'FAIL'
  
  except Exception as Error:
    print(Error)
    exit()


In [None]:
def NaturalKey_test(job_catalog, DatabaseName, TableName):
  """
  Function to check natural key violations
  """

  try:
    #declare variables
    DataQualityType = 'NaturalKey_test'
    
    #get natural key columns from data_quality_checks
    Query = (f'''
      select 
        c.dq_id, c.dq_type, c.dq_rule as ValidateColumns 
      from
        {job_catalog}._utils.data_quality_configs r
        join {job_catalog}._utils.data_quality_checks c
          on r.dq_config_id = c.dq_config_id
      where 
        r.database_name = "{DatabaseName}"
        and r.table_name = "{TableName}"
        and c.dq_type = "NaturalKey_test"
        and c.is_validate = "Y"
      ''')
    query_data = spark.sql(Query).collect()
    if query_data == []:
      #no data for specified table
      print('\n Natural Key Test' +
            '\n FAIL. Natural key info is missing in config table. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
    NK_str = (query_data[0].ValidateColumns)
    
    if NK_str == '':
      #no NK data
      print('\n Natural Key Test' +
            '\n FAIL. No natural key provided. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
    
    #check NK violation
    query_count_NK = (f'SELECT {NK_str} FROM {job_catalog}.{DatabaseName}.{TableName} GROUP BY {NK_str} HAVING COUNT(*) > 1')
    DF_qc_NK = spark.sql(query_count_NK).collect()

    if len(DF_qc_NK) > 0:
      print('\n Natural Key Test' +
            '\n Query used for testing:'+
            f'\n {query_count_NK}'
            '\n FAIL. Natural Key Violation! Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
    else:
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'PASS')
      return 'PASS'
  
  except Exception as Error:
    print(Error)
    exit()

In [None]:
def NullOrEmptyStringColumns_test(job_catalog, DatabaseName, TableName):
  """Check for columns with only NULL or empty strings"""

  try:
    #declare variables
    DataQualityType = 'NullOrEmptyStringColumns_test'

    #check the configs from data_quality_checks
    Query = (f'''
      select 
        c.dq_id, c.dq_type, c.dq_rule as ValidateColumns 
      from
        {job_catalog}._utils.data_quality_configs r
        join {job_catalog}._utils.data_quality_checks c
          on r.dq_config_id = c.dq_config_id
      where 
        r.database_name = "{DatabaseName}"
        and r.table_name = "{TableName}"
        and c.dq_type = "NullOrEmptyStringColumns_test"
        and c.is_validate = "Y"
      ''')
    
    query_data = spark.sql(Query).collect()
    
    if query_data == []:
      #no data for specified table
      print('\n Null or Empty String Columns Test' +
            '\n FAIL. NullOrEmptyStringColumns_test config data is missing. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
    
    #get table row count
    table_count_query = (f'select count(1) as CNT FROM {job_catalog}.{DatabaseName}.{TableName}')
    count_data = spark.sql(table_count_query).collect()
    table_len = count_data[0].CNT
    
    #get table columns, remove standard columns
    if (len(query_data) == 1 and query_data[0][2] == 'all_columns'):
      TableCols = [column.name for column in spark.catalog.listColumns(TableName,DatabaseName)]
      for val in ['CreateUser','CreateDate','UpdateUser','UpdateDate']:
        TableCols.remove(val)
    else:
      TableCols = [i for i in query_data[0][2].split(',')]
      
     
    print(TableCols)

    #potential improvement. Auto create 1 query with all column counts via union statement.
    null_cols = []
    for column in TableCols:
      
      
      Query = (f'''
        SELECT {table_len} - 
        (
          SELECT 
            COUNT(1) FROM {job_catalog}.{DatabaseName}.{TableName} 
           WHERE 
             {column} IS NULL OR LTRIM(RTRIM({column})) = ""
         ) AS RecordCnt
        ''')
      
      record_count_data = spark.sql(Query).collect()
      record_count = record_count_data[0].RecordCnt

      if record_count == 0:
        null_cols.append(column)

    if null_cols == []:
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'PASS')
      return 'PASS'
    else:
      null_col_str = ', '.join(null_cols)
      print('\n Null or Empty String Test' +
            f'\n FAIL. Columns {null_col_str} have only NULL or empty string values. Please review')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
  
  except Exception as Error:
    print(Error)
    exit()


In [None]:
def EmptyString_test(job_catalog, DatabaseName, TableName):
  """
  Check for columns with empty string records
  """
  try:
    #declare variables
    DataQualityType = 'EmptyString_test'

    #check the configs from data_quality_checks
    Query = (f'''
      select 
        c.dq_id, c.dq_type, c.dq_rule as ValidateColumns 
      from
        {job_catalog}._utils.data_quality_configs r
        join {job_catalog}._utils.data_quality_checks c
          on r.dq_config_id = c.dq_config_id
      where 
        r.database_name = "{DatabaseName}"
        and r.table_name = "{TableName}"
        and c.dq_type = "EmptyString_test"
        and c.is_validate = "Y"
      ''')
    query_data = spark.sql(Query).collect()
    if query_data == []:
      #no data for specified table
      print('\n Empty String Columns Test' +
            '\n FAIL. EmptyString_test config data is missing. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'

    #get table columns, remove standard columns
    if (len(query_data) == 1 and query_data[0][2] == 'all_columns'):
      TableCols = [column.name for column in spark.catalog.listColumns(TableName,DatabaseName)]
      for val in ['CreateUser','CreateDate','UpdateUser','UpdateDate']: # do not include audit columns
        TableCols.remove(val)
    else:
      TableCols = [i for i in query_data[0][2].split(',')]
      
    print(TableCols)
    
    empty_string_cols = []
    for column in TableCols:
      Query = f'SELECT COUNT(1) AS RecordCnt FROM {job_catalog}.{DatabaseName}.{TableName} WHERE LTRIM(RTRIM({column})) = ""'
      
      record_count_data = spark.sql(Query).collect()
      record_count = record_count_data[0].RecordCnt

      if record_count > 0:
        empty_string_cols.append(column)
      
    if empty_string_cols == []:
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'PASS')
      return 'PASS'
    else:
      empty_string_cols_str = ', '.join(empty_string_cols)
      print('\n Empty String Test' +
            f'\n FAIL. Column(s) {empty_string_cols_str} have empty string values. Please review. ' +
            '\n Consider using RunEmptyStringToNull function from Import_class_library')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
  
  except Exception as Error:
    print(Error)
    exit()


In [None]:
def LeadingTrailingSpaces_test(job_catalog, DatabaseName, TableName):
  """
  Check for columns with leading/trailing spaces
  """
  try:
    #declare variables
    DataQualityType = 'LeadingTrailingSpaces_test'

    #check the configs from data_quality_checks
    Query = (f'''
      select 
        c.dq_id, c.dq_type, c.dq_rule as ValidateColumns 
      from
        {job_catalog}._utils.data_quality_configs r
        join {job_catalog}._utils.data_quality_checks c
          on r.dq_config_id = c.dq_config_id
      where 
        r.database_name = "{DatabaseName}"
        and r.table_name = "{TableName}"
        and c.dq_type = "LeadingTrailingSpaces_test"
        and c.is_validate = "Y"
      ''')
    query_data = spark.sql(Query).collect()
    if query_data == []:
      #no data for specified table
      print('\n leading trailing spaces Test' +
            '\n FAIL. LeadingTrailingSpaces_test config data is missing. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'

    #get table columns, remove standard columns
    if (len(query_data) == 1 and query_data[0][2] == 'all_columns'):
      TableCols = [column.name for column in spark.catalog.listColumns(TableName,DatabaseName)]
      for val in ['CreateUser','CreateDate','UpdateUser','UpdateDate']: # do not include audit columns
        TableCols.remove(val)
    else:
      TableCols = [i for i in query_data[0][2].split(',')]
      
    print(TableCols)
    
    #Build testing query
    Query = (f"""SELECT count(1) as CNT FROM {job_catalog}.{DatabaseName}.{TableName} WHERE""")
    for x in range (len(TableCols)):
      col = TableCols[x]
      if x==0:
        Query += f"\n(left({col},1) = ' ' OR right({col},1) = ' ')"
      else:
        Query += f"\nOR (left({col},1) = ' ' OR right({col},1) = ' ')"
    
    #Collect query data
    query_data = spark.sql(Query).collect()
    row_count = query_data[0]['CNT']
        
    if row_count == 0:
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'PASS')
      return 'PASS'
    else:
      print('\n Leading/Trailing Spaces Test' +
            '\n FAIL. There are column(s) that have leading/trailing spaces. Please review.' +
            '\n Consider using RunTrimColumns function from Import_class_library')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
    
  except Exception as Error:
    print(Error)
    exit()


In [None]:
def RowCount_test(job_catalog, DatabaseName, TableName):
  """
  Check row count
  """
  try:
    #declare variables
    DataQualityType = 'RowCount_test'

    #check the configs from data_quality_checks
    Query = (f'''
      select 
        c.dq_id, c.dq_type, c.dq_rule as ValidateColumns 
      from
        {job_catalog}._utils.data_quality_configs r
        join {job_catalog}._utils.data_quality_checks c
          on r.dq_config_id = c.dq_config_id
      where 
        r.database_name = "{DatabaseName}"
        and r.table_name = "{TableName}"
        and c.dq_type = "RowCount_test"
        and c.is_validate = "Y"
      ''')
    query_data = spark.sql(Query).collect()
    if query_data == []:
      #no data for specified table
      print('\n table row count Test' +
            '\n FAIL. RowCount_test config data is missing. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'

    #get data
    Query = (f'select count(1) as CNT from {job_catalog}.{DatabaseName}.{TableName}')
    query_data = spark.sql(Query).collect()
    
    row_count = query_data[0]['CNT']
    
    if row_count != 0:
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'PASS')
      return 'PASS'
    else:
      print ('\n Row Count Test' +
             '\n FAIL. The result table has 0 rows. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
    
  except Exception as Error:
    print(Error)
    exit()


In [None]:
def AuditColumns_test(job_catalog, DatabaseName, TableName):
  """
  Check audit columns existence
  """
  try:
    #declare variables
    DataQualityType = 'AuditColumns_test'

    #check the configs from data_quality_checks
    Query = (f'''
      select 
        c.dq_id, c.dq_type, c.dq_rule as ValidateColumns 
      from
        {job_catalog}._utils.data_quality_configs r
        join {job_catalog}._utils.data_quality_checks c
          on r.dq_config_id = c.dq_config_id
      where 
        r.database_name = "{DatabaseName}"
        and r.table_name = "{TableName}"
        and c.dq_type = "AuditColumns_test"
        and c.is_validate = "Y"
      ''')
    query_data = spark.sql(Query).collect()
    if query_data == []:
      #no data for specified table
      print('\n audit columns Test' +
            '\n FAIL. AuditColumns_test config data is missing. Please review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'

    dq_columns = [i for i in query_data[0][2].split(',')]
    print(dq_columns)

    audit_columns = []
    for col in dq_columns:
        Query = (f'''SELECT distinct column_name FROM system.information_schema.columns where table_schema = "{DatabaseName}" AND table_name = "{TableName}" AND column_name = "{col}"''')

        query_data = spark.sql(Query).collect()
        audit_columns.append(query_data[0][0])
    print(audit_columns)
    
    #check existence of audit columns
    missing_cols = []
    for val in dq_columns:
      if val in audit_columns:
        pass
      else:
        missing_cols.append(val)

    if missing_cols == []:
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'PASS')
      return 'PASS'
    else:
      missing_cols_str = ', '.join(missing_cols)
      print('\n Audit Columns Test' +
            f'\n FAIL. The following audit column(s) are missing: {missing_cols_str}.' +
            '\n Please add column(s) and review.')
      Insert_Data_Quality_Monitor(DatabaseName, TableName, DataQualityType, 'FAIL')
      return 'FAIL'
    
  except Exception as Error:
    print(Error)
    exit()


In [None]:
RunDataQualityNotebookTests(job_catalog, DatabaseName, TableName)


 -- Generic Tests Started for .
-- -- RefTableConfig_test Tests Started

 data_quality_configs Availability Test
 FAIL. Table  is not available in data_quality_configs. Please review.
-- -- RefTableConfig_test Tests Completed
-- -- NaturalKey_test Tests Started

 Natural Key Test
 FAIL. Natural key info is missing in config table. Please review.
-- -- NaturalKey_test Tests Completed
-- -- NullOrEmptyStringColumns_test Tests Started

 Null or Empty String Columns Test
 FAIL. NullOrEmptyStringColumns_test config data is missing. Please review.
-- -- NullOrEmptyStringColumns_test Tests Completed
-- -- EmptyString_test Tests Started

 Empty String Columns Test
 FAIL. EmptyString_test config data is missing. Please review.
-- -- EmptyString_test Tests Completed
-- -- LeadingTrailingSpaces_test Tests Started

 leading trailing spaces Test
 FAIL. LeadingTrailingSpaces_test config data is missing. Please review.
-- -- LeadingTrailingSpaces_test Tests Completed
-- -- RowCount_test Tests Starte

In [None]:
dbutils.notebook.exit("PASS")