In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import * 

In [0]:
df_sales = spark.read.table('learning_pradeep.pipeline.bronze_fact_sales_normalized')

df_campaign = spark.read.table('learning_pradeep.pipeline.bronze_dim_campaigns')
df_cust = spark.read.table('learning_pradeep.pipeline.bronze_dim_customers')
df_date = spark.read.table('learning_pradeep.pipeline.bronze_dim_dates')
df_prod = spark.read.table('learning_pradeep.pipeline.bronze_dim_products')
df_salesperson = spark.read.table('learning_pradeep.pipeline.bronze_dim_salespersons')
df_store = spark.read.table('learning_pradeep.pipeline.bronze_dim_stores')

# dim cleaning 

#customers

### DIM CUSTOMERS\
## Rules:\
customer_sk NOT NULL\
Remove duplicates\
First/Last name Proper Case\
Email lowercase\
Trim text\
Customer segment meaningful, else mark as Unknown

In [0]:
display(df_cust.limit(5))

# no nulls

In [0]:
df_cust_clean = (df_cust
                 .filter(col('customer_sk').isNull())                
                 
).display()

In [0]:
display(df_cust.groupBy('customer_sk').count().filter(col('count') > 1))

In [0]:
display(df_cust.limit(5))

In [0]:
dim_cust_clean = df_cust.filter(col('customer_sk').isNotNull())\
                .dropDuplicates(['customer_sk'])\
                .withColumn('first_name',initcap(trim(col('first_name'))))\
                .withColumn('last_name',initcap(trim(col('last_name'))))\
                .withColumn('email',lower(trim(col("email"))))\
                .withColumn('customer_segment',when(col('customer_segment').isNull(),'Unknown').otherwise(col('customer_segment')))\
                .withColumn('residential_location',trim(col('residential_location')))


In [0]:
display(dim_cust_clean.limit(25))

In [0]:
df_cust.select('customer_segment').distinct().display()

# products

In [0]:
dim_products_clean = (
    df_prod
    .filter(col("product_sk").isNotNull())
    .dropDuplicates(["product_sk"])
    .withColumn("product_name", initcap(trim(col("product_name"))))
    .withColumn("category", when(col("category").isNull(), "Unknown")
                            .otherwise(initcap(trim(col("category")))))
    .withColumn("brand", when(col("brand").isNull(), "Unknown")
                         .otherwise(initcap(trim(col("brand")))))
    .withColumn("origin_location", when(col("origin_location").isNull(), "Unknown")
                                   .otherwise(initcap(trim(col("origin_location")))))
)


In [0]:
display(dim_products_clean.limit(25))

# stores

In [0]:
dim_stores_clean = (
    df_store
    .filter(col("store_sk").isNotNull())
    .dropDuplicates(["store_sk"])
    .withColumn("store_name", initcap(trim(col("store_name"))))
    .withColumn("store_type", when(col("store_type").isNull(), "Unknown")
                              .otherwise(initcap(trim(col("store_type")))))
    .withColumn("store_location", when(col("store_location").isNull(), "Unknown")
                                  .otherwise(initcap(trim(col("store_location")))))
)


In [0]:
display(dim_stores_clean.limit(25))

# SALESPERSONS

In [0]:
dim_salespersons_clean = (
    df_salesperson
    .filter(col("salesperson_sk").isNotNull())
    .dropDuplicates(["salesperson_sk"])
    .withColumn("salesperson_name", initcap(trim(col("salesperson_name"))))
    .withColumn("salesperson_role", initcap(trim(col("salesperson_role"))))
)


In [0]:
display(dim_salespersons_clean.limit(25))

# CAMPAIGNS

In [0]:
dim_campaigns_clean = (
    df_campaign
    .filter(col("campaign_sk").isNotNull())
    .dropDuplicates(["campaign_sk"])
    .withColumn("campaign_name", initcap(trim(col("campaign_name"))))
    .withColumn("campaign_budget", col("campaign_budget").cast("double"))
)


In [0]:
display(dim_campaigns_clean.limit(25))

# DATES

In [0]:
dim_dates_clean = (
    df_date
    .filter(col("date_sk").isNotNull())
    .dropDuplicates(["date_sk"])
    .withColumn("full_date", to_date(col("full_date"), "yyyy-MM-dd"))
    .withColumn("year", col("year").cast("int"))
    .withColumn("month", col("month").cast("int"))
    .withColumn("day", col("day").cast("int"))
    .withColumn("quarter", col("quarter").cast("int"))
)


In [0]:
display(dim_dates_clean.limit(25))

# FACT SALES

### Rules:
All FK keys must NOT NULL\
sales_date to DATE\
total_amount numeric + â‰¥ 0\
deduplicate rows

In [0]:
fact_sales_clean = (
    df_sales
    .filter(col("customer_sk").isNotNull())
    .filter(col("product_sk").isNotNull())
    .filter(col("store_sk").isNotNull())
    .filter(col("salesperson_sk").isNotNull())
    .filter(col("campaign_sk").isNotNull())
    .withColumn("total_amount", col("total_amount").cast("double"))
    .filter(col("total_amount").isNotNull())
    .filter(col("total_amount") >= 0)
    .withColumn("sales_date", try_to_date(col("sales_date"), "yyyy-MM-dd"))
    .dropDuplicates(["sales_sk"])
)
display(fact_sales_clean.limit(25))

# INTEGRITY VALIDATION WITH JOINS , SO IT WONT FAIL IN DOWNSTREAMS

In [0]:
valid_fact = (
            fact_sales_clean
            .join(dim_cust_clean, "customer_sk" , 'left')\
            .join(dim_products_clean, "product_sk" , "left")\
            .join(dim_stores_clean, 'store_sk', 'left')\
            .join(dim_salespersons_clean, "salesperson_sk",'left')\
            .join(dim_campaigns_clean, "campaign_sk", 'left')
)

print('original_fact_count :' +str(fact_sales_clean.count()))
print('valid_fact_count :' +str(valid_fact.count()))

In [0]:
fact_sales_clean.join(dim_cust_clean, "customer_sk", "left_anti").count()


# WRITING TO SILVER TABLES

In [0]:
fact_sales_clean.write.mode('overwrite')\
                        .saveAsTable('learning_pradeep.pipeline.silver_fact_sales')
    

In [0]:
dim_campaigns_clean.write.mode('overwrite')\
                        .saveAsTable('learning_pradeep.pipeline.silver_dim_campaigns')
    

In [0]:
dim_cust_clean.write.mode('overwrite')\
                        .saveAsTable('learning_pradeep.pipeline.silver_dim_customers')
    

In [0]:
dim_dates_clean.write.mode('overwrite')\
                        .saveAsTable('learning_pradeep.pipeline.silver_dim_dates')
    

In [0]:
dim_products_clean.write.mode('overwrite')\
                        .saveAsTable('learning_pradeep.pipeline.silver_dim_products')
    

In [0]:
dim_salespersons_clean.write.mode('overwrite')\
                        .saveAsTable('learning_pradeep.pipeline.silver_dim_salespersons')
    

In [0]:
dim_stores_clean.write.mode('overwrite')\
                        .saveAsTable('learning_pradeep.pipeline.silver_dim_stores')
    