#### **Silver Layer: Data Cleaning and Integration**

#### **Load Bronze Data**

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 3, Finished, Available, Finished)

In [2]:
# Load csv to a Spark Dataframe
customers = spark.read.format('csv').options(header = True, inferSchema = True).load("abfss://87acb0e3-0dec-4a97-b910-b1a254cfaaf3@onelake.dfs.fabric.microsoft.com/370d5687-e604-4a7a-8842-829e7a70512b/Files/Supply_Chain_Raw/dim_customers")
date = spark.read.format('csv').options(header = True, inferSchema = True).load("abfss://87acb0e3-0dec-4a97-b910-b1a254cfaaf3@onelake.dfs.fabric.microsoft.com/370d5687-e604-4a7a-8842-829e7a70512b/Files/Supply_Chain_Raw/dim_date")
order_agg = spark.read.format('csv').options(header = True, inferSchema = True).load("abfss://87acb0e3-0dec-4a97-b910-b1a254cfaaf3@onelake.dfs.fabric.microsoft.com/370d5687-e604-4a7a-8842-829e7a70512b/Files/Supply_Chain_Raw/dim_order_agg")
order_lines = spark.read.format('csv').options(header = True, inferSchema = True).load("abfss://87acb0e3-0dec-4a97-b910-b1a254cfaaf3@onelake.dfs.fabric.microsoft.com/370d5687-e604-4a7a-8842-829e7a70512b/Files/Supply_Chain_Raw/dim_order_lines")
products = spark.read.format('csv').options(header = True, inferSchema = True).load("abfss://87acb0e3-0dec-4a97-b910-b1a254cfaaf3@onelake.dfs.fabric.microsoft.com/370d5687-e604-4a7a-8842-829e7a70512b/Files/Supply_Chain_Raw/dim_product")
targets = spark.read.format('csv').options(header = True, inferSchema = True).load("abfss://87acb0e3-0dec-4a97-b910-b1a254cfaaf3@onelake.dfs.fabric.microsoft.com/370d5687-e604-4a7a-8842-829e7a70512b/Files/Supply_Chain_Raw/dim_target")

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 4, Finished, Available, Finished)

In [3]:
display(customers.limit(3))
display(date.limit(3))
display(order_agg.limit(3))
display(order_lines.limit(3))
display(products.limit(3))
display(targets.limit(3))

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 39669358-3d73-48f8-824b-cdb87473aa02)

SynapseWidget(Synapse.DataFrame, c89d93c0-15d3-4201-a394-566e104fa2a3)

SynapseWidget(Synapse.DataFrame, b89f6ce6-61b7-4b62-a824-6c2a5383181c)

SynapseWidget(Synapse.DataFrame, 38545f5d-67c6-4845-b25d-ac60f92b4844)

SynapseWidget(Synapse.DataFrame, d2237022-ef08-42fa-a693-252d0acfec0d)

SynapseWidget(Synapse.DataFrame, 1302609a-1b0e-4406-839a-0f22adb032b2)

#### **Data Cleaning and Enriching**

In [4]:
order_lines = order_lines.dropna(subset=['order_placement_date', 'order_id', 'customer_id', 'order_qty'])
order_agg = order_agg.dropna(subset=['order_placement_date', 'order_id', 'customer_id'])

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 6, Finished, Available, Finished)

In [5]:
# order_agg
cols2 = ["order_id", "customer_id"]
for c in cols2:
    order_agg = order_agg.withColumn(c, col(c).cast("string"))

order_agg = order_agg.withColumn("order_placement_date", to_date(col("order_placement_date"), "dd-MMM-yy"))

# targets
targets = targets.withColumn("customer_id", col("customer_id").cast("string"))

targets = (
    targets
        .withColumnRenamed("ontime_target%", "ontime_target")
        .withColumnRenamed("infull_target%", "infull_target")
        .withColumnRenamed("otif_target%", "otif_target")
)

#date
date = date.withColumn("date", to_date(col("date"), "dd-MMM-yy"))

display(order_agg)

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0c1af1c8-e4ee-431a-a37a-eac3865f8d79)

In [6]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# order_lines

order_lines = order_lines.withColumn("agreed_delivery_date",regexp_replace(col("agreed_delivery_date"), r'^\w+, ', ''))
order_lines = order_lines.withColumn("actual_delivery_date",regexp_replace(col("actual_delivery_date"), r'^\w+, ', ''))
order_lines = order_lines.withColumn("order_placement_date",regexp_replace(col("order_placement_date"), r'^\w+, ', ''))

order_lines = order_lines.withColumn("agreed_delivery_date",to_date(col("agreed_delivery_date"), "MMMM d, yyyy"))
order_lines = order_lines.withColumn("actual_delivery_date",to_date(col("actual_delivery_date"), "MMMM d, yyyy"))
order_lines = order_lines.withColumn("order_placement_date",to_date(col("order_placement_date"), "MMMM d, yyyy"))

# order_lines
cols2 = ["product_id", "customer_id"]
for c in cols2:
    order_lines = order_lines.withColumn(c, col(c).cast("string"))

order_lines = (
    order_lines
        .withColumnRenamed("In Full", "in_full")
        .withColumnRenamed("On Time", "on_time")
        .withColumnRenamed("On Time In Full", "on_time_in_full")
)
              
display(order_lines)

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2bef32c0-af3f-48ad-8eab-43ac722457b4)

In [7]:
date = date.drop("mmm_yy", "week_no")

date = date.withColumn("year", year(col("date"))) \
       .withColumn("quarter", quarter(col("date"))) \
       .withColumn("month", month(col("date"))) \
       .withColumn("year_month", date_format(col("date"), "yyyy-MM"))\
       .withColumn("month_name", date_format(col("date"), "MMMM"))\
       .withColumn("week", weekofyear(col("date"))) \
       .withColumn("day_name", date_format(col("date"), "EEEE")) 

date = (
    date
        .withColumnRenamed("year", "years")
        .withColumnRenamed("quarter", "quarters")
        .withColumnRenamed("month", "months")
        .withColumnRenamed("week", "weeks")
)

display(date)

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, dc459c42-7027-40dc-87f6-a132bee69f6c)

In [8]:
order_agg = order_agg.dropna(subset=["order_id", "customer_id", "order_placement_date"])
order_lines = order_lines.dropna(subset=["order_id", "customer_id", "order_placement_date","product_id"])

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 10, Finished, Available, Finished)

In [9]:
date = date.withColumnRenamed("date", "order_placement_date")
customers_renamed = customers.withColumnRenamed("customer_id", "cust_id")
targets_renamed = targets.withColumnRenamed("customer_id", "target_customer_id")

order_lines = (
    order_lines
        .join(date, on="order_placement_date", how="inner")
        .join(customers_renamed, order_lines.customer_id == customers_renamed.cust_id, "inner")
        .join(products, on="product_id", how="inner")
        .join(targets_renamed, customers_renamed.cust_id == targets_renamed.target_customer_id, "inner")
)

order_lines = order_lines.drop("target_customer_id")
display(order_lines)

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c9b6ede3-eff0-41b4-9b5d-418dfa97d1d1)

In [10]:
order_lines.write.mode("overwrite").parquet("Files/Supply_Chain_Silver_Layer_data/Order_lines_silver_layer")

StatementMeta(, 537bdd71-72ba-4769-99e0-72e72ba6d51e, 12, Finished, Available, Finished)