In [0]:


spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_access_key
)



In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [0]:


# customer Schema

customers_schema=StructType([
    StructField("cst_id", IntegerType(),True),
    StructField("cst_key",StringType(),True),
    StructField("cst_firstname",StringType(),True),
    StructField("cst_lastname",StringType(),True),
    StructField("cst_marital_status",StringType(),True),
    StructField("cst_gndr",StringType(),True),
    StructField("cst_create_date",StringType(),True)
])

# product schema

product_schema = StructType([
    StructField("prd_id",IntegerType(),True),
    StructField("prd_key",StringType(),True),
    StructField("prd_nm",StringType(),True),
    StructField("prd_cost",IntegerType(),True),
    StructField("prd_line",StringType(),True),
    StructField("prd_startdate",StringType(),True),
    StructField("prd_enddate",StringType(),True)
])


# sales schema

sales_schema=StructType([
    StructField("sls_ord_num",StringType(),True),
    StructField("sls_prd_key",StringType(),True),
    StructField("sls_cust_id",IntegerType(),True),
    StructField("sls_order_dt",StringType(),True),
    StructField("sls_ship_dt",StringType(),True),
    StructField("sls_due_dt",StringType(),True),
    StructField("sls_sales",IntegerType(),True),
    StructField("sls_quantity",IntegerType(),True),
    StructField("sls_price",IntegerType(),True)
    
])


In [0]:
container = "datawarehouse"
storage_account_name = "bronzelayer41"

print("Customer Data")

df_customers = spark.read.format("csv").schema(customers_schema).option("header","true").load(f"abfss://{container}@{storage_account_name}.dfs.core.windows.net/crm/cust_info.csv")

df_product=spark.read.format("csv").option("header","true").schema(product_schema).load(f"abfss://{container}@{storage_account_name}.dfs.core.windows.net/crm/prd_info.csv")

df_sales = spark.read.format("csv").schema(sales_schema).option("header","true").load(f"abfss://{container}@{storage_account_name}.dfs.core.windows.net/crm/sales_details.csv")






In [0]:
df_trimmed = df_customers.filter(col("cst_firstname")!=trim(col("cst_firstname"))).select("cst_firstname")
display(df_trimmed)

### Checking duplicates in primary key


In [0]:
df_customers.columns

In [0]:
window_partition = Window.partitionBy("cst_id").orderBy(desc("cst_create_date"))
df_clean = df_customers.withColumn("rank",row_number().over(window_partition))\
     .filter(col("rank")==1)\
    .filter(col("cst_id").isNotNull())\
     .withColumn("cst_firstname",initcap(trim(col("cst_firstname"))))\
      .withColumn("cst_lastname",initcap(trim(col("cst_lastname"))))\
       .select("cst_id","cst_key","cst_firstname","cst_lastname","cst_marital_status","cst_gndr","cst_create_date")

display(df_clean)

 

In [0]:
df_clean2 = df_clean.withColumn("cst_marital_status",when(initcap(trim(col("cst_marital_status")))=="M","Married")\
    .when(initcap(trim(col("cst_marital_status")))=="S","Single")\
    .otherwise("NA"))\
    .withColumn("cst_gndr",when(initcap(trim(col("cst_gndr")))=="M","Male")\
    .when(initcap(trim(col("cst_gndr")))=="F","Female")\
    .otherwise("NA"))\
    .withColumn("dhw_created_date",current_timestamp())

display(df_clean2)

In [0]:
df_trimmed = df_clean2.filter(col("cst_firstname")!=trim(col("cst_firstname"))).select("cst_firstname")
display(df_trimmed)


In [0]:
df_clean2.write\
    .mode("overwrite")\
    .parquet(f"abfss://sliver@{storage_account_name}.dfs.core.windows.net/customer.parquet")