# üîÑ Snowflake Dynamic Tables - Data Engineering Demo

This notebook demonstrates end-to-end data engineering with:
- **Bronze Layer**: Raw data ingestion
- **Silver Layer**: Cleaned & transformed data (Dynamic Tables)
- **Gold Layer**: Business aggregations (Dynamic Tables)

## Prerequisites
Run the SQL setup script first: `sql/01_setup_dynamic_tables.sql`

## To Import into Snowflake Notebooks:
1. Go to Snowflake ‚Üí Notebooks
2. Click "Import" or drag this .ipynb file
3. Select your warehouse and run!


## 1Ô∏è‚É£ Setup & Configuration


In [None]:
# Imports and Snowflake session
import json
import random
import uuid
from datetime import datetime, timedelta

# Get Snowflake session (works in Snowflake Notebooks)
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Configuration
DATABASE = "DATA_ENGINEERING_DEMO"
WAREHOUSE = "WH_DATA_ENG"
session.sql(f"USE DATABASE {DATABASE}").collect()
session.sql(f"USE WAREHOUSE {WAREHOUSE}").collect()
print(f"‚úÖ Connected to {DATABASE}")


## 2Ô∏è‚É£ Data Generators


In [None]:
# Sensor Data Generator
class SensorDataGenerator:
    """Generates realistic IoT sensor data."""
    
    SENSOR_TYPES = ["TEMPERATURE", "PRESSURE", "HUMIDITY", "VIBRATION"]
    DEVICES = [f"DEVICE_{str(i).zfill(3)}" for i in range(1, 51)]
    VALUE_RANGES = {
        "TEMPERATURE": (15, 85, "CELSIUS"),
        "PRESSURE": (100, 500, "PSI"),
        "HUMIDITY": (20, 80, "PERCENT"),
        "VIBRATION": (0, 100, "MM/S"),
    }
    
    @classmethod
    def generate_batch(cls, size: int) -> list:
        """Generate a batch of sensor readings."""
        data = []
        for _ in range(size):
            sensor_type = random.choice(cls.SENSOR_TYPES)
            device_id = random.choice(cls.DEVICES)
            min_val, max_val, unit = cls.VALUE_RANGES[sensor_type]
            
            # 5% chance of anomaly
            if random.random() < 0.05:
                value = random.uniform(max_val * 1.5, max_val * 2)
            else:
                value = random.uniform(min_val, max_val)
            
            timestamp = datetime.now() - timedelta(seconds=random.randint(0, 3600))
            
            data.append({
                "DEVICE_ID": device_id,
                "SENSOR_TYPE": sensor_type,
                "READING_VALUE": round(value, 2),
                "READING_UNIT": unit,
                "READING_TIMESTAMP": timestamp.strftime('%Y-%m-%d %H:%M:%S'),
                "RAW_PAYLOAD": json.dumps({"raw": True, "version": "1.0", "device": device_id}),
                "SOURCE_FILE": f"iot_stream_{datetime.now().strftime('%Y%m%d')}.json"
            })
        return data

print("‚úÖ SensorDataGenerator ready")


In [None]:
# Transaction Data Generator
class TransactionDataGenerator:
    """Generates realistic e-commerce transaction data."""
    
    TRANSACTION_TYPES = ["SALE", "SALE", "SALE", "SALE", "RETURN", "EXCHANGE"]
    PRODUCTS = [f"SKU_{str(i).zfill(4)}" for i in range(1, 201)]
    
    @classmethod
    def generate_batch(cls, size: int) -> list:
        """Generate a batch of transactions."""
        data = []
        for _ in range(size):
            txn_type = random.choice(cls.TRANSACTION_TYPES)
            quantity = random.randint(1, 10)
            unit_price = round(random.uniform(10, 500), 2)
            timestamp = datetime.now() - timedelta(hours=random.randint(0, 72))
            
            data.append({
                "TRANSACTION_ID": f"TXN_{uuid.uuid4().hex[:16].upper()}",
                "CUSTOMER_ID": random.randint(10001, 10500),
                "PRODUCT_SKU": random.choice(cls.PRODUCTS),
                "QUANTITY": quantity,
                "UNIT_PRICE": unit_price,
                "TRANSACTION_TYPE": txn_type,
                "TRANSACTION_TIME": timestamp.strftime('%Y-%m-%d %H:%M:%S'),
                "RAW_DATA": json.dumps({
                    "source": "POS",
                    "store_id": random.randint(1, 50),
                    "terminal": random.randint(1, 10)
                })
            })
        return data

print("‚úÖ TransactionDataGenerator ready")


In [None]:
# Customer Event Generator
class CustomerEventGenerator:
    """Generates realistic customer behavior/clickstream data."""
    
    EVENT_TYPES = ["PAGE_VIEW", "CLICK", "ADD_TO_CART", "REMOVE_FROM_CART", "PURCHASE", "SEARCH"]
    DEVICES = ["mobile", "desktop", "tablet"]
    PAGES = [f"/page/{i}" for i in range(1, 101)]
    
    @classmethod
    def generate_batch(cls, num_sessions: int) -> list:
        """Generate events for multiple sessions."""
        data = []
        for _ in range(num_sessions):
            session_id = f"SESS_{uuid.uuid4().hex[:24]}"
            customer_id = random.randint(10001, 10500)
            device = random.choice(cls.DEVICES)
            session_size = random.randint(3, 20)
            base_time = datetime.now() - timedelta(hours=random.randint(0, 168))
            
            for i in range(session_size):
                event_type = "PAGE_VIEW" if i == 0 else random.choice(cls.EVENT_TYPES)
                event_time = base_time + timedelta(seconds=i * random.randint(5, 60))
                
                data.append({
                    "EVENT_ID": f"EVT_{uuid.uuid4().hex[:24].upper()}",
                    "SESSION_ID": session_id,
                    "CUSTOMER_ID": customer_id,
                    "EVENT_TYPE": event_type,
                    "EVENT_PROPERTIES": json.dumps({"device": device, "event_index": i}),
                    "PAGE_URL": random.choice(cls.PAGES),
                    "USER_AGENT": f"Mozilla/5.0 ({device})",
                    "IP_ADDRESS": f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}",
                    "EVENT_TIMESTAMP": event_time.strftime('%Y-%m-%d %H:%M:%S')
                })
        return data

print("‚úÖ CustomerEventGenerator ready")


## 3Ô∏è‚É£ Data Ingestion Functions


In [None]:
# Helper function to insert sensor data using SQL
def insert_sensor_readings(data: list):
    """Insert sensor readings using SELECT FROM VALUES pattern."""
    for row in data:
        # Escape single quotes in JSON
        raw_payload = row["RAW_PAYLOAD"].replace("'", "''")
        sql = f"""
        INSERT INTO BRONZE.RAW_SENSOR_READINGS 
            (DEVICE_ID, SENSOR_TYPE, READING_VALUE, READING_UNIT, READING_TIMESTAMP, RAW_PAYLOAD, SOURCE_FILE)
        SELECT $1, $2, $3, $4, $5, PARSE_JSON($6), $7
        FROM VALUES (
            '{row["DEVICE_ID"]}',
            '{row["SENSOR_TYPE"]}',
            {row["READING_VALUE"]},
            '{row["READING_UNIT"]}',
            '{row["READING_TIMESTAMP"]}'::TIMESTAMP_NTZ,
            '{raw_payload}',
            '{row["SOURCE_FILE"]}'
        )
        """
        session.sql(sql).collect()
    return len(data)

print("‚úÖ insert_sensor_readings ready")


In [None]:
# Helper function to insert transactions using SQL
def insert_transactions(data: list):
    """Insert transactions using SELECT FROM VALUES pattern."""
    for row in data:
        # Escape single quotes in JSON
        raw_data = row["RAW_DATA"].replace("'", "''")
        sql = f"""
        INSERT INTO BRONZE.RAW_TRANSACTIONS 
            (TRANSACTION_ID, CUSTOMER_ID, PRODUCT_SKU, QUANTITY, UNIT_PRICE, TRANSACTION_TYPE, TRANSACTION_TIME, RAW_DATA)
        SELECT $1, $2, $3, $4, $5, $6, $7, PARSE_JSON($8)
        FROM VALUES (
            '{row["TRANSACTION_ID"]}',
            {row["CUSTOMER_ID"]},
            '{row["PRODUCT_SKU"]}',
            {row["QUANTITY"]},
            {row["UNIT_PRICE"]},
            '{row["TRANSACTION_TYPE"]}',
            '{row["TRANSACTION_TIME"]}'::TIMESTAMP_NTZ,
            '{raw_data}'
        )
        """
        session.sql(sql).collect()
    return len(data)

print("‚úÖ insert_transactions ready")


In [None]:
# Helper function to insert customer events using SQL
def insert_customer_events(data: list):
    """Insert customer events using SELECT FROM VALUES pattern."""
    for row in data:
        # Escape single quotes in JSON
        event_props = row["EVENT_PROPERTIES"].replace("'", "''")
        sql = f"""
        INSERT INTO BRONZE.RAW_CUSTOMER_EVENTS 
            (EVENT_ID, SESSION_ID, CUSTOMER_ID, EVENT_TYPE, EVENT_PROPERTIES, PAGE_URL, USER_AGENT, IP_ADDRESS, EVENT_TIMESTAMP)
        SELECT $1, $2, $3, $4, PARSE_JSON($5), $6, $7, $8, $9
        FROM VALUES (
            '{row["EVENT_ID"]}',
            '{row["SESSION_ID"]}',
            {row["CUSTOMER_ID"]},
            '{row["EVENT_TYPE"]}',
            '{event_props}',
            '{row["PAGE_URL"]}',
            '{row["USER_AGENT"]}',
            '{row["IP_ADDRESS"]}',
            '{row["EVENT_TIMESTAMP"]}'::TIMESTAMP_NTZ
        )
        """
        session.sql(sql).collect()
    return len(data)

print("‚úÖ insert_customer_events ready")


## 4Ô∏è‚É£ Ingest Data into Bronze Layer


In [None]:
# Ingest Sensor Data
print("üîÑ Generating sensor readings...")
sensor_data = SensorDataGenerator.generate_batch(100)
count = insert_sensor_readings(sensor_data)
print(f"‚úÖ Inserted {count} sensor readings into BRONZE.RAW_SENSOR_READINGS")


In [None]:
# Ingest Transactions
print("üîÑ Generating transactions...")
txn_data = TransactionDataGenerator.generate_batch(50)
count = insert_transactions(txn_data)
print(f"‚úÖ Inserted {count} transactions into BRONZE.RAW_TRANSACTIONS")


In [None]:
# Ingest Customer Events
print("üîÑ Generating customer events...")
event_data = CustomerEventGenerator.generate_batch(20)
count = insert_customer_events(event_data)
print(f"‚úÖ Inserted {count} customer events from 20 sessions into BRONZE.RAW_CUSTOMER_EVENTS")


In [None]:
# Check Dynamic Tables status
print("üîÑ Dynamic Tables in DATABASE:")
print("=" * 60)

dt_result = session.sql("SHOW DYNAMIC TABLES IN DATABASE DATA_ENGINEERING_DEMO").collect()
for row in dt_result:
    print(f"   {row['schema_name']}.{row['name']}")
    print(f"      Target Lag: {row['target_lag']} | State: {row['scheduling_state']}")


In [None]:
# Check Silver layer (Dynamic Tables should have auto-refreshed)
print("ü•à Silver Layer Sample Data - Recent Anomalies:")
print("=" * 60)

session.sql("""
    SELECT DEVICE_ID, SENSOR_TYPE, READING_VALUE, IS_ANOMALY, 
           READING_TIMESTAMP::STRING AS READING_TIMESTAMP
    FROM SILVER.SENSOR_READINGS_CLEANED 
    WHERE IS_ANOMALY = TRUE
    ORDER BY READING_TIMESTAMP DESC 
    LIMIT 5
""").to_pandas()


In [None]:
# Check Gold layer aggregations
print("ü•á Gold Layer - Daily Sales Summary:")
print("=" * 60)

session.sql("""
    SELECT TRANSACTION_DATE::STRING AS TRANSACTION_DATE, DAY_NAME, 
           TOTAL_TRANSACTIONS, ROUND(NET_SALES, 2) AS NET_SALES
    FROM GOLD.DAILY_SALES_SUMMARY 
    ORDER BY TRANSACTION_DATE DESC 
    LIMIT 5
""").to_pandas()


In [None]:
# Pipeline freshness view
print("‚è±Ô∏è Pipeline Freshness:")
print("=" * 60)
session.sql("""
    SELECT TABLE_NAME, LATEST_DATA::STRING AS LATEST_DATA, ROW_COUNT 
    FROM GOLD.V_PIPELINE_FRESHNESS
""").to_pandas()


## 6Ô∏è‚É£ Continuous Ingestion Demo

Run this cell multiple times to simulate continuous data ingestion.
The Dynamic Tables will automatically refresh based on their TARGET_LAG settings.


In [None]:
# Quick ingestion batch (run multiple times to simulate streaming)
print("üöÄ Quick Ingestion Batch")
print("=" * 40)

# Small batches for quick demo
sensor_data = SensorDataGenerator.generate_batch(25)
insert_sensor_readings(sensor_data)
print(f"‚úÖ +25 sensor readings")

txn_data = TransactionDataGenerator.generate_batch(10)
insert_transactions(txn_data)
print(f"‚úÖ +10 transactions")

event_data = CustomerEventGenerator.generate_batch(5)
insert_customer_events(event_data)
print(f"‚úÖ +{len(event_data)} customer events")

print("\nüìä Dynamic Tables will auto-refresh within their TARGET_LAG!")


In [None]:
# Check Bronze layer row counts
print("üì¶ Bronze Layer Row Counts:")
print("=" * 50)

tables = [
    ("RAW_SENSOR_READINGS", "BRONZE"),
    ("RAW_TRANSACTIONS", "BRONZE"),
    ("RAW_CUSTOMER_EVENTS", "BRONZE")
]

for table, schema in tables:
    result = session.sql(f"SELECT COUNT(*) as CNT FROM {schema}.{table}").collect()
    count = result[0]['CNT']
    print(f"   {schema}.{table}: {count:,} rows")
