####1.Load Business Rules Conf file (NO NEED TO UPDATE)

In [None]:
import pandas as pd
import json
import re
from datetime import datetime

# JSON configuration with uniqueness
json_config_with_uniqueness = """
{
    "input_columns": ["Entity", "Policy_ID", "NRIC", "Holder_Name", "Holder_Age", "Holder_Sex","Holder_Marital_Status","PostCode", "Vehicle_Age", "Email_Address", "Policy_Start_Date", "Policy_Status", "Mobile_Numbers"],

    "generic_rule_library": {
        "text": {"standard": {"complete": true,"regex": "^[a-zA-Z ,.'-/]+$"}},
        "email": {"standard": {"complete": true,"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\\\.[a-zA-Z]{2,}$"}},
        "date": {"formats": ["%Y-%m-%d", "%d/%m/%Y", "%m-%d-%Y"]},
        "integer": {
		"age": {"type": "integer","min": 18,"max": 99},
		"vehicle_age": {"type": "integer","min": 0,"max": 20}},
        "enum": {
		"policy_status_variant_1": {"complete": true,"values": ["Active", "Cancelled", "Expired", "Suspended"]},
        "policy_status_variant_2": {"complete": true,"values": ["Under Review", "Pending", "Approved", "Denied"]},
        "policy_status_my_LIFT": {"complete": true,"values": ["B","E","H","I","N","P","Q","R","U","W","Y","Z","A","C","D","F","G","J","K","L","M","S","T","V","X"]}},
        "policy_id": {
        "variant_1": {"complete": true,"regex": "^POL\\\\d{4}$"},
		"variant_2": {"complete": true,"regex": "^INS\\\\d{6}$"},
        "variant_3": {"complete": true,"regex": "^(\\\\d{10})|([A-Z]\\\\d{9})$"}},
    "entity": {
		"variant_1": {"complete": true,"values": ["ZI","ZT","ZL","ZF"]},
		"variant_2": {"complete": true,"values": ["GI","GT","LI","FT"]},
		"variant_3": {"complete": true,"values": ["ZGIMB","ZGTMB","ZLIMB","ZTMB"]}},
    "mobile_phone": {
		"variant_my": {"complete": true, "regex": "^\\\\d{10,11,12}$"}},
    "national_identity": {"variant_my_new_nric": {"complete": true, "regex": "^(\\\\d{12}$)|(\\\\d{6}-\\\\d{2}-\\\\d{4}$)"}},
    "sex": {"standard": {"complete": true,"values": ["M","F","MALE","FEMALE"]}},
    "postcode": {"variant_my": {"complete": true, "regex": "^\\\\d{5}$"}}

},

"column_ruleset": {
        "Entity": "entity.variant_3",
        "PostCode": "postcode.variant_my",
        "Policy_ID": {"rule": "policy_id.variant_3","unique": true},
        "NRIC": {"rule": "national_identity.variant_my_new_nric","unique": true},
        "Holder_Name": "text.standard",
        "Holder_Age": "integer.age",
        "Holder_Sex": "sex.standard",
        "Holder_Marital_Status": "text.standard",
        "Vehicle_Age": "integer.vehicle_age",
        "Email_Address": {"rule": "email.standard", "unique": true},
        "Policy_Start_Date": {"complete": true, "formats": "date.formats"},
        "Policy_Status": "enum.policy_status_my_LIFT"},
        "Mobile_Numbers": {"rule": "mobile_phone.variant_my","unique": true}
}
"""

# Parse the updated JSON configuration
config_with_uniqueness = json.loads(json_config_with_uniqueness)


####2.Load DQ functions

In [None]:

# Parse the updated JSON configuration
config_with_uniqueness = json.loads(json_config_with_uniqueness)

# Define the functions for DQ checks
def check_completeness(value, rules):
    if not rules.get('complete', False):
        return True
    return not pd.isnull(value) and value != ""

def check_validity(value, rules):
    if pd.isnull(value) or value == "":
        return False
    if 'regex' in rules and not re.match(rules['regex'], str(value)):
        return False
    if 'type' in rules and rules['type'] == 'integer':
        try:
            int_value = int(value)
            if 'min' in rules and int_value < rules['min']:
                return False
            if 'max' in rules and int_value > rules['max']:
                return False
        except ValueError:
            return False
    if 'values' in rules and value not in rules['values']:
        return False
    if 'formats' in rules:
        date_valid = False
        for date_format in rules['formats']:
            try:
                datetime.strptime(value, date_format)
                date_valid = True
                break
            except ValueError:
                continue
        if not date_valid:
            return False
    return True

def check_accuracy(value, rules):
    return True

def check_uniqueness(df, field, unique_required):
    if not unique_required:
        return pd.Series([True] * len(df), index=df.index)
    else:
        return ~df.duplicated(subset=[field])

def get_rules_for_field(field_rule, generic_rule_library):
    if isinstance(field_rule, dict) and 'rule' in field_rule:
        field_rule = field_rule['rule']
    if isinstance(field_rule, str) and "." in field_rule:
        library, rule = field_rule.split(".")
        return generic_rule_library[library][rule]
    else:
        return generic_rule_library.get(field_rule, {})

def calculate_data_quality_metrics(df, dq_flags):
    dq_metrics = {}
    for field, (comp, valid, acc, uniq) in dq_flags.items():
        completeness = df[comp].mean() * 100
        validity = df[valid].mean() * 100
        accuracy = df[acc].mean() * 100
        uniqueness = df[uniq].mean() * 100 if uniq in df.columns else 100
        data_quality = completeness * validity * accuracy * uniqueness / 1000000
        dq_metrics[field] = {
            "Completeness": f"{completeness:.2f}%",
            "Validity": f"{validity:.2f}%",
            "Accuracy": f"{accuracy:.2f}%",
            "Uniqueness": f"{uniqueness:.2f}%",
            "Data Quality": f"{data_quality:.2f}%"
        }
    return dq_metrics

def calculate_data_quality(df, config):
    dq_flags = {}
    column_ruleset = config['column_ruleset']
    generic_rule_library = config['generic_rule_library']

    for field in config['input_columns']:
        rule_entry = column_ruleset.get(field, {})
        if isinstance(rule_entry, str):
            rule_entry = {'rule': rule_entry}
        rules = get_rules_for_field(rule_entry.get('rule'), generic_rule_library)

        completeness_col = f"{field}__Completeness"
        validity_col = f"{field}__Validity"
        accuracy_col = f"{field}__Accuracy"
        uniqueness_col = f"{field}__Uniqueness"

        df[completeness_col] = df[field].apply(lambda x: check_completeness(x, rules))
        df[validity_col] = df[field].apply(lambda x: check_validity(x, rules))
        df[accuracy_col] = df[field].apply(lambda x: check_accuracy(x, rules))
        df[uniqueness_col] = check_uniqueness(df, field, rule_entry.get('unique', False))

        dq_flags[field] = (completeness_col, validity_col, accuracy_col, uniqueness_col)

    dq_metrics = calculate_data_quality_metrics(df, dq_flags)
    dq_summary = pd.DataFrame.from_dict(dq_metrics, orient='index')

    return df, dq_summary

