### LOAD TO SILVER Notebook
This notebook demonstrates a data pipeline in Databricks using VELOCITY framework. The solution involves importing data, applying transformations, and loading the processed data into SILVER storage layer. The steps include:
- Importing velocity library
- Defining input data sources
- Transforming the data using SQL
- Defining output data targets
- Writing the transformed data to the destination location

In [0]:
%sql
select * from lakehouse_vy.bronze.sales_orders_bronze
where OrderID = 1


OrderID,CustomerID,SalespersonPersonID,PickedByPersonID,ContactPersonID,BackorderOrderID,OrderDate,ExpectedDeliveryDate,CustomerPurchaseOrderNumber,IsUndersupplyBackordered,Comments,DeliveryInstructions,InternalComments,PickingCompletedWhen,LastEditedBy,LastEditedWhen,inserted,updated,deleted
1,830,2,,3032,45,2013-01-01,2013-01-02,12126,1,,,,2013-01-01 12:00:00.0000000,7,2025-03-10 11:15:58.1400000,2025-03-10T11:18:21.824686Z,2025-03-10T11:18:21.824689Z,2025-03-10T11:15:58.14Z


In [0]:
from velocity.executor.notebook_executor import NotebookExecutor 

2025-07-01 10:05:09,794 — velocity.utils.spark_utils — INFO — Using the following default configs you may want to override them for your job: {'spark.databricks.delta.optimizeWrite.enabled': True, 'spark.sql.adaptive.enabled': True, 'spark.databricks.delta.merge.enableLowShuffle': True, 'spark.databricks.delta.schema.autoMerge.enabled': True, 'spark.secret_scope': 'kv-velocity', 'spark.metadata_store_mode': 'RDBMS', 'spark.credentials_store_mode': 'DATABRICKS'}
INFO:velocity.utils.spark_utils:Using the following default configs you may want to override them for your job: {'spark.databricks.delta.optimizeWrite.enabled': True, 'spark.sql.adaptive.enabled': True, 'spark.databricks.delta.merge.enableLowShuffle': True, 'spark.databricks.delta.schema.autoMerge.enabled': True, 'spark.secret_scope': 'kv-velocity', 'spark.metadata_store_mode': 'RDBMS', 'spark.credentials_store_mode': 'DATABRICKS'}
2025-07-01 10:05:09,796 — velocity.utils.spark_utils — INFO — Final config is: {'spark.databricks.

## Input Defination

In [0]:
# Define the input data sources
input = {
    "BRONZE": [
        {
            "sales_orders": {
                "alias": "sales_orders",
                "catalog": "lakehouse_vy",
                "schema": "bronze",
                "read_method": {
                    "incremental_read": True,
                    "override_watermark": False, 
                    "watermark_column":"inserted",     
                    "watermark_column_datatype":"DATETIME2",
                },
                "versioning": {
                    "last_instance": True,
                    "order_by": ["LastEditedWhen", "inserted"],
                    "sort_desc": True,
                    "partition_by": ["OrderID"]
                }
            },
            "sales_orderlines": {
                "catalog": "lakehouse_vy",
                "schema": "bronze",
                "read_method": {
                    "incremental_read": True,
                    "override_watermark": False, 
                    "watermark_column":"inserted",     
                    "watermark_column_datatype":"DATETIME2",
                },
                "exclude_soft_deleted": False,
                "versioning": {
                    "last_instance": True,
                    "order_by": ["LastEditedWhen", "inserted"],
                    "sort_desc": True,
                    "partition_by": ["OrderLineID"]
                }
            }
        }
    ]
}

# Debug step for data discovery
Build views using the defined input data sources.

In [0]:
NotebookExecutor.build_views(input)

2025-07-01 10:05:17,252 — velocity.utils.metadata_factory — INFO — Build Landing View: []
INFO:velocity.utils.metadata_factory:Build Landing View: []
2025-07-01 10:05:17,256 — velocity.utils.metadata_factory — INFO — read method: read_delta_standard
INFO:velocity.utils.metadata_factory:read method: read_delta_standard


['*']
['*']


2025-07-01 10:05:41,126 — velocity.utils.metadata_factory — INFO — read method: read_delta_standard
INFO:velocity.utils.metadata_factory:read method: read_delta_standard


read_perser select expression: [Column<'dense_rank() OVER (PARTITION BY OrderID ORDER BY LastEditedWhen DESC NULLS LAST, inserted DESC NULLS LAST unspecifiedframe$()) AS rank_col'>, Column<'unresolvedstar()'>]
watermark column: inserted
final filter condition: Column<'and(and(true, `=`(rank_col, 1)), `>`(inserted, 2025-03-10 11:45:16.233359))'>
['*']
['*']


2025-07-01 10:05:42,864 — velocity.utils.metadata_factory — INFO — Build View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c265bc99d0>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264fea610>]
INFO:velocity.utils.metadata_factory:Build View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c265bc99d0>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264fea610>]
2025-07-01 10:05:42,866 — velocity.utils.metadata_factory — INFO — Build Bronze View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c265bc99d0>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264fea610>]
INFO:velocity.utils.metadata_factory:Build Bronze View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c265bc99d0>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264fea610>]
2025-07-01 10:05:42,86

read_perser select expression: [Column<'dense_rank() OVER (PARTITION BY OrderLineID ORDER BY LastEditedWhen DESC NULLS LAST, inserted DESC NULLS LAST unspecifiedframe$()) AS rank_col'>, Column<'unresolvedstar()'>]
watermark column: inserted
final filter condition: Column<'and(and(true, `=`(rank_col, 1)), `>`(inserted, 2025-03-10 11:45:18.954820))'>


[<velocity.metadata.delta_table_watermark.DeltaTableWatermark at 0x71c265bc99d0>,
 <velocity.metadata.delta_table_watermark.DeltaTableWatermark at 0x71c264fea610>]

In [0]:
%sql
select * from vw_sales_orders where OrderID = 1

rank_col,OrderID,CustomerID,SalespersonPersonID,PickedByPersonID,ContactPersonID,BackorderOrderID,OrderDate,ExpectedDeliveryDate,CustomerPurchaseOrderNumber,IsUndersupplyBackordered,Comments,DeliveryInstructions,InternalComments,PickingCompletedWhen,LastEditedBy,LastEditedWhen,inserted,updated,deleted,watermark_value
1,1,830,2,,3032,45,2013-01-01,2013-01-02,12126,1,,,,2013-01-01 12:00:00.0000000,7,2025-03-10 11:15:58.1400000,2025-03-12T08:15:22.70632Z,2025-03-12T08:15:22.706323Z,2025-03-10T11:15:58.14Z,2025-03-10 11:45:16.233359


# Define Logic: Load to Silver Layer using SQL
##### - Handle missing values
##### - Covert data to correct types

The temporary view vw_source_set will be used as the reference when writing the output.

In [0]:
%sql
-- select * from vw_source_set where OrderID = 1

OrderLineID,OrderID,CustomerID,BackorderOrderID,OrderDate,ExpectedDeliveryDate,CustomerPurchaseOrderNumber,IsUndersupplyBackordered,PickingCompletedWhen,StockItemID,Description,Quantity,UnitPrice,TaxRate,PickedQuantity,LinePickingCompletedWhen
2,1,,-1,,,,,1900-01-01T00:00:00Z,67,Ride on toy sedan car (Black) 1/12 scale,10,230.0,15.0,10,2013-01-01T11:00:00Z


In [0]:
sql_logic = """
CREATE OR REPLACE TEMPORARY VIEW vw_source_set
AS
SELECT 
    CAST(ol.OrderLineID AS INT) AS OrderLineID,
    CAST(ol.OrderID AS INT) AS OrderID,
    CAST(so.CustomerID AS INT) AS CustomerID,
    COALESCE(CAST(so.BackorderOrderID AS INT), -1) AS BackorderOrderID,
    CAST(so.OrderDate AS DATE) AS OrderDate,
    CAST(so.ExpectedDeliveryDate AS DATE) AS ExpectedDeliveryDate,
    CAST(so.CustomerPurchaseOrderNumber AS STRING) AS CustomerPurchaseOrderNumber,
    CAST(so.IsUndersupplyBackordered AS BOOLEAN) AS IsUndersupplyBackordered,
    COALESCE(CAST(so.PickingCompletedWhen AS TIMESTAMP), TIMESTAMP('1900-01-01 00:00:00')) AS PickingCompletedWhen,
    CAST(ol.StockItemID AS INT) AS StockItemID,
    CAST(ol.Description AS STRING) AS Description,
    CAST(ol.Quantity AS INT) AS Quantity,
    CAST(ol.UnitPrice AS DECIMAL(18, 2)) AS UnitPrice,
    CAST(ol.TaxRate AS DECIMAL(18, 3)) AS TaxRate,
    CAST(ol.PickedQuantity AS INT) AS PickedQuantity,
    CAST(ol.PickingCompletedWhen AS TIMESTAMP) AS LinePickingCompletedWhen
FROM
    vw_sales_orderlines ol
LEFT JOIN
    vw_sales_orders so
ON
    ol.OrderID = so.OrderID
"""

In [0]:
output: map = {
    "orders" : {
        "catalog": "lakehouse_vy",
        "schema": "silver",
        "target_type": "SILVER",
        "write_mode": "INSERT_ONLY",  
        "composite_key": ["OrderID","OrderLineID"], 
        }
}


In [0]:
NotebookExecutor.execute(input, sql_logic, output)

2025-07-01 10:06:05,179 — velocity.utils.metadata_factory — INFO — Build Landing View: []
INFO:velocity.utils.metadata_factory:Build Landing View: []
2025-07-01 10:06:05,181 — velocity.utils.metadata_factory — INFO — read method: read_delta_standard
INFO:velocity.utils.metadata_factory:read method: read_delta_standard


['*']
['*']


2025-07-01 10:06:07,106 — velocity.utils.metadata_factory — INFO — read method: read_delta_standard
INFO:velocity.utils.metadata_factory:read method: read_delta_standard


read_perser select expression: [Column<'dense_rank() OVER (PARTITION BY OrderID ORDER BY LastEditedWhen DESC NULLS LAST, inserted DESC NULLS LAST unspecifiedframe$()) AS rank_col'>, Column<'unresolvedstar()'>]
watermark column: inserted
final filter condition: Column<'and(and(true, `=`(rank_col, 1)), `>`(inserted, 2025-03-10 11:45:16.233359))'>
['*']
['*']


2025-07-01 10:06:08,507 — velocity.utils.metadata_factory — INFO — Build View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c2652a5190>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264e32310>]
INFO:velocity.utils.metadata_factory:Build View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c2652a5190>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264e32310>]
2025-07-01 10:06:08,508 — velocity.utils.metadata_factory — INFO — Build Bronze View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c2652a5190>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264e32310>]
INFO:velocity.utils.metadata_factory:Build Bronze View: [<velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c2652a5190>, <velocity.metadata.delta_table_watermark.DeltaTableWatermark object at 0x71c264e32310>]
2025-07-01 10:06:08,50

read_perser select expression: [Column<'dense_rank() OVER (PARTITION BY OrderLineID ORDER BY LastEditedWhen DESC NULLS LAST, inserted DESC NULLS LAST unspecifiedframe$()) AS rank_col'>, Column<'unresolvedstar()'>]
watermark column: inserted
final filter condition: Column<'and(and(true, `=`(rank_col, 1)), `>`(inserted, 2025-03-10 11:45:18.954820))'>


2025-07-01 10:06:08,665 — velocity.utils.metadata_factory — INFO — target format: UNITY_TABLE
INFO:velocity.utils.metadata_factory:target format: UNITY_TABLE
2025-07-01 10:06:08,666 — velocity.utils.metadata_factory — INFO — target layer: SILVER
INFO:velocity.utils.metadata_factory:target layer: SILVER
INFO:velocity.dataset.abs_dataset:output columns: ['inserted', 'updated', 'deleted', 'keyHash', 'rowHash', 'OrderLineID', 'OrderID', 'CustomerID', 'BackorderOrderID', 'OrderDate', 'ExpectedDeliveryDate', 'CustomerPurchaseOrderNumber', 'IsUndersupplyBackordered', 'PickingCompletedWhen', 'StockItemID', 'Description', 'Quantity', 'UnitPrice', 'TaxRate', 'PickedQuantity', 'LinePickingCompletedWhen']
INFO:velocity.dataset.abs_dataset:result output columns: ['inserted', 'updated', 'deleted', 'keyHash', 'rowHash', 'OrderLineID', 'OrderID', 'CustomerID', 'BackorderOrderID', 'OrderDate', 'ExpectedDeliveryDate', 'CustomerPurchaseOrderNumber', 'IsUndersupplyBackordered', 'PickingCompletedWhen', 'St

In [0]:
# %sql
# truncate table lakehouse_vy.bronze.sales_orders_bronze;
# truncate table lakehouse_vy.bronze.sales_orderlines_bronze;
# truncate table lakehouse_vy.bronze.sales_customers_bronze;
# truncate table lakehouse_vy.bronze.warehouse_stockitems_bronze

In [0]:
%sql
-- delete 
-- from matas_development.bricksmith_meta.watermark
-- where watermark_value = '2025-07-01 10:06:05.183351'

num_affected_rows
1
