##### This Notebook is used to generate, clean, and manipulate data from a ADW data source
##### from bronze layer
####  
##### And then load the Incremental output into a Lakehouse Silver Layer..
##### 
##### 
##### 
##### The particular dataset being used in this example is the ADWLt Sample SQL SOurce.

In [1]:
#df = spark.sql("DROP TABLE SIlver.fact_order")
#df = spark.sql("DROP TABLE SIlver.orderdetail_temp")
#df = spark.sql("DROP TABLE SIlver.orderhead_temp ")



StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 3, Finished, Available)

In [2]:

# Loading the necessary libraries
from datetime import datetime
from dateutil import parser,relativedelta
import pyspark.sql.functions as f
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, to_date, col, quarter, explode, sequence, expr,current_timestamp,lit
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType, StringType, FloatType, ArrayType, LongType
from delta.tables import DeltaTable
from notebookutils import mssparkutils


StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 4, Finished, Available)

In [3]:
%%sql
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 5, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

##### Generate a Date Dimension Table

In [4]:
df = spark.sql("DROP TABLE SIlver.productcategory")

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 6, Finished, Available)

In [5]:
# create a fucntion for  date dimension table
def generate_date_dimension(start_date: str, end_date: str):
 
     # DataFrame with a range of dates
    date_df = spark.createDataFrame([(start_date, end_date)], ["start", "end"])

    # gnerating  a new row for each date between the start and end dates
    date_df = date_df.select(explode(sequence(to_date("start"), to_date("end"), expr("interval 1 day"))).alias("date"))
    
    
    date_df = date_df.selectExpr(
        "date",
        "year(date) as year",
        "quarter(date) as quarter",
        "month(date) as month",
        "day(date) as day",
        "dayofweek(date) as day_of_week",
        "to_date(date) as date_key"
    )

    return date_df

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 7, Finished, Available)

##### Loading Date/ Customer / Product / Product Category Dimension in Silver Layer

In [6]:
date_dimension_df = generate_date_dimension("2008-01-01", "2023-12-31")
date_dimension_df.write.format('delta').option("overwriteSchema", "true").saveAsTable("Dates", mode="overwrite")

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 8, Finished, Available)

In [7]:
df_customer= spark.read.format("parquet").load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/Customer')
df_customer.write.format('delta').option("overwriteSchema", "true").saveAsTable("customer", mode="overwrite")

df_product = spark.read.format("parquet").load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/Product')
df_product_category = spark.read.format("parquet").load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/ProductCategory')


df_product.write.format('delta').option("overwriteSchema", "true").saveAsTable("product", mode="overwrite")
df_product_category.write.format('delta').option("overwriteSchema", "true").saveAsTable("productcategory", mode="overwrite")


StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 9, Finished, Available)

In [8]:
# Creating path for a checkpointfile where the metadata will live while writing the stream
basepath_orderhead ="Tables/orderhead_temp/"
checkpoint_dir_orderhead = 'Tables/orderhead_temp/_checkpoint'
# Creating path for a checkpointfile where the metadata will live while writing the stream
basepath_orderdetail ="Tables/orderdetail_temp/"
checkpoint_dir_orderdetail = 'Tables/orderdetail_temp/_checkpoint'

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 10, Finished, Available)

##### Load data from bronze depending on the `delta_load` flag.
##### If `delta_load` is 0, it checks and removes the checkpoint directory if exists, before loading the data.
##### Data is loaded from a delta format source into `Readdf` as Stream 

In [9]:
delta_load=1

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 11, Finished, Available)

In [10]:
df_orderhead = spark.read.format("parquet").load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/SalesLT.SalesOrderHeader')
inferred_schema_orderhead = df_orderhead.schema

df_orderdetail = spark.read.format("parquet").load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/SalesLT.SalesOrderDetail')
inferred_schema_orderdetail = df_orderdetail.schema



StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 12, Finished, Available)

In [11]:
if delta_load == 0:
    # Check if the checkpoint directory exists before removing it
    if mssparkutils.fs.exists(checkpoint_dir):
        mssparkutils.fs.rm(checkpoint_dir_orderhead, True)
        mssparkutils.fs.rm(checkpoint_dir_orderdetail, True)
    Readdf_orderhead = (spark.readStream.format("parquet").option("ignoreChanges",True).schema(inferred_schema_orderhead).load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/SalesLT.SalesOrderHeader'))
    Readdf_orderdetail = (spark.readStream.format("parquet").option("ignoreChanges",True).schema(inferred_schema_orderdetail).load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/SalesLT.SalesOrderDetail'))
else:
    Readdf_orderhead = (spark.readStream.format("parquet").option("ignoreChanges",True).schema(inferred_schema_orderhead).load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/SalesLT.SalesOrderHeader'))
    Readdf_orderdetail = (spark.readStream.format("parquet").option("ignoreChanges",True).schema(inferred_schema_orderdetail).load('abfss://DataBash23@onelake.dfs.fabric.microsoft.com/Bronze.Lakehouse/Files/ADW/SalesLT.SalesOrderDetail'))

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 13, Finished, Available)

In [12]:
Readdf_orderhead =Readdf_orderhead.withColumn("Silver_Loadedtimestamp", current_timestamp())
Readdf_orderdetail =Readdf_orderdetail.withColumn("Silver_Loadedtimestamp", current_timestamp())

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 14, Finished, Available)

 ##### Define the function `writeTo Orderhead & Detail_inc` which writes a DataFrame to the table "orderhead/detail_temp" in delta format in Overwrite mode

In [13]:
def writeToorderhead_inc (df,epoch_id):

 df.write.format("delta")\
              .mode("overwrite")\
              .option("overwritechema", "true")\
              .saveAsTable("orderhead_temp")

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 15, Finished, Available)

In [14]:
def writeToorderdetail_inc (df,epoch_id):

 df.write.format("delta")\
              .mode("overwrite")\
              .option("overwritechema", "true")\
              .saveAsTable("orderdetail_temp")

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 16, Finished, Available)

##### Write `Readdf` to the table "orderhead/detail_temp" in delta format in a streaming manner as batch based approach i.e trigger Once is true
##### Checkpointing is enabled and the checkpoint location is defined by `checkpoint_dir`.

In [15]:
# Define the streaming write operation with format, foreachBatch and checkpoint location
Inc_Query = Readdf_orderhead.writeStream.format("delta")\
    .foreachBatch(writeToorderhead_inc)\
    .option("checkpointLocation", checkpoint_dir_orderhead)\
    .trigger(once=True)\
    .start()

# Await for the termination of the stream
Inc_Query.awaitTermination()


StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 17, Finished, Available)

In [16]:
# Define the streaming write operation with format, foreachBatch and checkpoint location
Inc_Query = Readdf_orderdetail.writeStream.format("delta")\
    .foreachBatch(writeToorderdetail_inc)\
    .option("checkpointLocation", checkpoint_dir_orderdetail)\
    .trigger(once=True)\
    .start()

# Await for the termination of the stream
Inc_Query.awaitTermination()


StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 18, Finished, Available)

##### Analyse the Temp table 

In [17]:
df = spark.sql("SELECT count(*) FROM silver.orderdetail_temp LIMIT 1000")
display(df)

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, f30be7ca-20df-4796-afd4-51a6eaddb7a9)

In [18]:
df = spark.sql("SELECT max(modifieddate) FROM silver.orderdetail_temp LIMIT 1000")
display(df)

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 20, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7731dcab-bb11-47ef-8a2a-984933424c92)

##### Cleanup: Disable retention duration check and remove old data in the "nycgreentaxi_temp" table.

In [46]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.sql("VACUUM orderdetail_temp RETAIN 0 HOURS").show()
spark.sql("VACUUM orderhead_temp RETAIN 0 HOURS").show()


StatementMeta(, ad9153cf-0b95-4250-be9c-0920b77cdb77, 48, Finished, Available)

+--------------------+
|                path|
+--------------------+
|abfss://9978d83b-...|
+--------------------+

+--------------------+
|                path|
+--------------------+
|abfss://9978d83b-...|
+--------------------+



In [47]:
df = spark.sql("SELECT * FROM SIlver.orderhead_temp LIMIT 1000")
display(df)

StatementMeta(, ad9153cf-0b95-4250-be9c-0920b77cdb77, 49, Finished, Available)

SynapseWidget(Synapse.DataFrame, a1ba6fca-0af5-414e-979e-bbc735c2e673)

##### SQL query to create a temporary view "order_removeduplicates" which removes duplicates from the "orderhead/detail_temp" table

In [19]:
%%sql
CREATE OR REPLACE TEMP VIEW orderhead_removeduplicates AS

SELECT ROW_NUMBER() OVER(PARTITION BY SalesOrderID ORDER BY modifieddate DESC )as row_id
            ,SalesOrderID
            ,RevisionNumber
            ,OrderDate
            ,DueDate
            ,ShipDate
            ,Status
            ,OnlineOrderFlag
            ,SalesOrderNumber
            ,PurchaseOrderNumber
            ,AccountNumber
            ,CustomerID
            ,ShipToAddressID
            ,BillToAddressID
            ,ShipMethod
            ,CreditCardApprovalCode
            ,SubTotal
            ,TaxAmt
            ,Freight
            ,TotalDue
            ,Comment
            ,rowguid
            ,ModifiedDate
            ,Silver_Loadedtimestamp
FROM SIlver.orderhead_temp



StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 21, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [20]:
%%sql
CREATE OR REPLACE TEMP VIEW orderdetail_removeduplicates AS

SELECT ROW_NUMBER() OVER(PARTITION BY SalesOrderDetailID ORDER BY modifieddate DESC )as row_id
         ,SalesOrderID
        ,SalesOrderDetailID
        ,OrderQty
        ,ProductID
        ,UnitPrice
        ,UnitPriceDiscount
        ,LineTotal
        ,rowguid
        ,ModifiedDate
        ,Silver_Loadedtimestamp
FROM SIlver.orderdetail_temp


StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 22, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [21]:
%%sql
select * from orderdetail_removeduplicates

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 23, Finished, Available)

<Spark SQL result set with 10 rows and 11 fields>

##### Creating temporary view "fact_order_enrich" which enriches the "order_removeduplicates" view with data Already present on both tables merged as one table

In [22]:
%%sql
CREATE OR REPLACE TEMP VIEW fact_order_enrich AS
SELECT 
    oh.SalesOrderID,
    oh.RevisionNumber,
    oh.OrderDate,
    oh.DueDate,
    oh.ShipDate,
    oh.Status,
    oh.OnlineOrderFlag,
    oh.SalesOrderNumber,
    oh.PurchaseOrderNumber,
    oh.AccountNumber,
    oh.CustomerID,
    oh.ShipToAddressID,
    oh.BillToAddressID,
    oh.ShipMethod,
    oh.CreditCardApprovalCode,
    oh.SubTotal,
    oh.TaxAmt,
    oh.Freight,
    oh.TotalDue,
    oh.Comment,
    oh.rowguid AS header_rowguid,
    oh.ModifiedDate AS header_modifieddate,
    oh.Silver_Loadedtimestamp AS header_loadedtimestamp,
    od.SalesOrderDetailID,
    od.OrderQty,
    od.ProductID,
    od.UnitPrice,
    od.UnitPriceDiscount,
    od.LineTotal,
    od.rowguid AS detail_rowguid,
    od.ModifiedDate AS detail_modifieddate,
    od.Silver_Loadedtimestamp AS detail_loadedtimestamp
FROM 
    orderhead_removeduplicates oh
JOIN 
    orderdetail_removeduplicates od ON oh.SalesOrderID = od.SalesOrderID
WHERE 
    oh.row_id = 1 AND od.row_id = 1;



StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 24, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [23]:
%%sql
select * from fact_order_enrich

StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 25, Finished, Available)

<Spark SQL result set with 10 rows and 32 fields>

In [24]:
JobName="Fact_Order_Bronze_to_Silver"
FinalView = "fact_order_enrich" # Final Enriched View
lakehouse = "Silver" # target lakehouse
table = "fact_order" # Target Table
path_to_delta_table = "Tables"+ "/" + table

fact_order_df = spark.sql(f"SELECT * FROM {FinalView}")
fact_order_df = fact_order_df.withColumn("Fabricnloadingjobname", lit(JobName))


StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 26, Finished, Available)

 ##### Merging the Incremental captured Enriched view into a Target Silver Table 

In [25]:
# Check if Delta table exists
if mssparkutils.fs.exists(path_to_delta_table):
    deltaTable = DeltaTable.forPath(spark, path_to_delta_table)
else:
    # Create Delta table
    fact_order_df.write.format("delta").save(path_to_delta_table)
    deltaTable = DeltaTable.forPath(spark, path_to_delta_table)

# Define merge condition
merge_condition = (
    "sourceview.SalesOrderID = target.SalesOrderID AND "
    "sourceview.SalesOrderDetailID = target.SalesOrderDetailID"
)

# Execute merge
deltaTable.alias("target").merge(
    fact_order_df.alias("sourceview"), 
    merge_condition
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()


StatementMeta(, 5aaadb16-e605-4a69-9614-89f54ac74b6e, 27, Finished, Available)

#### Conclusion
#### The code demonstrates a robust Incremental pipeline for processing data at scale using Checkpoint Method from Structures Streaming . By adopting Streaming  pipeline as batch data professionals can effectively manage and analyze  data, deriving valuable insights and focus on decision-making processes without worrying of metadata management