In [125]:
import yaml
from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import date

In [126]:
def load_config(config_file):
    with open(config_file, 'r') as file:
        return yaml.safe_load(file)    

In [128]:
project_id = 'pb-data-tool-testing'
client = bigquery.Client(credentials= credentials,project=project_id)

In [129]:
dataset_name = "test_data"

In [130]:
table_name = "test_sample_1"

In [131]:
def null_check(table, column, filter_condition=None):
    query = f"""
    SELECT COUNT(*) AS null_count
    FROM `{table}`
    WHERE {column} IS NULL
    """
    if filter_condition:
        query = query + f" AND {filter_condition}"
        
    query_job = client.query(query)
    print(query)
    result = query_job.result().to_dataframe()
    return int(result['null_count'][0])

In [132]:
def negative_check(table, column1, filter_condition=None):
    query = f"""
    SELECT COUNT(*) AS non_equal_count
    FROM `{table}`
    WHERE {column1} < 0
    """
    if filter_condition:
        query = query + f" AND {filter_condition}"

    query_job = client.query(query)
    result = query_job.result().to_dataframe()
    return int(result['non_equal_count'][0])

In [133]:
table = f"{project_id}.{dataset_name}.{table_name}"

In [134]:
table

'pb-data-tool-testing.test_data.test_sample_1'

In [135]:
query_with_filter = null_check(table, "system_1_product_id", "system_1_PRODUCT_CODE = '6WUS'")


    SELECT COUNT(*) AS null_count
    FROM `pb-data-tool-testing.test_data.test_sample_1`
    WHERE system_1_product_id IS NULL
     AND system_1_PRODUCT_CODE = '6WUS'


In [136]:
query_with_filter

1

In [137]:
def load_config(config_file):
    with open(config_file, 'r') as file:
        return yaml.safe_load(file)    

In [138]:
config = load_config('test_config.yml')

In [139]:
config

{'tables': [{'name': 'pb-data-tool-testing.test_data.test_sample_1',
   'columns': [{'name': 'system_1_product_price',
     'data_area': 'Assets',
     'data_tested': 'Line Rental',
     'system': 'CRM',
     'field_type': 'Value',
     'date_value_checked': 'Line Rental Amount',
     'tests': [{'business_rule': 'Values missing', 'type': 'is_null'},
      {'business_rule': 'Values negative', 'type': 'is_negative'}]},
    {'name': 'system_2_product_price',
     'data_area': 'Assets',
     'data_tested': 'Line Rental',
     'system': 'CRM',
     'field_type': 'Value',
     'date_value_checked': 'Line Rental Amount',
     'tests': [{'business_rule': 'Values missing', 'type': 'is_null'},
      {'business_rule': 'Values negative', 'type': 'is_negative'}]}]}]}

In [151]:
def run_tests(config):
    test_results = []

    for table in config['tables']:
        table_name = table['name']
        for column in table['columns']:
            column_name = column['name']
            data_area = column['data_area']
            data_tested = column['data_tested']
            system = column['system']
            field_type = column['field_type']
            date_value_checked = column['date_value_checked']
            
            for test in column['tests']:
                print(test)
                business_rule = test['business_rule']  # Get custom test name
                test_type = test['type']  # Get test type
                print(test_type)
                
                if test_type == "is_null":
                    failure_count = null_check(table_name, column_name)
                    
                elif test_type == "is_negative":
                    # Implement positive check
                    failure_count = negative_check(table_name, column_name)

                elif test_type == "column_equality":
                    target_column = test['target_column']
                    failure_count = check_column_equality(table_name, column_name, target_column)
                        
                test_results.append({
                    'data_area': data_area,
                    'data_tested': data_tested,
                    'system': system,
                    'field_type': field_type,
                    'date_value_checked': date_value_checked,
                    'business_rule': business_rule,
                    'table': table_name,
                    'column': column_name,
                    'failure_count': failure_count,
                    'test_date': datetime.date.today().strftime("%Y-%m-%d")
                        })

    return test_results

In [152]:
test_results = run_tests(config)

{'business_rule': 'Values missing', 'type': 'is_null'}
is_null

    SELECT COUNT(*) AS null_count
    FROM `pb-data-tool-testing.test_data.test_sample_1`
    WHERE system_1_product_price IS NULL
    
{'business_rule': 'Values negative', 'type': 'is_negative'}
is_negative
{'business_rule': 'Values missing', 'type': 'is_null'}
is_null

    SELECT COUNT(*) AS null_count
    FROM `pb-data-tool-testing.test_data.test_sample_1`
    WHERE system_2_product_price IS NULL
    
{'business_rule': 'Values negative', 'type': 'is_negative'}
is_negative


In [144]:
test_table = f"{project_id}.{dataset_name}.{test_table_name}"

In [145]:
test_table

'pb-data-tool-testing.test_data.data_test_results'

In [150]:
errors = client.insert_rows_json(test_table, test_results)

In [114]:
rows_to_insert

[{'data_area': 'Assets',
  'data_tested': 'Line Rental',
  'system': 'CRM',
  'field_type': 'Value',
  'date_value_checked': 'Line Rental Amount',
  'business_rule': 'Values missing',
  'table': 'pb-data-tool-testing.test_data.test_sample_1',
  'column': 'system_1_product_price',
  'failure_count': 0,
  'test_date': '2024-10-23'},
 {'data_area': 'Assets',
  'data_tested': 'Line Rental',
  'system': 'CRM',
  'field_type': 'Value',
  'date_value_checked': 'Line Rental Amount',
  'business_rule': 'Values negative',
  'table': 'pb-data-tool-testing.test_data.test_sample_1',
  'column': 'system_1_product_price',
  'failure_count': 0,
  'test_date': '2024-10-23'},
 {'data_area': 'Assets',
  'data_tested': 'Line Rental',
  'system': 'CRM',
  'field_type': 'Value',
  'date_value_checked': 'Line Rental Amount',
  'business_rule': 'Values missing',
  'table': 'pb-data-tool-testing.test_data.test_sample_1',
  'column': 'system_2_product_price',
  'failure_count': 13,
  'test_date': '2024-10-23'},

In [109]:
def log_results_to_bigquery(results, table_id):
    # Prepare the rows to insert into BigQuery
    rows_to_insert = [
        {
            "test_name": r['test_name'],
            "table": r['table'],
            "column": r.get('column', ''),  # If column is None, use an empty string
            "target_column": r.get('target_column', ''),  # Optional, only present in some tests
            "failure_count": r['failure_count'],
            "test_date": r['test_date'].strftime("%Y-%m-%d %H:%M:%S")
        }
        for r in results
    ]
    
    # Insert the rows into the BigQuery table
    errors = client.insert_rows_json(table_id, rows_to_insert)
    
    if errors == []:
        print("New rows have been added to BigQuery.")
    else:
        print(f"Encountered errors while inserting rows: {errors}")

dict_keys(['null_test'])