### **Analysis**

In [0]:
%sql
--1. to check the duplicate customer id
select cst_firstname,cst_lastname,count(cst_firstname),count(cst_lastname) from bronze.crm_cust_info
group by cst_firstname,cst_lastname
having count(cst_firstname)>1 and count(cst_lastname)>1;
--validating the count is true
select *  from bronze.crm_cust_info 
where cst_firstname='Jordan' and cst_lastname='King';





In [0]:
%sql
--2. Validating extra spaces
select * from bronze.crm_cust_info where trim(cst_firstname) != cst_firstname or trim(cst_lastname) != cst_lastname;



In [0]:
%sql
--3. validating abbrevation
select* from bronze.crm_cust_info where cst_marital_status!='M' OR cst_marital_status!='S' or cst_gndr != 'M' or cst_gndr != 'F';
--no data found


In [0]:
%sql
--4. validate for null values
select * from bronze.crm_cust_info where cst_firstname is null or cst_lastname is null or cst_marital_status is null or cst_gndr is null or cst_create_date is null;
--null values are present in all the columns


In [0]:
%sql
--5. validate to check is there any alphanumber in cust_id,cst_key
select * from bronze.crm_cust_info where cst_id not regexp '^[0-9]+$';
--no alphanumber found
select * from bronze.crm_cust_info where cst_key not like 'AW%'
--no alphanumber found


In [0]:
%sql
--6. validate to check the date format follows yyyy-mm-dd pattern
select * from bronze.crm_cust_info where cst_create_date not regexp '^[0-9]{4}-[0-9]{2}-[0-9]{2}$';


### **Transformation of data**

## Remove Duplicate

In [0]:
import pyspark.sql.functions as F
df = spark.read.table("bronze.crm_cust_info")
# remove duplicate firstname and lastname
dfdropduplicates = df.dropDuplicates(['cst_firstname', 'cst_lastname','cst_id'])
#df2 = dfdropduplicates.where((F.col('cst_firstname') == 'Jordan') & (F.col('cst_lastname') =='King') )
#display(df2)




## Trimming

In [0]:
# remove the extra space
columnsfilter =["cst_id","cst_key","cst_firstname","cst_lastname","cst_marital_status","cst_gndr","cst_create_date"]
dftrimspace = dfdropduplicates
for i in columnsfilter:
  dftrimspace= dftrimspace.withColumn(i, F.trim(F.col(i)))

## Handling null values

In [0]:

#handling null values
dfhandlenull = dftrimspace.dropna(subset=['cst_id','cst_key','cst_firstname','cst_lastname'])
dfhandlenull=dfhandlenull.withColumn('cst_create_date',F.coalesce(F.col("cst_create_date"),F.date_format(F.current_date(),'yyyy-MM-dd')))
dfhandlenull = dfhandlenull.fillna('M',subset=['cst_gndr','cst_marital_status'])
display(dfhandlenull)


**Normalisation**

In [0]:
dfhandlenull=(dfhandlenull.withColumn('cst_marital_status',
                                     F.when(F.col('cst_marital_status') == 'S', 'Single')
                                     .when(F.col('cst_marital_status') == 'M', 'Married')
                                     .otherwise('n/a')
                                    )
                         .withColumn('cst_gndr',
                                    F.when(F.col('cst_gndr') == 'M', 'Male')
                                    .when(F.col('cst_gndr') == 'F', 'Female')
                                    .otherwise('n/a')
                                    )
                )

## Rename columns

In [0]:

RENAME_MAP = {
    "cst_id": "customer_id",
    "cst_key": "customer_number",
    "cst_firstname": "first_name",
    "cst_lastname": "last_name",
    "cst_marital_status": "marital_status",
    "cst_gndr": "gender",
    "cst_create_date": "created_date"
}
for old_name, new_name in RENAME_MAP.items():
    dfhandlenull = dfhandlenull.withColumnRenamed(old_name, new_name)

## Write to Silver table

In [0]:
#write to silver table
spark.sql("DROP TABLE IF EXISTS silver.crm_cust_info")
dfhandlenull.write.mode("overwrite").saveAsTable("silver.crm_cust_info")
