**Notebook Set**

In [0]:
from pyspark.sql import functions as F

Define Bronze and silver paths and Load Bronze data into dataframe

In [0]:
# Bronze Paths
retail_bronze_path = "/Volumes/project4cat/project4db/p4bronze/retail_bronze"
customer_bronze_path = "/Volumes/project4cat/project4db/p4bronze/customer_bronze"

# Silver Output Paths
retail_silver_path = "/Volumes/project4cat/project4db/p4silver/retail_silver"
customer_silver_path = "/Volumes/project4cat/project4db/p4silver/customer_silver"


In [0]:
df_retail_bronze = spark.read.format("delta").load(retail_bronze_path)
df_customer_bronze = spark.read.format("delta").load(customer_bronze_path)

# display(df_retail_bronze)
# display(df_customer_bronze)

In [0]:
df_retail = df_retail_bronze
df_customer = df_customer_bronze

2.2 Standardize Gender

In [0]:
df_customer = (
    df_customer
    .withColumn("gender",
        F.when(F.lower(F.col("gender")).isin("m","male"),"Male")
        .when(F.lower(F.col("gender")).isin("f","female"),"Female")
        .otherwise("Unknown")
    )
)
# display(df_customer)

2.3 Clean City Column

In [0]:
df_customer = (
    df_customer
    .withColumn("City", F.trim(F.col("City")))
)

df_customer = (
    df_customer
    .withColumn("City", F.when(F.col('City').isNull(), "Unknown").otherwise(F.col("City")))
)

df_customer = (
    df_customer
    .withColumn("City",F.upper(F.col("City")))
)

# display(df_customer)

2.4 Clean Age Column

In [0]:
df_customer = (
    df_customer
    .withColumn("Age", F.trim(F.col("Age").cast("double")))
)

df_customer = (
    df_customer
    .withColumn("Age", F.when(F.col("Age").cast("double").isNotNull(), F.col("Age").cast("double")).otherwise(F.lit(None)))
)

df_customer = (
    df_customer
    .withColumn("Age", F.when((F.col("Age") < 0) | (F.col("Age") > 100) | (F.col("Age").isNull()), F.lit(None)).otherwise(F.col("Age")))
)
# display(df_customer)

In [0]:

display(df_customer)

2.5 Normalize Loyalty Tier

In [0]:

# display(df_customer)

In [0]:
df_customer = (
    df_customer
    .withColumn("Loyalty_tier", F.trim(F.col("Loyalty_tier")))
)

df_customer = (
    df_customer
    .withColumn("Loyalty_tier", F.when(F.col("Loyalty_tier").isNull(), "Unknown").otherwise(F.col("Loyalty_tier")))
)

df_customer = (
    df_customer
    .withColumn("Loyalty_tier", F.upper("Loyalty_tier"))
)
# display(df_customer)

2.6 Standardize Signup Date

In [0]:
date_formats = [
    "yyyy-MM-dd",
    "MM/dd/yyyy",
    "dd-MM-yyyy",
    "yyyy/MM/dd",
    "dd/MM/yyyy",
    "dd-MMM-yyyy"
]

df_customer = df_customer.withColumn(
    "signup_date",
    F.coalesce(
        *[F.expr(f"try_to_date(signup_date, '{fmt}')") for fmt in date_formats]
    )
)
# display(df_customer)

2.7 Remove Duplicates

In [0]:
df_customer = (
  df_customer
  .dropDuplicates(["customer_id"])
)

# display(df_customer)

2.8 Write Silver Output

In [0]:
df_customer.write.format("delta").mode("overwrite").save(customer_silver_path)

2.9 Describe version history

In [0]:
spark.sql(f"DESCRIBE HISTORY delta.`{customer_silver_path}`").show(truncate=False)

# **Retail Data Cleaning**

In [0]:
display(df_retail)

1. Clean Order Date

In [0]:
order_date_formats = [
    "yyyy-MM-dd",
    "dd-MMM-yyyy",
    "dd/MM/yyyy"
]

df_retail = df_retail.withColumn(
    "order_date",
    F.coalesce(
        *[F.expr(f"try_to_date(order_date, '{fmt}')") for fmt in order_date_formats]
    )
)

# display(df_retail)

3. Clean price (remove $ and cast)

In [0]:
df_retail = df_retail.withColumn("price", F.regexp_replace("price", "[$]", ""))
df_retail = df_retail.withColumn("price", F.col("price").cast("double"))
# display(df_retail)

4 Clean Quantity

In [0]:
df_retail = (
    df_retail
    .withColumn("quantity", F.when((F.col("quantity") < 0) | (F.col("quantity").isNull()), F.lit(0)).otherwise(F.col("quantity")))
)
# display(df_retail)

5. Clean Payment Type

In [0]:
df_retail = (
    df_retail
    .withColumn("payment_type", F.upper(F.trim(F.col("payment_type"))))
)
# display(df_retail)

6. Clean order_status

In [0]:
df_retail = (
    df_retail
    .withColumn("order_status", F.upper(F.trim(F.col("order_status"))))
)
# display(df_retail)

7. Clean Returned Field

In [0]:
df_retail = df_retail.withColumn("returned", F.lower(F.trim(F.col("returned"))))
df_retail = df_retail.withColumn("returned", F.when(F.col("returned") == 'yes', True).otherwise(False))

# display(df_retail)

8. Compute Total Amount (useful for Gold Layer)

In [0]:
df_retail = df_retail.withColumn("TotalAmount", F.col("price")*F.col("quantity"))
# display(df_retail)

9. Remove Duplicates

In [0]:
df_retail = df_retail.dropDuplicates(["order_id"])
display(df_retail)

10. Write Silver Output

In [0]:
df_retail.write.format("delta").mode("overwrite").save(retail_silver_path)

Clean old files(older than 100 hours)

In [0]:
spark.sql(f"VACUUM delta.`{retail_silver_path}` RETAIN 168 HOURS")
spark.sql(f"VACUUM delta.`{customer_silver_path}` RETAIN 168 HOURS")