In [15]:
# ==============================================================================
# STANDARD LIBRARIES & SPARK CONFIGURATION
# ==============================================================================

# mssparkutils: Microsoft Fabric utility toolset for file system operations,
# secret retrieval, and cross-notebook orchestration.
from notebookutils import mssparkutils

# col: Function to select and manipulate specific DataFrame columns.
# trim: Removes leading and trailing whitespace from strings.
# to_timestamp: Converts a string column to a Timestamp type based on a format.
from pyspark.sql.functions import col, trim, to_timestamp

# F: Standard alias for PySpark SQL functions, providing a centralized
# namespace for transformations (aggregations, math, logic).
from pyspark.sql import functions as F

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 17, Finished, Available, Finished)

In [16]:
# Function to interface with the Spark catalog and retrieve data
def extract_table(table_name: str):
    """
    Reads a table from the Spark catalog and loads it into a DataFrame.

    Args:
        table_name (str): The name of the table to be extracted (e.g., 'gold_sales_data').

    Returns:
        pyspark.sql.dataframe.DataFrame: A Spark DataFrame containing the table data.
    """
    df = spark.read.table(table_name)
    return df

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 18, Finished, Available, Finished)

In [17]:
# Function to automate whitespace removal across the entire DataFrame
def trim_all_string_cols(df):
    """
    Identifies all string-type columns and applies the SQL trim function to remove 
    leading and trailing whitespace.
    
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to be cleaned.
        
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with trimmed string values.
    """

    # Get all columns that are of type 'string'
    string_columns = [c for c, t in df.dtypes if t == 'string']

    # Apply trim to those columns specifically
    df_trimmed = df.withColumns({c: trim(col(c)) for c in string_columns})

    # Return the transformed DataFrame with cleaned string data
    return df_trimmed

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 19, Finished, Available, Finished)

In [18]:
# Wrapper function to orchestrate data cleaning tasks
def trim_data(df):

    trimmed_df = trim_all_string_cols(df)
    # group_df = trimmed_df.groupBy(['geolocation_zip_code_prefix', 'geolocation_state']) \
    #     .agg({'geolocation_lat': 'mean', 'geolocation_lng':'mean'})

    return trimmed_df

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 20, Finished, Available, Finished)

In [19]:
# Function to persist DataFrame results back to the Lakehouse as a Delta Table
def load_df_to_delta(df, table_name: str, mode: str = "overwrite"):
    """
    Load and saves dataframe data as a Delta Table.
    
    Args:
        df (spark DataFrame): df to be loaded to delta table.
        table_name: name of destination table 
        mode (str): 'overwrite' to replace the table, default.
    """
    try:
        # 1. Save as a Delta Table 
        df.write.format("delta") \
            .mode(mode) \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)

        # Confirm successful completion
        print(f"Table '{table_name}' loaded successfully !")
        
    except Exception as e:
        # Error handling to catch issues like schema mismatches or permission errors
        print(f"Error processing table {table_name}: {str(e)}")


StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 21, Finished, Available, Finished)

In [20]:
# Function to perform Data Quality (DQ) checks for null values in key seller fields
def check_null_columns(df):
    """
    Scans specific seller identity and location columns for NULL values,
    counts the occurrences, and previews the problematic records.
    
    Args:
        df (pyspark.sql.DataFrame): The DataFrame to be inspected.
    """

    # Identify rows where any of the mandatory seller fields are null.
    null_df = df.filter(
            F.col("seller_id").isNull() | 
            F.col("seller_zip_code_prefix").isNull() | 
            F.col("seller_city").isNull() |             
            F.col("seller_state").isNull()
        )

    null_count = null_df.count()
    print(f"Records with null columns: {null_count}")

    # If the filter captured any records, display them for manual inspectio
    if null_count > 0:
        # Reuses the filtered logic
        null_df.show()
pass

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 22, Finished, Available, Finished)

In [21]:
# Function acting as a central hub for data cleaning and filtering logic
def clean_dirty_data(df):
    #Clean by removing dirty data

    print("Cleaning script executed ! ")

    return df


StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 23, Finished, Available, Finished)

In [22]:
# Function to validate foreign key relationships between the source and a reference table
def check_referencing_key(df, ref_table_name: str, df_col: str, ref_col: str, error_col: str): 
    """
    Identifies records in the source DataFrame whose keys are missing from a 
    reference table. Rows with missing keys are flagged with 'Y' in a new column.

    Args:
        df (pyspark.sql.DataFrame): The source DataFrame to validate.
        ref_table_name (str): The name of the catalog table used for validation.
        df_col (str): The key column name in the source DataFrame.
        ref_col (str): The key column name in the reference table.
        error_col (str): The name of the flag column to be created (e.g., 'invalid_zip_flag').

    Returns:
        pyspark.sql.DataFrame: The DataFrame with the added error flag column.
    """

    # 1. Load the reference table from the Spark Catalog
    ref_df = spark.read.table(ref_table_name)    

    # 2. Perform a Left Join to find mismatches
    # Aliasing as "main" and "ref" prevents ambiguous column errors during selection.
    invalid_keys_df = df.alias("main").join(
        ref_df.alias("ref"), 
        df[df_col] == ref_df[ref_col], 
        how="left"
    )

    # 3. Identify records where the join result is NULL (meaning no match was found)
    invalid_count = invalid_keys_df.filter(col("ref." + ref_col).isNull()).count()

    if invalid_count > 0:
        print(f"Found {invalid_count} Keys: {df_col} not present in the referencing table: {ref_table_name}")
        print(f"{error_col} column set to Y")

        # Display the specific values from the main table that failed the lookup
        invalid_keys_df.select(col("main." + df_col)).filter(col("ref." + ref_col).isNull()).show()

        # 4. Apply the 'Y' flag to invalid rows and remove auxiliary reference columns
        # F.when() assigns 'Y' to null matches; .select("main.*") keeps only original source data.
        df = invalid_keys_df.withColumn(
        error_col,
        F.when(F.col("ref." + ref_col).isNull(), "Y")
        ).select("main.*", error_col)

    else:
        # Confirms all data has a valid corresponding entry in the reference table
        print(f"All {df_col} found to be valid and present in the referencing table: {ref_table_name}.\n")

    return df

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 24, Finished, Available, Finished)

In [23]:
# Function to verify data uniqueness and identify duplicate primary keys
def check_records_count(df, col_name: str): 
    """
    Validates the integrity of a specific column by comparing total row counts 
    against distinct row counts and identifying any duplicate values.

    Args:
        df (pyspark.sql.DataFrame): The DataFrame to be audited.
        col_name (str): The name of the column that should be unique (e.g., 'customer_id').
    """
    
    # Check that after cleaning records is now unique and dirty data (if any) removed
    print(f"Total count: {df.select(col_name).count()}")
    print(f"Distinct count: {df.select(col_name).distinct().count()}")

    # can add assert statement
    
    print("Records to investigate (if any not unique key)")
    df.groupBy(col_name).count().filter("count > 1").show()

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 25, Finished, Available, Finished)

In [24]:
# ==============================================================================
# MAIN ETL PIPELINE EXECUTION: Sellers Bronze to Silver
# ==============================================================================

# 1. Configuration: Define source and destination entry points in the Lakehouse
from_table_name = 'BronzeLakeHouse.dbo.sellers_bronze'
to_table_name = 'sellers_silver'

# 2. Extract: Load the raw seller data from the Bronze layer
df = extract_table(from_table_name)

# 3. Transform (Cleaning): Remove whitespace from all string columns (ID, City, State, etc.)
df = trim_data(df)

# 4. Data Quality Check: Print count of rows with null values in mandatory seller fields
check_null_columns(df)

# 5. Referential Integrity Check A: Validate seller zip codes against geolocation master data
# Adds 'not_in_geo_flag' column set to 'Y' for missing references.
df = check_referencing_key(df, 'geolocation_silver', 'seller_zip_code_prefix', 'geolocation_zip_code_prefix', 'not_in_geo_flag')

# 6. Referential Integrity Check B: Validate seller existence within the order items records
# Adds 'not_in_order_items_flag' column set to 'Y' for sellers without associated items.
df = check_referencing_key(df, 'BronzeLakeHouse.dbo.order_items_bronze', 'seller_id', 'seller_id', 'not_in_order_items_flag')

# 7. Uniqueness Validation: Ensure 'seller_id' remains a unique primary key
check_records_count(df, 'seller_id')

# 8. Conditional Cleaning: Optional hook to filter or drop records based on flags
# >> If dirty Record to Clean here. 
# df = clean_dirty_data(df)

# 9. Load: Persist the final validated DataFrame as a Delta table in the Silver layer
load_df_to_delta(df, to_table_name)

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 26, Finished, Available, Finished)

Records with null columns: 0
Found 7 Keys: seller_zip_code_prefix not present in the referencing table: geolocation_silver
not_in_geo_flag column set to Y
+----------------------+
|seller_zip_code_prefix|
+----------------------+
|                 82040|
|                 91901|
|                 72580|
|                 02285|
|                 07412|
|                 71551|
|                 37708|
+----------------------+

All seller_id found to be valid and present in the referencing table: BronzeLakeHouse.dbo.order_items_bronze.

Total count: 3095
Distinct count: 3095
Records to investigate (if any not unique key)
+---------+-----+
|seller_id|count|
+---------+-----+
+---------+-----+

Table 'sellers_silver' loaded successfully !


In [25]:
# spark.sql("REFRESH TABLE order_items_silver")

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 27, Finished, Available, Finished)

In [26]:
# df.createOrReplaceTempView("test")
# check_df = spark.sql(""" select count(*) from test where not_in_geo_flag = 'Y' """)
# # check_df = spark.sql(""" select count(*) from test where not_in_orders_flag IS NULL """)

# check_df.show()

StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 28, Finished, Available, Finished)

In [27]:
# ==============================================================================
# SESSION TERMINATION
# ==============================================================================

# mssparkutils.session.stop(): Manually shuts down the active Spark session.
# This releases the allocated compute (V-Cores) back to the capacity pool.

mssparkutils.session.stop()


StatementMeta(, 349e4202-63e3-4f78-b954-ab3739ad03ee, 29, Finished, Available, Finished)