In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, md5, concat_ws, lit
from pyspark.sql.types import StringType, StructType, StructField, TimestampType
from datetime import datetime

def update_audit_table(table_name, progress=None, status=None, start=False, end=False):
    # Get the current timestamp
    current_time = datetime.now()
    
    # Check if the Audit table exists
    if not spark.catalog.tableExists("Audit.Audit_table"):
        # Create the table if it doesn't exist with timestamps
        schema = StructType([
            StructField("Table_Name", StringType(), True),
            StructField("Progress", StringType(), True),
            StructField("Status", StringType(), True),
            StructField("Start_time", TimestampType(), True),  # Changed to TimestampType
            StructField("End_time", TimestampType(), True)      # Changed to TimestampType
        ])
        empty_df = spark.createDataFrame([], schema)
        empty_df.write.format("delta").mode("overwrite").saveAsTable("Audit.Audit_table")
    
    # Define the schema explicitly to avoid inference issues
    audit_schema = StructType([
        StructField("Table_Name", StringType(), True),
        StructField("Progress", StringType(), True),
        StructField("Status", StringType(), True),
        StructField("Start_time", TimestampType(), True),  # Timestamp for Start_time
        StructField("End_time", TimestampType(), True)      # Timestamp for End_time
    ])
    
    # If start is True, insert a new record with Progress, Status, and Start_time
    if start:
        # Check that the necessary values are provided
        if progress is None or status is None:
            raise ValueError("Progress and Status must be provided when starting a new record.")
        
        # ** No check for existing "In_Progress" records. ** 
        # Insert a new row with the current start time, regardless of whether an In_Progress record already exists
        new_entry = [(table_name, progress, status, current_time, None)]
        new_df = spark.createDataFrame(new_entry, audit_schema)
        
        # Append the new record to the Audit table
        new_df.write.mode("append").format("delta").saveAsTable("Audit.Audit_table")
    
    # If end is True, update the last record with Status and End_time
    elif end:
        # Check that the Status is provided
        if status is None:
            raise ValueError("Status must be provided when ending a record.")
        
        # Load the Audit table
        audit_df = spark.table("Audit.Audit_table")
        
        # Find the last record for this table_name (the most recent Start_time)
        last_record = audit_df.filter(col("Table_Name") == table_name).orderBy(col("Start_time").desc()).limit(1)
        
        if last_record.count() == 0:
            raise ValueError("No existing record found to update.")
        
        # Update the last record with the new status and end time
        audit_df = audit_df.withColumn(
            "Status", when((col("Table_Name") == table_name) & (col("Start_time") == last_record.first()["Start_time"]), status).otherwise(col("Status"))
        ).withColumn(
            "End_time", when((col("Table_Name") == table_name) & (col("Start_time") == last_record.first()["Start_time"]), lit(current_time)).otherwise(col("End_time"))
        )
        
        # Overwrite the Audit table with the updated DataFrame
        audit_df.write.mode("overwrite").format("delta").saveAsTable("Audit.Audit_table")

In [0]:
def check_and_store_md5(df, layer_name, table_name, schema_flag ="N"):
    # Create a unique MD5 hash of all columns of the dataframe
    columns_concatenated = concat_ws(",", *df.columns)
    md5_hash = df.withColumn("md5_hash", md5(columns_concatenated.cast(StringType()))).select("md5_hash").first()["md5_hash"]
    
    # Check if the metadata table exists, if not create it
    if not spark.catalog.tableExists("Audit.Schema_Metadata"):
        # Create the metadata table if it doesn't exist
        schema = "Layer_Name STRING, Table_Name STRING, Schema_Flag STRING, Md5_Hash STRING"
        spark.sql(f"CREATE TABLE Audit.Schema_Metadata ({schema}) USING DELTA")
        
        # Insert the new md5 hash into the metadata table
        new_entry = [(layer_name, table_name, schema_flag, md5_hash)]
        new_df = spark.createDataFrame(new_entry, ["Layer_Name", "Table_Name", "Schema_Flag", "Md5_Hash"])
        new_df.write.mode("append").format("delta").saveAsTable("Audit.Schema_Metadata")
        
        # Since it's the first entry, return True
        return True
    
    # If the metadata table exists, check if there's an entry for this layer and table
    metadata_df = spark.table("Audit.Schema_Metadata")
    table_metadata = metadata_df.filter((col("Layer_Name") == layer_name) & (col("Table_Name") == table_name)).limit(1)
    
    if table_metadata.count() == 0:
        # If no existing entry for this table, insert new data and return True
        new_entry = [(layer_name, table_name, schema_flag, md5_hash)]
        new_df = spark.createDataFrame(new_entry, ["Layer_Name", "Table_Name", "Schema_Flag", "Md5_Hash"])
        new_df.write.mode("append").format("delta").saveAsTable("Audit.Schema_Metadata")
        return True
    else:
        # Compare the MD5 hash from the metadata table with the new one
        stored_md5 = table_metadata.first()["Md5_Hash"]
        stored_schema_flag = table_metadata.first()["Schema_Flag"]
        
        if stored_md5 == md5_hash:
            # If MD5 hash matches, return True
            return True
        else:
            # If MD5 hash doesn't match, check the schema flag
            if stored_schema_flag == "N":
                # If schema_flag is "N", it's acceptable to have a different schema, return True
                return True
            elif stored_schema_flag == "Y":
                # If schema_flag is "Y", it's not acceptable to have a different schema, return False
                return False