In [None]:
# Getting Started with Snowflake Dynamic Tables

This notebook demonstrates how to build continuous data pipelines using Snowflake Dynamic Tables - a declarative way of defining data pipelines that automatically and continuously materialize query results. We'll build a change data capture (CDC) pipeline, cumulative sum using Python UDTF, and data validation using Dynamic Tables.

**Original Quickstart:** https://quickstarts.snowflake.com/guide/getting_started_with_dynamic_tables/index.html

**Note:** This notebook uses the Faker Python package for generating sample data. Please ensure this package is available in your environment before running the Python UDTFs.


In [None]:
## Session Context Setup


In [None]:
-- Display current session context
SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_WAREHOUSE(), CURRENT_DATABASE(), CURRENT_SCHEMA();


In [None]:
## Initial Database and Warehouse Setup


In [None]:
-- Create initial database, schema and warehouse infrastructure
CREATE DATABASE IF NOT EXISTS DEMO;
CREATE SCHEMA IF NOT EXISTS DEMO.DT_DEMO;
USE SCHEMA DEMO.DT_DEMO;

CREATE WAREHOUSE XSMALL_WH 
WAREHOUSE_TYPE = STANDARD
  WAREHOUSE_SIZE = XSMALL
  AUTO_SUSPEND = 300
  AUTO_RESUME = TRUE;


In [None]:
## Sample Data Generation - Customer Information


In [None]:
-- Create Python UDTF to generate customer information data
create or replace function gen_cust_info(num_records number)
returns table (custid number(10), cname varchar(100), spendlimit number(10,2))
language python
runtime_version=3.10
handler='CustTab'
packages = ('Faker')
as $$
from faker import Faker
import random

fake = Faker()
# Generate a list of customers  

class CustTab:
    # Generate multiple customer records
    def process(self, num_records):
        customer_id = 1000 # Starting customer ID                 
        for _ in range(num_records):
            custid = customer_id + 1
            cname = fake.name()
            spendlimit = round(random.uniform(1000, 10000),2)
            customer_id += 1
            yield (custid,cname,spendlimit)

$$;


In [None]:
-- Generate customer information table with 1000 records
create or replace table cust_info as select * from table(gen_cust_info(1000)) order by 1;


In [None]:
## Sample Data Generation - Product Inventory


In [None]:
-- Create Python UDTF to generate product inventory data
create or replace function gen_prod_inv(num_records number)
returns table (pid number(10), pname varchar(100), stock number(10,2), stockdate date)
language python
runtime_version=3.10
handler='ProdTab'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta
fake = Faker()

class ProdTab:
    # Generate multiple product records
    def process(self, num_records):
        product_id = 100 # Starting customer ID                 
        for _ in range(num_records):
            pid = product_id + 1
            pname = fake.catch_phrase()
            stock = round(random.uniform(500, 1000),0)
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (3 months from now)
            min_date = current_date - timedelta(days=90)
            
            # Generate a random date within the date range
            stockdate = fake.date_between_dates(min_date,current_date)

            product_id += 1
            yield (pid,pname,stock,stockdate)

$$;


In [None]:
-- Generate product inventory table with 100 records
create or replace table prod_stock_inv as select * from table(gen_prod_inv(100)) order by 1;


In [None]:
## Sample Data Generation - Sales Data


In [None]:
-- Create Python UDTF to generate customer purchase/sales data
create or replace function gen_cust_purchase(num_records number,ndays number)
returns table (custid number(10), purchase variant)
language python
runtime_version=3.10
handler='genCustPurchase'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta

fake = Faker()

class genCustPurchase:
    # Generate multiple customer purchase records
    def process(self, num_records,ndays):       
        for _ in range(num_records):
            c_id = fake.random_int(min=1001, max=1999)
            
            #print(c_id)
            customer_purchase = {
                'custid': c_id,
                'purchased': []
            }
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (days from now)
            min_date = current_date - timedelta(days=ndays)
            
            # Generate a random date within the date range
            pdate = fake.date_between_dates(min_date,current_date)
            
            purchase = {
                'prodid': fake.random_int(min=101, max=199),
                'quantity': fake.random_int(min=1, max=5),
                'purchase_amount': round(random.uniform(10, 1000),2),
                'purchase_date': pdate
            }
            customer_purchase['purchased'].append(purchase)
            
            #customer_purchases.append(customer_purchase)
            yield (c_id,purchase)

$$;


In [None]:
-- Generate sales data table with 10000 records for last 10 days
create or replace table salesdata as select * from table(gen_cust_purchase(10000,10));


In [None]:
## Data Verification


In [None]:
-- Verify customer information table data
select * from cust_info limit 10;


In [None]:
-- Verify product stock table data
select * from prod_stock_inv limit 10;


In [None]:
-- Verify sales data table structure
select * from salesdata limit 10;


In [None]:
## Dynamic Tables Pipeline - Customer Sales Data History


In [None]:
-- Create first Dynamic Table to extract and join sales and customer data
CREATE OR REPLACE DYNAMIC TABLE customer_sales_data_history
    LAG='DOWNSTREAM'
    WAREHOUSE=XSMALL_WH
AS
select 
    s.custid as customer_id,
    c.cname as customer_name,
    s.purchase:"prodid"::number(5) as product_id,
    s.purchase:"purchase_amount"::number(10) as saleprice,
    s.purchase:"quantity"::number(5) as quantity,
    s.purchase:"purchase_date"::date as salesdate
from
    cust_info c inner join salesdata s on c.custid = s.custid
;


In [None]:
-- Verify customer sales data history
select * from customer_sales_data_history limit 10;


In [None]:
-- Check record count in customer sales data history
select count(*) from customer_sales_data_history;


In [None]:
## Dynamic Tables Pipeline - Sales Report with SCD Type 2


In [None]:
-- Create sales report Dynamic Table with SCD Type 2 transformation using LEAD function
CREATE OR REPLACE DYNAMIC TABLE salesreport
    LAG = '1 MINUTE'
    WAREHOUSE=XSMALL_WH
AS
    Select
        t1.customer_id,
        t1.customer_name, 
        t1.product_id,
        p.pname as product_name,
        t1.saleprice,
        t1.quantity,
        (t1.saleprice/t1.quantity) as unitsalesprice,
        t1.salesdate as CreationTime,
        customer_id || '-' || t1.product_id  || '-' || t1.salesdate AS CUSTOMER_SK,
        LEAD(CreationTime) OVER (PARTITION BY t1.customer_id ORDER BY CreationTime ASC) AS END_TIME
    from 
        customer_sales_data_history t1 inner join prod_stock_inv p 
        on t1.product_id = p.pid
       
;


In [None]:
-- Verify sales report data
select * from salesreport limit 10;


In [None]:
-- Check record count in sales report
select count(*) from salesreport;


In [None]:
## Testing the Dynamic Tables Pipeline


In [None]:
-- Add new sales records to test pipeline refresh
insert into salesdata select * from table(gen_cust_purchase(10000,2));


In [None]:
-- Check updated raw base table record count
select count(*) from salesdata;


In [None]:
-- Check Dynamic Tables after refresh (may take up to 1 minute)
select count(*) from customer_sales_data_history;


In [None]:
-- Check sales report table count after refresh
select count(*) from salesreport;


In [None]:
## Python UDTF for Cumulative Sum


In [None]:
-- Create Python UDTF for running total calculation
CREATE OR REPLACE FUNCTION sum_table (INPUT_NUMBER number)
  returns TABLE (running_total number)
  language python
  runtime_version = '3.10'
  handler = 'gen_sum_table'
as
$$

# Define handler class
class gen_sum_table :

  ## Define __init__ method ro initilize the variable
  def __init__(self) :    
    self._running_sum = 0
  
  ## Define process method
  def process(self, input_number: float) :
    # Increment running sum with data from the input row
    new_total = self._running_sum + input_number
    self._running_sum = new_total

    yield(new_total,)
  
$$
;


In [None]:
## Dynamic Table with Python UDTF - Cumulative Purchase


In [None]:
-- Create Dynamic Table using Python UDTF for cumulative purchase calculation
CREATE OR REPLACE DYNAMIC TABLE cumulative_purchase
    LAG = '1 MINUTE'
    WAREHOUSE=XSMALL_WH
AS
    select 
        month(creationtime) monthNum,
        year(creationtime) yearNum,
        customer_id, 
        saleprice,
        running_total 
    from 
        salesreport,
        table(sum_table(saleprice) over (partition by creationtime,customer_id order by creationtime, customer_id))
       
;


In [None]:
-- Verify cumulative purchase data
select * from cumulative_purchase limit 10;


In [None]:
## Data Validation - Product Inventory Alert


In [None]:
-- Create Dynamic Table for product inventory alerts when stock is low
CREATE OR REPLACE DYNAMIC TABLE PROD_INV_ALERT
    LAG = '1 MINUTE'
    WAREHOUSE=XSMALL_WH
AS
    SELECT 
        S.PRODUCT_ID, 
        S.PRODUCT_NAME,CREATIONTIME AS LATEST_SALES_DATE,
        STOCK AS BEGINING_STOCK,
        SUM(S.QUANTITY) OVER (PARTITION BY S.PRODUCT_ID ORDER BY CREATIONTIME) TOTALUNITSOLD, 
        (STOCK - TOTALUNITSOLD) AS UNITSLEFT,
        ROUND(((STOCK-TOTALUNITSOLD)/STOCK) *100,2) PERCENT_UNITLEFT,
        CURRENT_TIMESTAMP() AS ROWCREATIONTIME
    FROM SALESREPORT S JOIN PROD_STOCK_INV ON PRODUCT_ID = PID
    QUALIFY ROW_NUMBER() OVER (PARTITION BY PRODUCT_ID ORDER BY CREATIONTIME DESC) = 1
;


In [None]:
-- Check products with low inventory (less than 10%)
select * from prod_inv_alert where percent_unitleft < 10;


In [None]:
## Email Alerts Setup


In [None]:
-- Create notification integration for email alerts (update email address as needed)
CREATE NOTIFICATION INTEGRATION IF NOT EXISTS
    notification_emailer
    TYPE=EMAIL
    ENABLED=TRUE
    ALLOWED_RECIPIENTS=('first.last@company.com')
    COMMENT = 'email integration to update on low product inventory levels'
;


In [None]:
-- Create alert for low inventory (update email and warehouse as needed)
CREATE OR REPLACE ALERT alert_low_inv
  WAREHOUSE = XSMALL_WH
  SCHEDULE = '30 MINUTE'
  IF (EXISTS (
      SELECT *
      FROM prod_inv_alert
      WHERE percent_unitleft < 10 and ROWCREATIONTIME > SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME()
  ))
  THEN CALL SYSTEM$SEND_EMAIL(
                'notification_emailer', -- notification integration to use
                'first.last@company.com', -- Email
                'Email Alert: Low Inventory of products', -- Subject
                'Inventory running low for certain products. Please check the inventory report in Snowflake table prod_inv_alert' -- Body of email
);


In [None]:
-- Resume the alert (alerts are paused by default)
ALTER ALERT alert_low_inv RESUME;


In [None]:
-- Add more sales data to potentially trigger alerts
insert into salesdata select * from table(gen_cust_purchase(10000,2));


In [None]:
## Monitoring Dynamic Tables


In [None]:
-- View current alerts status
SHOW ALERTS;


In [None]:
-- Check alert execution history for the last hour
SELECT *
FROM
  TABLE(INFORMATION_SCHEMA.ALERT_HISTORY(
    SCHEDULED_TIME_RANGE_START
      =>dateadd('hour',-1,current_timestamp())))
WHERE
    NAME = 'ALERT_LOW_INV'
ORDER BY SCHEDULED_TIME DESC;
