<u>**1. Silver Layer - medallion architecutre**</u>

This notebook is used to cleanse data from the bronze layer. In the bronze layer, data is loaded and not cleansed—it may contain duplicates or data that is invalid based on domain knowledge. In the silver layer, data unsuitable for analysis should be removed.

First step is to load nonstandard libraries/functions

In [15]:
from pyspark.sql.functions import lit, col, trim

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 17, Finished, Available, Finished)

<u>**2. Data cleaning**</u>

Data cleansing involves removing duplicates, rows with missing values ​​in critical columns or with substantively incorrect values.


This notebook is connected to Pipeline Data Factory. A canvas variable named time_pipeline is passed to the notebook.

In [None]:
if not time_pipeline:
    raise ValueError("Error: time_pipeline variable is empty or not provided by the pipeline.")    
time_pipeline = str(time_pipeline)

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 18, Finished, Available, Finished)

First, you need to retrieve data from the tables and add a processing time column with the value obtained from the variable passed from canvas.

In [17]:
try:
    all_tables = [t.name for t in spark.catalog.listTables("bronze")]
    if not all_tables:
        raise ValueError("Error: No tables found in the bronze schema.")
except Exception as e:
    raise Exception(f"Error accessing bronze schema: {str(e)}")

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 19, Finished, Available, Finished)

Then a column with the processing time is added to each table.

In [18]:
for table_name in all_tables:
    full_table_name = f"bronze.{table_name}"
    df_bronze = spark.read.table(full_table_name)
    
    df_silver = df_bronze.withColumn("processed_at", lit(time_pipeline))
    

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 20, Finished, Available, Finished)

In order to remove duplicates, a list of primary keys should be created - it is assumed that, unlike in the OLTP database, integrity constraints do not work here.

In [19]:
ids_list = ["DiamondID","CustomerID", "TransactionID", "CountryID"]

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 21, Finished, Available, Finished)

Then, functions for processing individual tables are formulated.

In [20]:
def processing_diamonds(df):
    result = df.filter(
                (col("price").isNotNull()) & (col("price") >= 0) &
                (col("carat").isNotNull()) & (col("carat") > 0) &
                (col("x").isNotNull()) & (col("x") > 0) &
                (col("y").isNotNull()) & (col("y") > 0) &
                (col("z").isNotNull()) & (col("z") > 0) &
                (col("cut").isNotNull()) &
                (col("color").isNotNull()) &
                (col("clarity").isNotNull())
                )
    return result

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 22, Finished, Available, Finished)

In [21]:
def processing_countries(df):
    result = df.filter(
        (col("country").isNotNull()) & (trim(col("country")) != "") &
        (col("region").isNotNull()) & (trim(col("region")) != "") &
        (col("GNI_per_capita").isNotNull()) & (col("GNI_per_capita") >= 0)
        )
    return result

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 23, Finished, Available, Finished)

In [22]:
def processing_customers(df):
    result = df.filter(
        (col("CustomerID").isNotNull()) &
        (col("CountryID").isNotNull()) &
        (col("CustomerID") > 0) &
        (col("CountryID") > 0)
        )
    return result

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 24, Finished, Available, Finished)

In [23]:
def processing_transactions(df):
    result = df.filter(
        (col("TransactionID").isNotNull()) &
        (col("CustomerID").isNotNull()) &
        (col("DiamondID").isNotNull()) &
        (col("Quantity").isNotNull()) &
        (col("Quantity") > 0)
        )
    return result

StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 25, Finished, Available, Finished)

Then the transformation is performed for all columns

In [24]:
for table_name in all_tables:
    full_table_name = f"bronze.{table_name}"
    df_bronze = spark.read.table(full_table_name)

    df_silver = df_bronze.withColumn("processed_at", lit(time_pipeline))

    for id_col in ids_list:
        if id_col in df_silver.columns:
            df_silver = df_silver.dropDuplicates([id_col])            
            break 

    if table_name == "diamonds":
        df_silver = processing_diamonds(df_silver)

    if table_name == "countries":
        df_silver = processing_countries(df_silver)
    
    if table_name == "customers":
        df_silver = processing_customers(df_silver)

    if table_name == "transactions":
        df_silver = processing_transactions(df_silver)

    df_silver.write.mode("overwrite").saveAsTable(f"silver.{table_name}")


StatementMeta(, bf41620f-6fe1-4534-8ca4-0f4490111d44, 26, Finished, Available, Finished)