In [None]:
%pip install -r requirements.txt
%restart_python

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

In [None]:
%load_ext autoreload
%autoreload 2

from lakebase_utils import LakebaseConnection

In [None]:
dbutils.widgets.text("user", "lars.liahagen@databricks.com")
username = dbutils.widgets.get("user")
dbutils.widgets.text("lakebase_instance_name", "smart-stock-db")
lakebase_instance_name = dbutils.widgets.get("lakebase_instance_name")
dbutils.widgets.text("catalog", "smart_stock")
catalog = dbutils.widgets.get("catalog")
dbutils.widgets.text("schema_silver", "smart_stock_silver")
schema_silver = dbutils.widgets.get("schema_silver")
dbutils.widgets.text("schema_gold", "smart_stock_gold")
schema_gold = dbutils.widgets.get("schema_gold")


In [0]:
# =============================================
# CONFIGURATION
# =============================================

# PostgreSQL connection
conn = LakebaseConnection(username, lakebase_instance_name)

print("✅ Setup complete")

✅ Setup complete


In [0]:
spark.sql(
f"""
CREATE TABLE IF NOT EXISTS `{catalog}`.`{schema_gold}`.fact_inventory_transactions (
    transaction_id BIGINT,
    transaction_number STRING,
    transaction_type STRING,
    status STRING,
    quantity_change INT,
    notes STRING,
    transaction_timestamp TIMESTAMP,
    product_id INT,
    product_name STRING,
    category STRING,
    unit_price DECIMAL(10,2),
    warehouse_id INT,
    warehouse_name STRING,
    warehouse_location STRING,
    transaction_date DATE,
    year INT,
    month INT,
    week INT,
    day_of_week INT,
    hour INT,
    inbound_quantity INT,
    outbound_quantity INT,
    adjustment_quantity INT,
    transaction_value DECIMAL(12,2),
    is_pending INT,
    is_delivered INT,
    is_urgent INT,
    processed_at TIMESTAMP
)
CLUSTER BY (transaction_timestamp)
"""
)

DataFrame[]

In [0]:
def get_last_transaction_timestamp():
    """Get the timestamp of the last processed transaction"""
    try:
        last_ts = spark.sql(f"""
            SELECT MAX(transaction_timestamp) as max_ts 
            FROM `{catalog}`.`{schema_gold}`.fact_inventory_transactions
        """).collect()[0]['max_ts']
        return last_ts if last_ts else "2024-01-01 00:00:00"
    except:
        return "1970-01-01 00:00:00"  # First run

In [None]:
def check_critical_stock(new_transactions_df):
    """Check if any new transactions caused critical stock levels"""
    
    # Only check for outbound transactions
    outbound = new_transactions_df.filter(
        (col("transaction_type") == "sale") & 
        (col("status").isin("confirmed", "processing", "pending"))
    )
    
    if outbound.count() > 0:
        # Calculate current stock for affected products
        critical_check = spark.sql(f"""
            WITH current_stock AS (
                SELECT 
                    product_id,
                    warehouse_id,
                    SUM(quantity_change) as stock_level
                FROM `{catalog}`.`{schema_gold}`.fact_inventory_transactions
                WHERE status IN ('delivered', 'confirmed', 'processing')
                GROUP BY product_id, warehouse_id
            )
            SELECT 
                cs.product_id,
                p.product_name,
                cs.warehouse_id,
                w.warehouse_name,
                cs.stock_level,
                p.reorder_level,
                CASE 
                    WHEN cs.stock_level <= 0 THEN 'OUT_OF_STOCK'
                    WHEN cs.stock_level <= p.reorder_level THEN 'CRITICAL'
                    WHEN cs.stock_level <= p.reorder_level * 1.5 THEN 'LOW'
                    ELSE 'OK'
                END as stock_status
            FROM current_stock cs
            JOIN `{catalog}`.`{schema_silver}`.dim_products p ON cs.product_id = p.product_id
            JOIN (SELECT DISTINCT warehouse_id, warehouse_name FROM `{catalog}`.`{schema_gold}`.fact_inventory_transactions) w 
                ON cs.warehouse_id = w.warehouse_id
            WHERE cs.stock_level <= p.reorder_level
        """)
        
        critical_count = critical_check.count()
        if critical_count > 0:
            print(f"\n  ⚠️ ALERT: {critical_count} products at critical levels:")
            critical_check.show(5, truncate=False)

In [0]:
def process_new_transactions():
    """
    Process new inventory transactions - Run every 1-5 minutes
    Only processes new records since last run
    """
    
    print(f"\n📊 Checking for new transactions at {datetime.now()}")
    
    # Get last processed timestamp
    last_timestamp = get_last_transaction_timestamp()
    print(f"  Last processed: {last_timestamp}")
    
    # Query only new transactions
    new_transactions_query = f"""
        SELECT 
            CAST(t.transaction_id AS BIGINT) as transaction_id,
            CAST(t.transaction_number AS VARCHAR(50)) as transaction_number,
            CAST(t.transaction_type AS VARCHAR(50)) as transaction_type,
            CAST(t.status AS VARCHAR(20)) as status,
            CAST(t.quantity_change AS INTEGER) as quantity_change,
            CAST(COALESCE(t.notes, '') AS TEXT) as notes,
            t.transaction_timestamp,
            CAST(t.product_id AS INTEGER) as product_id,
            CAST(p.name AS VARCHAR(200)) as product_name,
            CAST(p.category AS VARCHAR(50)) as category,
            CAST(p.price AS DECIMAL(10,2)) as price,
            CAST(t.warehouse_id AS INTEGER) as warehouse_id,
            CAST(w.name AS VARCHAR(100)) as warehouse_name,
            CAST(w.location AS VARCHAR(200)) as warehouse_location
        FROM inventory_transactions t
        JOIN products p ON t.product_id = p.product_id
        JOIN warehouses w ON t.warehouse_id = w.warehouse_id
        WHERE t.transaction_timestamp > '{last_timestamp}'
        ORDER BY t.transaction_timestamp
    """
    
    new_transactions = conn.execute_query(new_transactions_query)
    if new_transactions.empty:
        print("  ℹ️ No new transactions")
        return 0
    
    new_transactions = spark.createDataFrame(new_transactions)
    
    transaction_count = new_transactions.count()
    
    if transaction_count > 0:
        print(f"  🆕 Found {transaction_count} new transactions")
        
        # Enrich and transform
        fact_transactions = new_transactions.select(
            # Core transaction data with explicit casting
            col("transaction_id").cast("bigint"),
            col("transaction_number").cast("string"),
            col("transaction_type").cast("string"),
            col("status").cast("string"),
            col("quantity_change").cast("int"),
            col("notes").cast("string"),
            col("transaction_timestamp").cast("timestamp"),
            
            # Product dimensions with casting
            col("product_id").cast("int"),
            col("product_name").cast("string"),
            col("category").cast("string"),
            col("price").cast("decimal(10,2)").alias("unit_price"),
            
            # Warehouse dimensions with casting
            col("warehouse_id").cast("int"),
            col("warehouse_name").cast("string"),
            col("warehouse_location").cast("string"),
            
            # Time dimensions for easy aggregation
            date_format(col("transaction_timestamp"), "yyyy-MM-dd").cast("date").alias("transaction_date"),
            year(col("transaction_timestamp")).cast("int").alias("year"),
            month(col("transaction_timestamp")).cast("int").alias("month"),
            weekofyear(col("transaction_timestamp")).cast("int").alias("week"),
            dayofweek(col("transaction_timestamp")).cast("int").alias("day_of_week"),
            hour(col("transaction_timestamp")).cast("int").alias("hour"),
            
            # Calculated fields with proper casting
            when(col("transaction_type") == "inbound", col("quantity_change")) \
                .otherwise(0).cast("int").alias("inbound_quantity"),
            when(col("transaction_type") == "sale", abs(col("quantity_change"))) \
                .otherwise(0).cast("int").alias("outbound_quantity"),
            when(col("transaction_type") == "adjustment", col("quantity_change")) \
                .otherwise(0).cast("int").alias("adjustment_quantity"),
            (abs(col("quantity_change")).cast("decimal(10,2)") * col("price").cast("decimal(10,2)")) \
                .cast("decimal(12,2)").alias("transaction_value"),
            
            # Flags for filtering (cast as int for consistency)
            when(col("status") == "pending", 1).otherwise(0).cast("int").alias("is_pending"),
            when(col("status") == "delivered", 1).otherwise(0).cast("int").alias("is_delivered"),
            when(col("notes").contains("URGENT"), 1).otherwise(0).cast("int").alias("is_urgent"),
            
            # ETL metadata
            current_timestamp().alias("processed_at")
        )

        # Ensure schema matches exactly before writing
        # This is critical to avoid merge errors
        expected_schema = spark.table(f"`{catalog}`.`{schema_gold}`.fact_inventory_transactions").schema
        
        # Reorder columns to match existing table schema
        fact_transactions_final = fact_transactions.select(
            [col(field.name).cast(field.dataType) for field in expected_schema.fields]
        )
        
        # Append to fact table
        fact_transactions.write \
            .mode("append") \
            .saveAsTable(f"`{catalog}`.`{schema_gold}`.fact_inventory_transactions")
        
        print(f"✅ Added {transaction_count} new transactions to fact table")
        
        # Check for critical alerts
        check_critical_stock(new_transactions)
        
    else:
        print("  ℹ️ No new transactions")
    
    return transaction_count


In [0]:
process_new_transactions()


📊 Checking for new transactions at 2025-09-16 15:58:19.958686
  Last processed: 2024-01-01 00:00:00
  🆕 Found 98 new transactions
✅ Added 98 new transactions to fact table

  ⚠️ ALERT: 4 products at critical levels:
+----------+--------------------+------------+---------------------------+-----------+-------------+------------+
|product_id|product_name        |warehouse_id|warehouse_name             |stock_level|reorder_level|stock_status|
+----------+--------------------+------------+---------------------------+-----------+-------------+------------+
|38        |Kickstand Heavy Duty|2           |Hamburg Distribution Center|-2         |60           |OUT_OF_STOCK|
|29        |Chain 11-Speed      |1           |Lyon Main Warehouse        |5          |50           |CRITICAL    |
|17        |Tire 29x2.4 MTB     |1           |Lyon Main Warehouse        |8          |60           |CRITICAL    |
|21        |Brake Rotor 180mm   |1           |Lyon Main Warehouse        |-3         |100          

98