In [0]:
import pyspark.sql.functions as F

##### Config Tables Metadata Info

In [0]:
p_bds_db_name = dbutils.widgets.text(name="BDS_DB_NAME", defaultValue="release_test")
BDS_DB_NAME = dbutils.widgets.get(name="BDS_DB_NAME")

In [0]:
if BDS_DB_NAME in ("", None):
    raise Exception("Invalid Database name. Halting the process for further steps...")

In [0]:
batch_tables = {

  "test":{
        "columns":{"test":"string"}
    },

    "bds_batch_metadata": {  # key
        "columns": {
            "md_batch_id": "string",
            "md_batch_layer": "string",
            "md_batch_start_time": "timestamp",
            "md_batch_end_time": "timestamp",
            "md_batch_details": "string",
            "md_batch_status": "string",
            "md_batch_failure_reason": "string",
            "md_batch_failure_job_name": "struct", #error string
            "md_batch_warning_msg": "string",
            "md_batch_record_count_input": "int",
        }
        },

    "bds_job_audit_log": {
        "columns": {
            "md_batch_id": "string",
            "md_job_name": "string",
            "md_batch_layer": "string",
            "md_job_start_time": "timestamp",
            "md_job_end_time": "timestamp",
            "md_job_details": "string",
            "md_job_status": "string",
            "md_job_failure_reason": "string",
            "md_job_record_count_input": "int",
            "md_job_record_count_processed": "int",
            "md_job_record_count_quarantined": "int",
            "md_job_record_count_warning": "int",
        }
    },

    "bds_quarantine_metadata": {
        "columns": {
            "transactionId": "string",
            "json_message": "struct",
            "batch_id": "int",
            "created_timestamp": "timestamp",
            "md_batch_id": "string",
            "validation_errors": "string",
            "md_batch_job_name": "string",
            "md_batch_layer": "string",
            "inserted_timestamp": "timestamp",
        }
    },
    "bds_bronze_data": {
        "columns": {
            "batch_id": "int",
            "transactionId": "string",
            "com_kroger_desp_events_rss_rsssalestransaction": "struct",
            "created_timestamp": "timestamp",
            "md_batch_id": "string",
        }
    },
    "test1":{
        "columns":{"test":"string"}
    },
    "bds_bronze_validated": {
        "columns": {
            "transactionId": "string",
            "com_kroger_desp_events_rss_rsssalestransaction": "string", #error struct
            "created_timestamp": "timestamp",
            "md_batch_id": "struct", #error int
        }
    }
}

In [0]:
metadata_tables_df = spark.sql(f"SHOW TABLES IN {BDS_DB_NAME}")

In [0]:
# Step 1: Check for table existence and collect missing tables
missing_tables = []
for t in batch_tables:
    print(f"Checking for '{t}' table existence")
    is_table_exists_df = metadata_tables_df.filter(
        metadata_tables_df.tableName.isin([t])
    )
    if is_table_exists_df.count() == 0:
        missing_tables.append(t)
# Display missing tables
if missing_tables:
    raise Exception(f"Required Tables Missing: {', '.join(missing_tables)}")
print("All required tables are present.")

# Step 2: Collect schema validation issues
validation_issues = []
for t in batch_tables:
    print(f"Validating schema for '{t}' table")

    table_df = spark.sql(f"DESCRIBE {BDS_DB_NAME}.{t}")

    schema_list = (
        table_df.select("col_name", "data_type")
        .rdd.map(lambda row: (row["col_name"], row["data_type"]))
        .collect()
    )

    schema_df = spark.createDataFrame(schema_list, ["col_name", "data_type"])

    schema_df_with_hardcoded_type = schema_df.withColumn(
        "data_type",
        F.when(F.col("data_type").contains("struct"), "struct").otherwise(F.col("data_type")),
    )
 
    schema_dict = schema_df_with_hardcoded_type.rdd.map(
        lambda row: (row["col_name"], row["data_type"])
    ).collectAsMap()
 
    batch_columns = batch_tables[t]["columns"]
 
    # Check for missing columns and data type mismatches
    for col1, d_type in batch_columns.items():
        if col1 not in schema_dict:
            validation_issues.append(f"Required column {col1} is missing in {BDS_DB_NAME}.{t}.")
        elif schema_dict[col1] != d_type:
            validation_issues.append(
                f"Data type mismatch for column {col1} in {BDS_DB_NAME}.{t}: Expected {d_type}, Found {schema_dict[col1]}"
            )
 
    for col1 in schema_dict:
        if col1 not in batch_columns:
            validation_issues.append(f"Unexpected column {col1} found in {BDS_DB_NAME}.{t}.")

# Report all collected validation issues or confirm success
if validation_issues:
    raise Exception("Structure validation issues found:\n" + "\n".join(validation_issues))
else:
    print("All schema validations passed successfully.")





Checking for 'test' table existence
Checking for 'bds_batch_metadata' table existence
Checking for 'bds_job_audit_log' table existence
Checking for 'bds_quarantine_metadata' table existence
Checking for 'bds_bronze_data' table existence
Checking for 'test1' table existence
Checking for 'bds_bronze_validated' table existence


[0;31m---------------------------------------------------------------------------[0m
[0;31mException[0m                                 Traceback (most recent call last)
File [0;32m<command-2307311006834220>:12[0m
[1;32m      9[0m     if is_table_exists_df.count() == 0:
[1;32m     10[0m         missing_tables.append(t)
[0;32m---> 12[0m # Display missing tables
[1;32m     13[0m if missing_tables:
[1;32m     14[0m     raise Exception(f"Required Tables Missing: {', '.join(missing_tables)}")

[0;31mException[0m: Required Tables Missing: test, test1

In [0]:
print(f"PASS: All tables and columns are present and correctly typed, proceed with executing the data pipeline.")

