### **Clean & Load crm_cust_info**

**Check Duplicates or null in primary key**

In [1]:
%%sql
select cst_id, count(*)
from Bronze.cust_info
group by cst_id
having count(*) > 1 or cst_id is null

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 2, Finished, Available, Finished)

<Spark SQL result set with 6 rows and 2 fields>

In [2]:
%%sql
select *
from (
    select *,
    row_number() over(partition by cst_id order by cst_create_date desc) as flag_last
    from Bronze.cust_info
    where cst_id is not null
)where flag_last = 1

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 3, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 8 fields>

**Check for unwanted spaces**

In [3]:
%%sql
select cst_firstname
from Bronze.cust_info
where cst_firstname != TRIM(cst_firstname)

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 4, Finished, Available, Finished)

<Spark SQL result set with 17 rows and 1 fields>

In [4]:
%%sql
select
cst_id,
cst_key,
trim(cst_firstname) as cst_firstname,
trim(cst_lastname) as cst_lastname,
cst_marital_status,
cst_gndr,
cst_create_date
from (
    select *,
    row_number() over(partition by cst_id order by cst_create_date desc) as flag_last
    from Bronze.cust_info
    where cst_id is not null
)where flag_last = 1

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 5, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 7 fields>

**handling consistency of values in low cardinality**

In [5]:
%%sql
select distinct cst_gndr
from Bronze.cust_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 6, Finished, Available, Finished)

<Spark SQL result set with 3 rows and 1 fields>

In [6]:
%%sql
select distinct cst_marital_status
from Bronze.cust_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 7, Finished, Available, Finished)

<Spark SQL result set with 3 rows and 1 fields>

In [7]:
%%sql
select
cst_id,
cst_key,
trim(cst_firstname) as cst_firstname,
trim(cst_lastname) as cst_lastname,
CASE WHEN UPPER(TRIM(cst_marital_status)) = 'S' THEN 'Single'
     WHEN UPPER(TRIM(cst_marital_status)) = 'M' THEN 'Married'
     ELSE 'n/a'
END cst_marital_status,
CASE WHEN UPPER(TRIM(cst_gndr)) = 'F' THEN 'Female'
     WHEN UPPER(TRIM(cst_gndr)) = 'M' THEN 'Male'
     ELSE 'n/a'
END cst_gndr,
cst_create_date,
current_date() as dwh_create_date
from (
    select *,
    row_number() over(partition by cst_id order by cst_create_date desc) as flag_last
    from Bronze.cust_info
    where cst_id is not null
)where flag_last = 1

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 8, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 8 fields>

In [8]:
df_cust_info = spark.sql( 
    """
    select
cst_id,
cst_key,
trim(cst_firstname) as cst_firstname,
trim(cst_lastname) as cst_lastname,
CASE WHEN UPPER(TRIM(cst_marital_status)) = 'S' THEN 'Single'
     WHEN UPPER(TRIM(cst_marital_status)) = 'M' THEN 'Married'
     ELSE 'n/a'
END cst_marital_status,
CASE WHEN UPPER(TRIM(cst_gndr)) = 'F' THEN 'Female'
     WHEN UPPER(TRIM(cst_gndr)) = 'M' THEN 'Male'
     ELSE 'n/a'
END cst_gndr,
cst_create_date,
current_date() as dwh_create_date
from (
    select *,
    row_number() over(partition by cst_id order by cst_create_date desc) as flag_last
    from Bronze.cust_info
    where cst_id is not null
)where flag_last = 1
    """
)

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 10, Finished, Available, Finished)

In [9]:
display(df_cust_info)

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 908556f1-9f7a-44fd-8988-d592e384fcf6)

In [10]:
df_cust_info.write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("Silver.crm_cust_info")

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 12, Finished, Available, Finished)

### **Clean & Load crm_prd_info**

**Check Duplicates or null in primary key**

In [11]:
%%sql
select prd_id, count(*)
from Bronze.prd_info
group by prd_id
having count(*) > 1 or prd_id is null

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 13, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 2 fields>

In [12]:
%%sql
select 
prd_id,
prd_key,
REPLACE(substring(prd_key, 1, 5),'-', '_')as cat_id,
prd_nm,
prd_cost,
prd_line,
prd_start_dt,
prd_end_dt
from Bronze.prd_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 14, Finished, Available, Finished)

<Spark SQL result set with 397 rows and 8 fields>

In [13]:
%%sql
select distinct id
from Bronze.PX_CAT_G1V2

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 15, Finished, Available, Finished)

<Spark SQL result set with 37 rows and 1 fields>

In [14]:
%%sql
select 
prd_id,
prd_key,
REPLACE(substring(prd_key, 1, 5),'-', '_')as cat_id,
substring(prd_key, 7, LEN(prd_key)) as prd_key,
prd_nm,
prd_cost,
prd_line,
prd_start_dt,
prd_end_dt
from Bronze.prd_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 16, Finished, Available, Finished)

<Spark SQL result set with 397 rows and 9 fields>

In [15]:
%%sql
select sls_prd_key from Bronze.sales_details

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 17, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 1 fields>

**Check Unwanted spaces**

In [16]:
%%sql
select prd_nm
from Bronze.prd_info
where prd_nm != TRIM(prd_nm)

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 18, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 1 fields>

**Check null or negative prices**

In [17]:
%%sql
select prd_cost
from Bronze.prd_info
where prd_cost < 0 or prd_cost is null

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 19, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 1 fields>

In [18]:
%%sql
select 
prd_id,
prd_key,
REPLACE(substring(prd_key, 1, 5),'-', '_')as cat_id,
substring(prd_key, 7, LEN(prd_key)) as prd_key,
prd_nm,
COALESCE(prd_cost,0) as prd_cost,
prd_line,
prd_start_dt,
prd_end_dt
from Bronze.prd_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 20, Finished, Available, Finished)

<Spark SQL result set with 397 rows and 9 fields>

**Data standardization and normalization**

In [19]:
%%sql
select distinct prd_line
from Bronze.prd_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 21, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 1 fields>

In [20]:
%%sql
select 
prd_id,
prd_key,
REPLACE(substring(prd_key, 1, 5),'-', '_')as cat_id,
substring(prd_key, 7, LEN(prd_key)) as prd_key,
prd_nm,
COALESCE(prd_cost,0) as prd_cost,
CASE WHEN UPPER(TRIM(prd_line)) = 'S' THEN 'Other Sales'
     WHEN UPPER(TRIM(prd_line)) = 'M' THEN 'Mountain'
     WHEN UPPER(TRIM(prd_line)) = 'R' THEN 'Road'
     WHEN UPPER(TRIM(prd_line)) = 'T' THEN 'Touring'
     ELSE 'n/a'
END AS prd_line,
prd_start_dt,
prd_end_dt
from Bronze.prd_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 22, Finished, Available, Finished)

<Spark SQL result set with 397 rows and 9 fields>

**Check invalid date orders**

In [21]:
%%sql
select *
from Bronze.prd_info
where prd_end_dt < prd_start_dt

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 23, Finished, Available, Finished)

<Spark SQL result set with 200 rows and 7 fields>

In [22]:
%%sql
select 
prd_id,
prd_key,
REPLACE(substring(prd_key, 1, 5),'-', '_')as cat_id,
substring(prd_key, 7, LEN(prd_key)) as prd_key,
prd_nm,
COALESCE(prd_cost,0) as prd_cost,
CASE WHEN UPPER(TRIM(prd_line)) = 'S' THEN 'Other Sales'
     WHEN UPPER(TRIM(prd_line)) = 'M' THEN 'Mountain'
     WHEN UPPER(TRIM(prd_line)) = 'R' THEN 'Road'
     WHEN UPPER(TRIM(prd_line)) = 'T' THEN 'Touring'
     ELSE 'n/a'
END AS prd_line,
CAST(prd_start_dt AS DATE),
CAST(LEAD(CAST(prd_start_dt AS DATE)) OVER (PARTITION BY prd_key ORDER BY prd_start_dt) - INTERVAL '1 day' AS DATE) AS prd_end_dt,
current_date() as dwh_create_date
from Bronze.prd_info

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 24, Finished, Available, Finished)

<Spark SQL result set with 397 rows and 10 fields>

In [23]:
df_prod_info = spark.sql( 
    """
    select 
prd_id,
REPLACE(substring(prd_key, 1, 5),'-', '_')as cat_id,
substring(prd_key, 7, LEN(prd_key)) as prd_key,
prd_nm,
COALESCE(prd_cost,0) as prd_cost,
CASE WHEN UPPER(TRIM(prd_line)) = 'S' THEN 'Other Sales'
     WHEN UPPER(TRIM(prd_line)) = 'M' THEN 'Mountain'
     WHEN UPPER(TRIM(prd_line)) = 'R' THEN 'Road'
     WHEN UPPER(TRIM(prd_line)) = 'T' THEN 'Touring'
     ELSE 'n/a'
END AS prd_line,
CAST(prd_start_dt AS DATE),
CAST(LEAD(CAST(prd_start_dt AS DATE)) OVER (PARTITION BY prd_key ORDER BY prd_start_dt) - INTERVAL '1 day' AS DATE) AS prd_end_dt,
current_date() as dwh_create_date
from Bronze.prd_info
    """
)

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 25, Finished, Available, Finished)

In [24]:
display(df_prod_info)

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0b3a8959-d28b-43f7-a9cf-97801d8e0dc4)

In [25]:
df_prod_info.write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("Silver.crm_prd_info")

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 27, Finished, Available, Finished)

### **Clean & Load crm_cust_info**

In [26]:
%%sql
SELECT
sls_ord_num,
sls_prd_key,
sls_cust_id,
sls_order_dt,
sls_ship_dt,
sls_due_dt,
sls_sales,
sls_quantity,
sls_price
from Bronze.sales_details

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 28, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 9 fields>

In [27]:
%%sql
SELECT
sls_ord_num,
sls_prd_key,
sls_cust_id,
sls_order_dt,
sls_ship_dt,
sls_due_dt,
sls_sales,
sls_quantity,
sls_price
from Bronze.sales_details
where sls_ord_num != trim(sls_ord_num)

StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 29, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 9 fields>

**Check invalid dates**

In [28]:
%%sql
select 
nullif(sls_order_dt,0) as sls_order_dt
from Bronze.sales_details
where 
sls_order_dt <= 0 
or len(sls_order_dt) != 8
or sls_order_dt > 20250101
or sls_order_dt < 19000101


StatementMeta(, 4bc288b1-8e49-4a44-810e-f91debec3f4a, 30, Finished, Available, Finished)

<Spark SQL result set with 19 rows and 1 fields>

In [1]:
%%sql
SELECT
sls_ord_num,
sls_prd_key,
sls_cust_id,
case when sls_order_dt = 0 or len(sls_order_dt) != 8 then null
    else to_date(cast(sls_order_dt as STRING),'yyyyMMdd')
end as sls_order_dt,
case when sls_ship_dt = 0 or len(sls_ship_dt) != 8 then null
    else to_date(cast(sls_ship_dt as STRING), 'yyyyMMdd')
end as sls_ship_dt,
case when sls_due_dt = 0 or len(sls_due_dt) != 8 then null
    else to_date(cast(sls_due_dt as STRING), 'yyyyMMdd')
end as sls_due_dt,
sls_sales,
sls_quantity,
sls_price
from Bronze.sales_details

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 2, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 9 fields>

In [13]:
%%sql
select 
sls_sales AS OLD_SALES,
sls_quantity,
sls_price AS OLD_PRICE,

case when sls_sales is null or sls_sales <=0 or sls_sales != sls_quantity * ABS(sls_price)
        THEN sls_quantity * ABS(sls_price)
     ELSE sls_sales
END AS sls_sales,

CASE WHEN sls_price IS NULL OR sls_price <= 0
        THEN sls_sales / nullif (sls_quantity,0)
     else sls_price
end as sls_price

from Bronze.sales_details
where sls_sales != sls_quantity*sls_price
or sls_sales is null or sls_quantity is null or sls_price is null
or sls_sales <= 0 or sls_quantity <= 0 or sls_price <= 0

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 14, Finished, Available, Finished)

<Spark SQL result set with 35 rows and 5 fields>

In [15]:
%%sql
SELECT
sls_ord_num,
sls_prd_key,
sls_cust_id,
case when sls_order_dt = 0 or len(sls_order_dt) != 8 then null
    else to_date(cast(sls_order_dt as STRING),'yyyyMMdd')
end as sls_order_dt,
case when sls_ship_dt = 0 or len(sls_ship_dt) != 8 then null
    else to_date(cast(sls_ship_dt as STRING), 'yyyyMMdd')
end as sls_ship_dt,
case when sls_due_dt = 0 or len(sls_due_dt) != 8 then null
    else to_date(cast(sls_due_dt as STRING), 'yyyyMMdd')
end as sls_due_dt,
case when sls_sales is null or sls_sales <=0 or sls_sales != sls_quantity * ABS(sls_price)
        THEN sls_quantity * ABS(sls_price)
     ELSE sls_sales
END AS sls_sales,
sls_quantity,
CASE WHEN sls_price IS NULL OR sls_price <= 0
        THEN sls_sales / nullif (sls_quantity,0)
     else sls_price
end as sls_price,
current_date() as dwh_create_date
from Bronze.sales_details

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 16, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 10 fields>

In [16]:
df_sales_details = spark.sql( 
    """
    SELECT
sls_ord_num,
sls_prd_key,
sls_cust_id,
case when sls_order_dt = 0 or len(sls_order_dt) != 8 then null
    else to_date(cast(sls_order_dt as STRING),'yyyyMMdd')
end as sls_order_dt,
case when sls_ship_dt = 0 or len(sls_ship_dt) != 8 then null
    else to_date(cast(sls_ship_dt as STRING), 'yyyyMMdd')
end as sls_ship_dt,
case when sls_due_dt = 0 or len(sls_due_dt) != 8 then null
    else to_date(cast(sls_due_dt as STRING), 'yyyyMMdd')
end as sls_due_dt,
case when sls_sales is null or sls_sales <=0 or sls_sales != sls_quantity * ABS(sls_price)
        THEN sls_quantity * ABS(sls_price)
     ELSE sls_sales
END AS sls_sales,
sls_quantity,
CASE WHEN sls_price IS NULL OR sls_price <= 0
        THEN sls_sales / nullif (sls_quantity,0)
     else sls_price
end as sls_price,
current_date() as dwh_create_date
from Bronze.sales_details
    """
)

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 18, Finished, Available, Finished)

In [17]:
df_sales_details.write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("Silver.crm_sales_details")

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 19, Finished, Available, Finished)

### **Clean & Load erp_cust_az12**

In [22]:
%%sql
select
case when cid like 'NAS%' then substring(cid, 4, len(cid))
    else cid
end as cid,
case when bdate > current_date() then null
     else bdate
end as bdate,
gen
from Bronze.cust_az12

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 24, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 3 fields>

In [24]:
%%sql
select
distinct gen,
case when upper(trim(gen)) in ('F', 'FEMALE') THEN 'Female'
     when upper(trim(gen)) in ('M', 'MALE') THEN 'Male'
     ELSE 'n/a'
end as gen
from Bronze.cust_az12

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 26, Finished, Available, Finished)

<Spark SQL result set with 9 rows and 2 fields>

In [26]:
%%sql
select
case when cid like 'NAS%' then substring(cid, 4, len(cid))
    else cid
end as cid,
case when bdate > current_date() then null
     else bdate
end as bdate,
case when upper(trim(gen)) in ('F', 'FEMALE') THEN 'Female'
     when upper(trim(gen)) in ('M', 'MALE') THEN 'Male'
     ELSE 'n/a'
end as gen,
current_date() as dwh_create_date
from Bronze.cust_az12

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 28, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 4 fields>

In [27]:
df_cust_az12 = spark.sql( 
    """
    select
case when cid like 'NAS%' then substring(cid, 4, len(cid))
    else cid
end as cid,
case when bdate > current_date() then null
     else bdate
end as bdate,
case when upper(trim(gen)) in ('F', 'FEMALE') THEN 'Female'
     when upper(trim(gen)) in ('M', 'MALE') THEN 'Male'
     ELSE 'n/a'
end as gen,
current_date() as dwh_create_date
from Bronze.cust_az12
    """
)

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 29, Finished, Available, Finished)

In [28]:
df_cust_az12.write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("Silver.erp_cust_az12")

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 30, Finished, Available, Finished)

### **Clean & Load erp_loc_a101**

In [32]:
%%sql
select
REPLACE(cid, '-', ''),
cntry
from loc_a101

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 34, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 2 fields>

In [33]:
%%sql
SELECT DISTINCT cntry
from loc_a101
group by cntry

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 35, Finished, Available, Finished)

<Spark SQL result set with 13 rows and 1 fields>

In [37]:
%%sql
select
REPLACE(cid, '-', '') as cid,
case when UPPER(trim(cntry)) = 'DE' then 'Germany'
     when UPPER(trim(cntry)) IN ('US', 'USA') then 'United States'
     when UPPER(trim(cntry)) = '' or cntry is null then 'n/a'
     else trim(cntry)
end as cntry,
current_date() as dwh_create_date
from loc_a101

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 39, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 3 fields>

In [38]:
df_loc_a101 = spark.sql( 
    """
    select
REPLACE(cid, '-', '') as cid,
case when UPPER(trim(cntry)) = 'DE' then 'Germany'
     when UPPER(trim(cntry)) IN ('US', 'USA') then 'United States'
     when UPPER(trim(cntry)) = '' or cntry is null then 'n/a'
     else trim(cntry)
end as cntry,
current_date() as dwh_create_date
from loc_a101
    """
)

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 40, Finished, Available, Finished)

In [39]:
df_loc_a101.write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("Silver.erp_loc_a101")

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 41, Finished, Available, Finished)

### **Clean & Load erp_px_cat_g1v2**

In [43]:
%%sql
select
id,
cat,
subcat,
maintenance
from px_cat_g1v2

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 45, Finished, Available, Finished)

<Spark SQL result set with 37 rows and 4 fields>

In [49]:
%%sql
select
id,
cat,
subcat,
maintenance
from px_cat_g1v2
where id != trim(id) 

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 51, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 4 fields>

In [53]:
%%sql
select
distinct maintenance
from px_cat_g1v2

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 55, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 1 fields>

In [54]:
%%sql
select
id,
cat,
subcat,
maintenance,
current_date() as dwh_create_date
from px_cat_g1v2

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 56, Finished, Available, Finished)

<Spark SQL result set with 37 rows and 5 fields>

In [55]:
df_px_cat_g1v2 = spark.sql( 
    """
select
id,
cat,
subcat,
maintenance,
current_date() as dwh_create_date
from px_cat_g1v2
    """
)

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 57, Finished, Available, Finished)

In [56]:
df_px_cat_g1v2.write.format("delta")\
    .mode("overwrite")\
    .saveAsTable("Silver.erp_df_px_cat_g1v2")

StatementMeta(, eef91e93-fbc5-4190-8cb3-675a04835c99, 58, Finished, Available, Finished)