# Streaming Data Pipeline with Snowpark Python and Dynamic Tables

## Objective
This notebook demonstrates an enhanced approach to building a real-time analytics pipeline using Snowflake Dynamic Tables, Snowpark Python procedures, and Triggered Tasks. It focuses on transforming raw streaming ski resort data into actionable insights, with improved daily visit tracking and a structured aggregation hierarchy.

## 1. Setup and Initialization

Python includes and initialize Snowpark environment

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

## 2. Initial Data Exploration

Before building transformations, let's examine the structure of our raw streaming data. This helps in understanding the source tables we'll be working with.

In [None]:
-- Lift usage events (core activity data)
SELECT *
FROM LIFT_RIDE 
LIMIT 20;

In [None]:
-- Day ticket purchases
SELECT * 
FROM RESORT_TICKET 
LIMIT 20;

In [None]:
-- Season pass purchases
SELECT *
FROM SEASON_PASS 
LIMIT 20;

## 3. Initial Data Pipeline Setup

This section covers the additional setup required for this use case, including creating streams and reference tables.

### 3.1. Create Stream on Raw Lift Ride Data

A stream is created on the `LIFT_RIDE` table to capture new lift ride events. This stream will be the source for the Snowpark procedure that populates daily visit information.

In [None]:
CREATE STREAM IF NOT EXISTS LIFT_RIDE_STREAM ON TABLE LIFT_RIDE APPEND_ONLY = TRUE SHOW_INITIAL_ROWS = TRUE;

### 3.2. Resort Capacity Reference Table

Create and populate a reference table for resort capacities, which will be used in downstream calculations.

In [None]:
-- Reference table for resort capacity
CREATE OR REPLACE TABLE RESORT_CAPACITY (
    RESORT VARCHAR(100) PRIMARY KEY,
    MAX_CAPACITY INTEGER,
    HOURLY_CAPACITY INTEGER,
    BASE_LIFT_COUNT INTEGER
);

INSERT INTO RESORT_CAPACITY VALUES
('Vail', 7000, 1100, 34),
('Beaver Creek', 5500, 900, 25),
('Breckenridge', 6500, 1000, 35),
('Keystone', 4500, 700, 21),
('Heavenly', 5000, 800, 27);

## 4. Automated Daily Visit Processing with Snowpark

This section details the setup for accurately tracking daily visits using a Snowpark Stored Procedure and a Task to automate its execution.

### 4.1. `DAILY_VISITS` Table

This table will store unique daily visits per RFID at each resort, along with their first ride details and season pass status. It is populated by a Snowpark procedure.

In [None]:
CREATE OR REPLACE TABLE DAILY_VISITS (
    VISIT_DATE DATE,
    RESORT STRING,
    RFID STRING,
    NAME STRING,
    FIRST_RIDE_TIME DATETIME,
    FIRST_LIFT STRING,
    HAS_SEASON_PASS BOOLEAN
);

### 4.2. Stage for Deployed Snowpark Code

Create a stage to store Snowpark Python code for stored procedures.

In [None]:
create stage if not exists snowpark_apps;

### 4.3. Snowpark Python Function: `populate_daily_visits`

This Python function will ultimately be deployed as a Python Stored Procedure. It processes new records from `LIFT_RIDE_STREAM`, identifies the first ride for each visitor per day at each resort, enriches the data with customer details and pass status, and inserts new, unique daily visits into the `DAILY_VISITS` table.

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, row_number, coalesce, when, to_date
from snowflake.snowpark.window import Window

def populate_daily_visits(session: Session) -> str:
    """
    Populate DAILY_VISITS table using Snowpark Python
    Handles data from any date in the stream, deduplicates by RFID per resort per day
    This process is designed to be run frequently from a triggered task
    """
    
    # Step 1: Get new rides from stream
    lift_ride_stream = session.table("LIFT_RIDE_STREAM")
    
    # Deduplicate by RFID per resort per day - get earliest ride time
    window_spec = Window.partition_by(
        col("RESORT"), 
        col("RFID"), 
        to_date(col("RIDE_TIME")) # Ensure partitioning by date part only
    ).order_by(col("RIDE_TIME").asc())
    
    first_rides_df = lift_ride_stream.select(
        col("RESORT"),
        col("RFID"),
        col("LIFT").alias("FIRST_LIFT"),
        col("RIDE_TIME").alias("FIRST_RIDE_TIME"),        
        to_date(col("RIDE_TIME")).alias("VISIT_DATE"), # Use to_date for VISIT_DATE       
        row_number().over(window_spec).alias("rn")
    )
    # Filter to only first ride of each day for each RFID at each resort
    first_rides_df = first_rides_df.filter(col("rn") == 1).drop("rn") # Drop rn after filtering
    
    # Step 2: Join with customer data to get customer details and determine visit type
    # Corrected alias for season_pass and resort_ticket RFID columns to avoid ambiguity
    season_pass_df = session.table("SEASON_PASS").select(col("RFID").alias("RFID_PASS"), col("NAME").alias("NAME_PASS"))
    resort_ticket_df = session.table("RESORT_TICKET").select(col("RFID").alias("RFID_TICKET"), col("NAME").alias("NAME_TICKET"))
    
    # Left join with season pass
    first_rides_df = first_rides_df.join(season_pass_df, first_rides_df.col("RFID") == season_pass_df.col("RFID_PASS"), "left")
    
    # Left join with resort ticket
    first_rides_df = first_rides_df.join(resort_ticket_df, first_rides_df.col("RFID") == resort_ticket_df.col("RFID_TICKET"), "left")
     
    first_rides_df = first_rides_df.select(
        first_rides_df.col("RESORT"),
        first_rides_df.col("RFID"),
        first_rides_df.col("FIRST_LIFT"),
        first_rides_df.col("FIRST_RIDE_TIME"),
        first_rides_df.col("VISIT_DATE"),
        coalesce(col("NAME_PASS"), col("NAME_TICKET")).alias("NAME"),
        when(col("RFID_PASS").is_not_null(), True).otherwise(False).alias("HAS_SEASON_PASS")
    )
    
    # Step 3: Anti-join with existing DAILY_VISITS to find new visits
    daily_visits_existing_df = session.table("DAILY_VISITS").select(
            col("VISIT_DATE").alias("VISIT_DATE_DV"),
            col("RESORT").alias("RESORT_DV"),
            col("RFID").alias("RFID_DV")
    )

    # Join condition for anti-join
    join_condition = (
        (first_rides_df.VISIT_DATE == daily_visits_existing_df.VISIT_DATE_DV) &
        (first_rides_df.RESORT == daily_visits_existing_df.RESORT_DV) &
        (first_rides_df.RFID == daily_visits_existing_df.RFID_DV)
    )
    
    new_visits_df = first_rides_df.join(
        daily_visits_existing_df, 
        join_condition, 
        "left_anti"
    )
    
    # Select columns for insertion, ensuring correct order and names as per DAILY_VISITS table
    new_visits_to_insert_df = new_visits_df.select(
        col("VISIT_DATE"),
        col("RESORT"),
        col("RFID"),
        col("NAME"),
        col("FIRST_RIDE_TIME"),
        col("FIRST_LIFT"),
        col("HAS_SEASON_PASS")
    )
    
    # Step 4: Append new visits into DAILY_VISITS table
    try:
        # Write the data to the table
        if new_visits_to_insert_df.count() > 0: # Only write if there's new data
            new_visits_to_insert_df.write.mode("append").save_as_table("DAILY_VISITS") # Use default column name mapping
            return f"OK: Appended {new_visits_to_insert_df.count()} new visits."
        else:
            return "OK: No new visits to append."
    except Exception as e:
        return f"ERROR: {str(e)}"

### 4.4. Manually invoke Python function (for testing/setup)

Prior to deploying as a Snowflake task, let's run the Python function to make sure it's working properly. This step will backfill initial data if `SHOW_INITIAL_ROWS=TRUE` was used for the stream and it's the first run.

In [None]:
populate_daily_visits(session)

### 4.5. Create Triggered Task to Automate `populate_daily_visits`

Define and create a Snowflake Triggered Task to automatically run `populate_daily_visits` as a Python stored procedure when new data arrives in the `LIFT_RIDE_STREAM`.

In [None]:
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
root = Root(session)
populate_dv_task = Task(
    "populate_daily_visits",
    StoredProcedureCall(populate_daily_visits, stage_location="@snowpark_apps"),
    warehouse="STREAMING_INGEST", 
    condition="SYSTEM$STREAM_HAS_DATA('lift_ride_stream')",
    allow_overlapping_execution=False
)
populate_dv_task_res = root.databases['streaming_ingest'].schemas['streaming_ingest'].tasks["populate_daily_visits"]
populate_dv_task_res.create_or_alter(populate_dv_task)

## 5. Task Management

Commands to manage the `populate_daily_visits`, such as suspending, checking parameters, altering, and resuming.

In [None]:
populate_dv_task_res.suspend()

In [None]:
SHOW PARAMETERS LIKE 'USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS' IN TASK populate_daily_visits;

In [None]:
-- Note: USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS controls the minimum execution interval for triggered tasks.
-- By setting to 10 seconds, the task will run with maximum frequency.
ALTER TASK populate_daily_visits SET USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS = 10;

In [None]:
# Resume the task to start its execution based on the stream condition
# Ensure populate_dv_task_ref is defined from the PY_SUSPEND_POPULATE_DAILY_VISITS_TASK cell
populate_dv_task_res.resume()

## 6. Dynamic Table Aggregation Pipeline

Define a series of Dynamic Tables to perform hierarchical aggregations (hourly, daily, weekly) on the ski resort data. These tables will automatically refresh as new data arrives.

### 6.1. Hourly Aggregations

These Dynamic Tables provide the first level of aggregation, summarizing data on an hourly basis.

In [None]:
CREATE OR REPLACE DYNAMIC TABLE HOURLY_LIFT_ACTIVITY 
TARGET_LAG='1 minute' 
WAREHOUSE = STREAMING_INGEST AS
SELECT 
    DATE(lr.RIDE_TIME) as RIDE_DATE,
    HOUR(lr.RIDE_TIME) as RIDE_HOUR,
    lr.RESORT,
    COUNT(*) as TOTAL_RIDES,
    COUNT(DISTINCT lr.RFID) as VISITOR_COUNT,
    -- Use DAILY_VISITS to determine pass usage for visitors active in this hour
    COUNT(DISTINCT CASE WHEN dv.HAS_SEASON_PASS = TRUE THEN lr.RFID END) as ACTIVE_PASSES,
    COUNT(CASE WHEN dv.HAS_SEASON_PASS = TRUE THEN 1 END) as PASS_RIDES
FROM LIFT_RIDE lr
LEFT JOIN DAILY_VISITS dv ON lr.RFID = dv.RFID 
    AND DATE(lr.RIDE_TIME) = dv.VISIT_DATE
    AND lr.RESORT = dv.RESORT
GROUP BY DATE(lr.RIDE_TIME), HOUR(lr.RIDE_TIME), lr.RESORT;

In [None]:
CREATE OR REPLACE DYNAMIC TABLE HOURLY_LIFT_TICKET_SALES
TARGET_LAG='1 minute' 
WAREHOUSE = STREAMING_INGEST AS
SELECT 
    DATE(PURCHASE_TIME) as PURCHASE_DATE,
    HOUR(PURCHASE_TIME) as PURCHASE_HOUR,
    RESORT,
    SUM(PRICE_USD) as TICKET_REVENUE,
    COUNT(*) as TICKETS_SOLD
FROM RESORT_TICKET 
GROUP BY DATE(PURCHASE_TIME), HOUR(PURCHASE_TIME), RESORT;

In [None]:
CREATE OR REPLACE DYNAMIC TABLE HOURLY_RESORT_SUMMARY
TARGET_LAG='1 minute' -- Can be downstream if HOURLY_LIFT_ACTIVITY and HOURLY_LIFT_TICKET_SALES have shorter, fixed lags
WAREHOUSE = STREAMING_INGEST AS
SELECT 
    h.RIDE_DATE,
    h.RIDE_HOUR,
    h.RESORT,
    h.VISITOR_COUNT,
    h.TOTAL_RIDES,
    COALESCE(t.TICKET_REVENUE, 0) as TICKET_REVENUE,
    COALESCE(t.TICKETS_SOLD, 0) as TICKETS_SOLD,
    h.ACTIVE_PASSES,
    h.PASS_RIDES,
    -- Calculate capacity percentage
    ROUND((h.VISITOR_COUNT / rc.MAX_CAPACITY * 100), 1) as CAPACITY_PCT,
    -- Calculate total revenue (tickets + allocated pass value: $5.50 per active pass use in the hour)
    COALESCE(t.TICKET_REVENUE, 0) + (h.PASS_RIDES * 5.50) as TOTAL_REVENUE, -- Changed from ACTIVE_PASSES to PASS_RIDES for revenue calc
    -- Capacity status
    CASE 
        WHEN (h.VISITOR_COUNT / rc.MAX_CAPACITY * 100) > 90 THEN 'HIGH'
        WHEN (h.VISITOR_COUNT / rc.MAX_CAPACITY * 100) > 70 THEN 'MODERATE'
        ELSE 'NORMAL'
    END as CAPACITY_STATUS
FROM HOURLY_LIFT_ACTIVITY h
LEFT JOIN HOURLY_LIFT_TICKET_SALES t 
    ON h.RIDE_DATE = t.PURCHASE_DATE 
    AND h.RIDE_HOUR = t.PURCHASE_HOUR 
    AND h.RESORT = t.RESORT
JOIN RESORT_CAPACITY rc ON h.RESORT = rc.RESORT;

### 6.2. Daily Aggregations

This Dynamic Table rolls up hourly data and incorporates accurate daily visitor counts from the `DAILY_VISITS` table.

In [None]:
-- ========================================
-- DAILY RESORT SUMMARY DYNAMIC TABLE
-- Daily aggregation from hourly DT + accurate visitor counts from DAILY_VISITS
-- ========================================
CREATE OR REPLACE DYNAMIC TABLE DAILY_RESORT_SUMMARY
TARGET_LAG='1 minute' -- Can be 'downstream' if HOURLY_RESORT_SUMMARY has a fixed lag
WAREHOUSE = STREAMING_INGEST AS
WITH daily_hourly_aggregation AS (
    SELECT 
        RIDE_DATE,
        RESORT,
        MAX(VISITOR_COUNT) as PEAK_HOURLY_VISITORS, -- Renamed for clarity
        SUM(VISITOR_COUNT) as TOTAL_VISITOR_HOURS,
        SUM(TOTAL_RIDES) as TOTAL_RIDES,
        SUM(TOTAL_REVENUE) as TOTAL_REVENUE,
        SUM(TICKET_REVENUE) as TICKET_REVENUE,
        SUM(TICKETS_SOLD) as TICKETS_SOLD,
        SUM(PASS_RIDES) as TOTAL_PASS_RIDES, -- Changed from ACTIVE_PASSES
        ROUND(AVG(CAPACITY_PCT), 1) as AVG_CAPACITY_PCT,
        MAX(CAPACITY_PCT) as PEAK_CAPACITY_PCT,
        COUNT(*) as OPERATION_HOURS
    FROM HOURLY_RESORT_SUMMARY
    GROUP BY RIDE_DATE, RESORT
),
daily_visitor_count AS (
    SELECT 
        VISIT_DATE,
        RESORT,
        COUNT(DISTINCT RFID) as TOTAL_UNIQUE_VISITORS -- Renamed for clarity
    FROM DAILY_VISITS
    GROUP BY VISIT_DATE, RESORT
)
SELECT 
    h.RIDE_DATE,
    h.RESORT,
    COALESCE(v.TOTAL_UNIQUE_VISITORS, 0) as TOTAL_VISITORS, -- This is the accurate daily unique visitor count
    h.PEAK_HOURLY_VISITORS,
    h.TOTAL_VISITOR_HOURS,
    h.TOTAL_RIDES,
    h.TOTAL_REVENUE,
    h.TICKET_REVENUE,
    h.TICKETS_SOLD,
    h.TOTAL_PASS_RIDES, -- Renamed from TOTAL_PASS_USES
    h.AVG_CAPACITY_PCT,
    h.PEAK_CAPACITY_PCT,
    h.OPERATION_HOURS
FROM daily_hourly_aggregation h
LEFT JOIN daily_visitor_count v 
    ON h.RIDE_DATE = v.VISIT_DATE 
    AND h.RESORT = v.RESORT;

### 6.3. Weekly Aggregations

This Dynamic Table aggregates daily summaries to provide weekly insights.

In [None]:
-- ========================================
-- WEEKLY RESORT SUMMARY DYNAMIC TABLE
-- Weekly aggregation from daily DT
-- ========================================
CREATE OR REPLACE DYNAMIC TABLE WEEKLY_RESORT_SUMMARY
TARGET_LAG='1 minute' -- Can be 'downstream' if DAILY_RESORT_SUMMARY has a fixed lag
WAREHOUSE = STREAMING_INGEST AS
SELECT 
    DATE_TRUNC('week', RIDE_DATE) as WEEK_START_DATE,
    RESORT,
    MAX(PEAK_HOURLY_VISITORS) as WEEK_PEAK_HOURLY_VISITORS, -- Reflects peak hourly within the week
    ROUND(AVG(TOTAL_VISITORS), 0) as AVG_DAILY_UNIQUE_VISITORS, -- Avg of unique daily visitors
    SUM(TOTAL_RIDES) as WEEK_TOTAL_RIDES,
    SUM(TOTAL_REVENUE) as WEEK_TOTAL_REVENUE,
    ROUND(AVG(TOTAL_REVENUE), 0) as AVG_DAILY_REVENUE,
    SUM(TICKETS_SOLD) as WEEK_TICKETS_SOLD,
    SUM(TOTAL_PASS_RIDES) as WEEK_TOTAL_PASS_RIDES, -- Reflects sum of daily pass rides
    ROUND(AVG(AVG_CAPACITY_PCT), 1) as AVG_WEEK_CAPACITY_PCT,
    MAX(PEAK_CAPACITY_PCT) as WEEK_PEAK_CAPACITY_PCT,
    COUNT(DISTINCT RIDE_DATE) as OPERATION_DAYS -- Count of distinct days with operations
FROM DAILY_RESORT_SUMMARY
GROUP BY DATE_TRUNC('week', RIDE_DATE), RESORT;

## 7. Analytical Views for Reporting

Create views on top of base tables and/or dynamic tables for easier querying and dashboarding.

In [None]:
-- ========================================
-- VIEW: V_RT_LIFT_PERFORMANCE
-- Real-time lift performance based on last 30 minutes of activity from LIFT_RIDE table
-- ========================================
CREATE OR REPLACE VIEW V_RT_LIFT_PERFORMANCE AS
WITH simulation_clock AS (
    -- Determine the latest ride time to simulate a 'current time' for the batch data
    SELECT         
        MAX(RIDE_TIME) as MAX_RIDE_TIME
    FROM LIFT_RIDE     
),
recent_activity AS (
    SELECT 
        lr.RESORT,
        lr.LIFT,
        COUNT(*) as RIDES_30MIN,
        COUNT(DISTINCT lr.RFID) as VISITORS_30MIN,
        MAX(lr.RIDE_TIME) as LAST_ACTIVITY_TIME
    FROM LIFT_RIDE lr
    CROSS JOIN simulation_clock clock -- Use CROSS JOIN if clock returns one row, or ensure appropriate join condition
    WHERE lr.RIDE_TIME >= DATEADD(MINUTE, -30, clock.MAX_RIDE_TIME) AND lr.RIDE_TIME <= clock.MAX_RIDE_TIME
    GROUP BY lr.RESORT, lr.LIFT
    HAVING COUNT(*) > 0 -- Ensure there was activity
)
SELECT 
    ra.RESORT,
    ra.LIFT,
    ra.RIDES_30MIN,
    ra.VISITORS_30MIN,
    ra.LAST_ACTIVITY_TIME,
    ROUND(ra.RIDES_30MIN * 2.0, 1) as ESTIMATED_RIDES_PER_HOUR, -- Simplified: rides in 30 mins * 2
    ROW_NUMBER() OVER (PARTITION BY ra.RESORT ORDER BY ra.RIDES_30MIN DESC) as USAGE_RANK_IN_RESORT
FROM recent_activity ra
ORDER BY ra.RESORT, ra.RIDES_30MIN DESC;

In [None]:
-- ========================================
-- VIEW: V_DAILY_REVENUE_PERFORMANCE
-- Daily revenue vs targets, derived from DAILY_RESORT_SUMMARY and RESORT_CAPACITY
-- ========================================
CREATE OR REPLACE VIEW V_DAILY_REVENUE_PERFORMANCE AS
WITH daily_targets AS (
    SELECT 
        RESORT,
        (MAX_CAPACITY * 0.7 * 100) as REVENUE_TARGET_USD -- Example target: 70% of max capacity value, assuming $100 per visitor
    FROM RESORT_CAPACITY
)
SELECT 
    d.RIDE_DATE,
    d.RESORT,
    d.TOTAL_REVENUE,
    t.REVENUE_TARGET_USD,
    CASE 
        WHEN t.REVENUE_TARGET_USD > 0 THEN ROUND((d.TOTAL_REVENUE / t.REVENUE_TARGET_USD * 100), 1)
        ELSE NULL 
    END as REVENUE_TARGET_PCT,
    CASE 
        WHEN d.TOTAL_REVENUE >= t.REVENUE_TARGET_USD THEN 'ABOVE_TARGET'
        WHEN d.TOTAL_REVENUE >= t.REVENUE_TARGET_USD * 0.9 THEN 'NEAR_TARGET'
        ELSE 'BELOW_TARGET'
    END as PERFORMANCE_STATUS
FROM DAILY_RESORT_SUMMARY d
JOIN daily_targets t ON d.RESORT = t.RESORT;

In [None]:
-- ========================================
-- VIEW: V_DAILY_NETWORK_METRICS
-- Simplified network-wide metrics for dashboard, derived from DAILY_RESORT_SUMMARY
-- ========================================
CREATE OR REPLACE VIEW V_DAILY_NETWORK_METRICS AS
SELECT 
    RIDE_DATE,
    SUM(TOTAL_VISITORS) as TOTAL_NETWORK_VISITORS,
    SUM(TOTAL_REVENUE) as TOTAL_NETWORK_REVENUE,
    ROUND(AVG(AVG_CAPACITY_PCT), 1) as AVG_NETWORK_CAPACITY_PCT, -- Average of average daily capacities
    SUM(TOTAL_RIDES) as TOTAL_NETWORK_RIDES,
    COUNT(DISTINCT RESORT) as ACTIVE_RESORTS
FROM DAILY_RESORT_SUMMARY
GROUP BY RIDE_DATE;

## 9. Schema Verification

Show tables and views to verify the created objects.

In [None]:
SHOW DYNAMIC TABLES;

In [None]:
SHOW VIEWS;

## 10. Dynamic Table Observability

Monitor the health, refresh history, and status of your Dynamic Tables.

In [None]:
-- Check refresh history for performance monitoring
SELECT *
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(NAME_PREFIX => 'STREAMING_INGEST.STREAMING_INGEST.'))
ORDER BY refresh_start_time DESC;

## 11. Conclusion and Next Steps

This notebook has established an end-to-end streaming data pipeline incorporating Snowpark for complex transformations (`DAILY_VISITS`) and a hierarchy of Dynamic Tables for efficient, incremental aggregations.

**Key Features Implemented:**
- Automated daily unique visitor tracking using a Snowpark procedure and Task.
- Multi-level aggregation pipeline (Hourly → Daily → Weekly) using Dynamic Tables.
- Analytical views for simplified reporting and dashboarding.
- Observability queries for monitoring Dynamic Table performance and health.

**Potential Next Steps:**
- Build Streamlit applications or connect BI tools to these views and Dynamic Tables for visualization.
- Extend the pipeline with more advanced analytics, such as anomaly detection or predictive modeling.
- Implement alerting based on DT status or data quality checks.