#### Import the necessary packages

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import count,col

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 6, Finished, Available, Finished)

#### Reading raw data

In [5]:
def readRawData(file_name):
    base_path = "abfss://efb0e5a4-308e-4a79-84ec-25af3ae281d3@onelake.dfs.fabric.microsoft.com/eea3f7b4-4cac-435f-8e42-4a4740b53b47/Files/Bronze"
    file_path=f'{base_path}/{file_name}.dlm.gz'
    df=spark.read.format("csv").option("compression","gzip")\
            .option("header","True")\
            .option("delimiter","|")\
            .load(file_path)
    return df

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 7, Finished, Available, Finished)

In [6]:
# read the raw data
fact_averagecosts = readRawData("fact.averagecosts")
fact_transactions = readRawData("fact.transactions")

hier_clnd = readRawData("hier.clnd")
hier_hldy = readRawData("hier.hldy")
hier_invloc = readRawData("hier.invloc")
hier_invstatus = readRawData("hier.invstatus")
hier_possite = readRawData("hier.possite")
hier_pricestate= readRawData("hier.pricestate")
hier_prod= readRawData("hier.prod")
hier_rtlloc= readRawData("hier.rtlloc")

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 8, Finished, Available, Finished)

In [7]:
fact_transactions.createOrReplaceTempView("factTransView")

hier_prod.createOrReplaceTempView("dimProdView")
hier_clnd.createOrReplaceTempView("dimClndView")
hier_hldy.createOrReplaceTempView("dimHldyView")
hier_invloc.createOrReplaceTempView("dimInvlocView")
hier_invstatus.createOrReplaceTempView("dimInvstatusView")
hier_possite.createOrReplaceTempView("dimPossiteView")
hier_pricestate.createOrReplaceTempView("dimPricestateView")
hier_rtlloc.createOrReplaceTempView("dimRtllocView")

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 9, Finished, Available, Finished)

#### Non null check for primary key

In [8]:
%%sql
select *
from dimProdView
where sku_id is null

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 10, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 15 fields>

#### Uniqueness of primary key

In [9]:
%%sql
select count(sku_id) 
from dimProdView
group by sku_id
having count(sku_id) >1

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 11, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 1 fields>

#### Counting the nulls from all columns

In [10]:
# null on dimension table
Dict_Null = {col:hier_prod.filter(hier_prod[col].isNull()).count() for col in hier_prod.columns}
Dict_Null

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 12, Finished, Available, Finished)

{'sku_id': 0,
 'sku_label': 0,
 'stylclr_id': 0,
 'stylclr_label': 0,
 'styl_id': 0,
 'styl_label': 0,
 'subcat_id': 0,
 'subcat_label': 0,
 'cat_id': 0,
 'cat_label': 0,
 'dept_id': 0,
 'dept_label': 0,
 'issvc': 0,
 'isasmbly': 0,
 'isnfs': 0}

In [11]:
# null check on fact table
Dict_Null = {col:fact_transactions.filter(fact_transactions[col].isNull()).count() for col in fact_transactions.columns}
Dict_Null

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 13, Finished, Available, Finished)

{'order_id': 0,
 'line_id': 0,
 'type': 0,
 'dt': 0,
 'pos_site_id': 0,
 'sku_id': 0,
 'fscldt_id': 0,
 'price_substate_id': 0,
 'sales_units': 0,
 'sales_dollars': 0,
 'discount_dollars': 0,
 'original_order_id': 3993143,
 'original_line_id': 3987518}

#### Foreign Key Check

In [12]:
%%sql
-- btween prod and transaction table

SELECT f.sku_id 
FROM factTransView f
LEFT JOIN dimProdView d ON f.sku_id = d.sku_id
WHERE d.sku_id IS NULL;

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 14, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 1 fields>

#### Casting of Columns

In [13]:
df_final = fact_transactions.withColumn("order_id", col("order_id").cast(LongType())) \
            .withColumn("line_id", col("line_id").cast(LongType())) \
            .withColumn("pos_site_id", col("pos_site_id").cast(LongType())) \
            .withColumn("sku_id", col("sku_id").cast(LongType())) \
            .withColumn("fscldt_id", col("fscldt_id").cast(LongType())) \
            .withColumn("price_substate_id", col("price_substate_id").cast(LongType())) \
            .withColumn("original_order_id", col("original_order_id").cast(LongType())) \
            .withColumn("original_line_id", col("original_line_id").cast(LongType())) \
            .withColumn("type", col("type").cast(StringType())) \
            .withColumn("dt", col("dt").cast(StringType())) \
            .withColumn("sales_units", col("sales_units").cast(StringType())) \
            .withColumn("sales_dollars", col("sales_dollars").cast(StringType())) \
            .withColumn("discount_dollars", col("discount_dollars").cast(StringType()))

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 15, Finished, Available, Finished)

#### Create staging tables

In [14]:
hier_clnd.write.mode("overwrite").saveAsTable("stg_hier_clnd")
hier_hldy.write.mode("overwrite").saveAsTable("stg_hier_hldy")
hier_invloc.write.mode("overwrite").saveAsTable("stg_hier_invloc")
hier_invstatus.write.mode("overwrite").saveAsTable("stg_hier_invstatus")
hier_possite.write.mode("overwrite").saveAsTable("stg_hier_possite")
hier_pricestate.write.mode("overwrite").saveAsTable("stg_hier_pricestate")
hier_prod.write.mode("overwrite").saveAsTable("stg_hier_prod")
hier_rtlloc.write.mode("overwrite").saveAsTable("stg_hier_rtlloc")
hier_rtlloc.write.mode("overwrite").saveAsTable("stg_hier_rtlloc")

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 16, Finished, Available, Finished)

In [19]:
df = spark.sql("""
        select T.*,fsclwk_id 
        from factTransView T
        LEFT JOIN dimProdView P
        ON T.sku_id = P.sku_id
        LEFT JOIN dimPossiteView S
        on T.pos_site_id = S.site_id
        LEFT JOIN dimClndView C
        ON C.fscldt_id = T.fscldt_id
        LEFT JOIN dimPricestateView PR 
        ON PR.substate_id = T.price_substate_id
    """)

df.write.mode("overwrite").saveAsTable("stg_fact_transactions")

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 21, Finished, Available, Finished)

#### Fact table

In [20]:
spark.table("stg_fact_transactions").write.mode("append").saveAsTable("fact_transactions")

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 22, Finished, Available, Finished)

#### The Final refined table

In [24]:
'''Create a refined table called mview_weekly_sales which totals sales_units, sales_dollars, and discount_dollars by pos_site_id, sku_id, fsclwk_id, price_substate_id and type'''

df=spark.sql(""" SELECT pos_site_id,
                    sku_id,
                    fsclwk_id,
                    price_substate_id,
                    type,
                    sum(sales_units) as totals_sales_units, 
                    sum(sales_dollars) as total_sales_dollars, 
                    sum(discount_dollars) as total_discount_dollars
                FROM fact_transactions 
                GROUP BY
                    pos_site_id,
                    sku_id,
                    fsclwk_id,
                    price_substate_id,
                    type

        """) 
df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("mview_weekly_sales") 

StatementMeta(, ca637f8b-e71b-40f7-a92e-3f061733b255, 26, Finished, Available, Finished)