# MallardDV Demo Notebook

This notebook demonstrates how to use MallardDV, a lightweight Python implementation of Data Vault 2.0 methodology using DuckDB.

## Setup and Installation

First, let's make sure we have all the necessary dependencies installed.

In [None]:
# Install MallardDV from the local repository
!pip install -e .. 

# Verify installation
!pip list | grep mallard

## 1. Creating a Sample Dataset

Let's create a more comprehensive sample dataset for our demonstration. We'll create customer, product, and order data.

In [None]:
import pandas as pd
import os

# Create data directory if it doesn't exist
os.makedirs('../demo/data_notebook', exist_ok=True)

# Sample customers
customers = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'first_name': ['John', 'Jane', 'Robert', 'Sarah', 'Michael'],
    'last_name': ['Doe', 'Smith', 'Johnson', 'Williams', 'Brown'],
    'email': ['john.doe@example.com', 'jane.smith@example.com', 'robert.j@example.com', 
              'sarah.w@example.com', 'michael.b@example.com'],
    'created_date': ['2025-01-15 10:30:00', '2025-01-16 14:45:00', '2025-01-17 09:15:00',
                     '2025-01-18 16:20:00', '2025-01-19 11:35:00'],
    'referenced_by': [None, 1, 1, 2, 3],
    'reference_code': [None, 101, 102, 201, 301]
})

# Convert None to empty string for the CSV format
customers = customers.fillna('')

# Sample products
products = pd.DataFrame({
    'id': ['P001', 'P002', 'P003', 'P004', 'P005'],
    'name': ['Laptop', 'Smartphone', 'Coffee Maker', 'Running Shoes', 'Desk Chair'],
    'description': ['High-performance laptop', 'Latest smartphone model', 'Automatic coffee machine',
                   'Sports running shoes', 'Ergonomic office chair'],
    'price': [1299.99, 899.99, 149.99, 89.99, 249.99],
    'category': ['Electronics', 'Electronics', 'Kitchen', 'Sportswear', 'Furniture']
})

# Sample orders
orders = pd.DataFrame({
    'id': [10001, 10002, 10003, 10004, 10005, 10006],
    'customer_id': [1, 2, 3, 1, 4, 5],
    'order_date': ['2025-01-20', '2025-01-21', '2025-01-22', '2025-01-23', '2025-01-24', '2025-01-25'],
    'status': ['Delivered', 'Processing', 'Shipped', 'Delivered', 'Processing', 'Processing']
})

# Sample order items
order_items = pd.DataFrame({
    'order_id': [10001, 10001, 10002, 10003, 10004, 10005, 10006],
    'product_id': ['P001', 'P002', 'P003', 'P002', 'P004', 'P005', 'P001'],
    'quantity': [1, 1, 2, 1, 2, 1, 1],
    'unit_price': [1299.99, 899.99, 149.99, 899.99, 89.99, 249.99, 1299.99]
})

# Save to CSV files
customers.to_csv('../demo/data_notebook/customers.csv', index=False)
products.to_csv('../demo/data_notebook/products.csv', index=False)
orders.to_csv('../demo/data_notebook/orders.csv', index=False)
order_items.to_csv('../demo/data_notebook/order_items.csv', index=False)

print("Sample datasets created successfully!")

# Preview the data
print("\nCustomers Preview:")
display(customers.head())

print("\nProducts Preview:")
display(products.head())

print("\nOrders Preview:")
display(orders.head())

print("\nOrder Items Preview:")
display(order_items.head())

## 2. Creating Metadata for Data Vault Structure

Now let's define the metadata that will create our Data Vault structures. We need to define two key metadata files:
1. `tables.csv` - Defines the Data Vault table structures
2. `transitions.csv` - Maps staging tables to Data Vault structures

In [None]:
# Define tables metadata
tables_data = [
    # Customer staging
    {'base_name': 'customer', 'rel_type': 'stg', 'column_name': 'id', 'column_type': 'INTEGER', 'column_position': 1, 'mapping': 'c'},
    {'base_name': 'customer', 'rel_type': 'stg', 'column_name': 'first_name', 'column_type': 'VARCHAR(255)', 'column_position': 2, 'mapping': 'c'},
    {'base_name': 'customer', 'rel_type': 'stg', 'column_name': 'last_name', 'column_type': 'VARCHAR(255)', 'column_position': 3, 'mapping': 'c'},
    {'base_name': 'customer', 'rel_type': 'stg', 'column_name': 'email', 'column_type': 'VARCHAR(255)', 'column_position': 4, 'mapping': 'c'},
    {'base_name': 'customer', 'rel_type': 'stg', 'column_name': 'created_date', 'column_type': 'TIMESTAMP', 'column_position': 5, 'mapping': 'c'},
    {'base_name': 'customer', 'rel_type': 'stg', 'column_name': 'referenced_by', 'column_type': 'INTEGER', 'column_position': 6, 'mapping': 'c'},
    {'base_name': 'customer', 'rel_type': 'stg', 'column_name': 'reference_code', 'column_type': 'INTEGER', 'column_position': 7, 'mapping': 'c'},
    
    # Customer hub
    {'base_name': 'customer', 'rel_type': 'hub', 'column_name': 'id', 'column_type': 'INTEGER', 'column_position': 1, 'mapping': 'bk'},
    
    # Customer satellite
    {'base_name': 'customer_details', 'rel_type': 'hsat', 'column_name': 'customer', 'column_type': '', 'column_position': 0, 'mapping': 'hk'},
    {'base_name': 'customer_details', 'rel_type': 'hsat', 'column_name': 'first_name', 'column_type': 'VARCHAR(255)', 'column_position': 1, 'mapping': 'c'},
    {'base_name': 'customer_details', 'rel_type': 'hsat', 'column_name': 'last_name', 'column_type': 'VARCHAR(255)', 'column_position': 2, 'mapping': 'c'},
    {'base_name': 'customer_details', 'rel_type': 'hsat', 'column_name': 'email', 'column_type': 'VARCHAR(255)', 'column_position': 3, 'mapping': 'c'},
    {'base_name': 'customer_details', 'rel_type': 'hsat', 'column_name': 'created_date', 'column_type': 'TIMESTAMP', 'column_position': 4, 'mapping': 'c'},
    
    # Customer referral link
    {'base_name': 'customer__referencer', 'rel_type': 'link', 'column_name': 'customer', 'column_type': '', 'column_position': 1, 'mapping': 'll'},
    {'base_name': 'customer__referencer', 'rel_type': 'link', 'column_name': 'referencer', 'column_type': '', 'column_position': 2, 'mapping': 'll'},
    {'base_name': 'customer__referencer', 'rel_type': 'link', 'column_name': 'reference_code', 'column_type': 'INTEGER', 'column_position': 3, 'mapping': 'dk'},
    
    # Product staging
    {'base_name': 'product', 'rel_type': 'stg', 'column_name': 'id', 'column_type': 'VARCHAR(20)', 'column_position': 1, 'mapping': 'c'},
    {'base_name': 'product', 'rel_type': 'stg', 'column_name': 'name', 'column_type': 'VARCHAR(255)', 'column_position': 2, 'mapping': 'c'},
    {'base_name': 'product', 'rel_type': 'stg', 'column_name': 'description', 'column_type': 'VARCHAR(1023)', 'column_position': 3, 'mapping': 'c'},
    {'base_name': 'product', 'rel_type': 'stg', 'column_name': 'price', 'column_type': 'DECIMAL(10,2)', 'column_position': 4, 'mapping': 'c'},
    {'base_name': 'product', 'rel_type': 'stg', 'column_name': 'category', 'column_type': 'VARCHAR(100)', 'column_position': 5, 'mapping': 'c'},
    
    # Product hub
    {'base_name': 'product', 'rel_type': 'hub', 'column_name': 'id', 'column_type': 'VARCHAR(20)', 'column_position': 1, 'mapping': 'bk'},
    
    # Product satellite
    {'base_name': 'product_details', 'rel_type': 'hsat', 'column_name': 'product', 'column_type': '', 'column_position': 0, 'mapping': 'hk'},
    {'base_name': 'product_details', 'rel_type': 'hsat', 'column_name': 'name', 'column_type': 'VARCHAR(255)', 'column_position': 1, 'mapping': 'f'},
    {'base_name': 'product_details', 'rel_type': 'hsat', 'column_name': 'description', 'column_type': 'VARCHAR(1023)', 'column_position': 2, 'mapping': 'f'},
    {'base_name': 'product_details', 'rel_type': 'hsat', 'column_name': 'price', 'column_type': 'DECIMAL(10,2)', 'column_position': 3, 'mapping': 'f'},
    {'base_name': 'product_details', 'rel_type': 'hsat', 'column_name': 'category', 'column_type': 'VARCHAR(100)', 'column_position': 4, 'mapping': 'f'},
    
    # Order staging
    {'base_name': 'order', 'rel_type': 'stg', 'column_name': 'id', 'column_type': 'INTEGER', 'column_position': 1, 'mapping': 'c'},
    {'base_name': 'order', 'rel_type': 'stg', 'column_name': 'customer_id', 'column_type': 'INTEGER', 'column_position': 2, 'mapping': 'c'},
    {'base_name': 'order', 'rel_type': 'stg', 'column_name': 'order_date', 'column_type': 'DATE', 'column_position': 3, 'mapping': 'c'},
    {'base_name': 'order', 'rel_type': 'stg', 'column_name': 'status', 'column_type': 'VARCHAR(50)', 'column_position': 4, 'mapping': 'c'},
    
    # Order hub
    {'base_name': 'order', 'rel_type': 'hub', 'column_name': 'id', 'column_type': 'INTEGER', 'column_position': 1, 'mapping': 'bk'},
    
    # Order satellite
    {'base_name': 'order_details', 'rel_type': 'hsat', 'column_name': 'order', 'column_type': '', 'column_position': 0, 'mapping': 'hk'},
    {'base_name': 'order_details', 'rel_type': 'hsat', 'column_name': 'order_date', 'column_type': 'DATE', 'column_position': 1, 'mapping': 'f'},
    {'base_name': 'order_details', 'rel_type': 'hsat', 'column_name': 'status', 'column_type': 'VARCHAR(50)', 'column_position': 2, 'mapping': 'f'},
    
    # Customer-Order link
    {'base_name': 'customer__order', 'rel_type': 'link', 'column_name': 'customer', 'column_type': '', 'column_position': 1, 'mapping': 'll'},
    {'base_name': 'customer__order', 'rel_type': 'link', 'column_name': 'order', 'column_type': '', 'column_position': 2, 'mapping': 'll'},
    
    # Order item staging
    {'base_name': 'order_item', 'rel_type': 'stg', 'column_name': 'order_id', 'column_type': 'INTEGER', 'column_position': 1, 'mapping': 'c'},
    {'base_name': 'order_item', 'rel_type': 'stg', 'column_name': 'product_id', 'column_type': 'VARCHAR(20)', 'column_position': 2, 'mapping': 'c'},
    {'base_name': 'order_item', 'rel_type': 'stg', 'column_name': 'quantity', 'column_type': 'INTEGER', 'column_position': 3, 'mapping': 'c'},
    {'base_name': 'order_item', 'rel_type': 'stg', 'column_name': 'unit_price', 'column_type': 'DECIMAL(10,2)', 'column_position': 4, 'mapping': 'c'},
    
    # Order-Product link
    {'base_name': 'order__product', 'rel_type': 'link', 'column_name': 'order', 'column_type': '', 'column_position': 1, 'mapping': 'll'},
    {'base_name': 'order__product', 'rel_type': 'link', 'column_name': 'product', 'column_type': '', 'column_position': 2, 'mapping': 'll'},
    
    # Order-Product satellite
    {'base_name': 'order__product', 'rel_type': 'lsat', 'column_name': 'order__product', 'column_type': '', 'column_position': 0, 'mapping': 'hk'},
    {'base_name': 'order__product', 'rel_type': 'lsat', 'column_name': 'quantity', 'column_type': 'INTEGER', 'column_position': 1, 'mapping': 'f'},
    {'base_name': 'order__product', 'rel_type': 'lsat', 'column_name': 'unit_price', 'column_type': 'DECIMAL(10,2)', 'column_position': 2, 'mapping': 'f'}
]

# Define transitions metadata
transitions_data = [
    # Customer transitions
    {'source_table': 'customer', 'source_field': 'id', 'target_table': 'hub_customer', 'target_field': 'id_bk', 'group_name': 'customer', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'bk'},
    {'source_table': 'customer', 'source_field': 'first_name', 'target_table': 'hsat_customer_details', 'target_field': 'first_name', 'group_name': 'customer_details', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    {'source_table': 'customer', 'source_field': 'last_name', 'target_table': 'hsat_customer_details', 'target_field': 'last_name', 'group_name': 'customer_details', 'position': 2, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    {'source_table': 'customer', 'source_field': 'email', 'target_table': 'hsat_customer_details', 'target_field': 'email', 'group_name': 'customer_details', 'position': 3, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    {'source_table': 'customer', 'source_field': 'created_date', 'target_table': 'hsat_customer_details', 'target_field': 'created_date', 'group_name': 'customer_details', 'position': 4, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    {'source_table': 'customer', 'source_field': 'customer_hk', 'target_table': 'hsat_customer_details', 'target_field': 'customer', 'group_name': 'customer_details', 'position': 0, 'raw': False, 'transformation': '', 'transfer_type': 'sat_delta'},
    
    # Customer referral transitions
    {'source_table': 'customer', 'source_field': 'referenced_by', 'target_table': 'hub_customer', 'target_field': 'id_bk', 'group_name': 'referencer', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'bk'},
    {'source_table': 'customer', 'source_field': 'customer', 'target_table': 'link_customer__referencer', 'target_field': 'customer_hk', 'group_name': 'l_reference', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'll'},
    {'source_table': 'customer', 'source_field': 'referencer', 'target_table': 'link_customer__referencer', 'target_field': 'referencer_hk', 'group_name': 'l_reference', 'position': 2, 'raw': False, 'transformation': '', 'transfer_type': 'll'},
    {'source_table': 'customer', 'source_field': 'reference_code', 'target_table': 'link_customer__referencer', 'target_field': 'reference_code_dk', 'group_name': 'l_reference', 'position': 3, 'raw': False, 'transformation': '', 'transfer_type': 'dk'},
    
    # Product transitions
    {'source_table': 'product', 'source_field': 'id', 'target_table': 'hub_product', 'target_field': 'id_bk', 'group_name': 'product', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'bk'},
    {'source_table': 'product', 'source_field': 'product_hk', 'target_table': 'hsat_product_details', 'target_field': 'product', 'group_name': 'product_details', 'position': 0, 'raw': False, 'transformation': '', 'transfer_type': 'sat_delta'},
    {'source_table': 'product', 'source_field': 'name', 'target_table': 'hsat_product_details', 'target_field': 'name', 'group_name': 'product_details', 'position': 1, 'raw': False, 'transformation': 'trim(#)', 'transfer_type': 'f'},
    {'source_table': 'product', 'source_field': 'description', 'target_table': 'hsat_product_details', 'target_field': 'description', 'group_name': 'product_details', 'position': 2, 'raw': False, 'transformation': 'trim(#)', 'transfer_type': 'f'},
    {'source_table': 'product', 'source_field': 'price', 'target_table': 'hsat_product_details', 'target_field': 'price', 'group_name': 'product_details', 'position': 3, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    {'source_table': 'product', 'source_field': 'category', 'target_table': 'hsat_product_details', 'target_field': 'category', 'group_name': 'product_details', 'position': 4, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    
    # Order transitions
    {'source_table': 'order', 'source_field': 'id', 'target_table': 'hub_order', 'target_field': 'id_bk', 'group_name': 'order', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'bk'},
    {'source_table': 'order', 'source_field': 'order_hk', 'target_table': 'hsat_order_details', 'target_field': 'order', 'group_name': 'order_details', 'position': 0, 'raw': False, 'transformation': '', 'transfer_type': 'sat_delta'},
    {'source_table': 'order', 'source_field': 'order_date', 'target_table': 'hsat_order_details', 'target_field': 'order_date', 'group_name': 'order_details', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    {'source_table': 'order', 'source_field': 'status', 'target_table': 'hsat_order_details', 'target_field': 'status', 'group_name': 'order_details', 'position': 2, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    
    # Customer-Order link transitions
    {'source_table': 'order', 'source_field': 'customer_id', 'target_table': 'hub_customer', 'target_field': 'id_bk', 'group_name': 'customer_from_order', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'bk'},
    {'source_table': 'order', 'source_field': 'customer_from_order', 'target_table': 'link_customer__order', 'target_field': 'customer_hk', 'group_name': 'l_customer_order', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'll'},
    {'source_table': 'order', 'source_field': 'order', 'target_table': 'link_customer__order', 'target_field': 'order_hk', 'group_name': 'l_customer_order', 'position': 2, 'raw': False, 'transformation': '', 'transfer_type': 'll'},
    
    # Order item transitions
    {'source_table': 'order_item', 'source_field': 'order_id', 'target_table': 'hub_order', 'target_field': 'id_bk', 'group_name': 'order_from_item', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'bk'},
    {'source_table': 'order_item', 'source_field': 'product_id', 'target_table': 'hub_product', 'target_field': 'id_bk', 'group_name': 'product_from_item', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'bk'},
    {'source_table': 'order_item', 'source_field': 'order_from_item', 'target_table': 'link_order__product', 'target_field': 'order_hk', 'group_name': 'l_order_product', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'll'},
    {'source_table': 'order_item', 'source_field': 'product_from_item', 'target_table': 'link_order__product', 'target_field': 'product_hk', 'group_name': 'l_order_product', 'position': 2, 'raw': False, 'transformation': '', 'transfer_type': 'll'},
    {'source_table': 'order_item', 'source_field': 'l_order_product_hk', 'target_table': 'lsat_order__product', 'target_field': 'order__product', 'group_name': 's_order_product', 'position': 0, 'raw': False, 'transformation': '', 'transfer_type': 'sat_delta'},
    {'source_table': 'order_item', 'source_field': 'quantity', 'target_table': 'lsat_order__product', 'target_field': 'quantity', 'group_name': 's_order_product', 'position': 1, 'raw': False, 'transformation': '', 'transfer_type': 'f'},
    {'source_table': 'order_item', 'source_field': 'unit_price', 'target_table': 'lsat_order__product', 'target_field': 'unit_price', 'group_name': 's_order_product', 'position': 2, 'raw': False, 'transformation': '', 'transfer_type': 'f'}
]

# Convert to dataframes
tables_df = pd.DataFrame(tables_data)
transitions_df = pd.DataFrame(transitions_data)

# Create metadata directory
os.makedirs('../demo/data_notebook/metadata', exist_ok=True)

# Save to CSV files
tables_df.to_csv('../demo/data_notebook/metadata/tables.csv', index=False)
transitions_df.to_csv('../demo/data_notebook/metadata/transitions.csv', index=False)

print("Metadata files created successfully!")

# Preview the metadata
print("\nTables Metadata Preview:")
display(tables_df.head(10))

print("\nTransitions Metadata Preview:")
display(transitions_df.head(10))

## 3. Initializing and Loading the Data Vault

Now let's initialize MallardDV and load our data into the Data Vault structures.

In [None]:
from mallarddv import MallardDataVault
import os

# Set file paths
db_path = '../demo/data_notebook/retail_dv.db'
metadata_dir = '../demo/data_notebook/metadata'
data_dir = '../demo/data_notebook'

# Remove existing database if it exists
if os.path.exists(db_path):
    os.remove(db_path)
    print(f"Removed existing database at {db_path}")

# Initialize the Data Vault with verbose output to see what's happening
with MallardDataVault(db_path) as mdv:
    print("Initializing MallardDV database...")
    
    # Initialize the database with metadata
    errors = mdv.init_mallard_db(
        meta_only=False,
        meta_tables_path=f"{metadata_dir}/tables.csv",
        meta_transitions_path=f"{metadata_dir}/transitions.csv",
        verbose=True
    )
    
    if errors:
        print(f"Encountered {len(errors)} errors during initialization:")
        for sql, error in errors:
            print(f"Error: {error}")
            print(f"SQL: {sql}")
    else:
        print("Database initialized successfully!")

In [None]:
# Load data into the Data Vault
with MallardDataVault(db_path) as mdv:
    print("Loading customer data...")
    errors = mdv.execute_flow(
        "customer", 
        "notebook-demo", 
        f"{data_dir}/customers.csv",
        force_load=True, 
        verbose=True
    )
    
    if errors:
        print(f"Encountered {len(errors)} errors while loading customers:")
        for sql, error in errors:
            print(f"Error: {error}")
            print(f"SQL: {sql}")
    else:
        print("Customer data loaded successfully!")
        
    print("\nLoading product data...")
    errors = mdv.execute_flow(
        "product", 
        "notebook-demo", 
        f"{data_dir}/products.csv",
        force_load=True, 
        verbose=True
    )
    
    if errors:
        print(f"Encountered {len(errors)} errors while loading products:")
        for sql, error in errors:
            print(f"Error: {error}")
            print(f"SQL: {sql}")
    else:
        print("Product data loaded successfully!")
        
    print("\nLoading order data...")
    errors = mdv.execute_flow(
        "order", 
        "notebook-demo", 
        f"{data_dir}/orders.csv",
        force_load=True, 
        verbose=True
    )
    
    if errors:
        print(f"Encountered {len(errors)} errors while loading orders:")
        for sql, error in errors:
            print(f"Error: {error}")
            print(f"SQL: {sql}")
    else:
        print("Order data loaded successfully!")
        
    print("\nLoading order item data...")
    errors = mdv.execute_flow(
        "order_item", 
        "notebook-demo", 
        f"{data_dir}/order_items.csv",
        force_load=True, 
        verbose=True
    )
    
    if errors:
        print(f"Encountered {len(errors)} errors while loading order items:")
        for sql, error in errors:
            print(f"Error: {error}")
            print(f"SQL: {sql}")
    else:
        print("Order item data loaded successfully!")
        
    print("\nAll data loaded into the Data Vault!")

## 4. Querying Data from the Data Vault

Now that we have loaded our data, let's explore it with some queries to show how the Data Vault model works.

In [None]:
# Connect to the database and run some queries
with MallardDataVault(db_path) as mdv:
    # Query 1: List all customers with their details
    print("Query 1: All customers with details")
    result = mdv.sql("""
        SELECT 
            c.id_bk AS customer_id,
            d.first_name,
            d.last_name,
            d.email,
            d.created_date
        FROM 
            raw.hub_customer c
        JOIN 
            raw.current_hsat_customer_details d 
            ON c.customer_hk = d.customer_hk
        ORDER BY 
            c.id_bk
    """)
    display(result.fetchdf())
    
    # Query 2: Customer referrals
    print("\nQuery 2: Customer referrals")
    result = mdv.sql("""
        SELECT 
            c1.id_bk AS customer_id,
            cd1.first_name || ' ' || cd1.last_name AS customer_name,
            c2.id_bk AS referrer_id,
            cd2.first_name || ' ' || cd2.last_name AS referrer_name,
            l.reference_code_dk AS reference_code
        FROM 
            raw.link_customer__referencer l
        JOIN 
            raw.hub_customer c1 
            ON l.customer_hk = c1.customer_hk
        JOIN 
            raw.hub_customer c2 
            ON l.referencer_hk = c2.customer_hk
        JOIN 
            raw.current_hsat_customer_details cd1 
            ON c1.customer_hk = cd1.customer_hk
        JOIN 
            raw.current_hsat_customer_details cd2 
            ON c2.customer_hk = cd2.customer_hk
    """)
    display(result.fetchdf())
    
    # Query 3: Products with their details
    print("\nQuery 3: Products with details")
    result = mdv.sql("""
        SELECT 
            p.id_bk AS product_id,
            d.name,
            d.description,
            d.price,
            d.category
        FROM 
            raw.hub_product p
        JOIN 
            raw.current_hsat_product_details d 
            ON p.product_hk = d.product_hk
        ORDER BY 
            d.category, d.name
    """)
    display(result.fetchdf())
    
    # Query 4: Orders by customer
    print("\nQuery 4: Orders by customer")
    result = mdv.sql("""
        SELECT 
            c.id_bk AS customer_id,
            cd.first_name || ' ' || cd.last_name AS customer_name,
            o.id_bk AS order_id,
            od.order_date,
            od.status
        FROM 
            raw.link_customer__order l
        JOIN 
            raw.hub_customer c 
            ON l.customer_hk = c.customer_hk
        JOIN 
            raw.hub_order o 
            ON l.order_hk = o.order_hk
        JOIN 
            raw.current_hsat_customer_details cd 
            ON c.customer_hk = cd.customer_hk
        JOIN 
            raw.current_hsat_order_details od 
            ON o.order_hk = od.order_hk
        ORDER BY 
            c.id_bk, od.order_date
    """)
    display(result.fetchdf())
    
    # Query 5: Order details with products
    print("\nQuery 5: Order details with products")
    result = mdv.sql("""
        SELECT 
            o.id_bk AS order_id,
            p.id_bk AS product_id,
            pd.name AS product_name,
            s.quantity,
            s.unit_price,
            (s.quantity * s.unit_price) AS total_price
        FROM 
            raw.link_order__product l
        JOIN 
            raw.hub_order o 
            ON l.order_hk = o.order_hk
        JOIN 
            raw.hub_product p 
            ON l.product_hk = p.product_hk
        JOIN 
            raw.current_lsat_order__product s 
            ON l.order__product_hk = s.order__product_hk
        JOIN 
            raw.current_hsat_product_details pd 
            ON p.product_hk = pd.product_hk
        ORDER BY 
            o.id_bk, p.id_bk
    """)
    display(result.fetchdf())
    
    # Query 6: Customer order summary
    print("\nQuery 6: Customer order summary")
    result = mdv.sql("""
        WITH order_totals AS (
            SELECT 
                o.order_hk,
                SUM(s.quantity * s.unit_price) AS order_total
            FROM 
                raw.link_order__product l
            JOIN 
                raw.hub_order o 
                ON l.order_hk = o.order_hk
            JOIN 
                raw.current_lsat_order__product s 
                ON l.order__product_hk = s.order__product_hk
            GROUP BY 
                o.order_hk
        )
        SELECT 
            c.id_bk AS customer_id,
            cd.first_name || ' ' || cd.last_name AS customer_name,
            COUNT(DISTINCT l.order_hk) AS order_count,
            SUM(ot.order_total) AS total_spent
        FROM 
            raw.link_customer__order l
        JOIN 
            raw.hub_customer c 
            ON l.customer_hk = c.customer_hk
        JOIN 
            raw.current_hsat_customer_details cd 
            ON c.customer_hk = cd.customer_hk
        JOIN 
            order_totals ot 
            ON l.order_hk = ot.order_hk
        GROUP BY 
            c.id_bk, cd.first_name, cd.last_name
        ORDER BY 
            total_spent DESC
    """)
    display(result.fetchdf())

## 5. Modifying Data and Observing Historical Tracking

One of the key features of Data Vault is its ability to track historical changes. Let's update some data and observe how the history is maintained.

In [None]:
# Create an updated customer dataset with some changes
customers_updated = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'first_name': ['John', 'Jane', 'Robert', 'Sarah', 'Michael'],
    'last_name': ['Doe', 'Smith-Jones', 'Johnson', 'Williams', 'Brown'],  # Changed Jane's last name
    'email': ['john.doe@example.com', 'jane.smith@newdomain.com', 'robert.j@example.com',  # Changed Jane's email
              'sarah.w@example.com', 'michael.b@example.com'],
    'created_date': ['2025-01-15 10:30:00', '2025-01-16 14:45:00', '2025-01-17 09:15:00',
                     '2025-01-18 16:20:00', '2025-01-19 11:35:00'],
    'referenced_by': [None, 1, 1, 2, 3],
    'reference_code': [None, 101, 102, 201, 301]
})

# Convert None to empty string for the CSV format
customers_updated = customers_updated.fillna('')

# Save updated data
customers_updated.to_csv('../demo/data_notebook/customers_updated.csv', index=False)

print("Updated customer data created. Highlighting changes:")
# Show the differences
changes = customers_updated[customers_updated['last_name'] != customers['last_name']]
changes = pd.concat([changes, customers_updated[customers_updated['email'] != customers['email']]])
display(changes)

In [None]:
# Load the updated data
import datetime

with MallardDataVault(db_path) as mdv:
    # Load the updated data with a new timestamp
    update_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"Loading updated customer data with timestamp: {update_date}")
    
    errors = mdv.execute_flow(
        "customer", 
        "notebook-update", 
        "../demo/data_notebook/customers_updated.csv",
        load_date_overwrite=update_date,
        force_load=True, 
        verbose=True
    )
    
    if errors:
        print(f"Encountered {len(errors)} errors while loading updated customers:")
        for sql, error in errors:
            print(f"Error: {error}")
            print(f"SQL: {sql}")
    else:
        print("Updated customer data loaded successfully!")

In [None]:
# Now query the historical data for customer ID 2 (Jane Smith)
with MallardDataVault(db_path) as mdv:
    # Query historical data for Jane Smith
    print("Historical data for customer ID 2 (Jane Smith):")
    result = mdv.sql("""
        SELECT 
            c.id_bk AS customer_id,
            s.first_name,
            s.last_name,
            s.email,
            s.effective_from,
            s.load_date,
            s.record_source,
            s.deleted_flag
        FROM 
            raw.hub_customer c
        JOIN 
            raw.hsat_customer_details s 
            ON c.customer_hk = s.customer_hk
        WHERE 
            c.id_bk = 2
        ORDER BY 
            s.effective_from
    """)
    display(result.fetchdf())
    
    # Check the current view
    print("\nCurrent view for customer ID 2 (Jane Smith):")
    result = mdv.sql("""
        SELECT 
            c.id_bk AS customer_id,
            s.first_name,
            s.last_name,
            s.email
        FROM 
            raw.hub_customer c
        JOIN 
            raw.current_hsat_customer_details s 
            ON c.customer_hk = s.customer_hk
        WHERE 
            c.id_bk = 2
    """)
    display(result.fetchdf())

## 6. Point-in-Time Queries

Data Vault excels at point-in-time querying, allowing you to see data as it existed at any point in the past.

In [None]:
# Query data at a specific point in time
with MallardDataVault(db_path) as mdv:
    # Get the original load timestamp and update timestamp
    result = mdv.sql("""
        SELECT DISTINCT 
            effective_from 
        FROM 
            raw.hsat_customer_details 
        ORDER BY 
            effective_from
    """)
    timestamps = result.fetchall()
    original_time = timestamps[0][0]
    update_time = timestamps[1][0] if len(timestamps) > 1 else datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Query as of the original load time
    print(f"Customers as of original load time ({original_time}):")
    result = mdv.sql(f"""
        SELECT 
            c.id_bk AS customer_id,
            cd.first_name,
            cd.last_name,
            cd.email
        FROM 
            raw.hub_customer c
        JOIN (
            SELECT 
                s.customer_hk,
                s.first_name,
                s.last_name,
                s.email,
                ROW_NUMBER() OVER (
                    PARTITION BY s.customer_hk 
                    ORDER BY s.effective_from DESC
                ) as rn
            FROM 
                raw.hsat_customer_details s
            WHERE 
                s.effective_from <= '{original_time}'
        ) cd ON c.customer_hk = cd.customer_hk AND cd.rn = 1
        WHERE 
            c.id_bk = 2
    """)
    display(result.fetchdf())
    
    # Query as of the update time
    print(f"\nCustomers as of update time ({update_time}):")
    result = mdv.sql(f"""
        SELECT 
            c.id_bk AS customer_id,
            cd.first_name,
            cd.last_name,
            cd.email
        FROM 
            raw.hub_customer c
        JOIN (
            SELECT 
                s.customer_hk,
                s.first_name,
                s.last_name,
                s.email,
                ROW_NUMBER() OVER (
                    PARTITION BY s.customer_hk 
                    ORDER BY s.effective_from DESC
                ) as rn
            FROM 
                raw.hsat_customer_details s
            WHERE 
                s.effective_from <= '{update_time}'
        ) cd ON c.customer_hk = cd.customer_hk AND cd.rn = 1
        WHERE 
            c.id_bk = 2
    """)
    display(result.fetchdf())

## 7. Advanced Analytics Queries

Let's run some more complex analytical queries on our Data Vault.

In [None]:
# Run advanced analytics queries
with MallardDataVault(db_path) as mdv:
    # Query 1: Sales by product category
    print("Query 1: Sales by product category")
    result = mdv.sql("""
        SELECT 
            pd.category,
            COUNT(DISTINCT l.order__product_hk) AS total_orders,
            SUM(s.quantity) AS units_sold,
            SUM(s.quantity * s.unit_price) AS total_revenue
        FROM 
            raw.link_order__product l
        JOIN 
            raw.hub_product p 
            ON l.product_hk = p.product_hk
        JOIN 
            raw.current_hsat_product_details pd 
            ON p.product_hk = pd.product_hk
        JOIN 
            raw.current_lsat_order__product s 
            ON l.order__product_hk = s.order__product_hk
        GROUP BY 
            pd.category
        ORDER BY 
            total_revenue DESC
    """)
    display(result.fetchdf())
    
    # Query 2: Customer referral impact
    print("\nQuery 2: Customer referral impact")
    result = mdv.sql("""
        WITH customer_revenue AS (
            SELECT 
                l1.customer_hk,
                SUM(oi.quantity * oi.unit_price) AS total_spent
            FROM 
                raw.link_customer__order l1
            JOIN 
                raw.link_order__product l2 
                ON l1.order_hk = l2.order_hk
            JOIN 
                raw.current_lsat_order__product oi 
                ON l2.order__product_hk = oi.order__product_hk
            GROUP BY 
                l1.customer_hk
        ),
        referral_counts AS (
            SELECT 
                r.referencer_hk,
                COUNT(DISTINCT r.customer_hk) AS referral_count
            FROM 
                raw.link_customer__referencer r
            GROUP BY 
                r.referencer_hk
        )
        SELECT 
            c.id_bk AS customer_id,
            cd.first_name || ' ' || cd.last_name AS customer_name,
            COALESCE(rc.referral_count, 0) AS num_referrals,
            COALESCE(cr.total_spent, 0) AS own_spending,
            SUM(COALESCE(cr_referred.total_spent, 0)) AS referred_spending
        FROM 
            raw.hub_customer c
        JOIN 
            raw.current_hsat_customer_details cd 
            ON c.customer_hk = cd.customer_hk
        LEFT JOIN 
            referral_counts rc 
            ON c.customer_hk = rc.referencer_hk
        LEFT JOIN 
            customer_revenue cr 
            ON c.customer_hk = cr.customer_hk
        LEFT JOIN 
            raw.link_customer__referencer r 
            ON c.customer_hk = r.referencer_hk
        LEFT JOIN 
            customer_revenue cr_referred 
            ON r.customer_hk = cr_referred.customer_hk
        GROUP BY 
            c.id_bk, cd.first_name, cd.last_name, rc.referral_count, cr.total_spent
        ORDER BY 
            num_referrals DESC, referred_spending DESC
    """)
    display(result.fetchdf())
    
    # Query 3: Product purchase frequency
    print("\nQuery 3: Product purchase frequency")
    result = mdv.sql("""
        SELECT 
            p.id_bk AS product_id,
            pd.name AS product_name,
            pd.category,
            COUNT(DISTINCT l.order_hk) AS order_count,
            COUNT(DISTINCT lo.customer_hk) AS customer_count,
            SUM(oi.quantity) AS total_quantity,
            AVG(oi.unit_price) AS avg_price
        FROM 
            raw.link_order__product l
        JOIN 
            raw.hub_product p 
            ON l.product_hk = p.product_hk
        JOIN 
            raw.current_hsat_product_details pd 
            ON p.product_hk = pd.product_hk
        JOIN 
            raw.current_lsat_order__product oi 
            ON l.order__product_hk = oi.order__product_hk
        JOIN 
            raw.link_customer__order lo 
            ON l.order_hk = lo.order_hk
        GROUP BY 
            p.id_bk, pd.name, pd.category
        ORDER BY 
            order_count DESC
    """)
    display(result.fetchdf())

## 8. Examining the Data Vault Structure

Let's examine the structure of the Data Vault we've created.

In [None]:
# Query the database structure
with MallardDataVault(db_path) as mdv:
    # Get all tables
    print("Database tables:")
    result = mdv.sql("""
        SELECT name, type, schema 
        FROM sqlite_master 
        WHERE type IN ('table', 'view') 
        ORDER BY schema, type, name
    """)
    display(result.fetchdf())
    
    # Get metadata tables
    print("\nMetadata tables:")
    result = mdv.sql("SELECT COUNT(*) as record_count FROM metadata.tables")
    tables_count = result.fetchone()[0]
    print(f"metadata.tables: {tables_count} records")
    
    result = mdv.sql("SELECT COUNT(*) as record_count FROM metadata.transitions")
    transitions_count = result.fetchone()[0]
    print(f"metadata.transitions: {transitions_count} records")
    
    # Get counts of records in key tables
    print("\nRecord counts in Data Vault structures:")
    result = mdv.sql("""
        SELECT table_name, record_count
        FROM (
            SELECT 'hub_customer' as table_name, COUNT(*) as record_count FROM raw.hub_customer
            UNION ALL
            SELECT 'hub_product' as table_name, COUNT(*) as record_count FROM raw.hub_product
            UNION ALL
            SELECT 'hub_order' as table_name, COUNT(*) as record_count FROM raw.hub_order
            UNION ALL
            SELECT 'link_customer__order' as table_name, COUNT(*) as record_count FROM raw.link_customer__order
            UNION ALL
            SELECT 'link_order__product' as table_name, COUNT(*) as record_count FROM raw.link_order__product
            UNION ALL
            SELECT 'hsat_customer_details' as table_name, COUNT(*) as record_count FROM raw.hsat_customer_details
            UNION ALL
            SELECT 'hsat_product_details' as table_name, COUNT(*) as record_count FROM raw.hsat_product_details
            UNION ALL
            SELECT 'hsat_order_details' as table_name, COUNT(*) as record_count FROM raw.hsat_order_details
            UNION ALL
            SELECT 'lsat_order__product' as table_name, COUNT(*) as record_count FROM raw.lsat_order__product
        ) counts
        ORDER BY table_name
    """)
    display(result.fetchdf())

## 9. Conclusion

In this notebook, we've demonstrated:

1. **Data Vault Structure Creation**: Using metadata to define hubs, links, and satellites
2. **Data Loading**: Loading data from CSV files into the Data Vault structure
3. **Data Querying**: Retrieving information from the Data Vault model
4. **Historical Tracking**: Observing how Data Vault tracks changes over time
5. **Point-in-Time Queries**: Retrieving data as it existed at a specific point in time
6. **Advanced Analytics**: Running complex analytical queries across the Data Vault model

MallardDV provides a lightweight, portable implementation of Data Vault 2.0 methodology using DuckDB. It's ideal for rapid prototyping, learning Data Vault concepts, and creating small to medium-sized data warehouses without the need for a full database server.

Key advantages of MallardDV include:
- Metadata-driven approach that separates structure definition from implementation
- Automated hash key generation and Data Vault loading patterns
- Built-in support for both delta and full loading patterns
- Comprehensive historical tracking of all data changes
- Portable, file-based database using DuckDB

This notebook can serve as a starting point for your own Data Vault implementations using MallardDV.