# Silver layer

In [0]:
bronze_base = "/Volumes/practise/ecommerce/bronze"
silver_base = "/Volumes/practise/ecommerce/silver"

In [0]:
from pyspark.sql.functions import to_date, col, length

# -------------------------
# Clean orders table
# -------------------------
# order_id,customer_id,payment,order_date,delivery_date
orders_clean = (
    spark.read.format("delta").load(f"{bronze_base}/order")
        .withColumn("delivery_date", to_date("delivery_date", "M/d/yyyy HH:mm:ss"))
        .withColumn("order_date", to_date("order_date", "M/d/yyyy HH:mm:ss"))
        .withColumn("payment", col("payment").cast("double"))
        .filter((col("order_id").isNotNull()) & (length(col("order_id")) > 0))
        .filter((col("customer_id").isNotNull()) & (length(col("customer_id")) > 0))
        .filter((col("payment").isNotNull()) & (col("payment") >= 0))
        .filter(col("order_date").isNotNull())
        .filter(col("delivery_date").isNotNull())
)

# -------------------------
# Clean customers table
# -------------------------
# customer_id,customer_name,gender,age,home_address,zip_code,city,state,country
customers_clean = (
    spark.read.format("delta").load(f"{bronze_base}/customer")
        .filter((col("customer_id").isNotNull()) & (length(col("customer_id")) > 0))
        .filter((col("customer_name").isNotNull()) & (length(col("customer_name")) > 0))
        .filter(col("gender").isNotNull())
        .filter((col("age").isNotNull()) & (col("age") >= 0) & (col("age") <= 120))
        .filter(col("home_address").isNotNull())
        .filter(col("zip_code").isNotNull())
        .filter(col("city").isNotNull())
        .filter(col("state").isNotNull())
        .filter(col("country").isNotNull())
)

# -------------------------
# Clean products table
# -------------------------
# product_ID,product_type,product_name,size,colour,price,quantity,description
products_clean = (
    spark.read.format("delta").load(f"{bronze_base}/product")
        .withColumn("price", col("price").cast("double"))
        .withColumn("quantity", col("quantity").cast("int"))
        .filter((col("product_ID").isNotNull()) & (length(col("product_ID")) > 0))
        .filter((col("product_name").isNotNull()) & (length(col("product_name")) > 0))
        .filter((col("product_type").isNotNull()) & (length(col("product_type")) > 0))
        .filter((col("size").isNotNull()) & (length(col("product_ID")) > 0))
        .filter((col("price").isNotNull()) & (col("price") >= 0))
        .filter((col("quantity").isNotNull()) & (col("quantity") >= 0))
        .filter((col("description").isNotNull()) & (length(col("description")) > 0))
)

# -------------------------
# Clean sales table
# -------------------------
# sales_id,order_id,product_id,price_per_unit,quantity,total_price
sales_clean = (
    spark.read.format("delta").load(f"{bronze_base}/sale")
        .withColumn("quantity", col("quantity").cast("int"))
        .withColumn("price_per_unit", col("price_per_unit").cast("double"))
        .withColumn("total_price", col("total_price").cast("double"))
        .filter((col("sales_id").isNotNull()) & (length(col("sales_id")) > 0))
        .filter((col("order_id").isNotNull()) & (length(col("order_id")) > 0))
        .filter((col("product_id").isNotNull()) & (length(col("product_id")) > 0))
        .filter((col("quantity").isNotNull()) & (col("quantity") > 0))
        .filter((col("price_per_unit").isNotNull()) & (col("price_per_unit") >= 0))
        .filter((col("total_price").isNotNull()) & (col("total_price") >= 0))
)

In [0]:
print(silver_base)
customers_clean.printSchema()
orders_clean.printSchema()
products_clean.printSchema()
sales_clean.printSchema()

In [0]:
customers_clean.write.format("delta").mode("overwrite").save(f"{silver_base}/customers")
orders_clean.write.format("delta").mode("overwrite").save(f"{silver_base}/orders")
products_clean.write.format("delta").mode("overwrite").save(f"{silver_base}/products")
sales_clean.write.format("delta").mode("overwrite").save(f"{silver_base}/sales")