## Bronze to Silver : Data Cleaning and Transformations for Dimension Tables

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, FloatType

catalog_name = 'my_databricks_workspace'

In [0]:
df_bronze = spark.table(f"{catalog_name}.ecommerce_bronze.brz_brands")
df_bronze.show(10)

In [0]:
# Remove white spaces if exists in the brand name column
df_silver = df_bronze.withColumn('brand_name', F.trim(F.col('brand_name')))
df_silver.show(10)

In [0]:
# Remove special characters from the brand code other than alpha numeric
df_silver = df_silver.withColumn('brand_code', F.regexp_replace(F.col('brand_code'), '[^a-zA-Z0-9]', ''))
df_silver.show(10)

In [0]:
# Check the distinct category codes to see whether there are multiple codes for the same category
df_silver.select("category_code").distinct().show()

In [0]:
# Anomalies Dict
anomalies_dict = {
    "GROCERY" : "GRCY",
    "BOOKS" : "BKS",
    "TOYS" : "TOY"
}
  
df_silver = df_silver.replace(anomalies_dict, subset=['category_code'])
df_silver.show(10)  

In [0]:
df_silver.select("category_code").distinct().show()

- What you have done above is "Transformations" on the Bronze table.
- These transformations can be done via Experience + taling to the Domain expert

In [0]:
# Write the "Transformed" data to the silver layer (catalog : ecommerce_silver, table : slv_brands)
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true").saveAsTable(f"{catalog_name}.ecommerce_silver.slv_brands")

## Category Table Transformations

In [0]:
df_bronze = spark.table(f"{catalog_name}.ecommerce_bronze.brz_category")
df_bronze.show(10)

In [0]:
# Check whether there are any duplicated category codes in the table
df_duplicates = df_bronze.groupBy("category_code").count().filter(F.col("count") > 1)
display(df_duplicates)

In [0]:
# Found duplicates, so remove them 
df_silver = df_bronze.dropDuplicates(["category_code"])
display(df_silver)

In [0]:
# Convert category code into upper case
df_silver = df_silver.withColumn('category_code', F.upper(F.col('category_code')))
df_silver.show(10)

In [0]:
# Write the Transformed data into the silver layer
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true").saveAsTable(f"{catalog_name}.ecommerce_silver.slv_category")

Product table Transformation

In [0]:
# Read the raw data from the bronze table (ecommerce_bronze.brz_products)
df_bronze = spark.table(f"{catalog_name}.ecommerce_bronze.brz_products")

# Get row and column count
row_count, column_count = df_bronze.count(), len(df_bronze.columns)

# Print the results
print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

In [0]:
display(df_bronze.limit(5))

In [0]:
# Check  weight_grams contain the letter "g" and if so remove it and cast it to the proper data type
df_bronze.select("weight_grams").show(5, truncate=False)

In [0]:
# replace 'g' with ''
df_silver = df_bronze.withColumn(
    "weight_grams",
    F.regexp_replace(F.col("weight_grams"), "g", "").cast(IntegerType())
)
df_silver.select("weight_grams").show(5, truncate=False)

In [0]:
# Check length_cm as ther is a comma instead of . (with initial inspection you came to know this )

df_silver.select("length_cm").show(3)

In [0]:
# replace , with . and then convert to the Float data type
df_silver = df_silver.withColumn(
    "length_cm",
    F.regexp_replace(F.col("length_cm"), ",", ".").cast(FloatType())
)
df_silver.select("length_cm").show(3)

In [0]:
# category_code and brand_code are in lower case, need to convert them to the Upper case

df_silver.select("category_code", "brand_code").show(2)

In [0]:
# convert category_code and brand_code to upper case
df_silver = df_silver.withColumn(
    "category_code",
    F.upper(F.col("category_code"))
).withColumn(
    "brand_code",
    F.upper(F.col("brand_code"))
)
df_silver.select("category_code", "brand_code").show(2)

In [0]:
#Spelling mistales in Material column

df_silver.select("material").distinct().show()

In [0]:
# Fix spelling mistakes
df_silver = df_silver.withColumn(
    "material",
    F.when(F.col("material") == "Coton", "Cotton")
     .when(F.col("material") == "Alumium", "Aluminum")
     .when(F.col("material") == "Ruber", "Rubber")
     .otherwise(F.col("material"))
)
df_silver.select("material").distinct().show()    

In [0]:
# Negative values are present in the rating_count column

df_silver.filter(F.col('rating_count')<0).select("rating_count").show(3)

In [0]:
# Convert the negative values to positive (Domain expert advised so)

df_silver = df_silver.withColumn(
    "rating_count",
    F.when(F.col("rating_count").isNotNull(), F.abs(F.col("rating_count")))
     .otherwise(F.lit(0))  # if null, replace with 0
)


In [0]:
# Check final cleaned data

df_silver.select(
    "weight_grams",
    "length_cm",
    "category_code",
    "brand_code",
    "material",
    "rating_count"
).show(10, truncate=False)

In [0]:
# Write raw data to the silver layer (catalog: my_databricks_workspace, schema: ecommerce_silver, table: slv_dim_products)
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.ecommerce_silver.slv_products")

## Customers table transformation

In [0]:
# Read the raw data from the bronze table (ecommerce_bronze.brz_customers)
df_bronze = spark.read.table(f"{catalog_name}.ecommerce_bronze.brz_customers")

# Get row and column count
row_count, column_count = df_bronze.count(), len(df_bronze.columns)

# Print the results
print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

df_bronze.show(10)

In [0]:
# Handle NULL values in customer_id column

null_count = df_bronze.filter(F.col("customer_id").isNull()).count()
null_count

In [0]:
# There are 300 null values in customer_id column. Display some of those
df_bronze.filter(F.col("customer_id").isNull()).show(3)

In [0]:
# Drop rows where 'customer_id' is null
df_silver = df_bronze.dropna(subset=["customer_id"])

# Get row count
row_count = df_silver.count()
print(f"Row count after droping null values: {row_count}")

In [0]:
# Handle NULL values in "phone" column

null_count = df_silver.filter(F.col("phone").isNull()).count()
print(f"Number of nulls in phone: {null_count}") 

In [0]:
df_silver.filter(F.col("phone").isNull()).show(3)

In [0]:
### Fill null values with 'Not Available'
df_silver = df_silver.fillna("Not Available", subset=["phone"])

# sanity check (If any nulls still exist)
df_silver.filter(F.col("phone").isNull()).show()

In [0]:
# Write raw data to the silver layer (catalog: my_databricks_workspace, schema: ecommerce_silver, table: slv_customers)
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.ecommerce_silver.slv_customers")

## Calender / Date

In [0]:
# Read the raw data from the bronze table (ecommerce_bronze.brz_calendar)
df_bronze = spark.read.table(f"{catalog_name}.ecommerce_bronze.brz_calendar")

# Get row and column count
row_count, column_count = df_bronze.count(), len(df_bronze.columns)

# Print the results
print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

df_bronze.show(3)

In [0]:
df_bronze.printSchema()

In [0]:
# Converting string to date

from pyspark.sql.functions import to_date


# Convert the string column to a date type
df_silver = df_bronze.withColumn("date", to_date(df_bronze["date"], "dd-MM-yyyy"))

In [0]:
print(df_silver.printSchema())

df_silver.show(5)

In [0]:
# Remove Duplicates

# Find duplicate rows in the DataFrame
duplicates = df_silver.groupBy('date').count().filter("count > 1")

# Show the duplicate rows
print("Total duplicated Rows: ", duplicates.count())
display(duplicates)

In [0]:
# Remove duplicate rows
df_silver = df_silver.dropDuplicates(['date'])

# Get row count
row_count = df_silver.count()

print("Rows After removing Duplicates: ", row_count)

In [0]:
# Capitalize first letter of each word in day_name
df_silver = df_silver.withColumn("day_name", F.initcap(F.col("day_name")))

df_silver.show(5)

In [0]:
# Convert negative week_of_year to positive

df_silver = df_silver.withColumn("week_of_year", F.abs(F.col("week_of_year")))  # Convert negative to positive

df_silver.show(3)

In [0]:
# Enhance quarter and week_of_year columns

df_silver = df_silver.withColumn("quarter", F.concat_ws("", F.concat(F.lit("Q"), F.col("quarter"), F.lit("-"), F.col("year"))))

df_silver = df_silver.withColumn("week_of_year", F.concat_ws("-", F.concat(F.lit("Week"), F.col("week_of_year"), F.lit("-"), F.col("year"))))

df_silver.show(3)

In [0]:
# Rename column week_of_year to week
df_silver = df_silver.withColumnRenamed("week_of_year", "week")

In [0]:
# Write raw data to the silver layer (catalog: my_databricks_workspace, schema: ecommerce_silver, table: slv_calendar)
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.ecommerce_silver.slv_calendar")