In [None]:
CREATE OR REPLACE DATABASE TASK_GRAPH_DATABASE;
CREATE OR REPLACE SCHEMA TASK_GRAPH_SCHEMA;

CREATE API INTEGRATION IF NOT EXISTS GITHUB_PUBLIC
API_PROVIDER = GIT_HTTPS_API
API_ALLOWED_PREFIXES = ('https://github.com/')
ENABLED = TRUE;

CREATE OR REPLACE GIT REPOSITORY SNOWFLAKE_LABS 
ORIGIN = 'https://github.com/Snowflake-Labs/getting-started-with-task-graphs' 
API_INTEGRATION = 'GITHUB_PUBLIC';


## Task Graph Run Demo
scheduled graph run to show:
* dag structure
* different run statuses
* graph config parameter
* task return value
* condition on stream
* condition on predecessor
* retry attempts

In [None]:
use schema TASK_GRAPH_DATABASE.TASK_GRAPH_SCHEMA;
ALTER SESSION SET query_tag = '{"origin":"sf_sit-is","name":"dex_demos","version":{"major":1, "minor":0},"attributes":{"is_quickstart":1, "source":"notebook", "vignette":"tasks"}}';

In [None]:
--- function to randomize runtime with 1/10 as outlier (twice as long)
create or replace function RUNTIME_WITH_OUTLIERS(REGULAR_RUNTIME NUMBER(6,0))
returns NUMBER(6,0)
language SQL
comment = 'for input and output as milliseconds'
as
$$
    select
        case when uniform(1, 10, random()) = 10 
            then cast((REGULAR_RUNTIME * 2 + (uniform(-10, 10, random()))/100 * REGULAR_RUNTIME) as NUMBER(6,0))
            else cast((REGULAR_RUNTIME     + (uniform(-10, 10, random()))/100 * REGULAR_RUNTIME) as NUMBER(6,0))
        end
$$
;

In [None]:
--- test randomized value around 5000 miliseconds
select RUNTIME_WITH_OUTLIERS(5000);

In [None]:
-- successful procedure 1
create or replace procedure DEMO_PROCEDURE_1()        
returns VARCHAR(16777216)
language SQL
execute as OWNER
as 
$$
    select system$wait(3);
$$;

In [None]:
-- failing procedure at 1/2 attempts
create or replace procedure DEMO_PROCEDURE_2()        
returns VARCHAR(16777216)
language SQL
execute as OWNER
as 
$$
declare
    RANDOM_VALUE number(2,0);
begin
    RANDOM_VALUE := (select uniform(1, 2, random()));
    if (:RANDOM_VALUE = 2) 
        then select count(*) from OLD_TABLE;
    end if;
    select SYSTEM$WAIT(2);
end
$$;

In [None]:
--- create table for stream condition demo 
create or replace table TASK_DEMO_TABLE(
	TIME_STAMP TIMESTAMP_NTZ(9),
	ID NUMBER(38,0) autoincrement start 1 increment 1 order,
	MESSAGE VARCHAR(16777216),
	COMMENT VARCHAR(16777216)
);

In [None]:
--- empty stream on table as condition 
create or replace stream DEMO_STREAM
on table TASK_DEMO_TABLE
comment = 'empty stream on table as condition for demo task'
;

In [None]:
alter task if exists DEMO_TASK_1 suspend;

---- successful root task running every hour during EU business hours 
create or replace task DEMO_TASK_1 
warehouse = 'VW_GENAI' 
comment = 'successful root task with random duration running every hour during EU business hours'
schedule = 'USING CRON 15 8-18 * * MON-FRI CET'
SUSPEND_TASK_AFTER_NUM_FAILURES = 0
TASK_AUTO_RETRY_ATTEMPTS = 2
config = $${"RUNTIME_MULTIPLIER": 5}$$                 --- adding default config parameter for runtime duration multiplier
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');       --- get runtime duration factor from graph config as integer
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 1000);            --- specify the median runtime in milliseconds
    begin
       select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');                                      --- task will wait for a random duration with 1/10 being 2x as long
       call SYSTEM$SET_RETURN_VALUE('✅ All Stage files scanned');                               --- demo return value to show in the UI
    end
;

In [None]:
--- Finalizer TASK to check all tables
create or replace task DEMO_FINALIZER
warehouse = 'VW_GENAI'
finalize = DEMO_TASK_1
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');       --- get runtime duration factor from graph config as integer
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 1000);            --- specify the median runtime in milliseconds
    begin
       select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');                                      --- task will wait for a random duration with 1/10 being twice as long
       call SYSTEM$SET_RETURN_VALUE('✅ All checks completed.');                                 --- demo return value to show in the UI
    end
;

In [None]:
-- successful task with random duration
create or replace task DEMO_TASK_2 
warehouse = 'VW_GENAI' 
comment = 'successful task with random duration'
after
    DEMO_TASK_1 
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 3000);            --- specify the median runtime in milliseconds
    begin
       select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');       --- task will wait for a random duration with 1/10 being twice as long
       
       call SYSTEM$SET_RETURN_VALUE(:RANDOM_RUNTIME||' new entries loaded');
    end
;

In [None]:
--- successful task with random duration calling 1 procedure 
create or replace task DEMO_TASK_3 
warehouse = 'VW_GENAI' 
comment = 'successful task with random duration calling 1 procedure'
after
    DEMO_TASK_1
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 4000);            --- specify the median runtime in milliseconds
    begin
        call DEMO_PROCEDURE_1();
        
        select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');       --- task will wait for a random duration with 1/10 being twice as long
        
        call SYSTEM$SET_RETURN_VALUE(:RANDOM_RUNTIME||' new Files processed');
    end
;

In [None]:
-- successful task with random duration
create or replace task DEMO_TASK_4 
warehouse = 'VW_GENAI' 
comment = 'successful task with random duration'
after
    DEMO_TASK_2 
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 1000);            --- specify the median runtime in milliseconds
    begin
        select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');       --- task will wait for a random duration with 1/10 being twice as long
        
        call SYSTEM$SET_RETURN_VALUE('Delay: '||:RANDOM_RUNTIME||' milliseconds');
    end
;

In [None]:
create or replace task DEMO_TASK_5 
comment = 'serverless task'
after
    DEMO_TASK_1, DEMO_TASK_4 
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 200);            --- specify the median runtime in milliseconds
    begin
        select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');       --- task will wait for a random duration with 1/10 being twice as long
        
        call SYSTEM$SET_RETURN_VALUE('Delay: '||:RANDOM_RUNTIME||' milliseconds');
    end
;

In [None]:
--- successful task calling 1 system function to send a random return value 1/2/3

create or replace task DEMO_TASK_6 
warehouse = 'VW_GENAI' 
comment = 'successful task calling 1 system function to send a random return value 1/2/3'
after
    DEMO_TASK_3 
as
    declare
        RANDOM_VALUE varchar;
    begin
        RANDOM_VALUE := (select UNIFORM(1, 3, RANDOM()));
        case when :RANDOM_VALUE = 1
        then
            call SYSTEM$SET_RETURN_VALUE('✅ Quality Check Passed');
        else
            call SYSTEM$SET_RETURN_VALUE('⚠️ Quality Check Failed');
        end;
    end;
;

In [None]:
--- successful task calling system function 

create or replace task DEMO_TASK_7 
warehouse = 'VW_GENAI' 
comment = 'successful task calling 1 system function'
after
    DEMO_TASK_6 
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 4000);            --- specify the median runtime in milliseconds
    begin
        RANDOM_RUNTIME := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 5000);            --- specify the median runtime in milliseconds
       
        call SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');       --- task will wait for a random duration with 1/20 being twice as long
       
        call SYSTEM$SET_RETURN_VALUE('https://app.snowflake.com/pm/dex_demo/logging-and-alerting-demo-dCHJfecoR');
    end
;

In [None]:
--- skipped task because stream condition is not met

create or replace task DEMO_TASK_8 
warehouse = 'VW_GENAI' 
comment ='skipped task because stream condition is not met'
after
    DEMO_TASK_7 
when 
    SYSTEM$STREAM_HAS_DATA('DEMO_STREAM') 
as
    select SYSTEM$WAIT(4)
;

In [None]:
--- failing task with first procedure succeeding and second procedure failing 1/4 cases

create or replace task DEMO_TASK_9 
warehouse = 'VW_GENAI' 
comment = 'failing task with first procedure succeeding and second procedure failing 1/4 cases'
after
    DEMO_TASK_4 
as
    begin
        call DEMO_PROCEDURE_1();
                
        select SYSTEM$WAIT(3);
        
        call DEMO_PROCEDURE_2();
    end
;

In [None]:
--- task does not run after failing task 9

create or replace task DEMO_TASK_10 
warehouse = 'VW_GENAI' 
comment = 'task does not run after failing task 9'
after
    DEMO_TASK_9 
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 4000);            --- specify the median runtime in milliseconds
    begin
        RANDOM_RUNTIME := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 2000);            --- specify the median runtime in milliseconds
        select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');       --- task will wait for a random duration with 1/10 being twice as long
        
        return 'Delay: '||:RANDOM_RUNTIME||' milliseconds';
    end
;

In [None]:
--- task skipped 1/3 times, if TASK_6 returns '3' 

create or replace task DEMO_TASK_11 
warehouse = 'VW_GENAI'
comment = 'task skipped 1/3 times, if TASK_6 returns passed'
after
    DEMO_TASK_6
when 
    SYSTEM$GET_PREDECESSOR_RETURN_VALUE('DEMO_TASK_6') = 'Quality Check Passed'
as
    declare
        RUNTIME_MULTIPLIER integer := SYSTEM$GET_TASK_GRAPH_CONFIG('RUNTIME_MULTIPLIER');
        RANDOM_RUNTIME varchar := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 4000);            --- specify the median runtime in milliseconds
    begin
        RANDOM_RUNTIME := RUNTIME_WITH_OUTLIERS(:RUNTIME_MULTIPLIER * 3000);            --- specify the median runtime in milliseconds
        select SYSTEM$WAIT(:RANDOM_RUNTIME,'MILLISECONDS');       --- task will wait for a random duration with 1/20 being twice as long
        
        return 'Delay: '||:RANDOM_RUNTIME||' milliseconds';
    end
;

In [None]:
--- task self-cancelling 1/10 times after long run
create or replace task DEMO_TASK_12 
warehouse = 'VW_GENAI'
comment = 'task self-cancelling 1/10 times after long run'
after
    DEMO_TASK_3 
as
    declare
        RANDOM_VALUE number(2,0);
    begin
        RANDOM_VALUE := (select UNIFORM(1, 10, RANDOM()));
        if (:RANDOM_VALUE = 10) then
            select SYSTEM$WAIT(12);
            select SYSTEM$USER_TASK_CANCEL_ONGOING_EXECUTIONS('DEMO_TASK_12');
        end if;
        
        select SYSTEM$WAIT(2);
    end
;

In [None]:
--- successful task with 2 predecessors
create or replace task DEMO_TASK_13 
warehouse = 'VW_GENAI'
comment = 'successful task with 2 predecessors'
after
    DEMO_TASK_12,
    DEMO_TASK_2
as
    select SYSTEM$WAIT(3)
;

In [None]:
--- always suspended task
create or replace task DEMO_TASK_14 
warehouse = 'VW_GENAI'
comment = 'always suspended task'
after
    DEMO_TASK_9 
as
    select SYSTEM$WAIT(3)
;

In [None]:
--- always suspended task
create or replace task DEMO_TASK_15 
warehouse = 'VW_GENAI'
comment = 'never runs because predecessor is suspended'
after
    DEMO_TASK_14 
as
    select 1
;

In [None]:
--- resume all, suspend 1 to suspend 14. then resume 1 and execute
select SYSTEM$TASK_DEPENDENTS_ENABLE('DEMO_TASK_1');
alter task DEMO_TASK_1 suspend;
alter task DEMO_TASK_14 suspend;

--- known bug that finalizer does not resume with graph -> fix coming
alter task DEMO_FINALIZER resume;

alter task DEMO_TASK_1 resume;
execute task DEMO_TASK_1;

In [None]:

/*DROP DATABASE TASK_GRAPH_DATABASE;
DROP INTEGRATION GITHUB_PUBLIC;*/
