In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd

In [0]:
catalog_name = "ecommerce"

#Brand Table

In [0]:
df_bronze = spark.read.table(f"{catalog_name}.bronze.bronze_brand")

In [0]:
df_bronze.limit(3).display()

Removing Spaces from **brand_name** column

In [0]:
from pyspark.sql.functions import trim, col

df_clean = df_bronze.withColumn(
    "brand_name", trim(col("brand_name"))
)

display(df_clean)

brand name should not have special character so replacing it with sapce

In [0]:
from pyspark.sql.functions import col, regexp_replace

df_clean = df_clean.withColumn(
    "brand_id", regexp_replace(col("brand_id"),r'[^0-9-A-Z-a-z]','')
)
# df_clean.limit(100).display()

In [0]:
df_clean.show()


In [0]:
df_clean.select("category_code").distinct().display()


In [0]:
df_clean.show(4)


In [0]:
anomolies  = {
    "BOOKS":"BKS",
    "GROCERY":"GRC",
    "TOYS":"TOY",
}

In [0]:

df_clean = df_clean.replace(
    to_replace=anomolies, 
    subset=["category_code"]
)

In [0]:
df_clean.select("category_code").distinct().display()


In [0]:
df_silver = df_clean\
    .write\
    .mode("overwrite")\
    .option("mergeSchema", "true")\
    .format("delta")\
    .saveAsTable(f"{catalog_name}.silver.silver_brand")

#category

In [0]:
df_bronze_cat = spark.table(f"{catalog_name}.bronze.bronze_category")

In [0]:
df_bronze_cat.display()

Drop Duplicate

In [0]:

df_silver_cat = df_bronze_cat.dropDuplicates(["category_id"])
display(df_silver_cat)

In [0]:
df_silver_cat.write\
    .mode("overwrite")\
    .format("delta")\
    .option("mergeSchema", "true")\
    .saveAsTable(f"{catalog_name}.silver.silver_category")

#customers

In [0]:
raw_path_customer = "/Volumes/ecommerce/raw/source_date/customers/*.csv"

In [0]:
df_bronze_customer = spark.table(f"{catalog_name}.bronze.bronze_customers")
df_bronze_customer.display()

In [0]:
df_bronze_customer.count()


In [0]:
df_bronze_customer.count()
df_bronze_customer.select("customer_id").distinct().count()

In [0]:
null = df_bronze_customer.filter(col("customer_id").isNull()).count()
print(null)

In [0]:
null = df_bronze_customer.filter(col("customer_id").isNull()).show(5)


In [0]:
df_silver_customer = df_bronze_customer.dropna( subset=["customer_id"])

In [0]:
df_silver_customer.count()

In [0]:
null1 = df_silver_customer.filter(col("phone").isNull()).count()
print(null1)

In [0]:
null1 = df_silver_customer.filter(col("phone").isNull()).show(3)

In [0]:
df_silver_customer = df_silver_customer.fillna("NotApplicable",subset=['phone'])

In [0]:
df_silver_customer.filter(col("phone").isNull()).show(3)


In [0]:
df_silver_customer.write\
    .mode("overwrite")\
    .format("delta")\
    .option("mergeSchema", "true")\
    .saveAsTable(f"ecommerce.silver.silver_customer")

#date

In [0]:
df_bronze = spark.table(f"{catalog_name}.bronze.bronze_date")

In [0]:
df_bronze.display()

In [0]:
df_bronze_date.printSchema()

In [0]:
from pyspark.sql.functions import col, expr

df_silver_date = df_bronze_date.withColumn(
    "date",
    expr("try_to_date(date, 'dd-MM-yyyy')")
)
display(df_silver_date)

In [0]:
dup = df_silver_date.groupBy("date").count().filter(col("count") > 1)
dup.show()


In [0]:
df_silver_date = df_silver_date.dropDuplicates(["date"])


In [0]:
dup = df_silver_date.groupBy("date").count().filter(col("count") > 1)
dup.show()

In [0]:
from pyspark.sql.functions import col, initcap, abs

df_silver_date = (
    df_silver_date
    .withColumn(
        "day_name",
        initcap(col("day_name"))
    )
    .withColumn(
        "week_of_year",
        abs(col("week_of_year"))
    )
)
display(df_silver_date)

In [0]:
df_silver_date.write\
    .mode("overwrite")\
    .format("delta")\
    .option("mergeSchema", "true")\
    .saveAsTable(f"{catalog_name}.silver.silver_date")

#Product

In [0]:

catalog_name = 'ecommerce'  # replace with your actual catalog name
df_bronze_product = spark.table(f"{catalog_name}.bronze.bronze_products")

df_bronze_product.display()


In [0]:
from pyspark.sql.functions import col, regexp_replace, round

df_silver_product = (
    df_bronze_product
    .withColumn(
        "weight_grams",
        regexp_replace(col("weight_grams"), "g", "")
    )
    .withColumn(
        "weight_grams",
        col("weight_grams").cast("double")
    )
    .withColumn(
        "length_cm",
        round(
            regexp_replace(col("length_cm"), ",", ".").cast("double"),2
        )
    )
)

display(df_silver_product)

In [0]:
from pyspark.sql.functions import col, when

df_silver_product = (
    df_silver_product
    .withColumn(
        "material",
        when(col("material") == "Coton", "Cotton")
        .when(col("material") == "Aluminum", "Aluminium")
        .when(col("material") == "Ruber", "Rubber")
        .otherwise(col("material"))
    )
)

In [0]:
from pyspark.sql.functions import abs, col

df_silver_product = df_silver_product.withColumn(
    "rating_count",
    abs(col("rating_count"))
)

display(df_silver_product)

In [0]:
df_silver_product.write\
    .mode("overwrite")\
    .format("delta")\
    .option("mergeSchema", "true")\
    .saveAsTable(f"{catalog_name}.silver.silver_products")vbcb 