# User Engagement Analytics Pipeline
**Dynamic Tables Demo**

---

## What are Dynamic Tables?

Dynamic Tables let you define **WHAT** you want (a SELECT statement), and Snowflake handles **HOW** (scheduling, refreshes, dependencies).

**Benefits:**
- No tasks to schedule
- No streams to manage  
- No failure handling to write
- Automatic dependency tracking
- Visual DAG in Snowsight

---

**Business Context:**  
For content-focused organizations, tracking how users engage with educational resources—pageviews, session depth, meaningful interactions—is critical for measuring impact.

**Demo Flow (~15 min):**
1. Setup database/schema
2. Create base tables with synthetic data
3. Create 5-layer Dynamic Tables pipeline
4. View DAG in Snowsight
5. Stream data and watch cascade refresh

## Setup: Database, Schema & Warehouse

In [None]:
-- Toggle this cell to SQL
CREATE DATABASE IF NOT EXISTS UNDERSTOOD_DEMO;
CREATE SCHEMA IF NOT EXISTS UNDERSTOOD_DEMO.DYNAMIC_TABLES;
CREATE WAREHOUSE IF NOT EXISTS UNDERSTOOD_DEMO_WH WAREHOUSE_SIZE='XSMALL' AUTO_SUSPEND=60 AUTO_RESUME=TRUE;

USE DATABASE UNDERSTOOD_DEMO;
USE SCHEMA DYNAMIC_TABLES;
USE WAREHOUSE UNDERSTOOD_DEMO_WH;

---
## Section 1: Create Base Tables

Generate synthetic data using Snowflake's `GENERATOR()` and `RANDOM()` functions:
- **MEMBERS** - 1,000 user profiles
- **RESOURCES** - 100 educational content items  
- **RAW_INTERACTIONS** - 10,000 engagement events

In [None]:
-- Toggle to SQL: MEMBERS table (1,000 user profiles)
CREATE OR REPLACE TABLE MEMBERS AS
SELECT 
    10000 + SEQ4() as member_id,
    'user' || SEQ4() || '@example.com' as email,
    ARRAY_CONSTRUCT('Alex','Jordan','Taylor','Morgan','Casey','Riley','Quinn','Avery','Peyton','Drew')[ABS(RANDOM()) % 10]::VARCHAR 
        || ' ' || 
        ARRAY_CONSTRUCT('Smith','Johnson','Williams','Brown','Jones','Garcia','Miller','Davis','Martinez','Wilson')[ABS(RANDOM()) % 10]::VARCHAR as name,
    ARRAY_CONSTRUCT('free','free','free','registered','registered','premium')[ABS(RANDOM()) % 6]::VARCHAR as member_type,
    DATEADD('day', -ABS(RANDOM()) % 730, CURRENT_DATE()) as signup_date,
    ARRAY_CONSTRUCT('Northeast','Southeast','Midwest','Southwest','West')[ABS(RANDOM()) % 5]::VARCHAR as region
FROM TABLE(GENERATOR(ROWCOUNT => 1000));

In [None]:
-- Toggle to SQL: RESOURCES table (100 educational content items)
CREATE OR REPLACE TABLE RESOURCES AS
SELECT 
    100 + SEQ4() as resource_id,
    'Guide to ' || ARRAY_CONSTRUCT('Understanding','Supporting','Managing','Improving','Building')[ABS(RANDOM()) % 5]::VARCHAR 
        || ' ' || 
        ARRAY_CONSTRUCT('Focus Skills','Reading Strategies','Learning Differences','Executive Function','Attention')[ABS(RANDOM()) % 5]::VARCHAR as title,
    ARRAY_CONSTRUCT('article','video','tool','webinar','guide')[ABS(RANDOM()) % 5]::VARCHAR as content_type,
    ARRAY_CONSTRUCT('focus','reading','learning','parenting','school')[ABS(RANDOM()) % 5]::VARCHAR as topic,
    ARRAY_CONSTRUCT('beginner','intermediate','advanced')[ABS(RANDOM()) % 3]::VARCHAR as difficulty_level,
    DATEADD('day', -ABS(RANDOM()) % 365, CURRENT_DATE()) as publish_date,
    ARRAY_CONSTRUCT(3,5,7,10,15,20,30,45,60)[ABS(RANDOM()) % 9]::NUMBER as estimated_minutes
FROM TABLE(GENERATOR(ROWCOUNT => 100));

In [None]:
-- Toggle to SQL: RAW_INTERACTIONS table (10,000 engagement events)
CREATE OR REPLACE TABLE RAW_INTERACTIONS AS
SELECT 
    10000 + ABS(RANDOM()) % 1000 as member_id,
    OBJECT_CONSTRUCT(
        'resource_id', 100 + ABS(RANDOM()) % 100,
        'event_type', ARRAY_CONSTRUCT('page_view','page_view','page_view','video_start','video_complete','download','share','bookmark')[ABS(RANDOM()) % 8]::VARCHAR,
        'session_id', 'sess_' || ABS(RANDOM()) % 2000,
        'timestamp', DATEADD('second', -ABS(RANDOM()) % 2592000, CURRENT_TIMESTAMP())::VARCHAR,
        'engagement_seconds', 10 + ABS(RANDOM()) % 590,
        'device_type', ARRAY_CONSTRUCT('desktop','desktop','mobile','tablet')[ABS(RANDOM()) % 4]::VARCHAR
    ) as event_data
FROM TABLE(GENERATOR(ROWCOUNT => 10000));

In [None]:
-- Toggle to SQL: Verify row counts
SELECT 'MEMBERS' as table_name, COUNT(*) as row_count FROM MEMBERS
UNION ALL SELECT 'RESOURCES', COUNT(*) FROM RESOURCES
UNION ALL SELECT 'RAW_INTERACTIONS', COUNT(*) FROM RAW_INTERACTIONS;

In [None]:
-- Toggle to SQL: Preview MEMBERS
SELECT * FROM MEMBERS LIMIT 5;

In [None]:
-- Toggle to SQL: Preview RESOURCES
SELECT * FROM RESOURCES LIMIT 5;

In [None]:
-- Toggle to SQL: Preview RAW_INTERACTIONS (note the VARIANT/JSON column)
SELECT * FROM RAW_INTERACTIONS LIMIT 5;

---
## Section 2: Create 5-Layer Dynamic Tables Pipeline

```
        MEMBERS          RESOURCES       RAW_INTERACTIONS
        (base)            (base)              (base)
           │                 │                  │
           │                 ▼                  ▼
           │         RESOURCE_CATALOG    MEMBER_SESSIONS
           │               (DT2)              (DT1)
           │          DOWNSTREAM          DOWNSTREAM
           │                 │                  │
           └─────────────────┼───────────────────┘
                            ▼
               SESSION_ENGAGEMENT_DETAIL
                  (DT3) DOWNSTREAM
                            │
                            ▼
              MEMBER_ENGAGEMENT_SUMMARY
                  (DT4) DOWNSTREAM
                            │
                            ▼
                ENGAGEMENT_DASHBOARD
                  (DT5) LAG = 1 MINUTE
```

**Key LAG settings:**
- `LAG = 'DOWNSTREAM'` → Only refresh when something downstream needs fresh data (cost saver!)
- `LAG = '1 MINUTE'` → Always stay within 1 minute of fresh (only on final report)

### DT1: MEMBER_SESSIONS (Sessionize raw events)
Groups raw interaction events into sessions per member.

In [None]:
-- DT1: MEMBER_SESSIONS (incremental-compatible version)
CREATE OR REPLACE DYNAMIC TABLE MEMBER_SESSIONS
    LAG = 'DOWNSTREAM'
    WAREHOUSE = UNDERSTOOD_DEMO_WH
    REFRESH_MODE = INCREMENTAL
AS
SELECT 
    event_data:session_id::VARCHAR as session_id,
    member_id,
    MIN(event_data:timestamp::TIMESTAMP) as session_start,
    MAX(event_data:timestamp::TIMESTAMP) as session_end,
    COUNT(*) as event_count,
    SUM(event_data:engagement_seconds::NUMBER) as total_engagement_seconds,
    MAX(event_data:device_type::VARCHAR) as device_type
FROM RAW_INTERACTIONS
GROUP BY event_data:session_id, member_id;

In [None]:
-- Toggle to SQL: Preview DT1
SELECT * FROM MEMBER_SESSIONS LIMIT 5;

### DT2: RESOURCE_CATALOG (Enrich resources)
Adds computed fields like content category and days since publish.

In [None]:
-- DT2: RESOURCE_CATALOG
CREATE OR REPLACE DYNAMIC TABLE RESOURCE_CATALOG
    LAG = 'DOWNSTREAM'
    WAREHOUSE = UNDERSTOOD_DEMO_WH
    REFRESH_MODE = INCREMENTAL
AS
SELECT 
    resource_id,
    title,
    content_type,
    topic,
    difficulty_level,
    publish_date,
    estimated_minutes,
    CASE 
        WHEN content_type IN ('video', 'webinar') THEN 'multimedia'
        WHEN content_type IN ('article', 'guide') THEN 'text'
        ELSE 'interactive'
    END as content_category
FROM RESOURCES;

In [None]:
-- Toggle to SQL: Preview DT2
SELECT * FROM RESOURCE_CATALOG LIMIT 5;

### DT3: SESSION_ENGAGEMENT_DETAIL (Join all sources)
Combines sessions with member info and resource details - the full picture.

In [None]:
-- DT3: SESSION_ENGAGEMENT_DETAIL (join directly to interactions)
CREATE OR REPLACE DYNAMIC TABLE SESSION_ENGAGEMENT_DETAIL
    LAG = 'DOWNSTREAM'
    WAREHOUSE = UNDERSTOOD_DEMO_WH
    REFRESH_MODE = INCREMENTAL
AS
SELECT 
    i.event_data:session_id::VARCHAR as session_id,
    i.member_id,
    m.name as member_name,
    m.member_type,
    m.region,
    i.event_data:resource_id::NUMBER as resource_id,
    rc.title as resource_title,
    rc.content_type,
    rc.topic,
    rc.content_category,
    i.event_data:engagement_seconds::NUMBER as engagement_seconds,
    i.event_data:device_type::VARCHAR as device_type,
    i.event_data:timestamp::TIMESTAMP as event_time
FROM RAW_INTERACTIONS i
JOIN MEMBERS m ON i.member_id = m.member_id
LEFT JOIN RESOURCE_CATALOG rc ON i.event_data:resource_id::NUMBER = rc.resource_id;

In [None]:
-- Toggle to SQL: Preview DT3
SELECT * FROM SESSION_ENGAGEMENT_DETAIL LIMIT 5;

### DT4: MEMBER_ENGAGEMENT_SUMMARY (Aggregate per member)
Roll up to member-level engagement metrics.

In [None]:
CREATE OR REPLACE DYNAMIC TABLE MEMBER_ENGAGEMENT_SUMMARY
    LAG = 'DOWNSTREAM'
    WAREHOUSE = UNDERSTOOD_DEMO_WH
    REFRESH_MODE = INCREMENTAL
AS
SELECT 
    member_id,
    member_name,
    member_type,
    region,
    COUNT(DISTINCT session_id) as total_sessions,
    COUNT(DISTINCT resource_id) as unique_resources,
    SUM(engagement_seconds) as lifetime_engagement_seconds,
    COUNT(*) as total_events,  -- Replace avg_events_per_session with total_events
    MAX(topic) as favorite_topic,
    MAX(content_type) as preferred_content_type,
    MAX(event_time) as last_activity  -- Changed from session_start to event_time
FROM SESSION_ENGAGEMENT_DETAIL
GROUP BY member_id, member_name, member_type, region;

In [None]:
-- Toggle to SQL: Preview DT4 (top engaged members)
SELECT * FROM MEMBER_ENGAGEMENT_SUMMARY ORDER BY lifetime_engagement_seconds DESC LIMIT 10;

### DT5: ENGAGEMENT_DASHBOARD (Executive KPIs)
Final org-wide metrics. **This is the only DT with 1-minute freshness!**

In [None]:
-- Toggle to SQL: DT5 - Final dashboard (1 MINUTE freshness)
CREATE OR REPLACE DYNAMIC TABLE ENGAGEMENT_DASHBOARD
    LAG = '1 MINUTE'
    WAREHOUSE = UNDERSTOOD_DEMO_WH
AS
SELECT 
    CURRENT_TIMESTAMP() as report_generated_at,
    COUNT(DISTINCT member_id) as active_members,
    SUM(total_sessions) as total_sessions,
    SUM(unique_resources) as total_resource_views,
    ROUND(SUM(lifetime_engagement_seconds) / 3600, 1) as total_engagement_hours,
    ROUND(AVG(lifetime_engagement_seconds) / 60, 1) as avg_engagement_minutes_per_member,
    MODE(favorite_topic) as most_popular_topic,
    MODE(preferred_content_type) as most_popular_content_type,
    COUNT_IF(member_type = 'premium') as premium_members,
    COUNT_IF(member_type = 'registered') as registered_members,
    COUNT_IF(member_type = 'free') as free_members
FROM MEMBER_ENGAGEMENT_SUMMARY;

In [None]:
-- Toggle to SQL: View the final dashboard
SELECT * FROM ENGAGEMENT_DASHBOARD;

---
## Section 3: View the DAG in Snowsight

**Navigate to:**
1. **Data** → **Databases** → **UNDERSTOOD_DEMO** → **DYNAMIC_TABLES**
2. Click on **ENGAGEMENT_DASHBOARD**
3. Click the **Graph** tab

You'll see a beautiful **5-node DAG** with automatic dependency tracking!

---
## Section 4: Demonstrate Auto-Refresh

Let's prove the pipeline automatically updates when source data changes!

In [None]:
-- Toggle to SQL: Check counts BEFORE streaming
SELECT 'RAW_INTERACTIONS' as table_name, COUNT(*) as row_count FROM RAW_INTERACTIONS
UNION ALL SELECT 'ENGAGEMENT_DASHBOARD', 1 FROM ENGAGEMENT_DASHBOARD;

### Stream New Events (Python)
This function inserts batches of events to simulate real-time data flow.

In [None]:
import time
from snowflake.snowpark.context import get_active_session
session = get_active_session()

def stream_events(batches=10000, per_batch=1000, delay=10):
    """Simulate real-time event streaming"""
    print(f"Streaming {batches} batches of {per_batch} events (delay: {delay}s)\n")
    
    for i in range(batches):
        session.sql(f"""
            INSERT INTO RAW_INTERACTIONS
            SELECT 
                10000 + ABS(RANDOM()) % 1000,
                OBJECT_CONSTRUCT(
                    'resource_id', 100 + ABS(RANDOM()) % 100,
                    'event_type', ARRAY_CONSTRUCT('page_view','video_start','download')[ABS(RANDOM()) % 3]::VARCHAR,
                    'session_id', 'sess_' || ABS(RANDOM()) % 2000,
                    'timestamp', CURRENT_TIMESTAMP()::VARCHAR,
                    'engagement_seconds', 10 + ABS(RANDOM()) % 300,
                    'device_type', ARRAY_CONSTRUCT('desktop','mobile','tablet')[ABS(RANDOM()) % 3]::VARCHAR
                )
            FROM TABLE(GENERATOR(ROWCOUNT => {per_batch}))
        """).collect()
        
        count = session.sql("SELECT COUNT(*) as cnt FROM RAW_INTERACTIONS").collect()[0]['CNT']
        print(f"Batch {i+1}/{batches}: +{per_batch} events | Total: {count:,}")
        
        if i < batches - 1:
            print(f"  Waiting {delay}s for refresh...")
            time.sleep(delay)
    
    print("\nDone! Check ENGAGEMENT_DASHBOARD for updated metrics.")

stream_events()

In [None]:
-- Toggle to SQL: Check dashboard after streaming
SELECT * FROM ENGAGEMENT_DASHBOARD;

### Refresh History
See when each Dynamic Table was refreshed and how long it took.

In [None]:
-- Toggle to SQL: View refresh history for all 5 DTs
SELECT 
    NAME,
    STATE,
    REFRESH_START_TIME,
    REFRESH_END_TIME,
    DATEDIFF('second', REFRESH_START_TIME, REFRESH_END_TIME) as duration_seconds
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
WHERE NAME IN ('MEMBER_SESSIONS', 'RESOURCE_CATALOG', 'SESSION_ENGAGEMENT_DETAIL', 
               'MEMBER_ENGAGEMENT_SUMMARY', 'ENGAGEMENT_DASHBOARD')
ORDER BY REFRESH_START_TIME DESC
LIMIT 15;

---
## Summary

**What we demonstrated:**

1. **5-layer declarative pipeline** - SELECT statements define the entire DAG
2. **Smart refresh strategy** - DOWNSTREAM saves compute, 1-MINUTE only where freshness matters
3. **Automatic dependency tracking** - Snowflake knows the refresh order
4. **Incremental refresh** - Only processes changed data when possible
5. **No orchestration code** - No tasks, no streams, no failure handling
6. **Visual lineage** - Beautiful DAG in Snowsight
7. **Built-in monitoring** - Refresh history via INFORMATION_SCHEMA

---
## Cleanup (Optional)

Run this to remove all demo objects. **Drop DTs in reverse dependency order.**

In [None]:
-- Toggle to SQL: Cleanup (uncomment to run)
-- DROP DYNAMIC TABLE IF EXISTS ENGAGEMENT_DASHBOARD;
-- DROP DYNAMIC TABLE IF EXISTS MEMBER_ENGAGEMENT_SUMMARY;
-- DROP DYNAMIC TABLE IF EXISTS SESSION_ENGAGEMENT_DETAIL;
-- DROP DYNAMIC TABLE IF EXISTS RESOURCE_CATALOG;
-- DROP DYNAMIC TABLE IF EXISTS MEMBER_SESSIONS;
-- DROP TABLE IF EXISTS RAW_INTERACTIONS;
-- DROP TABLE IF EXISTS RESOURCES;
-- DROP TABLE IF EXISTS MEMBERS;