In [0]:
# Custom Logger
import logging

def get_custom_logger(
    v_Notebook_Name, v_Pipeline_RunId="None", v_Actvt_RunId="None", v_Log_Level="INFO"
):
    if v_Log_Level.upper() == "INFO":
        v_Log_Level = logging.INFO
    elif v_Log_Level.upper() == "WARNING":
        v_Log_Level = logging.WARNING
    elif v_Log_Level.upper() == "DEBUG":
        v_Log_Level = logging.DEBUG
    elif v_Log_Level.upper() == "ERROR":
        v_Log_Level = logging.ERROR
    elif v_Log_Level.upper() == "CRITICAL":
        v_Log_Level = logging.CRITICAL
    else:
        v_Log_Level = logging.INFO

    FORMAT = (
        "%(asctime)s - [pipeline.runid="
        + v_Pipeline_RunId
        + ", pipeline.activity.runid="
        + v_Actvt_RunId
        + "] - %(name)s:%(lineno)d - %(levelname)s - %(message)s"
    )
    formatter = logging.Formatter(fmt=FORMAT)

    logger = logging.getLogger(v_Notebook_Name)
    logger.setLevel(v_Log_Level)

    if not logger.handlers:
        stream_handler = logging.StreamHandler()
        stream_handler.setFormatter(formatter)
        logger.addHandler(stream_handler)

    v_Logger_Response = (
        "Created a logger object with logging level="
        + str(v_Log_Level)
        + " [10=DEBUG, 20=INFO, 30=WARNING, 40=ERROR, 50=CRITICAL]"
    )
    logger.info(v_Logger_Response)
    return logger


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when
import traceback

# Initialize Logger
logger = get_custom_logger(
    v_Notebook_Name="DataQualityFramework",
    v_Pipeline_RunId="DQ_RUN_001",
    v_Actvt_RunId="DQ_ACT_001",
    v_Log_Level="DEBUG"
)

try:
    logger.info("Starting the Data Quality Framework.")

    # Create Spark session
    logger.info("Creating Spark session.")
    spark = SparkSession.builder \
        .appName("DQFramework") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    logger.info("Spark session created successfully.")

    # Sample Data
    logger.info("Creating the input DataFrame.")
    data = [("John", 28, "USA"), ("Jane", 35, "UK"), ("Sam", None, "USA"), (None, 22, "USA"), ("John", 28, "USA")]
    columns = ["Name", "Age", "Country"]
    df = spark.createDataFrame(data, schema=columns)

    # Show the input DataFrame
    logger.debug("Input DataFrame:")
    df.show()

    try: 
	    value = columns[5] 
    except Exception as e: 
	    logger.debug(traceback.format_exc())

    # Data Quality Checks
    dq_results = {}

    # Check 1: Null Value Check
    try:
        logger.info("Performing Null Value Check.")
        null_checks = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
        null_checks.show()
        dq_results["Null Value Check"] = null_checks.collect()
    except Exception as e:
        logger.error(f"Error in Null Value Check: {e}")
        logger.debug(traceback.format_exc())

    # Check 2: Duplicate Record Check
    try:
        logger.info("Performing Duplicate Record Check.")
        duplicate_count = df.count() - df.dropDuplicates().count()
        logger.info(f"Number of duplicate records: {duplicate_count}")
        dq_results["Duplicate Record Count"] = duplicate_count
    except Exception as e:
        logger.error(f"Error in Duplicate Record Check: {e}")
        logger.debug(traceback.format_exc())

    # Check 3: Schema Validation (Expected Schema)
    try:
        logger.info("Performing Schema Validation.")
        expected_schema = ["Name", "Age", "Country"]
        if set(expected_schema) == set(df.columns):
            logger.info("Schema Validation Passed.")
            dq_results["Schema Validation"] = "Passed"
        else:
            logger.error("Schema Validation Failed.")
            dq_results["Schema Validation"] = "Failed"
    except Exception as e:
        logger.error(f"Error in Schema Validation: {e}")
        logger.debug(traceback.format_exc())

    # Check 4: Age Column Data Type Validation
    try:
        logger.info("Performing Age Data Type Validation.")
        if dict(df.dtypes)["Age"] == "int":
            logger.info("Age Data Type Validation Passed.")
            dq_results["Age Data Type Validation"] = "Passed"
        else:
            logger.error("Age Data Type Validation Failed.")
            dq_results["Age Data Type Validation"] = "Failed"
    except Exception as e:
        logger.error(f"Error in Age Data Type Validation: {e}")
        logger.debug(traceback.format_exc())

    # Log Final DQ Results
    logger.info("Final Data Quality Results:")
    for check, result in dq_results.items():
        logger.info(f"{check}: {result}")

except Exception as main_e:
    logger.error(f"An error occurred during the DQ Framework execution: {main_e}")
    logger.debug(traceback.format_exc())

finally:
    try:
        # Stop Spark session
        logger.info("Stopping the Spark session.")
        #spark.stop()
        logger.info("Data Quality Framework execution completed.")
    except Exception as stop_e:
        logger.error(f"Error while stopping Spark session: {stop_e}")
        logger.debug(traceback.format_exc())


2024-12-23 12:20:16,109 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:14 - INFO - Starting the Data Quality Framework.
2024-12-23 12:20:16,110 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:17 - INFO - Creating Spark session.
2024-12-23 12:20:16,133 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:23 - INFO - Spark session created successfully.
2024-12-23 12:20:16,134 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:26 - INFO - Creating the input DataFrame.
2024-12-23 12:20:16,224 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:32 - DEBUG - Input DataFrame:
2024-12-23 12:20:17,116 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:38 - DEBUG - Traceback (most recent call last):
  File "<command-1619780761519303>", line 36, in <module>
    value =

+----+----+-------+
|Name| Age|Country|
+----+----+-------+
|John|  28|    USA|
|Jane|  35|     UK|
| Sam|null|    USA|
|null|  22|    USA|
|John|  28|    USA|
+----+----+-------+

+----+---+-------+
|Name|Age|Country|
+----+---+-------+
|   1|  1|      0|
+----+---+-------+



2024-12-23 12:20:18,481 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:55 - INFO - Performing Duplicate Record Check.
2024-12-23 12:20:19,866 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:57 - INFO - Number of duplicate records: 1
2024-12-23 12:20:19,867 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:65 - INFO - Performing Schema Validation.
2024-12-23 12:20:19,867 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:68 - INFO - Schema Validation Passed.
2024-12-23 12:20:19,868 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:79 - INFO - Performing Age Data Type Validation.
2024-12-23 12:20:19,869 - [pipeline.runid=DQ_RUN_001, pipeline.activity.runid=DQ_ACT_001] - DataQualityFramework:84 - ERROR - Age Data Type Validation Failed.
2024-12-23 12:20:19,872 - [pipeline.runid=DQ_RUN_001, pip