Data Vault 2.0 Implementation in Databricks Community Edition with **PySpark** and Spark **SQL**
Introduction
Hello! I'm glad to assist you again with implementing Data Vault 2.0 in the Databricks Community Edition. This time, we'll:

- Generate realistic fake data with valid names, addresses, product names, etc.
- Provide code examples in both PySpark and Spark SQL.
- Build the Raw Vault and Business Vault layers.
- Create at least three Output Business Tables (OBTs).
- Use a scenario that closely resembles real-world industry situations.
Let's get started!

# Section1

## 1. Industry Scenario
### Scenario: E-Commerce Platform - "ShopStream"
####Business Context:

"ShopStream" is a global e-commerce platform connecting millions of customers with thousands of sellers offering a wide range of products. The platform aims to enhance customer experience, optimize supply chain operations, and increase sales through data-driven insights.

Business Entities:

- Customers: Individuals purchasing products.
- Sellers: Businesses or individuals selling products.
- Products: Items available for sale.
- Orders: Transactions made by customers.
- Shipments: Delivery details of orders.
- Business Questions:

1. Which products are trending in different regions?
2. What is the average delivery time, and how does it affect customer satisfaction?
3. Who are the top-performing sellers in the last month?

# Section 2:

## 2. Generating Realistic Source Data
We'll generate synthetic but realistic datasets to simulate source systems.

Libraries and Setup
First, ensure you have the following libraries installed:

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import random
from datetime import datetime, timedelta

In [0]:
%sql
-- For Spark SQL, no additional setup is required

## section2.1

PySpark Implementation
###2.1 Generating Realistic Names and Addresses
We'll use the Faker library to generate realistic names and addresses.

Install Faker Library:

In [0]:
%pip install Faker

Collecting Faker
  Obtaining dependency information for Faker from https://files.pythonhosted.org/packages/19/48/e8531110e55c96a66b076a7cf71891038790eee22b88ff4d983ed972067a/Faker-29.0.0-py3-none-any.whl.metadata
  Downloading Faker-29.0.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-29.0.0-py3-none-any.whl (1.8 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.8 MB[0m [31m3.5 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.4/1.8 MB[0m [31m5.3 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.7/1.8 MB[0m [31m6.3 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━[0m [32m1.0/1.8 MB[0m [31m6.9 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━

In [0]:
%restart_python

In [0]:
from faker import Faker
fake = Faker()

In [0]:
num_customers = 1000
customer_data = []

for _ in range(num_customers):
    customer = {
        'CustomerID': fake.unique.uuid4(),
        'FirstName': fake.first_name(),
        'LastName': fake.last_name(),
        'Email': fake.email(),
        'PhoneNumber': fake.phone_number(),
        'Country': fake.country(),
        'RegistrationDate': fake.date_between(start_date='-2y', end_date='today')
    }
    customer_data.append(customer)

customers_df = spark.createDataFrame(customer_data)
display(customers_df)


Country,CustomerID,Email,FirstName,LastName,PhoneNumber,RegistrationDate
Saint Kitts and Nevis,567a8408-ec6c-4ceb-98bd-c07264948c85,ogray@example.org,Eric,Dennis,266-393-7546,2024-02-01
Ethiopia,49756f4a-ec43-4b5e-ab14-620f35f46200,martinezseth@example.net,Christopher,Thompson,643.527.1215,2023-03-14
New Zealand,5cae3ee2-d1fe-4496-b8a2-b9b554e7a407,josewilliams@example.org,Wendy,Vance,001-585-872-4754x2794,2023-04-15
Nicaragua,5ea682d2-e28b-488a-8a72-64a04b77b4e0,kblack@example.net,Karen,Young,612.363.8996,2024-06-02
Somalia,60ece845-3834-4922-93fe-cba74e39360f,ralph25@example.net,Melissa,Cooper,(536)921-8312,2023-05-24
Turkmenistan,f3413088-286e-498e-b471-dc42613d949b,rubiojudy@example.com,Kenneth,Lowe,553-509-1381,2023-06-19
Norway,d18f0d75-c270-44e8-ad06-18b80d2b24c1,erodriguez@example.net,Jacqueline,Joseph,+1-965-410-3266x7763,2023-10-26
Latvia,ca21e16d-c0dc-4d30-b295-417b96a68049,ascott@example.org,Terry,Alexander,+1-687-535-6643,2024-03-04
Costa Rica,4dc1c249-18c1-4524-826f-e4e38a24628c,barbarawallace@example.com,James,Williams,496.705.7966,2022-11-20
Cote d'Ivoire,0b26973f-64be-443f-810c-77379591bbea,thomas49@example.com,Pamela,Allen,6317060041,2022-12-10


In [0]:
num_sellers = 200
seller_data = []

for _ in range(num_sellers):
    seller = {
        'SellerID': fake.unique.uuid4(),
        'SellerName': fake.company(),
        'Email': fake.company_email(),
        'PhoneNumber': fake.phone_number(),
        'Country': fake.country(),
        'JoinDate': fake.date_between(start_date='-5y', end_date='today')
    }
    seller_data.append(seller)

sellers_df = spark.createDataFrame(seller_data)
display(sellers_df)


Country,Email,JoinDate,PhoneNumber,SellerID,SellerName
South Georgia and the South Sandwich Islands,oscott@booker-hale.com,2019-11-24,568.766.5223,7e2ddaf7-60a4-4186-a1e9-f8a702e7c7b0,Rivas-Nguyen
United Arab Emirates,lcampbell@flores.com,2024-01-15,+1-363-241-3453x90170,ded97ba2-bf1c-44bb-af05-59c16b3df91d,Lane PLC
Monaco,mary43@rivera-meyer.com,2022-11-09,+1-852-986-1181,8cbdaac9-8bc2-46df-96ad-53f379aa7b29,"Jones, Morris and Jones"
Tonga,zoe14@garner-moyer.org,2023-10-08,+1-645-663-1628x275,a0c9bace-ac94-4007-b481-e875e8eb7d7b,Green Ltd
Oman,melissa94@collins-zuniga.org,2024-03-26,504.771.3023x15727,a7cd3eb3-9f57-47d2-990f-96625c604e9d,"Myers, Charles and Mays"
Hong Kong,pbaker@hawkins-montgomery.biz,2023-11-29,(302)388-2207x572,87b7f4ac-ccd5-459f-8235-a3e0c138a070,"Williams, Jackson and Gonzalez"
Denmark,mblanchard@lopez-scott.com,2020-02-24,(974)623-0812x4532,9178ba4c-2aed-45d7-9153-89a198966240,"Williamson, Gregory and Martin"
Mali,ashley04@andrews.biz,2020-11-04,(565)967-6867x0582,6e9208a7-5be6-4cdf-88bb-1d185f8c4315,Le-Clark
Guernsey,bellbrian@martinez.info,2022-06-09,+1-423-412-9045x349,7a89e138-c300-45f9-91fc-d3a47c39dca1,"Castillo, French and Jordan"
Pitcairn Islands,derekmcpherson@mcgee.com,2024-08-04,+1-278-805-9809x703,bba25dd4-27b1-492a-9927-a06e404e8154,"Sandoval, Bright and Gaines"


In [0]:
num_products = 500
product_data = []

categories = ['Electronics', 'Clothing', 'Home & Kitchen', 'Books', 'Sports']
for _ in range(num_products):
    product = {
        'ProductID': fake.unique.uuid4(),
        'ProductName': fake.catch_phrase(),
        'Category': random.choice(categories),
        'Price': round(random.uniform(5, 500), 2),
        'SellerID': random.choice(seller_data)['SellerID']
    }
    product_data.append(product)

products_df = spark.createDataFrame(product_data)
display(products_df)


Category,Price,ProductID,ProductName,SellerID
Clothing,179.62,af625207-801f-4a63-85a9-7cca4b1bb7ec,Compatible bottom-line throughput,8251adbe-c06d-4894-bd08-0f922b2c05f7
Sports,271.03,63499fa9-7da3-4ef7-917a-26c29143d08b,Open-architected next generation synergy,3c149096-f587-4ee7-aade-586948cb39ef
Electronics,429.76,2d8a539b-b92e-4547-af80-61303eb0405f,Integrated actuating functionalities,8c7c3ff5-ee2d-4abc-95e0-5a05a844bfe2
Electronics,77.64,a524ab28-c7ab-43f2-bb29-b7f64e1577f5,De-engineered hybrid open system,c983fbfe-85fa-4bc2-9728-c0cd49c9ea6a
Clothing,284.25,415c0138-980f-42d5-b13a-fe968a3e3e26,Persevering interactive function,d94ed150-f4c1-4549-b8de-ef4c59738e56
Clothing,400.83,8686fbd2-9013-4fde-9a02-dde43cfe08a4,Virtual bottom-line moratorium,f69f03e4-b148-4bd3-9c95-f27aad87e204
Sports,103.75,34b1d634-6405-462b-9bb4-3e65234dbb05,Face-to-face national help-desk,00c1507e-aace-4b41-9611-c86313fe4c4d
Electronics,320.74,9dbbd150-ffb3-4a7f-b208-0dd0947e5fb8,Inverse impactful moratorium,8747ad4e-3400-4369-a915-45ae99977544
Home & Kitchen,367.64,19c1b252-99bd-467d-971c-99beaa7c8ec8,Optional hybrid budgetary management,109d935a-dda5-49be-a780-789df14e7591
Clothing,35.64,54d9f3d7-0f0e-4a01-a89a-353cc5097156,Pre-emptive content-based groupware,231efd9f-8422-4b10-bff9-0322814c87e0


In [0]:
num_orders = 5000
order_data = []

for _ in range(num_orders):
    order_date = fake.date_between(start_date='-1y', end_date='today')
    order = {
        'OrderID': fake.unique.uuid4(),
        'CustomerID': random.choice(customer_data)['CustomerID'],
        'OrderDate': order_date,
        'TotalAmount': round(random.uniform(20, 1000), 2),
        'Currency': random.choice(['USD', 'EUR', 'GBP', 'AUD', 'CAD']),
    }
    order_data.append(order)

orders_df = spark.createDataFrame(order_data)
display(orders_df)


Currency,CustomerID,OrderDate,OrderID,TotalAmount
GBP,179958da-f5a4-4cfa-8df5-6f95b7c82b47,2023-10-20,087ff7d6-5fcd-4f2c-a9ec-7c50acb69443,725.17
CAD,448385dd-6652-41c3-b030-79f2434776c1,2024-02-16,e014218d-b1d5-4192-bbd6-2b59e66b89a1,690.87
EUR,a29a4838-6511-43f3-8b44-eab2a7f9a80f,2024-09-08,dd74c066-b0b7-475a-bd55-a3c5e88e0d8f,727.5
AUD,7ed04ad3-eb95-4ffa-9aa5-e44cdef3c69d,2024-04-29,6a8022c2-e4ed-4ffb-8407-4b7de2e728ca,497.66
EUR,f9eb991b-8bc7-4505-a0f5-998e441fcd91,2024-05-05,343c739a-e143-4ef3-8371-a09c426f3224,818.92
AUD,2274c915-0f91-4add-a585-050d9b02bc09,2024-05-06,a812c432-edc9-4bef-8825-71e18c573e46,809.89
GBP,cc919c37-b284-400a-b2cc-1d7e8e90de04,2023-11-01,9825fdaa-e4f5-41aa-af41-980083197c11,55.56
USD,019e2645-be22-4982-86e5-5beca56b1c72,2024-04-18,7b212976-423a-468e-95f9-dc0984e88223,272.77
CAD,7538cf57-95fe-4fc4-802a-075e373ea0a9,2024-07-12,bbf68308-c50b-498c-88fb-b7021cabd016,58.9
EUR,fa2a638f-654d-4f8f-9c89-8231e6631684,2024-09-12,7a0b5f93-ed96-4954-a61d-41739211ce44,981.08


In [0]:
order_items_data = []

for order in order_data:
    num_items = random.randint(1, 5)
    purchased_products = random.sample(product_data, num_items)
    for product in purchased_products:
        order_item = {
            'OrderID': order['OrderID'],
            'ProductID': product['ProductID'],
            'Quantity': random.randint(1, 3),
            'UnitPrice': product['Price']
        }
        order_items_data.append(order_item)

order_items_df = spark.createDataFrame(order_items_data)
display(order_items_df)


OrderID,ProductID,Quantity,UnitPrice
087ff7d6-5fcd-4f2c-a9ec-7c50acb69443,8ec59e74-ec60-4444-9ea6-e56b72719db9,1,404.78
087ff7d6-5fcd-4f2c-a9ec-7c50acb69443,98dc800c-ba02-4940-b3f7-c8497cd4ed84,2,251.5
087ff7d6-5fcd-4f2c-a9ec-7c50acb69443,2c6019bf-3121-4fd8-9248-d24bd56788b9,1,267.66
087ff7d6-5fcd-4f2c-a9ec-7c50acb69443,4c2cbb0a-3071-402b-9839-301065b8e42a,2,105.69
087ff7d6-5fcd-4f2c-a9ec-7c50acb69443,be42f086-4388-461b-970e-daab95ab71aa,1,175.11
e014218d-b1d5-4192-bbd6-2b59e66b89a1,a524ab28-c7ab-43f2-bb29-b7f64e1577f5,2,77.64
e014218d-b1d5-4192-bbd6-2b59e66b89a1,547369b5-bdc4-4bbd-89f2-f3564b0af409,3,82.75
e014218d-b1d5-4192-bbd6-2b59e66b89a1,e58a964b-fbc3-410c-abf4-26deb1579782,3,447.61
dd74c066-b0b7-475a-bd55-a3c5e88e0d8f,98712da3-8fc0-4707-bf54-c93b2c05e8db,1,377.45
dd74c066-b0b7-475a-bd55-a3c5e88e0d8f,1a6e672d-db1b-4b18-872d-7e24df465580,1,287.68


In [0]:
shipment_data = []

for order in order_data:
    # order['OrderDate'] is already a datetime.date object
    shipment_date = order['OrderDate'] + timedelta(days=random.randint(1, 14))
    shipment = {
        'OrderID': order['OrderID'],
        'ShipmentID': fake.unique.uuid4(),
        'ShipmentDate': shipment_date.strftime('%Y-%m-%d'),
        'Carrier': random.choice(['DHL', 'FedEx', 'UPS', 'USPS']),
        'Status': random.choice(['Delivered', 'In Transit', 'Delayed'])
    }
    shipment_data.append(shipment)

shipments_df = spark.createDataFrame(shipment_data)


display(shipments_df)


Carrier,OrderID,ShipmentDate,ShipmentID,Status
DHL,087ff7d6-5fcd-4f2c-a9ec-7c50acb69443,2023-11-03,1fe0f7a7-318f-4623-abe5-6d4cffc9e9de,Delayed
FedEx,e014218d-b1d5-4192-bbd6-2b59e66b89a1,2024-02-20,a17507c0-31f3-493b-af5c-c23fd651c717,Delayed
USPS,dd74c066-b0b7-475a-bd55-a3c5e88e0d8f,2024-09-09,cc11213f-c7a1-4fd0-adbc-2015685f19c6,Delayed
UPS,6a8022c2-e4ed-4ffb-8407-4b7de2e728ca,2024-05-05,efa88f39-01db-44dc-95f5-a2189f59a3a2,In Transit
USPS,343c739a-e143-4ef3-8371-a09c426f3224,2024-05-17,79077b00-693c-4f74-bb09-08b61cc4ff47,Delayed
FedEx,a812c432-edc9-4bef-8825-71e18c573e46,2024-05-19,3efd5b2b-98e4-47cb-a443-7a257736bc9a,Delayed
DHL,9825fdaa-e4f5-41aa-af41-980083197c11,2023-11-03,29bbd509-05b2-4897-b429-42cfbf316a0c,Delayed
FedEx,7b212976-423a-468e-95f9-dc0984e88223,2024-04-22,91c95882-e414-4e90-9abc-7dabdd8ebd44,Delivered
UPS,bbf68308-c50b-498c-88fb-b7021cabd016,2024-07-17,1d02a6e8-0f78-4c44-8fd8-bfe810938bac,Delivered
UPS,7a0b5f93-ed96-4954-a61d-41739211ce44,2024-09-24,d6fa54b8-e9f8-418d-9fd2-74364b378823,Delayed


In [0]:
customers_df.write.format('delta').mode('overwrite').saveAsTable('src_customers')
sellers_df.write.format('delta').mode('overwrite').saveAsTable('src_sellers')
products_df.write.format('delta').mode('overwrite').saveAsTable('src_products')
orders_df.write.format('delta').mode('overwrite').saveAsTable('src_orders')
order_items_df.write.format('delta').mode('overwrite').saveAsTable('src_order_items')
shipments_df.write.format('delta').mode('overwrite').saveAsTable('src_shipments')


##section2.2

Spark SQL Implementation
For Spark SQL, we need to switch to SQL context.

In [0]:
customers_df.createOrReplaceTempView('customers_temp')
sellers_df.createOrReplaceTempView('sellers_temp')
products_df.createOrReplaceTempView('products_temp')
orders_df.createOrReplaceTempView('orders_temp')
order_items_df.createOrReplaceTempView('order_items_temp')
shipments_df.createOrReplaceTempView('shipments_temp')


- Now, we can use Spark SQL to query these views.

In [0]:
%sql
-- Example: Select top 5 customers
SELECT * FROM customers_temp LIMIT 5;


Country,CustomerID,Email,FirstName,LastName,PhoneNumber,RegistrationDate
Saint Kitts and Nevis,567a8408-ec6c-4ceb-98bd-c07264948c85,ogray@example.org,Eric,Dennis,266-393-7546,2024-02-01
Ethiopia,49756f4a-ec43-4b5e-ab14-620f35f46200,martinezseth@example.net,Christopher,Thompson,643.527.1215,2023-03-14
New Zealand,5cae3ee2-d1fe-4496-b8a2-b9b554e7a407,josewilliams@example.org,Wendy,Vance,001-585-872-4754x2794,2023-04-15
Nicaragua,5ea682d2-e28b-488a-8a72-64a04b77b4e0,kblack@example.net,Karen,Young,612.363.8996,2024-06-02
Somalia,60ece845-3834-4922-93fe-cba74e39360f,ralph25@example.net,Melissa,Cooper,(536)921-8312,2023-05-24


In [0]:
# %sql
-- Only execute this if not using pyspark approach
# DROP TABLE IF EXISTS src_customers;
# CREATE TABLE src_customers AS SELECT * FROM customers_temp;

# DROP TABLE IF EXISTS src_sellers;
# CREATE TABLE src_sellers AS SELECT * FROM sellers_temp;

# DROP TABLE IF EXISTS src_products;
# CREATE TABLE src_products AS SELECT * FROM products_temp;

# DROP TABLE IF EXISTS src_orders;
# CREATE TABLE src_orders AS SELECT * FROM orders_temp;

# DROP TABLE IF EXISTS src_order_items;
# CREATE TABLE src_order_items AS SELECT * FROM order_items_temp;

# DROP TABLE IF EXISTS src_shipments;
# CREATE TABLE src_shipments AS SELECT * FROM shipments_temp;


## section3
3. Building the Raw Vault
- Creating Hubs

In [0]:
from pyspark.sql.functions import sha2, concat_ws

hub_customer = (
    customers_df.select(
        sha2(F.col('CustomerID'), 256).alias('CustomerHashKey'),
        'CustomerID',
        F.current_timestamp().alias('LoadDate'),
        F.lit('CustomerSource').alias('RecordSource')
    ).dropDuplicates(['CustomerID'])
)

hub_customer.write.format('delta').mode('overwrite').saveAsTable('hub_customer')


In [0]:
# %sql
# DROP TABLE IF EXISTS hub_customer;
# CREATE TABLE hub_customer AS
# SELECT
#     SHA2(CustomerID, 256) AS CustomerHashKey,
#     CustomerID,
#     current_timestamp() AS LoadDate,
#     'CustomerSource' AS RecordSource
# FROM src_customers
# GROUP BY CustomerID;


In [0]:
hub_seller = (
    sellers_df.select(
        sha2(F.col('SellerID'), 256).alias('SellerHashKey'),
        'SellerID',
        F.current_timestamp().alias('LoadDate'),
        F.lit('SellerSource').alias('RecordSource')
    ).dropDuplicates(['SellerID'])
)

hub_seller.write.format('delta').mode('overwrite').saveAsTable('hub_seller')


In [0]:
# %sql
# DROP TABLE IF EXISTS hub_seller;
# CREATE TABLE hub_seller AS
# SELECT
#     SHA2(SellerID, 256) AS SellerHashKey,
#     SellerID,
#     current_timestamp() AS LoadDate,
#     'SellerSource' AS RecordSource
# FROM src_sellers
# GROUP BY SellerID;


In [0]:
hub_product = (
    products_df.select(
        sha2(F.col('ProductID'), 256).alias('ProductHashKey'),
        'ProductID',
        F.current_timestamp().alias('LoadDate'),
        F.lit('ProductSource').alias('RecordSource')
    ).dropDuplicates(['ProductID'])
)

hub_product.write.format('delta').mode('overwrite').saveAsTable('hub_product')


In [0]:
# %sql
# DROP TABLE IF EXISTS hub_product;
# CREATE TABLE hub_product AS
# SELECT
#     SHA2(ProductID, 256) AS ProductHashKey,
#     ProductID,
#     current_timestamp() AS LoadDate,
#     'ProductSource' AS RecordSource
# FROM src_products
# GROUP BY ProductID;


In [0]:
hub_order = (
    orders_df.select(
        sha2(F.col('OrderID'), 256).alias('OrderHashKey'),
        'OrderID',
        F.current_timestamp().alias('LoadDate'),
        F.lit('OrderSource').alias('RecordSource')
    ).dropDuplicates(['OrderID'])
)

hub_order.write.format('delta').mode('overwrite').saveAsTable('hub_order')


In [0]:
# %sql
# DROP TABLE IF EXISTS hub_order;
# CREATE TABLE hub_order AS
# SELECT
#     SHA2(OrderID, 256) AS OrderHashKey,
#     OrderID,
#     current_timestamp() AS LoadDate,
#     'OrderSource' AS RecordSource
# FROM src_orders
# GROUP BY OrderID;


## section 3.2
Creating Link's tables

In [0]:
link_order_customer = (
    orders_df.select(
        sha2(concat_ws('-', F.col('OrderID'), F.col('CustomerID')), 256).alias('OrderCustomerHashKey'),
        sha2(F.col('OrderID'), 256).alias('OrderHashKey'),
        sha2(F.col('CustomerID'), 256).alias('CustomerHashKey'),
        F.current_timestamp().alias('LoadDate'),
        F.lit('OrderCustomerSource').alias('RecordSource')
    )
)

link_order_customer.write.format('delta').mode('overwrite').saveAsTable('link_order_customer')


In [0]:
# %sql
# DROP TABLE IF EXISTS link_order_customer;
# CREATE TABLE link_order_customer AS
# SELECT
#     SHA2(CONCAT(OrderID, '-', CustomerID), 256) AS OrderCustomerHashKey,
#     SHA2(OrderID, 256) AS OrderHashKey,
#     SHA2(CustomerID, 256) AS CustomerHashKey,
#     current_timestamp() AS LoadDate,
#     'OrderCustomerSource' AS RecordSource
# FROM src_orders;


In [0]:
link_product_seller = (
    products_df.select(
        sha2(concat_ws('-', F.col('ProductID'), F.col('SellerID')), 256).alias('ProductSellerHashKey'),
        sha2(F.col('ProductID'), 256).alias('ProductHashKey'),
        sha2(F.col('SellerID'), 256).alias('SellerHashKey'),
        F.current_timestamp().alias('LoadDate'),
        F.lit('ProductSellerSource').alias('RecordSource')
    )
)

link_product_seller.write.format('delta').mode('overwrite').saveAsTable('link_product_seller')


In [0]:
# %sql
# DROP TABLE IF EXISTS link_product_seller;
# CREATE TABLE link_product_seller AS
# SELECT
#     SHA2(CONCAT(ProductID, '-', SellerID), 256) AS ProductSellerHashKey,
#     SHA2(ProductID, 256) AS ProductHashKey,
#     SHA2(SellerID, 256) AS SellerHashKey,
#     current_timestamp() AS LoadDate,
#     'ProductSellerSource' AS RecordSource
# FROM src_products;


In [0]:
link_order_product = (
    order_items_df.select(
        sha2(concat_ws('-', F.col('OrderID'), F.col('ProductID')), 256).alias('OrderProductHashKey'),
        sha2(F.col('OrderID'), 256).alias('OrderHashKey'),
        sha2(F.col('ProductID'), 256).alias('ProductHashKey'),
        F.current_timestamp().alias('LoadDate'),
        F.lit('OrderProductSource').alias('RecordSource')
    )
)

link_order_product.write.format('delta').mode('overwrite').saveAsTable('link_order_product')


In [0]:
# %sql
# DROP TABLE IF EXISTS link_order_product;
# CREATE TABLE link_order_product AS
# SELECT
#     SHA2(CONCAT(OrderID, '-', ProductID), 256) AS OrderProductHashKey,
#     SHA2(OrderID, 256) AS OrderHashKey,
#     SHA2(ProductID, 256) AS ProductHashKey,
#     current_timestamp() AS LoadDate,
#     'OrderProductSource' AS RecordSource
# FROM src_order_items;


## section 3.3
- Creating Satellites

In [0]:
sat_customer_details = (
    customers_df.select(
        sha2(F.col('CustomerID'), 256).alias('CustomerHashKey'),
        'FirstName',
        'LastName',
        'Email',
        'PhoneNumber',
        'Country',
        'RegistrationDate',
        F.current_timestamp().alias('LoadDate'),
        F.lit('CustomerSource').alias('RecordSource')
    )
)

sat_customer_details.write.format('delta').mode('overwrite').saveAsTable('sat_customer_details')


In [0]:
# %sql
# DROP TABLE IF EXISTS sat_customer_details;
# CREATE TABLE sat_customer_details AS
# SELECT
#     SHA2(CustomerID, 256) AS CustomerHashKey,
#     FirstName,
#     LastName,
#     Email,
#     PhoneNumber,
#     Country,
#     RegistrationDate,
#     current_timestamp() AS LoadDate,
#     'CustomerSource' AS RecordSource
# FROM src_customers;


In [0]:
sat_seller_details = (
    sellers_df.select(
        sha2(F.col('SellerID'), 256).alias('SellerHashKey'),
        'SellerName',
        'Email',
        'PhoneNumber',
        'Country',
        'JoinDate',
        F.current_timestamp().alias('LoadDate'),
        F.lit('SellerSource').alias('RecordSource')
    )
)

sat_seller_details.write.format('delta').mode('overwrite').saveAsTable('sat_seller_details')


In [0]:
# %sql
# DROP TABLE IF EXISTS sat_seller_details;
# CREATE TABLE sat_seller_details AS
# SELECT
#     SHA2(SellerID, 256) AS SellerHashKey,
#     SellerName,
#     Email,
#     PhoneNumber,
#     Country,
#     JoinDate,
#     current_timestamp() AS LoadDate,
#     'SellerSource' AS RecordSource
# FROM src_sellers;


In [0]:
sat_product_details = (
    products_df.select(
        sha2(F.col('ProductID'), 256).alias('ProductHashKey'),
        'ProductName',
        'Category',
        'Price',
        'SellerID',
        F.current_timestamp().alias('LoadDate'),
        F.lit('ProductSource').alias('RecordSource')
    )
)

sat_product_details.write.format('delta').mode('overwrite').saveAsTable('sat_product_details')


In [0]:
# %sql
# DROP TABLE IF EXISTS sat_product_details;
# CREATE TABLE sat_product_details AS
# SELECT
#     SHA2(ProductID, 256) AS ProductHashKey,
#     ProductName,
#     Category,
#     Price,
#     SellerID,
#     current_timestamp() AS LoadDate,
#     'ProductSource' AS RecordSource
# FROM src_products;


In [0]:
sat_order_details = (
    orders_df.select(
        sha2(F.col('OrderID'), 256).alias('OrderHashKey'),
        'OrderDate',
        'TotalAmount',
        'Currency',
        F.current_timestamp().alias('LoadDate'),
        F.lit('OrderSource').alias('RecordSource')
    )
)

sat_order_details.write.format('delta').mode('overwrite').saveAsTable('sat_order_details')


In [0]:
# %sql
# DROP TABLE IF EXISTS sat_order_details;
# CREATE TABLE sat_order_details AS
# SELECT
#     SHA2(OrderID, 256) AS OrderHashKey,
#     OrderDate,
#     TotalAmount,
#     Currency,
#     current_timestamp() AS LoadDate,
#     'OrderSource' AS RecordSource
# FROM src_orders;


In [0]:
sat_shipment_details = (
    shipments_df.select(
        sha2(F.col('OrderID'), 256).alias('OrderHashKey'),
        'ShipmentID',
        'ShipmentDate',
        'Carrier',
        'Status',
        F.current_timestamp().alias('LoadDate'),
        F.lit('ShipmentSource').alias('RecordSource')
    )
)

sat_shipment_details.write.format('delta').mode('overwrite').saveAsTable('sat_shipment_details')


In [0]:
# %sql
# DROP TABLE IF EXISTS sat_shipment_details;
# CREATE TABLE sat_shipment_details AS
# SELECT
#     SHA2(OrderID, 256) AS OrderHashKey,
#     ShipmentID,
#     ShipmentDate,
#     Carrier,
#     Status,
#     current_timestamp() AS LoadDate,
#     'ShipmentSource' AS RecordSource
# FROM src_shipments;


# Section 4: Building the Business Vault

In [0]:
product_sales_stats = (
    order_items_df.groupBy('ProductID')
    .agg(
        F.sum(F.col('Quantity') * F.col('UnitPrice')).alias('TotalSales'),
        F.countDistinct('OrderID').alias('OrderCount'),
        F.sum('Quantity').alias('TotalQuantitySold')
    )
    .withColumn('LoadDate', F.current_timestamp())
    .withColumn('RecordSource', F.lit('BusinessCalculation'))
    .withColumn('ProductHashKey', sha2(F.col('ProductID'), 256))
    .select('ProductHashKey', 'TotalSales', 'OrderCount', 'TotalQuantitySold', 'LoadDate', 'RecordSource')
)

product_sales_stats.write.format('delta').mode('overwrite').saveAsTable('sat_product_sales_stats')


In [0]:
# %sql
# DROP TABLE IF EXISTS sat_product_sales_stats;
# CREATE TABLE sat_product_sales_stats AS
# SELECT
#     SHA2(ProductID, 256) AS ProductHashKey,
#     SUM(Quantity * UnitPrice) AS TotalSales,
#     COUNT(DISTINCT OrderID) AS OrderCount,
#     SUM(Quantity) AS TotalQuantitySold,
#     current_timestamp() AS LoadDate,
#     'BusinessCalculation' AS RecordSource
# FROM src_order_items
# GROUP BY ProductID;


###What is a Point-In-Time (PIT) Table?
A Point-In-Time (PIT) table is a structure in the Data Vault that provides a snapshot of the data at a specific point in time. It serves as a bridge between the historical, granular data in the Raw Vault and the current, consolidated view needed for reporting and analytics.

###Key Characteristics:
- Simplifies Data Retrieval: Combines multiple satellite effective dates into a single record per business key.
- Performance Optimization: Reduces the complexity of joins required to retrieve data from multiple satellites.
- Historical Consistency: Ensures that data is consistent and accurate as of a specific timestamp.

###When and Where to Use PIT Tables:
- Historical Reporting: When you need to report on data as it was at a specific point in time.
- Slowly Changing Dimensions: To track changes in attributes over time for dimensions.
- Simplifying Queries: To make querying large datasets more efficient by pre-joining necessary data.

###Creating a Point-In-Time (PIT) Table
We'll create a PIT table to capture the state of customer and order data as of the most recent load date.

Steps:
1. Identify the Hubs and Satellites to include.
2. Determine the Effective Dates from the satellites.
3. Join the Hubs and Satellites on the hash keys and effective dates.
4. Create the PIT Table with the relevant keys and timestamps.

In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Get the latest load date for each satellite
customer_sat = spark.table('sat_customer_details')
order_sat = spark.table('sat_order_details')

# Create window specifications
customer_window = Window.partitionBy('CustomerHashKey').orderBy(F.col('LoadDate').desc())
order_window = Window.partitionBy('OrderHashKey').orderBy(F.col('LoadDate').desc())

# Get the latest record for each customer
latest_customer_sat = (
    customer_sat.withColumn('RowNum', F.row_number().over(customer_window))
    .filter(F.col('RowNum') == 1)
    .select('CustomerHashKey', F.col('LoadDate').alias('LatestCustomerLoadDate'))  # Alias here to avoid ambiguity
)

# Get the latest record for each order
latest_order_sat = (
    order_sat.withColumn('RowNum', F.row_number().over(order_window))
    .filter(F.col('RowNum') == 1)
    .select('OrderHashKey', F.col('LoadDate').alias('LatestOrderLoadDate'))  # Alias here to avoid ambiguity
)

# Join hubs with the latest satellite load dates
customer_pit = (
    spark.table('hub_customer')
    .join(latest_customer_sat, 'CustomerHashKey', 'left')
    .select('CustomerHashKey', F.col('LatestCustomerLoadDate').alias('CustomerLoadDate'))  # Use aliased column
)

order_pit = (
    spark.table('hub_order')
    .join(latest_order_sat, 'OrderHashKey', 'left')
    .select('OrderHashKey', F.col('LatestOrderLoadDate').alias('OrderLoadDate'))  # Use aliased column
)

# Combine customer and order PIT tables
customer_order_pit = (
    customer_pit.join(spark.table('link_order_customer'), 'CustomerHashKey', 'inner')
    .join(order_pit, 'OrderHashKey', 'inner')
    .select('CustomerHashKey', 'OrderHashKey', 'CustomerLoadDate', 'OrderLoadDate')
)

# Save the PIT table
customer_order_pit.write.format('delta').mode('overwrite').saveAsTable('pit_customer_order')


In [0]:
# %sql
# -- Get the latest record for each customer
# WITH latest_customer AS (
#     SELECT
#         CustomerHashKey,
#         LoadDate,
#         ROW_NUMBER() OVER (PARTITION BY CustomerHashKey ORDER BY LoadDate DESC) AS RowNum
#     FROM sat_customer_details
# )
# SELECT * FROM latest_customer WHERE RowNum = 1;

# -- Create the customer PIT
# DROP TABLE IF EXISTS customer_pit;
# CREATE TABLE customer_pit AS
# SELECT
#     h.CustomerHashKey,
#     s.LoadDate AS CustomerLoadDate
# FROM hub_customer h
# LEFT JOIN (
#     SELECT CustomerHashKey, LoadDate
#     FROM (
#         SELECT
#             CustomerHashKey,
#             LoadDate,
#             ROW_NUMBER() OVER (PARTITION BY CustomerHashKey ORDER BY LoadDate DESC) AS RowNum
#         FROM sat_customer_details
#     ) WHERE RowNum = 1
# ) s ON h.CustomerHashKey = s.CustomerHashKey;

# -- Get the latest record for each order
# WITH latest_order AS (
#     SELECT
#         OrderHashKey,
#         LoadDate,
#         ROW_NUMBER() OVER (PARTITION BY OrderHashKey ORDER BY LoadDate DESC) AS RowNum
#     FROM sat_order_details
# )
# SELECT * FROM latest_order WHERE RowNum = 1;

# -- Create the order PIT
# DROP TABLE IF EXISTS order_pit;
# CREATE TABLE order_pit AS
# SELECT
#     h.OrderHashKey,
#     s.LoadDate AS OrderLoadDate
# FROM hub_order h
# LEFT JOIN (
#     SELECT OrderHashKey, LoadDate
#     FROM (
#         SELECT
#             OrderHashKey,
#             LoadDate,
#             ROW_NUMBER() OVER (PARTITION BY OrderHashKey ORDER BY LoadDate DESC) AS RowNum
#         FROM sat_order_details
#     ) WHERE RowNum = 1
# ) s ON h.OrderHashKey = s.OrderHashKey;

# -- Combine customer and order PIT tables
# DROP TABLE IF EXISTS pit_customer_order;
# CREATE TABLE pit_customer_order AS
# SELECT
#     c.CustomerHashKey,
#     o.OrderHashKey,
#     c.CustomerLoadDate,
#     o.OrderLoadDate
# FROM customer_pit c
# JOIN link_order_customer loc ON c.CustomerHashKey = loc.CustomerHashKey
# JOIN order_pit o ON loc.OrderHashKey = o.OrderHashKey;


### Why is the PIT Table Useful?
- Efficient Data Retrieval: Simplifies queries by reducing the number of joins and lookups.
- Consistent Reporting: Ensures that all data is reported as of the same point in time.
- Historical Accuracy: Allows for accurate historical analysis without complex time-travel queries

# Section 5:  Creating Output Business Tables (OBTs)

In [0]:
trending_products = (
    orders_df.alias('o')
    .join(order_items_df.alias('oi'), 'OrderID')
    .join(products_df.alias('p'), 'ProductID')
    .join(customers_df.alias('c'), 'CustomerID')
    .groupBy('c.Country', 'p.Category', 'p.ProductName')
    .agg(
        F.count('o.OrderID').alias('OrderCount'),
        F.sum('oi.Quantity').alias('TotalUnitsSold')
    )
    .orderBy(F.col('OrderCount').desc())
)

trending_products.write.format('delta').mode('overwrite').saveAsTable('obt_trending_products')


In [0]:
# %sql
# DROP TABLE IF EXISTS obt_trending_products;
# CREATE TABLE obt_trending_products AS
# SELECT
#     c.Country,
#     p.Category,
#     p.ProductName,
#     COUNT(o.OrderID) AS OrderCount,
#     SUM(oi.Quantity) AS TotalUnitsSold
# FROM src_orders o
# JOIN src_order_items oi ON o.OrderID = oi.OrderID
# JOIN src_products p ON oi.ProductID = p.ProductID
# JOIN src_customers c ON o.CustomerID = c.CustomerID
# GROUP BY c.Country, p.Category, p.ProductName
# ORDER BY OrderCount DESC;


In [0]:
#Assuming we have a CustomerSatisfaction score (we can simulate it).

# Simulate Customer Satisfaction
orders_with_satisfaction = orders_df.withColumn('SatisfactionScore', F.rand() * 5)

delivery_times = (
    orders_with_satisfaction.alias('o')
    .join(shipments_df.alias('s'), 'OrderID')
    .join(customers_df.alias('c'), 'CustomerID')
    .withColumn('DeliveryTimeDays', F.datediff('s.ShipmentDate', 'o.OrderDate'))
    .groupBy('c.Country')
    .agg(
        F.avg('DeliveryTimeDays').alias('AvgDeliveryTime'),
        F.avg('o.SatisfactionScore').alias('AvgSatisfactionScore'),
        F.count('o.OrderID').alias('TotalOrders')
    )
    .orderBy('AvgDeliveryTime')
)

delivery_times.write.format('delta').mode('overwrite').saveAsTable('obt_delivery_time_satisfaction')


In [0]:
# %sql
# DROP TABLE IF EXISTS obt_delivery_time_satisfaction;
# CREATE TABLE obt_delivery_time_satisfaction AS
# SELECT
#     c.Country,
#     AVG(DATEDIFF(s.ShipmentDate, o.OrderDate)) AS AvgDeliveryTime,
#     AVG(RAND() * 5) AS AvgSatisfactionScore,
#     COUNT(o.OrderID) AS TotalOrders
# FROM src_orders o
# JOIN src_shipments s ON o.OrderID = s.OrderID
# JOIN src_customers c ON o.CustomerID = c.CustomerID
# GROUP BY c.Country
# ORDER BY AvgDeliveryTime;


In [0]:
last_month_orders = orders_df.filter(
    F.col('OrderDate') >= F.date_sub(F.current_date(), 30)
)

top_sellers = (
    last_month_orders.alias('o')
    .join(order_items_df.alias('oi'), 'OrderID')
    .join(products_df.alias('p'), 'ProductID')
    .join(sellers_df.alias('s'), 'SellerID')
    .groupBy('s.SellerID', 's.SellerName')
    .agg(
        F.sum(F.col('oi.Quantity') * F.col('oi.UnitPrice')).alias('TotalSales'),
        F.countDistinct('o.OrderID').alias('OrderCount')
    )
    .orderBy(F.col('TotalSales').desc())
    .limit(10)
)

top_sellers.write.format('delta').mode('overwrite').saveAsTable('obt_top_sellers_last_month')


In [0]:
# %sql
# DROP TABLE IF EXISTS obt_top_sellers_last_month;
# CREATE TABLE obt_top_sellers_last_month AS
# SELECT
#     s.SellerID,
#     s.SellerName,
#     SUM(oi.Quantity * oi.UnitPrice) AS TotalSales,
#     COUNT(DISTINCT o.OrderID) AS OrderCount
# FROM src_orders o
# JOIN src_order_items oi ON o.OrderID = oi.OrderID
# JOIN src_products p ON oi.ProductID = p.ProductID
# JOIN src_sellers s ON p.SellerID = s.SellerID
# WHERE o.OrderDate >= DATE_SUB(CURRENT_DATE(), 30)
# GROUP BY s.SellerID, s.SellerName
# ORDER BY TotalSales DESC
# LIMIT 10;


###Creating an OBT Using the PIT Table
####Business Question:
- How has customer purchasing behavior changed over time, considering only the most recent customer and order information?

- We aim to analyze customer purchases using the latest available data without dealing with historical changes in customer details or orders.

OBT: Customer Purchase Behavior Snapshot

In [0]:
# Read the PIT table
pit = spark.table('pit_customer_order')

# Join with satellites to get the latest customer and order details
customer_details = spark.table('sat_customer_details')
order_details = spark.table('sat_order_details')

# Join PIT with customer details
customer_snapshot = (
    pit.join(customer_details, (pit.CustomerHashKey == customer_details.CustomerHashKey) &
             (pit.CustomerLoadDate == customer_details.LoadDate), 'inner')
    .select(pit.CustomerHashKey, 'FirstName', 'LastName', 'Email', 'Country')
)

# Join PIT with order details
order_snapshot = (
    pit.join(order_details, (pit.OrderHashKey == order_details.OrderHashKey) &
             (pit.OrderLoadDate == order_details.LoadDate), 'inner')
    .select(pit.OrderHashKey, 'OrderDate', 'TotalAmount', 'Currency')
)

# Combine customer and order snapshots
customer_order_snapshot = (
    customer_snapshot.join(pit, 'CustomerHashKey', 'inner')
    .join(order_snapshot, 'OrderHashKey', 'inner')
    .select('CustomerHashKey', 'FirstName', 'LastName', 'Email', 'Country',
            'OrderHashKey', 'OrderDate', 'TotalAmount', 'Currency')
)

# Save the OBT
customer_order_snapshot.write.format('delta').mode('overwrite').saveAsTable('obt_customer_purchase_snapshot')


In [0]:
# %sql
# -- Create customer snapshot
# DROP TABLE IF EXISTS customer_snapshot;
# CREATE TABLE customer_snapshot AS
# SELECT
#     p.CustomerHashKey,
#     s.FirstName,
#     s.LastName,
#     s.Email,
#     s.Country
# FROM pit_customer_order p
# JOIN sat_customer_details s
# ON p.CustomerHashKey = s.CustomerHashKey AND p.CustomerLoadDate = s.LoadDate;

# -- Create order snapshot
# DROP TABLE IF EXISTS order_snapshot;
# CREATE TABLE order_snapshot AS
# SELECT
#     p.OrderHashKey,
#     s.OrderDate,
#     s.TotalAmount,
#     s.Currency
# FROM pit_customer_order p
# JOIN sat_order_details s
# ON p.OrderHashKey = s.OrderHashKey AND p.OrderLoadDate = s.LoadDate;

# -- Combine customer and order snapshots
# DROP TABLE IF EXISTS obt_customer_purchase_snapshot;
# CREATE TABLE obt_customer_purchase_snapshot AS
# SELECT
#     c.CustomerHashKey,
#     c.FirstName,
#     c.LastName,
#     c.Email,
#     c.Country,
#     p.OrderHashKey,
#     o.OrderDate,
#     o.TotalAmount,
#     o.Currency
# FROM customer_snapshot c
# JOIN pit_customer_order p ON c.CustomerHashKey = p.CustomerHashKey
# JOIN order_snapshot o ON p.OrderHashKey = o.OrderHashKey;


# Section 6: Answering Business Questions

## section 6.1
- Question 1: Which products are trending in different regions?
Answer:

Query the obt_trending_products table to identify top products by country and category.

In [0]:
%sql
SELECT * FROM obt_trending_products ORDER BY OrderCount DESC LIMIT 20;


Country,Category,ProductName,OrderCount,TotalUnitsSold
Mali,Electronics,Future-proofed interactive access,4,7
United Arab Emirates,Electronics,Reactive even-keeled website,4,7
Cuba,Sports,Ameliorated regional solution,3,6
Guam,Home & Kitchen,Programmable bandwidth-monitored core,3,6
Mali,Clothing,Mandatory user-facing protocol,3,3
Saint Barthelemy,Clothing,Profit-focused multi-tasking frame,3,7
Romania,Electronics,Reactive discrete pricing structure,3,3
Hungary,Sports,Organic methodical Local Area Network,3,5
Western Sahara,Sports,Enhanced explicit core,3,3
Greenland,Sports,Enterprise-wide 24hour Internet solution,3,7


## section 6.2
Question 2: What is the average delivery time, and how does it affect customer satisfaction?
Answer:

Query the obt_delivery_time_satisfaction table.

In [0]:
%sql
SELECT * FROM obt_delivery_time_satisfaction ORDER BY AvgDeliveryTime;


Country,AvgDeliveryTime,AvgSatisfactionScore,TotalOrders
Russian Federation,3.6,2.359235649139258,5
Poland,4.142857142857143,2.702707090277352,7
Niger,5.0,0.962579808402847,2
Saint Lucia,5.0,2.1537124924038604,4
Morocco,5.466666666666667,2.7609975912827807,15
Heard Island and McDonald Islands,5.5,3.325477591381899,6
Bulgaria,5.533333333333333,2.567520361931209,15
Nigeria,5.565217391304348,2.43107107459153,23
Kenya,5.666666666666667,2.028934722245221,9
American Samoa,5.75,2.040257041293837,8


## section 6.3:
Question 3: Who are the top-performing sellers in the last month?
Answer:

Query the obt_top_sellers_last_month table.

In [0]:
%sql
SELECT * FROM obt_top_sellers_last_month;


SellerID,SellerName,TotalSales,OrderCount
854c0e57-8449-4082-be32-7f16ed802b50,Ryan Inc,11400.05,14
7f1012be-e05d-41c7-8743-d6de52d8ad2a,Johnson-Jones,11126.47,17
f69f03e4-b148-4bd3-9c95-f27aad87e204,"Buckley, Wilson and Ford",9832.670000000002,14
18adc41f-291a-40ff-886b-b43afa1582c5,Bauer Ltd,9654.77,13
0631d223-97c0-40f1-bd9d-d9da9ac1b519,Green Inc,9117.05,19
6bbf9213-474f-4c51-910f-6149f4d34e72,Thomas PLC,9035.61,12
6d9da4b6-9eee-4713-aaf6-000267c3fcea,Schmidt-Summers,9001.56,15
c9d2ec10-e1ff-4258-a667-a6716c24ecad,Le-George,8514.3,13
8c7c3ff5-ee2d-4abc-95e0-5a05a844bfe2,"Odom, Sanders and Jenkins",8249.37,11
30b88931-ec3c-43a1-a351-e5188af71b74,"Greene, Hardy and Mendoza",8120.3499999999985,11


## section 6.4 business question with pit tables
###Business Question:
- Which customers have increased their average order value over the last six months compared to the previous six months?

###Solution:
- We'll use the obt_customer_purchase_snapshot OBT to calculate the average order value for each customer in two time periods and identify those with increased spending.

In [0]:
from pyspark.sql.functions import col, avg, when

# Define date thresholds
six_months_ago = F.add_months(F.current_date(), -6)

# Filter orders in the last six months
recent_orders = customer_order_snapshot.filter(col('OrderDate') >= six_months_ago)

# Filter orders between six and twelve months ago
previous_orders = customer_order_snapshot.filter((col('OrderDate') < six_months_ago) &
                                                 (col('OrderDate') >= F.add_months(F.current_date(), -12)))

# Calculate average order value for recent orders
recent_avg = (
    recent_orders.groupBy('CustomerHashKey', 'FirstName', 'LastName')
    .agg(avg('TotalAmount').alias('RecentAvgOrderValue'))
)

# Calculate average order value for previous orders
previous_avg = (
    previous_orders.groupBy('CustomerHashKey')
    .agg(avg('TotalAmount').alias('PreviousAvgOrderValue'))
)

# Join the two datasets
avg_order_comparison = (
    recent_avg.join(previous_avg, 'CustomerHashKey', 'inner')
    .withColumn('IncreaseInAvgOrderValue', col('RecentAvgOrderValue') - col('PreviousAvgOrderValue'))
    .filter(col('IncreaseInAvgOrderValue') > 0)
    .select('CustomerHashKey', 'FirstName', 'LastName', 'RecentAvgOrderValue', 'PreviousAvgOrderValue', 'IncreaseInAvgOrderValue')
    .orderBy(col('IncreaseInAvgOrderValue').desc())
)

# Save the results
avg_order_comparison.write.format('delta').mode('overwrite').saveAsTable('obt_customer_avg_order_increase')


In [0]:
# %sql
# -- Define date thresholds
# SET six_months_ago = date_add(current_date(), -180);
# SET twelve_months_ago = date_add(current_date(), -365);

# -- Recent orders
# DROP TABLE IF EXISTS recent_orders;
# CREATE TABLE recent_orders AS
# SELECT *
# FROM obt_customer_purchase_snapshot
# WHERE OrderDate >= '${six_months_ago}';

# -- Previous orders
# DROP TABLE IF EXISTS previous_orders;
# CREATE TABLE previous_orders AS
# SELECT *
# FROM obt_customer_purchase_snapshot
# WHERE OrderDate < '${six_months_ago}' AND OrderDate >= '${twelve_months_ago}';

# -- Calculate average order value for recent orders
# DROP TABLE IF EXISTS recent_avg;
# CREATE TABLE recent_avg AS
# SELECT
#     CustomerHashKey,
#     FirstName,
#     LastName,
#     AVG(TotalAmount) AS RecentAvgOrderValue
# FROM recent_orders
# GROUP BY CustomerHashKey, FirstName, LastName;

# -- Calculate average order value for previous orders
# DROP TABLE IF EXISTS previous_avg;
# CREATE TABLE previous_avg AS
# SELECT
#     CustomerHashKey,
#     AVG(TotalAmount) AS PreviousAvgOrderValue
# FROM previous_orders
# GROUP BY CustomerHashKey;

# -- Join and compare
# DROP TABLE IF EXISTS obt_customer_avg_order_increase;
# CREATE TABLE obt_customer_avg_order_increase AS
# SELECT
#     r.CustomerHashKey,
#     r.FirstName,
#     r.LastName,
#     r.RecentAvgOrderValue,
#     p.PreviousAvgOrderValue,
#     (r.RecentAvgOrderValue - p.PreviousAvgOrderValue) AS IncreaseInAvgOrderValue
# FROM recent_avg r
# JOIN previous_avg p ON r.CustomerHashKey = p.CustomerHashKey
# WHERE (r.RecentAvgOrderValue - p.PreviousAvgOrderValue) > 0
# ORDER BY IncreaseInAvgOrderValue DESC;


# Section 7:  Conclusion
In this session, we've:

- Generated realistic source data using the Faker library.
- Implemented the Data Vault 2.0 model in Databricks Community Edition.
- Provided code examples in both PySpark and Spark SQL.
- Built the Raw Vault (hubs, links, satellites).
- Constructed the Business Vault with derived satellites.
- Created three Output Business Tables (OBTs) to answer key business questions.
- Used a scenario that closely mirrors real-world industry challenges.

# Section 8 :
Next Steps and Best Practices

###Data Quality and Governance
- Consistent Hashing: Ensure the same hashing algorithm is used throughout.
- Record Lineage: Maintain RecordSource and LoadDate for auditability.
- Data Validation: Implement checks to validate data integrity.
###Performance Optimization
- Partitioning: Partition large tables by date or key columns.
- Indexing: Use appropriate indexing to speed up queries.
- Caching: Cache frequently accessed tables during sessions.
###Further Learning
####Books:
- "Building a Scalable Data Warehouse with Data Vault 2.0" by Dan Linstedt and Michael Olschimke
- "Data Engineering with Apache Spark, Delta Lake, and Lakehouse" by Manoj Kukreja