In [0]:
# SILVER LAYER – CUSTOMERS
# Purpose:
# - Enforce schema & data contracts
# - Reject rescued/malformed rows
# - Deduplicate records
# - Publish clean Silver table
#
# Upstream: git_analysis.bronze.customers
# Downstream: git_analysis.silver.customers

In [0]:
# Importing necessary libraries

In [0]:
%run /Workspace/Users/n_avazpour@yahoo.com/DataBricks/utils/import_needed_lib_functions

In [0]:
# Defining schemas that is neccessary if we didnt define it at bronze layer

In [0]:
customers_silver_schema = StructType(
    fields=[StructField("customer_id",StringType(),False),
            StructField("name",StringType(),True),
            StructField("region",StringType(), True),
            StructField("country",StringType(), True),
            StructField("signup_date",DateType(),True)  ])

In [0]:
# Reading bronze table into a dataframe

In [0]:
customer_bronze_df = spark.table("git_analysis.bronze.customers")


In [0]:
# Saving the extra column "_rescued_data" that databricks has added in bronze layer in a separate quarantine table


In [0]:
%python
customer_quarantine_df = customer_bronze_df.filter(F.col("_rescued_data").isNotNull())

(
    customer_quarantine_df
    .write
    .format("delta")
    .mode("append")
    .saveAsTable("git_analysis.silver.customers_quarantine")
)


In [0]:
# Filtering the extra column "_rescued_data" that databricks has added in bronze layer

In [0]:
customer_clean_df = customer_bronze_df.filter(F.col("_rescued_data").isNull())


In [0]:
# checking the coloumn types and renaming the columns

In [0]:
       
customer_typed_df = customer_clean_df.select(
    F.col("customer_id").cast("string").alias("customer_id"),
    F.col("name").cast("string").alias("name"),
    F.col("region").cast("string").alias("region"),
    F.col("country").cast("string").alias("country"),
    F.col("signup_date").cast("date").alias("signup_date")
)

In [0]:

#checking for data quality

In [0]:
%python
customer_dq_df = customer_typed_df.filter(
    F.col("customer_id").isNotNull() &
    F.col("name").isNotNull() &
    F.col("country").isNotNull() & 
    F.col("signup_date").isNotNull() &
    (F.col("signup_date") <= current_date())
)

In [0]:
#setting alert for bad data entry for checking data quality

In [0]:
bad_count = customer_typed_df.count() - customer_dq_df.count()
if bad_count > 0:
    print(f"⚠️ Dropped {bad_count} rows due to DQ rules")


In [0]:
#checking for duplicate values and droping them


In [0]:
window = Window.partitionBy("customer_id").orderBy(F.col("signup_date").desc())

customer_dedup_df = (
    customer_dq_df
    .withColumn("rn", F.row_number().over(window))
    .filter(F.col("rn") == 1)
    .drop("rn")
)


In [0]:
#checking schema of result silver table to avoid mismatch

In [0]:
#assert customer_dedup_df.schema == customers_silver_schema, "Schema mismatch in Silver!"
#assert customer_dedup_df.schema.simpleString() == customers_silver_schema.simpleString(), \
 #   f"Schema mismatch in Silver! Expected {customers_silver_schema}, got {customer_dedup_df.schema}"


In [0]:
#write df to silver table

In [0]:
(
    customer_dedup_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("git_analysis.silver.customers")
)


In [0]:
#metrics_df = spark.createDataFrame(
    [(customer_dedup_df.count(), customer_quarantine_df.count())],
    ["silver_row_count", "quarantined_row_count"]
#)

#metrics_df.write.mode("append").saveAsTable(
    "git_analysis.monitoring.customer_pipeline_metrics"
#)
