#init

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim,col

In [0]:
RENAME_MAP = {
  "cst_id": "customer_id",
  "cst_key": "customer_key",
  "cst_firstname": "customer_firstname",
  "cst_lastname": "customer_lastname",
  "cst_marital_status": "customer_marital_status",
  "cst_gndr": "customer_gender",
  "cst_create_date": "customer_create_date"
}

# Reading from bronze

In [0]:
df = spark.table("workspace.bronze.crm_cusr_info")


#Data Transformations  

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name,trim(col(field.name)))



# Normalization 

In [0]:
df = (
  df
  .withColumn(
    "cst_marital_status",
    F.when(F.upper(F.col("cst_marital_status"))== "S","Single")
      .when(F.upper(F.col("cst_marital_status"))=="M","Married")
      .otherwise("n/a")
  )
  .withColumn(
    "cst_gndr",
    F.when(F.upper(F.col("cst_gndr"))=="M","Male")
      .when(F.upper(F.col("cst_gndr"))=="F","Female")
      .otherwise("n/a")
  )
)



# Renaming The Columns


In [0]:
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

In [0]:
df.display()

#Write into Silver Table

In [0]:
(
    df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("workspace.silver.customer")
)

In [0]:
%sql
select * from workspace.silver.customer