# Import the required libraries

## Importing the libraries

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import StringType, trim, col

In [0]:
RENAME_MAP = {
    'cst_id':'customer_id',
    'cst_key':'customer_key',
    'cst_firstname':'firstname',
    'cst_lastname':'lastname',
    'cst_marital_status':'marital_status',
    'cst_gndr':'gender',
    'cst_create_date':'created_date'
}

# Reading From Bronze

In [0]:
df = spark.table("workspace.bronze.crm_cust_info")

## Trimming 

In [0]:
#trim the string
# normalization for marital_status,gender
# names are not friends
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))
    

In [0]:
df.display()

## Normalization

In [0]:
df = (
    df.
    withColumn("cst_marital_status",
                F.when(F.upper(F.col("cst_marital_status")) == 'S','Single')
                .when(F.upper(F.col("cst_marital_status")) == 'M','Married')
                .otherwise("n/a")
    )
    .withColumn("cst_gndr",
                F.when(F.upper(F.col("cst_gndr")) == 'M','Male')
                .when(F.upper(F.col("cst_gndr")) == 'F','Female')
                .otherwise("n/a")
                )
)
    
df.display()

# Renaming the columns

In [0]:
for old_name,new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name,new_name)
df.display()

# Write Into Silver Table

In [0]:
(
    df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver.crm_customers")
)