# Notebook #2
## Sets up a framework for outbound event notifications sharing table stream events in Snowflake to AWS EventBridge
Prerequisite:  Create Database called 'EVENT_MONITORING' to store this Notebook and all objects created here

Notes:  
* Make sure you have appropriate data governance in place and not exfiltrating sensitive data to a less secured location as it leaves Snowflake.
* This sample was written with Notebook #1 to send events to AWS EventBridge, but other UDFs can be added to send to other downstream destinations including Snowflake.


Written by Steven.Maser@snowflake.com

In [None]:
#Variables for object creation
DB= 'EVENT_MONITORING'
SCHEMA = 'PUBLIC'
SUB_SCHEMA = 'PROCESSING'
EXT_UDF = 'notifyEventBridge'

In [None]:
--1.  Create schema for individual table's streams & tasks
create schema if not EXISTS {{DB}}.{{SUB_SCHEMA}};

In [None]:
--2. Create table that will list what tables to be monitoring events on
CREATE or REPLACE TABLE {{DB}}.{{SCHEMA}}.WATCHED_TABLES (
    DB_NAME STRING COMMENT 'Database where table to monitor resides',
    SCHEMA_NAME STRING COMMENT 'Schema where table to monitor resides',
    TABLE_NAME STRING COMMENT 'Table to monitor',
    EXT_UDF STRING COMMENT 'UDF/Function to call to transmit new/updated records',
    LOAD_INITIAL BOOLEAN COMMENT 'Option to send all current records in table, or skip'
) 
COMMENT='This table holds the list of tables to continuously monitor changes and transmit downstream';

In [None]:
--3.  Create Stream to identify added tables/records to WATCHED_TABLES table
CREATE or REPLACE STREAM {{DB}}.{{SCHEMA}}.WATCHED_TABLES_STRM ON TABLE {{DB}}.{{SCHEMA}}.WATCHED_TABLES;

In [None]:
--4. Create triggered to task to run when table(s) are added to be monitored.  
      ---This task will create a stream and a task for each added table that will send events for each new record in that table.
CREATE or REPLACE TASK {{DB}}.{{SCHEMA}}.TABLE_ADDED_TSK
  COMMENT='This Task sets up sending changes downstream for tables registered in table WATCHED_TABLES'
  TARGET_COMPLETION_INTERVAL='1 MINUTE'
  SUSPEND_TASK_AFTER_NUM_FAILURES=1
  WHEN SYSTEM$STREAM_HAS_DATA('{{DB}}.{{SCHEMA}}.WATCHED_TABLES_STRM')
  AS
  DECLARE  
    c1 CURSOR FOR SELECT DB_NAME,SCHEMA_NAME,TABLE_NAME,LOAD_INITIAL,EXT_UDF,METADATA$ACTION as ACTION from {{DB}}.{{SCHEMA}}.WATCHED_TABLES_STRM;
    stream_name VARCHAR;
    task_name VARCHAR;
    table_counter INTEGER default 0;
    BEGIN   
      open c1;
      FOR rec IN c1 DO
        LET action string:=rec.ACTION::STRING;
        IF ( action = 'INSERT' ) THEN
          table_counter:= table_counter+1;
          LET db_name string:=rec.DB_NAME::STRING;
          LET schema_name string:=rec.SCHEMA_NAME::STRING;
          LET table_prefix string:=db_name || '_' || schema_name || '_' || rec.TABLE_NAME::STRING;
          LET table_name string:=db_name || '.' || schema_name || '.' || rec.TABLE_NAME::STRING;
          LET load_initial string:=rec.load_initial::BOOLEAN;
          LET ext_edf string:=rec.EXT_UDF::STRING;
          LET stream_name string:=table_prefix || '_STRM';
          LET task_name string:=table_prefix||'_TASK';
          LET temp_table string:=table_prefix||'_tmp';
          LET task_sql1 string:='BEGIN';
          LET task_sql2 string:='SELECT ' || ext_edf || '(OBJECT_CONSTRUCT(\'Account\',current_account(), \'Region\',current_region(), \'Database\',\'' || db_name || '\', \'Schema\',\'' || schema_name ||'\', \'Table\',\'' || table_name || '\', \'Record\',OBJECT_CONSTRUCT(*))::STRING) as EventId FROM {{DB}}.{{SUB_SCHEMA}}.' || stream_name || ';';
          LET task_sql3 string:='LET num_rows INTEGER := (select count(*) from TABLE(RESULT_SCAN(LAST_QUERY_ID())));';
          LET task_sql4 string:='LET return_value STRING := :num_rows::STRING || \' row(s) sent\';';
          LET task_sql5 string:='CALL SYSTEM$SET_RETURN_VALUE(:return_value);';
          --LET task_sql6 string:='CREATE or REPLACE TEMP TABLE tmp_empty_stream AS (SELECT * FROM {{DB}}.{{SUB_SCHEMA}}.' || stream_name ||' WHERE FALSE);';
          LET task_sql6 string:='CREATE or REPLACE TEMP TABLE '|| temp_table ||' AS (SELECT * FROM {{DB}}.{{SUB_SCHEMA}}.' || stream_name ||' WHERE FALSE);';
          LET task_sql7 string:='END;';
          let ret string:='
          ';
          LET task_sql string := task_sql1||ret||task_sql2||ret||task_sql3||ret||task_sql4||ret||task_sql5||ret||task_sql6||ret||task_sql7;
          LET stmt1 string:='CREATE OR REPLACE STREAM {{DB}}.{{SUB_SCHEMA}}.'||stream_name|| ' ON TABLE ' || table_name || ' SHOW_INITIAL_ROWS=' || load_initial;
          LET stmt2 string:='CREATE OR REPLACE TASK {{DB}}.{{SUB_SCHEMA}}.' || task_name || ' TARGET_COMPLETION_INTERVAL=\'1 MINUTE\' SUSPEND_TASK_AFTER_NUM_FAILURES=1 WHEN SYSTEM$STREAM_HAS_DATA(\'{{DB}}.{{SUB_SCHEMA}}.'|| stream_name ||'\') AS ' || task_sql;
          LET stmt3 string:='ALTER TASK {{DB}}.{{SUB_SCHEMA}}.' || task_name || ' RESUME;';  
          LET stmt4 string:='CREATE or REPLACE TEMP TABLE WATCHED_TABLES_TMP AS (SELECT * FROM {{DB}}.{{SCHEMA}}.WATCHED_TABLES_STRM WHERE FALSE);';
          EXECUTE IMMEDIATE :stmt1;
          EXECUTE IMMEDIATE :stmt2;
          EXECUTE IMMEDIATE :stmt3;
          EXECUTE IMMEDIATE :stmt4;
        END IF;  -- currently not processing update/deletes
      END FOR;  
      CLOSE c1;
    LET return_val STRING:=table_counter::STRING || ' table(s) added for event monitoring';
    CALL SYSTEM$SET_RETURN_VALUE(:return_val);
    END;
;

In [None]:
--5.  Activate this task to begin
ALTER TASK {{DB}}.{{SCHEMA}}.TABLE_ADDED_TSK RESUME;

In [None]:
--6. create a new table and register it for event sending
create or replace table {{DB}}.{{SCHEMA}}.TEST_TABLE(NUM INTEGER,NAME STRING);

---- Insert sample/your table to monitor and send change events
INSERT INTO {{DB}}.{{SCHEMA}}.WATCHED_TABLES values ('{{DB}}','{{SCHEMA}}','TEST_TABLE','{{EXT_UDF}}', true);

SELECT * from {{DB}}.{{SCHEMA}}.WATCHED_TABLES_STRM;

In [None]:
--7. add records to TEST_TABLE
CALL SYSTEM$WAIT(20);
INSERT INTO {{DB}}.{{SCHEMA}}.TEST_TABLE values (1,'apple');
INSERT INTO {{DB}}.{{SCHEMA}}.TEST_TABLE values (2,'pear');


In [None]:
--8. add more records to TEST_TABLE
CALL SYSTEM$WAIT(20);
INSERT INTO {{DB}}.{{SCHEMA}}.TEST_TABLE values (3,'banana');
INSERT INTO {{DB}}.{{SCHEMA}}.TEST_TABLE values (4,'orange');
INSERT INTO {{DB}}.{{SCHEMA}}.TEST_TABLE values (4,'grape');

In [None]:
--9. create a another larger table, seed with some records, and register it for event sending
create or replace table {{DB}}.{{SCHEMA}}.TEST_TABLE2 as select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER limit 1000;

INSERT INTO {{DB}}.{{SCHEMA}}.WATCHED_TABLES values ('{{DB}}','{{SCHEMA}}','TEST_TABLE2','{{EXT_UDF}}', true);


In [None]:
--10. Micro-batch a stream of records into table for testing
set endtime=DATEADD ('minute', 1, CURRENT_TIMESTAMP());
set batchsize=100;

EXECUTE IMMEDIATE 
$$ BEGIN
  WHILE ($endtime >CURRENT_TIMESTAMP()) do
        INSERT INTO {{DB}}.{{SCHEMA}}.TEST_TABLE2  select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER limit $batchsize;
        CALL SYSTEM$WAIT(1); -- throttle testing throughput, can comment out or use '0'
  END WHILE;
END; $$;


In [None]:
--11.  When testing complete, can turn off your triggered tasks using:
--ALTER TASK {{DB}}.{{SCHEMA}}.TABLE_ADDED_TSK SUSPEND;
--ALTER TASK {{DB}}.{{SUB_SCHEMA}}.EVENT_MONITORING_PUBLIC_TEST_TABLE_TASK SUSPEND;
--ALTER TASK {{DB}}.{{SUB_SCHEMA}}.EVENT_MONITORING_PUBLIC_TEST_TABLE2_TASK SUSPEND;