## Initialization

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, trim


# Read Bronze Table

In [0]:
df = spark.table("workspace.bronze.erp_cust_az12")

In [0]:
df.display()

# Silver Transformations

## Trimming

In [0]:
for field in df.schema.fields:
  if isinstance(field.dataType, StringType):
    df = df.withColumn(field.name, trim(col(field.name)))


In [0]:
df.display()


## Customer ID Cleanup

In [0]:

df = df.withColumn(
    "cid",
    F.when(col("cid").startswith("NAS"),
           F.substring(col("cid"), 4, F.length(col("cid"))))
     .otherwise(col("cid"))
)
# F.when(condition, value) similar to case-when in SQL, only returns the value if the condition is true.
# Otherwise, it returns null. Does the value in "cid" column start with "NAS"?
# If yes, then return the value of "cid" column after removing the first 4 characters.
# Otherwise, return the value of "cid" column as is
# Substring() extracts the value of the column and in spark index starts from 1 so first 4 characters are removed.

In [0]:

df.display()

## Birthdate Validation

In [0]:
df = df.withColumn(
    "bdate",
    F.when(col("bdate") > F.current_date(), None)
     .otherwise(col("bdate"))
)
# when bdate is greater than today's date then return null, otherwise return the value of bdate

In [0]:
df.display()

## Gender Normalization

In [0]:
df = df.withColumn(
    "gen",
    F.when(F.upper(col("gen")).isin("F", "FEMALE"), "Female")
     .when(F.upper(col("gen")).isin("M", "MALE"), "Male")
     .otherwise("n/a")
)
# F.upper() converts all value in a column to uppercase to make the comparison case-insensitive. isin() checks if the value is in the list of values. Checks if the uppercased value is either "F", "FEMALE" then returns "Female". Otherwise, returns "n/a".


## Renaming Columns

In [0]:
RENAME_MAP = {
    "cid": "customer_number",
    "bdate": "birth_date",
    "gen": "gender"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

## Sanity checks of dataframe

In [0]:
df.limit(10).display()

# Writing Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.erp_customers")

## Sanity Checks of Silver Table

In [0]:
%sql
SELECT * FROM workspace.silver.erp_customers LIMIT 10