In [0]:
from pyspark.sql.types import StringType , IntegerType , DateType , FloatType , TimestampType 
import pyspark.sql.functions as F 

# cleaning the brands table

In [0]:
df = spark.table("ecommerce.bronze.brand_base_table")
df_space = df.withColumn("brand_name", trim(col("brand_name")))
df = df_space.withColumn("brand_code",F.regexp_replace(F.col("brand_code"),r'[^A-Z,a-z,0-9]',''))
df.show(10)

In [0]:
df.select("category_code").distinct().show()

In [0]:
error={
    "BOOKS":"BKS",
    "GROCERY":"GRCY",
    "TOYS":"TOY"
}

df_silver = df.replace(error,subset=["category_code"])
df_silver.show(10)
df_silver.write.format("delta").mode("overwrite").saveAsTable("ecommerce.silver.brand_base_table")

# Cleaning Category table

In [0]:
df = spark.table("ecommerce.bronze.category_base_table")
df_duplicates = df.groupBy("category_code").count().filter(F.col("count")>1)
df = df.dropDuplicates(subset=["category_code"])
df = df.withColumn(("category_name"),F.upper(F.col("category_name")))
df.show(10)

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("ecommerce.silver.category_base_table")


# Cleaning Product Table


In [0]:
df = spark.table("ecommerce.bronze.product_base_table")
df = df.withColumn("length_cm",F.regexp_replace(F.col("length_cm").cast("string"), ",",".")
)
df= df.withColumn("weight_grams",F.regexp_replace(F.col("weight_grams"), "g","").cast("int")
)
df = df.withColumn("category_code",F.upper(F.col("category_code")))
df = df.withColumn("brand_code",F.upper(F.col("brand_code")))
df = df.withColumn("rating_count", F.abs(F.col("rating_count")))
df = df.filter(F.col("rating_count") <= 5)
df.show(10)

df.write.format("delta").mode("overwrite").saveAsTable("ecommerce.silver.product_base_table")

# Cleaning Customer Table


In [0]:
df = spark.table("ecommerce.bronze.customer_base_table")
df = df.filter(F.col("customer_id").isNotNull())
df = df.withColumn("phone",F.when(F.col("phone").isNull(), "Not Available").otherwise(F.col("phone")))

df.write.format("delta").mode("overwrite").saveAsTable("ecommerce.silver.customer_base_table")

# Cleaning Date Table

In [0]:
df = spark.table("ecommerce.bronze.date_base_table")
df = df.withColumn("date",F.try_to_date(F.col("date"),"MM-dd-yyyy"))
df = df.withColumn("day_name",F.upper("day_name"))
df = df.withColumn("week_of_year",F.abs(F.col("week_of_year")))
duplicates = df.groupBy("date").count().filter(F.col("count") > 1)
df = df.dropDuplicates(subset=["date"])
df = df.withColumn("quarter",F.concat(F.lit("Q"), F.col("quarter"), F.lit(" "), F.col("year")))
df = df.withColumn("week_of_year",F.concat(F.lit("Week"), F.col("week_of_year"), F.lit(" "), F.col("year")))

df.write.format("delta").mode("overwrite").saveAsTable("ecommerce.silver.date_base_table")