## Task Graph Run scheduled during biz hours graph run to show:

Transformation Concepts
* Streams
* Tasks
* Dynamic Tables

Orchestration & Workflow
* DAG Structure
* Graph Config Parameter
* Task Return Value
* Condition on Stream
* Condition on Predecessor

Observability
* Retry Attempts
* Event Logging
* Query Tagging
* SNS Task Notifications (optional)

In [None]:
use role TASK_GRAPH_ROLE;
use schema TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA;

--this will show in query hitory and warehouse utilization filtering
ALTER SESSION SET query_tag = '{"origin":"sf_hol","name":"pde_demo","version":{"major":1, "minor":0},"attributes":{"is_quickstart":1, "source":"notebook", "vignette":"tasks"}}';
ALTER SESSION SET TIMEZONE = 'America/Los_Angeles';
ALTER SESSION SET LOG_LEVEL = INFO;

-- OPTIONAL send notice to a slack channel
-- call send_slack_message('The Task Graph Demo is starting at ' || CURRENT_TIME);

In [None]:
--- function to randomize runtime with 1/10 as outlier (twice as long to show vairiable execution times in tasks)
create or replace function RUNTIME_WITH_OUTLIERS(REGULAR_RUNTIME NUMBER(6,0))
returns NUMBER(6,0)
language SQL
comment = 'for input and output as milliseconds'
as
$$
    select
        case when uniform(1, 10, random()) = 10 
            then cast((REGULAR_RUNTIME * 2 + (uniform(-10, 10, random()))/100 * REGULAR_RUNTIME) as NUMBER(6,0))
            else cast((REGULAR_RUNTIME     + (uniform(-10, 10, random()))/100 * REGULAR_RUNTIME) as NUMBER(6,0))
        end
$$
;

In [None]:
--- test randomized value around 5000 miliseconds
select RUNTIME_WITH_OUTLIERS(5000);


In [None]:
create or replace function gen_cust_purchase(num_records number,ndays number)
returns table (custid number(10), purchase variant)
language python
runtime_version=3.9
handler='genCustPurchase'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta

fake = Faker()

class genCustPurchase:
    # Generate multiple customer purchase records
    def process(self, num_records,ndays):       
        for _ in range(num_records):
            c_id = fake.random_int(min=1001, max=1999)
            
            #print(c_id)
            customer_purchase = {
                'custid': c_id,
                'purchased': []
            }
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (days from now)
            min_date = current_date - timedelta(days=ndays)
            
            # Generate a random date within the date range
            pdate = fake.date_between_dates(min_date,current_date)
            
            purchase = {
                'prodid': fake.random_int(min=101, max=199),
                'quantity': fake.random_int(min=1, max=5),
                'purchase_amount': round(random.uniform(10, 1000),2),
                'purchase_date': pdate
            }
            customer_purchase['purchased'].append(purchase)
            
            #customer_purchases.append(customer_purchase)
            yield (c_id,purchase)

$$;


In [None]:
-- loading 100 purchase records into table salesdata on first run only
create table if not exists salesdata as
    select * from
          table(gen_cust_purchase(1000, 10));

-- create a stream to trigger DEMO_TASK_8
create stream if not exists DEMO_STREAM on table salesdata comment = 'stream on table as condition for product stock inventory DT';
select count(*) from salesdata;

In [None]:
-- function to generate product inventory data 
create or replace function gen_prod_inv(num_records number)
returns table (pid number(10), pname varchar(100), stock number(10,2), stockdate date)
language python
runtime_version=3.9
handler='ProdTab'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta
fake = Faker()

class ProdTab:
    # Generate multiple product records
    def process(self, num_records):
        product_id = 100 # Starting customer ID                 
        for _ in range(num_records):
            pid = product_id + 1
            pname = fake.catch_phrase()
            stock = round(random.uniform(500, 1000),0)
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (3 months from now)
            min_date = current_date - timedelta(days=90)
            
            # Generate a random date within the date range
            stockdate = fake.date_between_dates(min_date,current_date)

            product_id += 1
            yield (pid,pname,stock,stockdate)

$$;



In [None]:
-- function to generate customer test data 
create or replace function gen_cust_info(num_records number)
returns table (custid number(10), cname varchar(100), spendlimit number(10,2))
language python
runtime_version=3.9
handler='CustTab'
packages = ('Faker')
as $$
from faker import Faker
import random

fake = Faker()
# Generate a list of customers  

class CustTab:
    # Generate multiple customer records
    def process(self, num_records):
        customer_id = 1000 # Starting customer ID                 
        for _ in range(num_records):
            custid = customer_id + 1
            cname = fake.name()
            spendlimit = round(random.uniform(1000, 10000),2)
            customer_id += 1
            yield (custid,cname,spendlimit)

$$;



In [None]:
alter task if exists DEMO_TASK_1 suspend;

---- successful root task running every hour during US business hours 
create or replace task DEMO_TASK_1 
warehouse = 'TASK_GRAPH_WH' 
comment = ' root task running every hour during US Pacific business hours'
schedule = 'USING CRON 15 8-18 * * MON-FRI America/Los_Angeles'
SUSPEND_TASK_AFTER_NUM_FAILURES = 0
TASK_AUTO_RETRY_ATTEMPTS = 0
--- adding AWS SNS notification integration
--ERROR_INTEGRATION = my_sns_notify_int
--SUCCESS_INTEGRATION = my_sns_notify_int
--- adding default config parameter for runtime duration multiplier
config = $${"RUNTIME_MULTIPLIER": 5}$$                 

as
    declare
        --- get runtime duration factor from graph config as integer
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');   
        --- specify the median runtime in milliseconds
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 1000);
    begin
        --- task will wait for a random duration with 1/10 being 2x as long
       select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');                                      
       call SYSTEM$SET_RETURN_VALUE('✅ All systems go in DEMO_TASK_1');
       SYSTEM$LOG('INFO', 'DEMO_TASK_1: Event Log entry: root task sucessful!');
    end
;

In [None]:
--- Finalizer TASK to check all tables
create or replace task DEMO_FINALIZER
warehouse = 'TASK_GRAPH_WH'
finalize = DEMO_TASK_1
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER'); 
        --- get runtime duration factor from graph config as integer
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 1000);      
        --- specify the median runtime in milliseconds
    begin
       select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');                               
       --- task will wait for a random duration with 1/10 being twice as long
       call SYSTEM$SET_RETURN_VALUE('✅ All checks completed via DEMO_FINALIZER');
       SYSTEM$LOG('INFO', 'DEMO_FINALIZER: completed!');
       --- demo return value to show in the UI
     end
;

In [None]:
-- successful task with random duration
create or replace task DEMO_TASK_2 
warehouse = 'TASK_GRAPH_WH' 
comment = 'successful task loading data into prod_stock_inv'
after
    DEMO_TASK_1 
as
    begin
       --- loading data into prod_stock_inv with stored proc
       create or replace table prod_stock_inv as select * from table(gen_prod_inv(1000)) order by 1; 
       call SYSTEM$SET_RETURN_VALUE('DEMO_TASK_2: table prod_stock_inv loaded with data');
        SYSTEM$LOG('INFO', 'DEMO_TASK_2: completed!');
    end
;

In [None]:
--- successful task calling a stored procedure to build cust_info table
create or replace task DEMO_TASK_3 
warehouse = 'TASK_GRAPH_WH' 
comment = 'successful task loading data into cust_info after random wait'
after
    DEMO_TASK_1
as
    
    begin
        --- demo loading data into cust_info with stored proc
        create or replace table cust_info as select * from table(gen_cust_info(1000)) order by 1; 
        call SYSTEM$SET_RETURN_VALUE('DEMO_TASK_3: data loaded into cust_info table');
        SYSTEM$LOG('INFO', 'DEMO_TASK_3: completed!');
    end
;

In [None]:
-- successful task creating sproc
create or replace task DEMO_TASK_4 
warehouse = 'TASK_GRAPH_WH' 
comment = 'task to define a function sum_table function to be used in DT'
after
  DEMO_TASK_2 
as 
declare 
    RUNTIME_MULTIPLIER integer;
    RANDOM_RUNTIME varchar;
begin 
--- function to keep a running total used in dynamic table
CREATE OR REPLACE FUNCTION sum_table (INPUT_NUMBER number)
  returns TABLE (running_total number)
  language python
  runtime_version = '3.9'
  handler = 'gen_sum_table'
as
$$

# Define handler class
class gen_sum_table :

  ## Define __init__ method ro initilize the variable
  def __init__(self) :    
    self._running_sum = 0
  
  ## Define process method
  def process(self, input_number: float) :
    # Increment running sum with data from the input row
    new_total = self._running_sum + input_number
    self._running_sum = new_total

    yield(new_total,)
  
$$
;
SYSTEM$LOG('INFO', 'DEMO_TASK_4: completed!');
end;

In [None]:
create or replace task DEMO_TASK_5 
comment = 'serverless task creating the customer_sales_data_history dynamic table'
after
    DEMO_TASK_1, DEMO_TASK_4 
when 
    SYSTEM$STREAM_HAS_DATA('DEMO_STREAM') 
as
    begin
         --lets create a dynamic table for sales history
        CREATE OR REPLACE DYNAMIC TABLE customer_sales_data_history
            LAG='DOWNSTREAM'
            WAREHOUSE=TASK_GRAPH_WH
                AS
                select 
                    s.custid as customer_id,
                    c.cname as customer_name,
                    s.purchase:"prodid"::number(5) as product_id,
                    s.purchase:"purchase_amount"::number(10) as saleprice,
                    s.purchase:"quantity"::number(5) as quantity,
                    s.purchase:"purchase_date"::date as salesdate
                from
                    cust_info c inner join salesdata s on c.custid = s.custid;
                    
        call SYSTEM$SET_RETURN_VALUE('DEMO_TASK_5: created customer_sales_data_history DT');
        SYSTEM$LOG('INFO', 'DEMO_TASK_5: completed!');
    end
;

In [None]:
--- successful task calling a system function to send a random return value 1/2/3

create or replace task DEMO_TASK_6 
warehouse = 'TASK_GRAPH_WH' 
comment = 'successful task calling a system function to send a random return value 1, 2 or 3'
after
    DEMO_TASK_3 
as
    declare
        RANDOM_VALUE varchar;
    begin
        RANDOM_VALUE := (select UNIFORM(1, 3, RANDOM()));
        case when :RANDOM_VALUE = 1
        then
            call SYSTEM$SET_RETURN_VALUE('✅ Quality Check Passed');
        else
            call SYSTEM$SET_RETURN_VALUE('⚠️ Quality Check Failed from random gen function in Task 6');
        end;
        SYSTEM$LOG('INFO', 'DEMO_TASK_6: completed!');
    end;
;

In [None]:
--- failing task with first procedure succeeding and second procedure failing 1/4 cases

create or replace task DEMO_TASK_7
warehouse = 'TASK_GRAPH_WH' 
comment = 'failing task with first procedure because customer_sales_data_history may not exist yet'
after
    DEMO_TASK_4 
as
    begin
        --create new 
        CREATE OR REPLACE DYNAMIC TABLE salesreport
            LAG = '1 MINUTE'
            WAREHOUSE=TASK_GRAPH_WH
            AS
            Select
                t1.customer_id,
                t1.customer_name, 
                t1.product_id,
                p.pname as product_name,
                t1.saleprice,
                t1.quantity,
                (t1.saleprice/t1.quantity) as unitsalesprice,
                t1.salesdate as CreationTime,
                customer_id || '-' || t1.product_id  || '-' || t1.salesdate AS CUSTOMER_SK,
                LEAD(CreationTime) OVER (PARTITION BY t1.customer_id ORDER BY CreationTime ASC) AS END_TIME
            from 
                customer_sales_data_history t1 inner join prod_stock_inv p 
                on t1.product_id = p.pid;
        SYSTEM$LOG('INFO', 'DEMO_TASK_7: CTAS salesreport dynamic table complete');
        call SYSTEM$SET_RETURN_VALUE('DEMO_TASK_7: created salesreport DT');
    end
;

In [None]:
--- skipped task when stream condition is not met

create or replace task DEMO_TASK_8
warehouse = 'TASK_GRAPH_WH' 
comment ='skipped task when stream condition is not met'
after
    DEMO_TASK_6,
    DEMO_TASK_7
as
   begin
        CREATE OR REPLACE DYNAMIC TABLE PROD_INV_ALERT
        LAG = '1 MINUTE'
        WAREHOUSE=TASK_GRAPH_WH
        AS
        SELECT 
            S.PRODUCT_ID, 
            S.PRODUCT_NAME,CREATIONTIME AS LATEST_SALES_DATE,
            STOCK AS BEGINING_STOCK,
            SUM(S.QUANTITY) OVER (PARTITION BY S.PRODUCT_ID ORDER BY CREATIONTIME) TOTALUNITSOLD, 
            (STOCK - TOTALUNITSOLD) AS UNITSLEFT,
            ROUND(((STOCK-TOTALUNITSOLD)/STOCK) *100,2) PERCENT_UNITLEFT,
            CURRENT_TIMESTAMP() AS ROWCREATIONTIME
        FROM SALESREPORT S JOIN PROD_STOCK_INV ON PRODUCT_ID = PID
        QUALIFY ROW_NUMBER() OVER (PARTITION BY PRODUCT_ID ORDER BY CREATIONTIME DESC) = 1;
        
        SYSTEM$LOG('INFO', 'DEMO_TASK_8: CTAS PROD_INV_ALERT dynamic table complete');
    end
;

In [None]:
--- task skipped 1/3 times, if TASK_6 returns '3' 

create or replace task DEMO_TASK_9
warehouse = 'TASK_GRAPH_WH' 
comment = 'task skipped 1/3 times, if TASK_6 returns passed'

after
  DEMO_TASK_6 
  as declare 
    PREDECESSOR_VALUE varchar;
    RUNTIME_MULTIPLIER integer;
    RANDOM_RUNTIME varchar;
begin PREDECESSOR_VALUE := SYSTEM$GET_PREDECESSOR_RETURN_VALUE('DEMO_TASK_6');
case
    when :PREDECESSOR_VALUE = '✅ Quality Check Passed' 
        then 
            RUNTIME_MULTIPLIER := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
            RANDOM_RUNTIME := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 3000);
            select
                SYSTEM$WAIT(:RANDOM_RUNTIME, 'MILLISECONDS');
                call SYSTEM$SET_RETURN_VALUE('Delay: ' || :RANDOM_RUNTIME || ' milliseconds');
        else 
            SYSTEM$LOG('ERROR', 'DEMO_TASK_9: ' || :PREDECESSOR_VALUE);
            call SYSTEM$SET_RETURN_VALUE('Task skipped due to failed quality check in DEMO_TASK_6');
  end case;
  SYSTEM$LOG('INFO', 'DEMO_TASK_9: completed!');
  end;

In [None]:
--- task self-cancelling 1/10 times after long run
create or replace task DEMO_TASK_10
warehouse = 'TASK_GRAPH_WH'
comment = 'task self-cancelling 1/10 times after long run'
after
    DEMO_TASK_3 
as
    declare
        RANDOM_VALUE number(2,0);
    begin
        RANDOM_VALUE := (select UNIFORM(1, 10, RANDOM()));
        if (:RANDOM_VALUE = 10) then
            select SYSTEM$WAIT(12);
            SYSTEM$LOG('ERROR', 'DEMO_TASK_10: Canceling Execution');
            select SYSTEM$USER_TASK_CANCEL_ONGOING_EXECUTIONS('DEMO_TASK_12');
            CALL SYSTEM$SET_RETURN_VALUE('DEMO_TASK_10: Canceled by task');
        end if;
        
        select SYSTEM$WAIT(2);
        SYSTEM$LOG('INFO', 'DEMO_TASK_10: completed!');
    end
;

In [None]:
--- successful task with 2 predecessors
create or replace task DEMO_TASK_11
warehouse = 'TASK_GRAPH_WH'
comment = 'successful task CTAS cumulative_purchase dynamic table'
after
    DEMO_TASK_10,
    DEMO_TASK_2
as
    begin 
        CREATE OR REPLACE DYNAMIC TABLE cumulative_purchase
        LAG = '1 MINUTE'
        WAREHOUSE=TASK_GRAPH_WH
        AS
            select 
                month(creationtime) monthNum,
                year(creationtime) yearNum,
                customer_id, 
                saleprice,
                running_total 
            from 
                salesreport,
                table(sum_table(saleprice) over (partition by creationtime,customer_id order by creationtime, customer_id));
            
        CALL SYSTEM$SET_RETURN_VALUE('DEMO_TASK_11: cumulative_purchase dynamic table created');
        SYSTEM$LOG('INFO', 'DEMO_TASK_11: completed!');
    end
;

In [None]:
--- suspended task on first run
create or replace task DEMO_TASK_12
warehouse = 'TASK_GRAPH_WH'
comment = 'enable Low Inventory Alert'
after
    DEMO_TASK_9
as
begin
    CREATE OR REPLACE ALERT alert_low_inv
    WAREHOUSE = TASK_GRAPH_WH
    SCHEDULE = '5 MINUTE'
    IF (EXISTS (
        SELECT *
        FROM prod_inv_alert
        WHERE percent_unitleft < 10 and ROWCREATIONTIME >           SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME()
            )
        )
    THEN
        begin
            -- Optional Slack Alert
            --CALL send_slack_message('Alert: Low Inventory of products. Check the inventory report in Snowflake table prod_inv_alert.');
            SYSTEM$LOG('ERROR', 'DEMO_TASK_12: Alert: Low Inventory of products. Check the inventory report in prod_inv_alert!');
        end
        ;
;
end

Now we have built out the DAG with several paths and interdepenancies. 

We are ready to test. For the first run lets introduce a suspended demo_task_12 to see the results in the intial run.

In [None]:
--suspend 1 
alter task DEMO_TASK_1 suspend;
--enable all tasks in the graph 
select SYSTEM$TASK_DEPENDENTS_ENABLE('DEMO_TASK_1');
--suspend DEMO_TASK_1 == root
alter task DEMO_TASK_1 suspend;

--suspend DEMO_TASK_12 only to see the results on the first run
alter task DEMO_TASK_12 suspend;

---  resume 1 and execute
alter task DEMO_TASK_1 resume;
execute task DEMO_TASK_1;

## Check Snowsight -> Monitoring -> Task History

Now lets look at the DAG execution and results in Snowsight -> Monitoring -> Task History to watch the status and see results.

## Check Snowsight -> Monitoring -> Traces & Logs

Come back here to fix any errors and try again. 


You should see 
    
    DEMO_TASK_5 may be skipped because DEMO_STREAM is empty
    DEMO_TASK_7 may fail a dependancy and require retry
    DEMO_TASK_8 did not run since 7 predecessor 8 fails
    DEMO_TASK_11 may fail a dependancy and require retry
    DEMO_TASK_12 is suspended 

## PART 2
Task Graph corrections and run two

Transformation Concepts
* Streams
* Tasks

Orchestration & Workflow
* DAG Structure
* Task Return Value
* Condition on Stream
* Condition on Predecessor

Observability
* Event Logging
* SNS Task Notifications (optional)

In [None]:
--suspend the task graph to make changes
alter task DEMO_TASK_1 suspend;

--- Update to make task 8 a dependancy
create or replace task DEMO_TASK_7
warehouse = 'TASK_GRAPH_WH' 
comment = 'failing task with first procedure because customer_sales_data_history may not exist yet'
after
    DEMO_TASK_4,
    DEMO_TASK_5  --lets add this dependancy since we need the customer_sales_data_history
as
    begin
        --create new 
        CREATE OR REPLACE DYNAMIC TABLE salesreport
            LAG = '1 MINUTE'
            WAREHOUSE=TASK_GRAPH_WH
            AS
            Select
                t1.customer_id,
                t1.customer_name, 
                t1.product_id,
                p.pname as product_name,
                t1.saleprice,
                t1.quantity,
                (t1.saleprice/t1.quantity) as unitsalesprice,
                t1.salesdate as CreationTime,
                customer_id || '-' || t1.product_id  || '-' || t1.salesdate AS CUSTOMER_SK,
                LEAD(CreationTime) OVER (PARTITION BY t1.customer_id ORDER BY CreationTime ASC) AS END_TIME
            from 
                customer_sales_data_history t1 inner join prod_stock_inv p 
                on t1.product_id = p.pid;
        SYSTEM$LOG('INFO', 'DEMO_TASK_7: CTAS salesreport dynamic table complete');
        call SYSTEM$SET_RETURN_VALUE('DEMO_TASK_7: created salesreport DT');
    end
;

--- Update to make task 11 a dependancy
create or replace task DEMO_TASK_11
warehouse = 'TASK_GRAPH_WH'
comment = 'successful task CTAS cumulative_purchase dynamic table'
after
    DEMO_TASK_10,
    DEMO_TASK_7, --lets add this dependancy since we need the salesreport first
    DEMO_TASK_2
as
    begin 
        CREATE OR REPLACE DYNAMIC TABLE cumulative_purchase
        LAG = '1 MINUTE'
        WAREHOUSE=TASK_GRAPH_WH
        AS
            select 
                month(creationtime) monthNum,
                year(creationtime) yearNum,
                customer_id, 
                saleprice,
                running_total 
            from 
                salesreport,
                table(sum_table(saleprice) over (partition by creationtime,customer_id order by creationtime, customer_id));
            
        CALL SYSTEM$SET_RETURN_VALUE('DEMO_TASK_11: cumulative_purchase dynamic table created');
        SYSTEM$LOG('INFO', 'DEMO_TASK_11: completed!');
    end
;

In [None]:
-- Add 10k new sales records for change stream DEMO_STREAM on TASK_5
insert into salesdata select * from table(gen_cust_purchase(10000,2));

---  resume 1 and execute ALL
select SYSTEM$TASK_DEPENDENTS_ENABLE('DEMO_TASK_1');

alter task DEMO_TASK_1 resume;
execute task DEMO_TASK_1;

## PART 3
Dyanmic Tables Monitoring:

Orchestration & Workflows
* CUSTOMER_SALES_DATA 
* SALESREPORT
* CUMULATIVE_PURCHASE
* PROD_INV_ALERT


Observability
* Create Alert
* Insert Data and monitor
* View in Streamlit
* Update Table Definition
* OPTIONAL: Generate Slack alert

In [None]:
import streamlit as st
import altair as alt
from snowflake.snowpark.context import get_active_session

# Get active Snowflake session
session = get_active_session()

# App title
st.title("Sales and Inventory Dashboard")

# Create two columns for the layout
col1, col2 = st.columns(2)

with col1:
    st.header("Cumulative Purchases")
    
    # Query cumulative purchase data
    cumulative_df = session.sql("""
        SELECT 
            MONTHNUM,
            YEARNUM,
            SUM(SALEPRICE) as TOTAL_SALES,
            COUNT(DISTINCT CUSTOMER_ID) as CUSTOMER_COUNT
        FROM CUMULATIVE_PURCHASE
        GROUP BY MONTHNUM, YEARNUM
        ORDER BY YEARNUM, MONTHNUM
    """).to_pandas()
    
    # Create monthly sales bar chart
    sales_chart = alt.Chart(cumulative_df).mark_bar().encode(
        x=alt.X('MONTHNUM:O', title='Month'),
        y=alt.Y('TOTAL_SALES:Q', title='Total Sales'),
        color=alt.Color('YEARNUM:N', title='Year')
    ).properties(height=300)
    
    st.altair_chart(sales_chart, use_container_width=True)


with col2:
    st.header("Inventory Alerts")
    
    # Query inventory alert data
    inventory_df = session.sql("""
        SELECT 
            PRODUCT_ID,
            PRODUCT_NAME,
            UNITSLEFT,
            PERCENT_UNITLEFT
        FROM PROD_INV_ALERT
        WHERE PERCENT_UNITLEFT < 50
        ORDER BY PERCENT_UNITLEFT ASC
    """).to_pandas()
    
    # Create inventory status chart
    inventory_chart = alt.Chart(inventory_df).mark_bar().encode(
        x=alt.X('PRODUCT_NAME:N', title='Product', sort='-y'),
        y=alt.Y('PERCENT_UNITLEFT:Q', title='Inventory Remaining (%)'),
        color=alt.condition(
            alt.datum.PERCENT_UNITLEFT < 10,
            alt.value('red'),
            alt.value('orange')
        )
    ).properties(height=300)
    
    st.altair_chart(inventory_chart, use_container_width=True)

# Display summary metrics
st.header("Summary Statistics")
col3, col4, col5, col6 = st.columns(4)

# Get summary metrics
metrics = session.sql("""
    SELECT 
        COUNT(DISTINCT CUSTOMER_ID) as TOTAL_CUSTOMERS,
        SUM(SALEPRICE) as TOTAL_REVENUE,
        COUNT(*) as TOTAL_TRANSACTIONS,
        AVG(SALEPRICE) as AVG_TRANSACTION
    FROM CUMULATIVE_PURCHASE
""").collect()[0]

col3.metric("Total Customers", f"{metrics['TOTAL_CUSTOMERS']:,.0f}")
col4.metric("Total Revenue", f"${metrics['TOTAL_REVENUE']:,.2f}")
col5.metric("Total Transactions", f"{metrics['TOTAL_TRANSACTIONS']:,.0f}")
col6.metric("Avg Transaction", f"${metrics['AVG_TRANSACTION']:,.2f}")


Now we can make an Alert trigger, Log an Error (optioanally send a Slack) when inventory drops below 10%;

In [None]:
-- Alerts are pause by default, so let's resume to check in 5 mins
ALTER ALERT alert_low_inv RESUME;

-- Add new records and check our 
insert into salesdata select * from table(gen_cust_purchase(4000,2));

In [None]:
SELECT
  NAME,ACTION,STATE,SCHEDULED_TIME,COMPLETED_TIME
FROM
  TABLE (INFORMATION_SCHEMA.ALERT_HISTORY ())
WHERE
  NAME = 'ALERT_LOW_INV'
  AND STATE IN ('SCHEDULED', 'TRIGGERED')
ORDER BY
  SCHEDULED_TIME DESC limit 4;

In [None]:
-- Add new records and confirm Dynamic tables propagate changes
select * from prod_inv_alert where PERCENT_UNITLEFT < 10;

In [None]:
-- Alerts are pause by default, so let's resume it first
ALTER ALERT alert_low_inv SUSPEND;
alter task DEMO_TASK_1 SUSPEND;

--clean up
--DROP TABLE TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA.CUMULATIVE_PURCHASE;
--DROP TABLE TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA.CUST_INFO;
--DROP TABLE TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA.CUSTOMER_SALES_DATA_HISTORY;
--DROP TABLE TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA.PROD_STOCK_INV;
--DROP TABLE TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA.SALESREPORT;
--DROP TABLE TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA.SALESDATA;
--DROP STREAM TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA.DEMO_STREAM;