# DYNAMIC TABLES

Dynamic tables are a declarative way of defining your data pipeline in Snowflake. It will continuously and automatically materialize the result of that query as a table. Dynamic Tables can join and aggregate across multiple source objects and *incrementally* update results as sources change.

Dynamic Tables can also be chained together to create a DAG for more complex data pipelines.

![image](https://quickstarts.snowflake.com/guide/getting_started_with_dynamic_tables/img/b268e50e6bedab07.jpg)



## What You'll Build Today
- A continuous data pipeline using Dynamic tables
- Manage and monitor Dynamic tables
- Add some Data Quality Metrics

In [None]:
--first create your own schema to work in!
create schema if not exists MY_NAME ;
use schema MY_NAME;
---this should be your name!!
select current_database(), current_schema();

In [None]:
---step one create sample data using a python UDF function

create or replace function gen_cust_info(num_records number)
returns table (custid number(10), cname varchar(100), spendlimit number(10,2))
language python
runtime_version=3.8
handler='CustTab'
packages = ('Faker')
as $$
from faker import Faker
import random

fake = Faker()
# Generate a list of customers  

class CustTab:
    # Generate multiple customer records
    def process(self, num_records):
        customer_id = 1000 # Starting customer ID                 
        for _ in range(num_records):
            custid = customer_id + 1
            cname = fake.name()
            spendlimit = round(random.uniform(1000, 10000),2)
            customer_id += 1
            yield (custid,cname,spendlimit)

$$;

create or replace table cust_info as select * from table(gen_cust_info(1000)) order by 1;

In [None]:
--did we get data?
select * from cust_info limit 5

In [None]:
--repeat for the other tables in our pipeline
create or replace function gen_prod_inv(num_records number)
returns table (pid number(10), pname varchar(100), stock number(10,2), stockdate date)
language python
runtime_version=3.8
handler='ProdTab'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta
fake = Faker()

class ProdTab:
    # Generate multiple product records
    def process(self, num_records):
        product_id = 100 # Starting customer ID                 
        for _ in range(num_records):
            pid = product_id + 1
            pname = fake.catch_phrase()
            stock = round(random.uniform(500, 1000),0)
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (3 months from now)
            min_date = current_date - timedelta(days=90)
            
            # Generate a random date within the date range
            stockdate = fake.date_between_dates(min_date,current_date)

            product_id += 1
            yield (pid,pname,stock,stockdate)

$$;

create or replace table prod_stock_inv as select * from table(gen_prod_inv(100)) order by 1;

create or replace function gen_cust_purchase(num_records number,ndays number)
returns table (custid number(10), purchase variant)
language python
runtime_version=3.8
handler='genCustPurchase'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta

fake = Faker()

class genCustPurchase:
    # Generate multiple customer purchase records
    def process(self, num_records,ndays):       
        for _ in range(num_records):
            c_id = fake.random_int(min=1001, max=1999)
            
            #print(c_id)
            customer_purchase = {
                'custid': c_id,
                'purchased': []
            }
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (days from now)
            min_date = current_date - timedelta(days=ndays)
            
            # Generate a random date within the date range
            pdate = fake.date_between_dates(min_date,current_date)
            
            purchase = {
                'prodid': fake.random_int(min=101, max=199),
                'quantity': fake.random_int(min=1, max=5),
                'purchase_amount': round(random.uniform(10, 1000),2),
                'purchase_date': pdate
            }
            customer_purchase['purchased'].append(purchase)
            
            #customer_purchases.append(customer_purchase)
            yield (c_id,purchase)

$$;

-- Create table and insert records 
create or replace table salesdata as select * from table(gen_cust_purchase(10000,10));





This completes our sample data stored in raw base tables. In real world, you will load this data into Snowflake either using COPY COMMAND, connectors, Snowpipe or Snowpipe Streaming

In [None]:
--Check if there is data in all 3 raw tables -

-- customer information table, each customer has spending limits
select * from cust_info limit 5;

-- product stock table, each product has stock level from fulfilment day

select * from prod_stock_inv limit 5;

-- sales data for products purchsaed online by various customers
select * from salesdata limit 5;

## Problem Statement
Let's assume that you are a data engineer at an online retail company, where a wide array of products is sold. In this role, we collect customer purchase and product sales data, initially storing it in a raw data table. Our primary tasks involve creating a continuous data pipeline for generating sales reports and validate the data.

## Data Pipeline Architecture
![](https://i.postimg.cc/3NCyXC6C/Screenshot-2024-06-07-at-4-45-04-PM.png)

Let's create our first Dynamic Table. For the first step in the pipeline we will extract the sales information from the salesdata table and join it with customer information to build the customer_sales_data_history, note that *we are extracting raw json data(schema on read)* and transforming it into meaningful columns and data types.



[Increment Refresh is Important!](https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh#supported-queries-in-incremental-refresh)

In [None]:

CREATE OR REPLACE DYNAMIC TABLE customer_sales_data_history
    LAG='DOWNSTREAM'
    WAREHOUSE=COMPUTE_WH_FOR_LAB
AS
select 
    s.custid as customer_id,
    c.cname as customer_name,
    s.purchase:"prodid"::number(5) as product_id,
    s.purchase:"purchase_amount"::number(10) as saleprice,
    s.purchase:"quantity"::number(5) as quantity,
    s.purchase:"purchase_date"::date as salesdate
from
    cust_info c inner join salesdata s on c.custid = s.custid
;

## Target lag is specified in one of following ways:

- Measure of freshness: Defines the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables.

    The following example sets the dynamic table to refresh and maintain freshness every hour:

    ALTER DYNAMIC TABLE product SET TARGET_LAG = '1 hour';

- Downstream: Specifies that the dynamic table should refresh on demand when other dependent dynamic tables refresh. This refresh can be triggered by a manual or scheduled refresh of a downstream dynamic table.

    In the following example, refresh is based on other downstream dynamic tables:

    ALTER DYNAMIC TABLE product SET TARGET_LAG = DOWNSTREAM;


In [None]:
-- quick sanity check
select * from customer_sales_data_history limit 10;

In [None]:

select count(*) from customer_sales_data_history;

Now, let's combine these results with the product table, we can even encorporate window functions

In [None]:
CREATE OR REPLACE DYNAMIC TABLE salesreport
    LAG = '1 MINUTE'
    WAREHOUSE=COMPUTE_WH_FOR_LAB
AS
    Select
        t1.customer_id,
        t1.customer_name, 
        t1.product_id,
        p.pname as product_name,
        t1.saleprice,
        t1.quantity,
        (t1.saleprice/t1.quantity) as unitsalesprice,
        t1.salesdate as CreationTime,
        customer_id || '-' || t1.product_id  || '-' || t1.salesdate AS CUSTOMER_SK,
        LEAD(CreationTime) OVER (PARTITION BY t1.customer_id ORDER BY CreationTime ASC) AS END_TIME
    from 
        customer_sales_data_history t1 inner join prod_stock_inv p 
        on t1.product_id = p.pid
       
;

In [None]:
select * from salesreport limit 5;


In [None]:
select count(*) from salesreport;

Check the UI!  Love a nice looking DAG

__[Monitor Dynamic Tables](https://app.snowflake.com/sfsenorthamerica/demo_jhill/#/compute/history/dynamic-tables)__
![](https://i.postimg.cc/Gt0D2T1G/DT-UI.png)

In [None]:
---can also monitor in SQL
SELECT * 
FROM 
    TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
WHERE 
    NAME IN ('SALESREPORT','CUSTOMER_SALES_DATA_HISTORY')
    -- AND REFRESH_ACTION != 'NO_DATA'
ORDER BY 
    DATA_TIMESTAMP DESC, REFRESH_END_TIME DESC LIMIT 10;

In [None]:
-- Add new records to test out pipeline
insert into salesdata select * from table(gen_cust_purchase(10000,2));

-- Check raw base table
select count(*) from salesdata;


In [None]:
-- Check Dynamic Tables after a minute... should grow by 10K
select count(*) from salesreport;

In [None]:
--!!!MANUAL REFRESH OF PIPELINE --- can be done from any tool!!!
insert into salesdata select * from table(gen_cust_purchase(10000,2));
alter dynamic table salesreport REFRESH;
--target ready ASAP
select count(*) from salesreport;

In [None]:
---other useful SQL
-- Resume the data pipeline
alter dynamic table salesreport RESUME;


-- Suspend the data pipeline
alter dynamic table salesreport SUSPEND;


# How About Monitoring My Data???
## [How about Data Quality Metrics](https://docs.snowflake.com/en/user-guide/data-quality-system-dmfs#system-dmfs) :)

![image](https://i.postimg.cc/QxtbtpwD/mypic.jpg)



In [None]:
ALTER TABLE salesdata
-- SET DATA_METRIC_SCHEDULE = '5 MINUTES';
  DATA_METRIC_SCHEDULE = 'TRIGGER_ON_CHANGES';

In [None]:
ALTER TABLE salesdata ADD DATA METRIC FUNCTION
  SNOWFLAKE.CORE.NULL_COUNT ON (custid);

In [None]:
INSERT INTO salesdata VALUES(NULL,NULL)

In [None]:
SELECT SNOWFLAKE.CORE.NULL_COUNT(
  SELECT custid
  FROM salesdata
);

In [None]:
SELECT METRIC_DATABASE, METRIC_NAME, scheduled_time, measurement_time, table_database, table_schema, table_name, metric_name, value, *
FROM SNOWFLAKE.LOCAL.DATA_QUALITY_MONITORING_RESULTS
WHERE TRUE
and table_schema = 'MY_NAME'
and table_database = 'HOL_TAKETWO'
LIMIT 100;

# POP QUIZ
- Remove the null record from the base table salesdata
- Force a manual refresh of the target DYNAMIC TABLE salesreport
- Check that the record no long exists using your DATA QUALITY METRIC