It's a common practice to use programming languages like Python in Data Engineering for constructing data pipelines. If you're migrating an existing data pipeline based on Python, Java, or Scala to Snowflake, Snowpark can be a valuable tool. Snowpark supports the creation of Python-based transformations through user-defined functions.

In below example, we'll demonstrate how to build a cumulative total of customer account balances each month and leverage this information to identify any instances of customers exceeding their set limits in the CUST_INFO table. Let's create a new SQL worksheet and rename it to "03_Dynamic_Table_With_UDTF"

In [None]:
use database SNOW_DYNAMIC_TABLES_DE;
use schema data;

This function computes the cumulative total and can be seamlessly incorporated into any SQL code or applied to any table as a table function. It's flexibile and allows us to feed any data partition, making it highly adaptable to any "cumulative total" use case. Let's partition this total by Customer and Month using dynamic table. This way it becomes highly modular and SQL independent.

In [None]:
CREATE OR REPLACE FUNCTION sum_table (INPUT_NUMBER number)
  returns TABLE (running_total number)
  language python
  runtime_version = '3.8'
  handler = 'gen_sum_table'
as
$$

# Define handler class
class gen_sum_table :

  ## Define __init__ method ro initilize the variable
  def __init__(self) :    
    self._running_sum = 0
  
  ## Define process method
  def process(self, input_number: float) :
    # Increment running sum with data from the input row
    new_total = self._running_sum + input_number
    self._running_sum = new_total

    yield(new_total,)
  
$$
;

In [None]:
CREATE OR REPLACE DYNAMIC TABLE cumulative_purchase
    LAG = '1 MINUTE'
    WAREHOUSE=DYNAMIC_TABLE_WH
AS
    select 
        month(creationtime) monthNum,
        year(creationtime) yearNum,
        customer_id, 
        saleprice,
        running_total 
    from 
        salesreport,
        table(sum_table(saleprice) over (partition by creationtime,customer_id order by creationtime, customer_id))
       
;

In [None]:
select * from  cumulative_purchase limit 10;

Similar results can be achieved using complex SQL queries, but it becomes more versatile and modular when implemented as a Python User-Defined Function (UDF).

The DAG that we created above will build our data pipeline but there are many use cases of DT, like creating data validation checks or data quality etc. In our data set, we want to know if a product is running low on inventory, let's say less than 10%.

In [None]:
CREATE OR REPLACE DYNAMIC TABLE PROD_INV_ALERT
    LAG = '1 MINUTE'
    WAREHOUSE=DYNAMIC_TABLE_WH
AS
    SELECT 
        S.PRODUCT_ID, 
        S.PRODUCT_NAME,CREATIONTIME AS LATEST_SALES_DATE,
        STOCK AS BEGINING_STOCK,
        SUM(S.QUANTITY) OVER (PARTITION BY S.PRODUCT_ID ORDER BY CREATIONTIME) TOTALUNITSOLD, 
        (STOCK - TOTALUNITSOLD) AS UNITSLEFT,
        ROUND(((STOCK-TOTALUNITSOLD)/STOCK) *100,2) PERCENT_UNITLEFT,
        CURRENT_TIMESTAMP() AS ROWCREATIONTIME
    FROM SALESREPORT S JOIN PROD_STOCK_INV ON PRODUCT_ID = PID
    QUALIFY ROW_NUMBER() OVER (PARTITION BY PRODUCT_ID ORDER BY CREATIONTIME DESC) = 1
;

Once this table has been built, we can now check and see if any products have low inventory

In [None]:
select * from prod_inv_alert where percent_unitleft < 10;

Alerts can help you send email alerts to your product procurement and inventory team to restock the required product. Remember to update the email address and warehouse in the below code.

In [None]:
-- First we create an integration to send the emails.
-- You can only send emails to an email address that has been verified in Snowflake. 

CREATE NOTIFICATION INTEGRATION IF NOT EXISTS
    notification_emailer
    TYPE=EMAIL
    ENABLED=TRUE
    ALLOWED_RECIPIENTS=('<YOUR_EMAIL>')
    COMMENT = 'email integration to update on low product inventory levels';

If you get an error that your email isn't valid or verified, you can click your initials in the bottom left, and go to 'my profile' to resent your verification email.

In [None]:
-- Now we create the alert, using the integration we created above.

CREATE OR REPLACE ALERT alert_low_inv
  WAREHOUSE = DYNAMIC_TABLE_WH
  SCHEDULE = '30 MINUTE'
  IF (EXISTS (
      SELECT *
      FROM prod_inv_alert
      WHERE percent_unitleft < 10 and ROWCREATIONTIME > SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME()
  ))
  THEN CALL SYSTEM$SEND_EMAIL(
                'notification_emailer', -- notification integration to use
                '<YOUR_EMAIL>', -- Email - be sure to update to yours
                'Email Alert: Low Inventory of products', -- Subject
                'Inventory running low for certain products. Please check the inventory report in Snowflake table prod_inv_alert' -- Body of email
);

-- Alerts are paused by default, so let's resume it first
ALTER ALERT alert_low_inv RESUME;

-- Add new records
insert into salesdata select * from table(gen_cust_purchase(10000,2));

And within a minute or two, you should receive an email letting you know that there is low inventory.

These alerts will only run if there is new data in the dynamic table (low inventory products). So, its easy to manage and maintain alerts in Snowflake on live data.

You can monitor, resume or pause alerts.

In [None]:
SHOW ALERTS;

In [None]:
-- If you would like, you can optionally force the alert to run.
EXECUTE ALERT ALERT_LOW_INV;

In [None]:
SELECT *
FROM
  TABLE(INFORMATION_SCHEMA.ALERT_HISTORY(
    SCHEDULED_TIME_RANGE_START
      =>dateadd('hour',-1,current_timestamp())))
WHERE
    NAME = 'ALERT_LOW_INV'
ORDER BY SCHEDULED_TIME DESC;


In [None]:

-- Suspend Alerts 
-- Important step to suspend alert and stop consuming the warehouse credit
ALTER ALERT alert_low_inv SUSPEND;

Finally, Snowflake makes it easier to monitor your data pipeline.

You can monitor Dynamic Tables using the DYNAMIC_TABLE_REFRESH_HISTORY() function in INFORMATION_SCHEMA. This is sample SQL for dynamic tables in our data pipeline

In [None]:
SELECT * 
FROM 
    TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
WHERE 
    NAME IN ('SALESREPORT','CUSTOMER_SALES_DATA_HISTORY','PROD_INV_ALERT','CUMULATIVE_PURCHASE')
    -- AND REFRESH_ACTION != 'NO_DATA'
ORDER BY 
    DATA_TIMESTAMP DESC, REFRESH_END_TIME DESC LIMIT 10;

You can use Snowsight GUI to visualize and monitor the directed acyclic graph (DAG) of your pipeline. Go to Data > Databases > DEMO > DT_DEMO > Dynamic Tables

From Snowsight you can monitor Dynamic Table DAG, refresh history, preview data, refresh mode, columns and table ddl etc

You can also monitor any issues with the refresh using the two table functions in information schema -

[DYNAMIC_TABLE_REFRESH_HISTORY](https://docs.snowflake.com/en/sql-reference/functions/dynamic_table_refresh_history?_fsi=kVaBWUHz&_fsi=kVaBWUHz&_fsi=kVaBWUHz)

[DYNAMIC_TABLE_GRAPH_HISTORY](https://docs.snowflake.com/en/sql-reference/functions/dynamic_table_graph_history?_fsi=kVaBWUHz&_fsi=kVaBWUHz&_fsi=kVaBWUHz)

Few tips for monitoring Dynamic Tables -

You should monitor Dynamic Tables for a few runs and verify if the refresh cycles are as desired (full or incremental).
Consider that any changes to the base tables DDL will most certainly impact the performance or refresh cycles of the Dynamic tables just like any other data pipeline.
In some cases DT defaults to full refresh like if you have masking policy on base tables, lateral flatten of nested structure or some other non deterministic functions like UDTF. These will be addressed in the future.

SUSPEND and RESUME Dynamic Tables

Dynamic tables can be suspended or resumed on demand. Snowflake automatically suspends it after 5 consecutive failures to prevent any credit consumption. If you suspend a Dynamic table upstream, it will automatically suspend its child or downstream Dynamic Tables in the DAG.

In [None]:
-- Resume the data pipeline
alter dynamic table customer_sales_data_history RESUME;
alter dynamic table salesreport RESUME;
alter dynamic table prod_inv_alert RESUME;


In [None]:

-- Suspend the data pipeline
alter dynamic table customer_sales_data_history SUSPEND;
alter dynamic table salesreport SUSPEND;
alter dynamic table prod_inv_alert SUSPEND;

Dynamic tables incur cost in three ways: [details here](https://docs.snowflake.com/en/user-guide/dynamic-tables-cost?_fsi=kVaBWUHz&_fsi=kVaBWUHz&_fsi=kVaBWUHz)

* Storage: DT materializes the results and saves it just like a normal Snowflake tables
* Cloud service compute: You will only incur this if daily cloud service cost is over 10% of your bill (very very rare)
* Warehouse compute cost: this is associated with the warehouse you use with Dynamic Table. This is only used if there is data to be processed upstream from base tables


Dynamic tables require a virtual warehouse to perform updates. Snowflake recommends testing dynamic tables using dedicated warehouses in order to understand related costs.Dynamic tables cost is driven by frequency of data refreshes in base tables and target LAG.

REFRESH_MODE can be FULL or INCREMENTAL based on the query. You can run the Show Dynamic table command or check dynamic table dashboard to determine your DT refresh mode. Check [this page](https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh?_fsi=kVaBWUHz&_fsi=kVaBWUHz&_fsi=kVaBWUHz#label-dynamic-tables-intro-refresh-queries) for more details.

Dynamic tables support Time Travel, Replication, Data Governance, Masking, Tagging etc. just like a standard Snowflake table.