In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder.appName("Data Quality Checks").getOrCreate()

In [0]:
# JDBC configurations
jdbcHostname = "asql-server-salesproject-2448-pramodpotghan.database.windows.net"
jdbcDatabase = "ASQL_SalesProject_2448PramodPotghan"
jdbcPort = 1433
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase}"

jdbcUsername = "asqlserveradmin"
jdbcPassword = dbutils.secrets.get(scope="ADB-KV-2448", key="asqlserveradmin-password")

connectionProperties = {
    "user": jdbcUsername,
    "password": jdbcPassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
# Azure ADLS configuration
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="ADB-KV-2448", key="ADLS-CLIENT-ID"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="ADB-KV-2448", key="ADLS-CLIENT-SECRET"),
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope='ADB-KV-2448', key='ADLS-TENANT-ID')}/oauth2/token"
}

# Mount the ADLS container
storage_account = "adlssalesprojectppwork"
container_name = "raw"
mount_point = f"/mnt/{container_name}"

try:
    dbutils.fs.mount(
        source=f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )
    print("Mounted")
except Exception as e:
    if "File already mounted" in str(e):
        print("Mount point already exists.")

In [0]:
tables = ["Retailer","WAREHOUSE","COUNTRY","RETURN_REASON","ORDER_METHOD","PRODUCT","ORDER_HEADER","BRANCH","PRODUCT_NAME_LOOKUP","INVENTORY_LEVELS","ORDER_DETAILS","RETURNED_ITEM"]

database = ["BRANCH","ORDER_HEADER","PRODUCT","ORDER_DETAILS","RETURNED_ITEM","INVENTORY_LEVELS"]
blobstorage = ["ORDER_METHOD","RETURN_REASON","COUNTRY","WAREHOUSE","Retailer","PRODUCT_NAME_LOOKUP"]

dbutils.widgets.text("PRM_PL_CURRENT_TIMESTAMP", "")
current_timestamp = dbutils.widgets.get("PRM_PL_CURRENT_TIMESTAMP")

file_path = { 
    **{table: f"/mnt/raw/blobstorage/Archive/{table}/{table}_{current_timestamp}.parquet" for table in tables if table in blobstorage},
    **{table: f"/mnt/raw/database/Archive/{table}/{table}_{current_timestamp}.parquet" for table in tables if table in database}
}

nc_column_names = {
    "ORDER_HEADER": {"ORDER_NUMBER","RETAILER_NAME","ORDER_DATE"},
    "PRODUCT": {"PRODUCT_NUMBER"},
    "ORDER_DETAILS":{"ORDER_DETAIL_CODE"},
    "RETURNED_ITEM":{"RETURN_CODE"},
    "INVENTORY_LEVELS":{"INVENTORY_YEAR"},
    "ORDER_METHOD":{"ORDER_METHOD_CODE","ORDER_METHOD_EN"}
}

dp_column_names = {
    "ORDER_HEADER": {"ORDER_NUMBER"},
    "PRODUCT": {"PRODUCT_NUMBER"}
}

date_column_names = {
    "PRODUCT": {"INTRODUCTION_DATE","DISCONTINUED_DATE"},
    "ORDER_DETAILS":{"SHIP_DATE"}
}

null_zero_negative_column_names = {
    "ORDER_DETAILS":{"QUANTITY","UNIT_COST","UNIT_PRICE","UNIT_SALE_PRICE"},
}

foreign_key_columns = {
    "ORDER_DETAILS":"ORDER_NUMBER",
    "RETURNED_ITEM":"ORDER_DETAIL_CODE",
    "INVENTORY_LEVELS":"PRODUCT_NUMBER"
}

new_column_names = {
    "ORDER_HEADER":{"SALES_STAFF_CODE":"RAW_STAFF_CODE","SALES_BRANCH_CODE":"RAW_BRANCH_CODE"},
    "ORDER_METHOD":{"ORDER_METHOD_CODE":"METHOD_CODE","ORDER_METHOD_EN":"METHOD_NAME"},
    "RETURN_REASON":{"REASON_DESCRIPTION_EN":"RETURN_REASON_DESC"},
    "WAREHOUSE":{"ADDRESS1":"ADDRESS"}
}

# Define table data type casts
table_casts = {
    "BRANCH": {
        "BRANCH_CODE": FloatType(), 
        "COUNTRY_CODE": FloatType(), 
        "WAREHOUSE_BRANCH_CODE": FloatType()
    },
    "ORDER_HEADER": {
        "ORDER_NUMBER": LongType(), 
        "RETAILER_SITE_CODE": LongType(), 
        "RETAILER_CONTACT_CODE": LongType(), 
        "SALES_STAFF_CODE": LongType(), 
        "SALES_BRANCH_CODE": LongType(), 
        "ORDER_DATE": DateType(), 
        "ORDER_CLOSE_DATE": DateType(), 
        "ORDER_METHOD_CODE": LongType()
    },
    "PRODUCT": {
        "PRODUCT_NUMBER": LongType(),
        "BASE_PRODUCT_NUMBER": LongType(),
        "INTRODUCTION_DATE": DateType(),
        "DISCONTINUED_DATE": DateType(),
        "PRODUCT_TYPE_CODE": LongType(),
        "PRODUCT_COLOR_CODE": LongType(),
        "PRODUCT_SIZE_CODE": LongType(),
        "PRODUCT_BRAND_CODE": LongType(),
        "PRODUCTION_COST": FloatType(),
        "GROSS_MARGIN": FloatType()
    },
    "ORDER_DETAILS": {
        "ORDER_DETAIL_CODE": LongType(),
        "ORDER_NUMBER": LongType(),
        "SHIP_DATE": DateType(),
        "PRODUCT_NUMBER": LongType(),
        "PROMOTION_CODE": LongType(),
        "QUANTITY": LongType(),
        "UNIT_COST": FloatType(),
        "UNIT_PRICE": FloatType(),
        "UNIT_SALE_PRICE": FloatType()
    },
    "RETURNED_ITEM": {
        "RETURN_CODE": LongType(),
        "RETURN_DATE": DateType(),
        "ORDER_DETAIL_CODE": LongType(),
        "RETURN_REASON_CODE": LongType(),
        "RETURN_QUANTITY": LongType(),
        "ASSIGNED_TO": LongType(),
        "FOLLOW_UP_CODE": LongType(),
        "DATE_ADVISED": DateType()
    },
    "INVENTORY_LEVELS": {
        "INVENTORY_YEAR": LongType(),
        "INVENTORY_MONTH": LongType(),
        "WAREHOUSE_BRANCH_CODE": LongType(),
        "PRODUCT_NUMBER": LongType(),
        "OPENING_INVENTORY": LongType(),
        "QUANTITY_SHIPPED": LongType(),
        "ADDITIONS": LongType(),
        "UNIT_COST": FloatType(),
        "CLOSING_INVENTORY": LongType(),
        "AVERAGE_UNIT_COST": FloatType()
    },
    "ORDER_METHOD": {
        "ORDER_METHOD_CODE": LongType()
    },
    "RETURN_REASON": {
        "RETURN_REASON_CODE": LongType()
    },
    "COUNTRY": {
        "COUNTRY_CODE": LongType()
    },
    "WAREHOUSE": {
        "BRANCH_CODE": LongType(),
        "COUNTRY_CODE": LongType(),
        "WAREHOUSE_BRANCH_CODE": LongType()
    },
    "Retailer": {
        "RETAILER_SITE_CODE": LongType(),
        "RETAILER_CONTACT_CODE": LongType()
    },
    "PRODUCT_NAME_LOOKUP": {
        "PRODUCT_NUMBER": LongType()
    }
}

In [0]:
# Function to read parquet files
def read_files(file_path):
    return spark.read.format("parquet").load(file_path)

# Casting columns to appropriate types
def cast_columns(df, column_casts):
    for column, new_type in column_casts.items():
        df = df.withColumn(column, F.col(column).cast(new_type))
    return df

# Null value check function
def null_check(df, columns):
    null_rejected_df = spark.createDataFrame([], schema=df.schema)
    for col in columns:
        rejected_df = df.filter(F.col(col).isNull())
        null_rejected_df = null_rejected_df.unionByName(rejected_df)
    return null_rejected_df
    
# Duplicate check function
def duplicate_check(df, columns):
    duplicate_rejected_df = spark.createDataFrame([], schema=df.schema)
    for column in columns:
        dup_df = df.groupBy(column).count().filter(F.col("count") > 1)
        if dup_df.count() > 0:
            rejected_df = df.join(dup_df.drop("count"), column, "inner")
            duplicate_rejected_df = duplicate_rejected_df.unionByName(rejected_df).distinct()
    return duplicate_rejected_df
    
# Date Format check
def date_format_check(df, columns):
    date_format_rejected_df = spark.createDataFrame([], schema=df.schema)
    for column in columns:
        rejected_df = df.filter(~F.to_date(F.col(column),'yyyy-MM-dd').isNotNull() & F.col(column).isNotNull())
        date_format_rejected_df = date_format_rejected_df.unionByName(rejected_df)
    return date_format_rejected_df

# Null, Zero, Negative value check
def null_zero_negative_check(df, columns):
    null_zero_negative_rejected_df = spark.createDataFrame([], schema=df.schema)
    for column in columns:
        rejected_df = df.filter((F.col(column).isNull()) | (F.col(column) == 0) | (F.col(column) < 0))
        null_zero_negative_rejected_df = null_zero_negative_rejected_df.unionByName(rejected_df)
    return null_zero_negative_rejected_df


# Foreign key check
def foreign_key_check(df, reference_df, foreign_key_column):
    unmatched_df = df.join(reference_df, df[foreign_key_column] == reference_df[foreign_key_column], "left_anti")
    return unmatched_df

# Add columns to dataframe
def add_columns_to_df(df, table, file_paths):
    df = df.withColumn("SOURCE_ID", F.when(F.lit(file_paths[table]).contains("blobstorage"), F.lit("LKP_File"))\
            .when(F.lit(file_paths[table]).contains("database"),F.lit("INTERNAL_DB"))\
                .otherwise(F.lit(None)))\
        .withColumn("DataDate", F.to_date(F.lit(dbutils.widgets.get("PRM_PL_CURRENT_TIMESTAMP")),"yyyyMMddHHmmss"))\
        .withColumn("UpdateDate", F.lit(None).cast(DateType()))
    return df
    

# Rename columns    
def rename_columns(df, table, new_column_names):
    if table in new_column_names:
        for old_col, new_col in new_column_names[table].items():
            df = df.withColumnRenamed(old_col, new_col)
        return df
    else:
        print(f"Table {table} not found in new_column_names")


In [0]:
# Process tables
dataframes = {}
for table, path in file_path.items():
    df = read_files(path)
    if table in table_casts:
        df = cast_columns(df, table_casts[table])
    dataframes[table] = df
    

reference_df = {
    "ORDER_DETAILS": dataframes["ORDER_HEADER"],
    "RETURNED_ITEM": dataframes["ORDER_DETAILS"],
    "INVENTORY_LEVELS": dataframes["PRODUCT"]
}

for table in tables:
    df = dataframes[table]
    reject_df = spark.createDataFrame([], schema = df.schema)

    # Perform Checks
    if table in nc_column_names:
        null_df = null_check(df, nc_column_names[table])
        reject_df = reject_df.unionByName(null_df)
        reject_df.show()        
    if table in dp_column_names:
        duplicate_df = duplicate_check(df, dp_column_names[table])
        reject_df = reject_df.unionByName(duplicate_df)
        reject_df.show()
    if table in date_column_names:
        date_format_df = date_format_check(df, date_column_names[table])
        reject_df = reject_df.unionByName(date_format_df)
        reject_df.show()
    if table in null_zero_negative_column_names:
        null_zero_negative_df = null_zero_negative_check(df, null_zero_negative_column_names[table])
        reject_df = reject_df.unionByName(null_zero_negative_df)
        reject_df.show()
    if table in foreign_key_columns:
        foreign_key_column = foreign_key_columns[table]
        foreign_key_df = foreign_key_check(df, reference_df[table], foreign_key_column)
        reject_df = reject_df.unionByName(foreign_key_df)
        reject_df.show()
    reject_df = reject_df.distinct()
    reject_df.show()

    if "blobstorage" in file_path[table]:
        reject_df.write.format("parquet").mode("overwrite").save(f"/mnt/raw/blobstorage/Reject/{table}/REJECT_{table}_{current_timestamp}.parquet")
    
    if "database" in file_path[table]:
        reject_df.write.format("parquet").mode("overwrite").save(f"/mnt/raw/database/Reject/{table}/REJECT_{table}_{current_timestamp}.parquet")
    
    clean_df = df.subtract(reject_df)
    clean_df = clean_df.dropDuplicates()
    if table in new_column_names:
        clean_df = rename_columns(clean_df, table, new_column_names)
    clean_df = add_columns_to_df(clean_df, table, file_path)
    clean_df.show()
    if table in ["ORDER_HEADER", "ORDER_DETAILS"]:
        clean_df.write.format("jdbc").mode("append").option("url", jdbcUrl).option("dbtable", f"Ingestion.{table}").option("user", jdbcUsername).option("password", jdbcPassword).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").save()
    else:
        clean_df.write.format("jdbc").mode("overwrite").option("url", jdbcUrl).option("dbtable", f"Ingestion.{table}").option("user", jdbcUsername).option("password", jdbcPassword).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").save()
    

    print(f"Table {table} successfully ingested")

+-------------+------------------+---------------------+
|RETAILER_NAME|RETAILER_SITE_CODE|RETAILER_CONTACT_CODE|
+-------------+------------------+---------------------+
+-------------+------------------+---------------------+

+--------------------+------------------+---------------------+---------+----------+----------+
|       RETAILER_NAME|RETAILER_SITE_CODE|RETAILER_CONTACT_CODE|SOURCE_ID|  DataDate|UpdateDate|
+--------------------+------------------+---------------------+---------+----------+----------+
|      The Sport Pros|             20556|                 3575| LKP_File|2024-10-08|      NULL|
|Connor Department...|             20130|                 3141| LKP_File|2024-10-08|      NULL|
|Silm�asema Karell Oy|             20663|                 3682| LKP_File|2024-10-08|      NULL|
|Chen Yu Enterpris...|             20365|                 3356| LKP_File|2024-10-08|      NULL|
|Outdoor-Fachgesch...|             20438|                 3457| LKP_File|2024-10-08|      NULL|
|  