# OTI MONITORING SET UP - STEP 1 - DATA COLLECTION

## PROBLEM STATEMENT
OTI needs to monitor Snowflake account security compliance accross several accounts in multiple cloud regions from one centralized account

## DEFINITIONS
- Target Account: Centralized Account that will be used by OTI to monitor security compliance  
    - YR44338
- Source Account: Account on which data is being collected


## METHODS OF CENTRALIZING DATA IN STEP 2
- Source Accounts in the same region as the Target Account: direct data sharing
- Source Accounts in a different region than the Target Account: database replication


## GRANT PRIVILEGES
**In the SOURCE ACCOUNT** 
1. grant the additional required privileges to the ACCOUNT_MONITOR_OTI role 
2. create the database (SNOWFLAKE_ARCHIVE) and schemas (ACCOUNT_USAGE, CONTROLS, TRUST_CENTER)

In [None]:
SET usr = CURRENT_USER();
SET WH_NAME = 'TASTY_BI_WH';

--RUN IN THE SOURCE ACCOUNT
USE ROLE ACCOUNTADMIN;

CREATE ROLE IF NOT EXISTS ACCOUNT_MONITOR_OTI;
GRANT 
    CREATE DATABASE,
    CREATE REPLICATION GROUP,
    CREATE SHARE,
    EXECUTE ALERT,
    EXECUTE TASK,
    MANAGE SHARE TARGET,
    MONITOR EXECUTION
ON ACCOUNT TO ROLE ACCOUNT_MONITOR_OTI;
GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ACCOUNT_MONITOR_OTI;
GRANT  APPLICATION ROLE	  SNOWFLAKE.TRUST_CENTER_ADMIN TO ROLE ACCOUNT_MONITOR_OTI;
GRANT ROLE ACCOUNT_MONITOR_OTI TO USER IDENTIFIER($usr);
GRANT USAGE ON WAREHOUSE TASTY_BI_WH TO ROLE ACCOUNT_MONITOR_OTI;


In [None]:
USE ROLE ACCOUNT_MONITOR_OTI;

USE WAREHOUSE IDENTIFIER($WH_NAME);--change warehouse

--create database and schemas to be used for data collection
CREATE DATABASE IF NOT EXISTS  SNOWFLAKE_ARCHIVE  COMMENT = "DATABSE to archive Snowflake Shared database SNOWFLAKE metadata views";
CREATE SCHEMA IF NOT EXISTS  SNOWFLAKE_ARCHIVE.ACCOUNT_USAGE COMMENT = "SCHEMA to archive Snowflake Shared account_usage views";
CREATE SCHEMA IF NOT EXISTS  SNOWFLAKE_ARCHIVE.TRUST_CENTER COMMENT = "SCHEMA to archive Snowflake Shared TRUST_CENTER views";
CREATE SCHEMA IF NOT EXISTS  SNOWFLAKE_ARCHIVE.CONTROLS  COMMENT = "SCHEMA to contain control tables for collecting SNOWFLAKE metadata views";

--set context 
USE DATABASE SNOWFLAKE_ARCHIVE;
USE SCHEMA CONTROLS;


#### IN THE SOURCE ACCOUNT, CREATE THE ARCHIVE CONTROL TABLE
PURPOSE:  STORE VIEWS AND LOOKBACK FOR SNOWFLAKE VIEWS TO BE MATERIALIZED INTO PERSISTANT TABLES

NOTES:  This table is the control table for the SNOWFLAKE_ARCHIVE_LOAD_DATA() stored procedure.

This procedure should only be used to collect data from historical views with addidtive updates only, not point in time snapshot views (account_usage object views for example)
    

In [None]:
--create control table that will contain view names of metadata to be persisted
CREATE TABLE IF NOT EXISTS   METADATA_ARCHIVE_CONTROL_INFO (
	VIEW_ID NUMBER  NOT NULL AUTOINCREMENT (1,1), --unique ID for table
    SRC_DATABASE_NAME VARCHAR(100) NOT NULL,      --should always be SNOWFLAKE 
	SRC_SCHEMA_NAME   VARCHAR(100) NOT NULL,      --ACCOUNT_USAGE/ORG_USAGE or other schema
    SRC_VIEW_NAME     VARCHAR(100) NOT NULL,      --VIEW NAME we want to archive
	DELTA_COLUMN  VARCHAR(100) NOT NULL,          --Column used to for incremental detection 
    LOOKBACK_IN_DAYS INTEGER,                     --if the process error or we want to check further than we can set this param to look back further than the last refresh
    LAST_REFRESH_ON TIMESTAMP                     --persisted date of the last time the refresh was successful 
)
COMMENT = 'SNOWFLAKE METADATA ARCHIVE CONTROL TABLE
PURPOSE:  STORE VIEWS AND LOOKBACK FOR SNOWFLAKE VIEWS TO BE MATERIALIZED INTO PERSISTANT TABLES
NOTES:  This table is the control table for the SNOWFLAKE_ARCHIVE_LOAD_DATA() stored procedure.
    This procedure should only be used to collect data from historical views with addidtive updates only, not point in time snapshot views (account_usage object views for example)';


#### IN THE SOURCE ACCOUNT, CREATE THE  LOGGING TABLE
PURPOSE:  STORE INFORMATION ABOUT EACH RUN OF SNOWFLAKE METADATA COLLECTION PROCESS
NOTES:  This table is the logging table for the SNOWFLAKE_ARCHIVE_LOAD_DATA() stored procedure.


In [None]:
-- Table to capture data collection runs
CREATE TABLE IF NOT EXISTS  METADATA_ARCHIVE_LOG_INFO (
    JOB_ID VARCHAR,                         --unique job ID to tie the run back to the achive data 
    JOB_RUN_TIME TIMESTAMP_LTZ,             --time the process was run
	SRC_VIEW VARCHAR(100),		    		--source view which we extracted from
	START_TIME TIMESTAMP_LTZ,       		--time the process was started from based on the last_refresh less the lookback
	NUM_OF_RECORDS_EXTRACTED NUMBER(38,0),  --how records were inserted 
	STATUS VARCHAR(10),             		--success or failure 
	SQL_RUN VARCHAR,                		--SQL statement that was excuted 
    ERROR_MSG VARCHAR                       --any error message that occured 
)
COMMENT = 'SNOWFLAKE METADATA ARCHIVE LOGGING TABLE
PURPOSE:  STORE INFORMATION ABOUT EACH RUN OF SNOWFLAKE METADATA COLLECTION PROCESS
NOTES:  This table is the logging table for the SNOWFLAKE_ARCHIVE_LOAD_DATA() stored procedure';


#### IN THE SOURCE ACCOUNT, CREATE THE TABLE TO STORE ACCOUNT PARAMETERS 
PURPOSE:  STORE VALUE OF ACCOUNT PARAMETERS AT THE TIME OF JOB EXECUTION


In [None]:
CREATE OR REPLACE TABLE ACCOUNT_PARAMETERS (
	KEY VARCHAR(16777216),
	VALUE VARCHAR(16777216),
	DEFAULT VARCHAR(16777216),
	LEVEL VARCHAR(16777216),
	DESCRIPTION VARCHAR(16777216),
	TYPE VARCHAR(16777216),
    COLLECTION_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
)
COMMENT = 'ACCOUNT PARAMETER VALUES TABLE
PURPOSE:  STORE VALUE OF ACCOUNT PARAMETERS AT THE TIME OF JOB EXECUTION';

#### IN THE SOURCE ACCOUNT, CREATE THE TABLE TO STORE ROLE HIERARCHY IFORMATION FOR GRANTS TO ELEVATED SYSTEM ROLES
PURPOSE:  STORE ROLE HIEARCHY PATHS FOR ANY PATH THAT INCLUDES ELEVATED SYSTEM ROLES ACCOUNTADMIN OR SECURITYADMIN

In [None]:

CREATE  TABLE IF NOT EXISTS ROLE_HIERARCHIES (
	ELEVATED_ROLE VARCHAR(16777216),
	ROLE_PATH VARCHAR(16777216),
	PATH_ARRAY ARRAY,
	ROLE_NAME VARCHAR(16777216),
    PRIVILEGE VARCHAR(16777216),
    SNAPSHOT_TIMESTAMP TIMESTAMP 
)
COMMENT = 'ELEVATED ROLE HIEARCHY TABLE
PURPOSE:  STORE ROLE HIEARCHY PATHS FOR ANY PATH THAT INCLUDES ELEVATED SYSTEM ROLES ACCOUNTADMIN OR SECURITYADMIN
NOTES:  This table is used in identifying users that have been granted elevated roles through inheritance which is not recommended';

#### IN THE SOURCE ACCOUNT, CREATE THE TABLE TO STORE USERS WITH ELEVATED PRIVILEGES
PURPOSE:  STORE USERS WITH ELEVATED PRIVILEGES AT THE TIME OF DATA COLLECTION

In [None]:
CREATE TABLE IF NOT EXISTS ELEVATED_ROLE_USERS (
	USER_NAME VARCHAR(16777216),
	ELEVATED_ROLE VARCHAR(16777216),
	GRANTED_VIA_ROLE VARCHAR(16777216),
	GRANT_PATH VARCHAR(16777216),
	CREATED_ON TIMESTAMP_LTZ(6),
    ELEVATED_PRIVILEGE VARCHAR(16777216),
    SNAPSHOT_TIMESTAMP TIMESTAMP 
)
COMMENT = 'USERS WITH ELEVATED ROLES
PURPOSE:  STORE USERS WITH ELEVATED SYSTEM ROLES ACCOUNTADMIN OR SECURITYADMIN
NOTES:  Identifying users that have been granted elevated roles'
;

### IN THE SOURCE ACCOUNT, CREATE THE PROCEDURE SNOWFLAKE_ARCHIVE_LOAD_DATA() 
PROCEDURE ARGUMENTS: None. 
PROCEDURE PURPOSE: To collect Snowflake Metadata for views designated in the METADATA_ARCHIVE_CONTROL_INFO table.  

PROCEDURE DETAILS:  The procedure collects data for the views designated in the METADATA_ARCHIVE_CONTROL_INFO table since the last runtime as in the METADATA_ARCHIVE_LOG_INFO table.
A table in is created in the CONTROLS schema for each view in the METADATA_ARCHIVE_CONTROL_INFO table with the same name as the view being persisted. 
- Each table created has the following columns: 
    -   JOB_ID (VARCHAR): Identifies the specific collection execution run from the METADATA_ARCHIVE_LOG_INFO 
    -   LOG_ID(VARCHAR): SHA2_HEX value of the view row to prevent inserting duplicate records
    -   PAYLOAD (VARIANT): OBJECT_CONSTRUCT(*) of the view row.  
        - The SNOWFLAKE_ARCHIVE_CREATE_VIEWS procedure flattens this column into a view based on the current definition of the Snowflake view that the data has been collected from.  
                This protects from the impacts of schema drift in the view definition. 
    -   TIME_INSERTED (TIMESTAMP): Insert Timestamp

If this is the inital run, the lookback time is since 1900 to get all possible data since then. 

If it is an incremental run, the lookback period is determined by the LOOKBACK_IN_DAYS column of the METADATA_ARCHIVE_CONTROL_INFO table.
-    The LOOKBACK_IN_DAYS value should be at least twice as long as the time between runs to avoid data loss.
-    There is logic in place to avoid inserting the same record twice (Using the LOG_ID value).


This procedure should only be used to collect data from historical views with addidtive updates only, not point in time snapshot views (account_usage object views for example)


In [None]:

--create data collection procedure 
CREATE OR REPLACE PROCEDURE SNOWFLAKE_ARCHIVE_LOAD_DATA()
RETURNS VARIANT
EXECUTE AS CALLER AS
$$
declare
    --when we are kicking off this job
        job_start_time timestamp_ltz;
     --unique ID for each run
        job_id string default '';
    --last time the job was run
        last_end_time timestamp_ltz;
    --last_time the job was run with the lookback offset applied 
        last_end_time_with_lookback timestamp_ltz;
    --rows inserted 
        num_rec_insert number:= 0;
    --message output
        output string;
    --array to collect output for each loop
        msg_array array := array_construct();
    --sql statement to insert new records 
        stmt_to_run string ;

    --cursor to loop through the control table and obtain the variables needed to generate the INSERT INTO SQL statement
    --note that if the last refresh date is null we look back to 1900 to get all data possible 
    res resultset default (SELECT  VIEW_ID, SRC_DATABASE_NAME, SRC_SCHEMA_NAME, SRC_VIEW_NAME, DELTA_COLUMN, NVL(LAST_REFRESH_ON, '1900-01-01')::timestamp LAST_REFRESH_ON, LOOKBACK_IN_DAYS
          FROM METADATA_ARCHIVE_CONTROL_INFO);
    --cursor to loop through the results       
    c1 cursor for res;
    
begin
      --create unique Job_ID
      select uuid_string() into :job_id;
      
      --open the cursor with the resultset and loop though each view in the Control table
      FOR rec IN c1 DO
          --set the fully qualified view name
          let fqtn string := rec.SRC_DATABASE_NAME || '.' || rec.SRC_SCHEMA_NAME || '.' || rec.SRC_VIEW_NAME;
          last_end_time := rec.LAST_REFRESH_ON;
          select current_timestamp()::timestamp_ltz into :job_start_time;
          --since the daysadd function requires a negative value we multiply the lookback by -1 to make it negative
          let lookback number := rec.LOOKBACK_IN_DAYS * -1;
          --set the last_end_time_with_lookback by offsetting the last run with the lookback 
          SELECT DATEADD('DAYS', :lookback, :last_end_time) INTO :last_end_time_with_lookback ;  


           --process the inserts and updates in a transaction so they execute and rollback together in case of failure. 
           begin transaction;
                --create the archive table if it doesnt exist 
                 let create_table_sql string:= 'create table  IF NOT EXISTS ' || rec.SRC_VIEW_NAME  || '(JOB_ID VARCHAR, LOG_ID VARCHAR, PAYLOAD VARIANT, TIME_INSERTED TIMESTAMP_NTZ )';
                 execute immediate :create_table_sql;

                --generate the INSERT INTO SELECT statement for the current View using the lookback as a filter as well as a logID
                --the Log_id is a unique value so we dont re-insert records that have already been entered 
                stmt_to_run  := 'insert into ' || rec.SRC_view_name || ' select   ''' || :job_id || ''',   SHA2_HEX(OBJECT_CONSTRUCT(*)::VARCHAR) LOG_ID, OBJECT_CONSTRUCT(*) PAYLOAD, ''' ||  :job_start_time || '''
                    FROM '   || :fqtn  || ' WHERE LOG_ID NOT IN (SELECT LOG_ID FROM ' || rec.SRC_view_name  || ') AND ' || rec.DELTA_COLUMN || ' >=  ''' || :last_end_time_with_lookback || '''::timestamp_ltz '; 
                --execute the INSERT 
                execute immediate :stmt_to_run;

                 --get the number of records inserted for logging purposes. We use this method because there is a current bug with SQLROWCOUNT
                 -- could use this method once bug is fixed https://docs.snowflake.com/en/developer-guide/snowflake-scripting/dml-status.html#determining-the-number-of-rows-affected-by-dml-commands
                 select "number of rows inserted"::number into :num_rec_insert from table(result_scan(-1));

                --update the control table with the date this process was run so we can restart from where we left off
                let update_metadata_view_date varchar:= 'UPDATE METADATA_ARCHIVE_CONTROL_INFO SET LAST_REFRESH_ON = ''' || :job_start_time || ''' WHERE SRC_SCHEMA_NAME = ''' || rec.SRC_schema_name || ''' AND SRC_VIEW_NAME = ''' || rec.SRC_VIEW_NAME  ||'''' ;
                execute immediate :update_metadata_view_date;

                --insert results into log table
                insert into metadata_archive_log_info 
                       ( src_view  , start_time , job_run_time , num_of_records_extracted , status , sql_run, job_id ) 
                values ( :fqtn,:last_end_time_with_lookback,      :job_start_time ,     :num_rec_insert,   'SUCCESS', :stmt_to_run, :job_id);
          --commit transaction 
          commit;
       
          output := 'Extraction for table, ' || rec.src_view_name || ' for the period of ' || :last_end_time_with_lookback::varchar || ' and ' || :job_start_time::varchar || ' completed with total of ' || :num_rec_insert || ' records';
          msg_array := array_append(msg_array, :output );
        
      
     END FOR;
     CLOSE c1;   
     return  msg_array ;  
      
    -- Handle exceptions and log to table 
    exception
      when statement_error then
       rollback; 
      
       Insert into metadata_archive_log_info  (status , error_msg, JOB_ID, JOB_RUN_TIME, SQL_RUN) 
        values    ( 'ERROR', :sqlerrm, :job_id, :job_start_time, :stmt_to_run);

       return object_construct('Error type', 'STATEMENT_ERROR',
                                'SQLCODE', sqlcode,
                                'SQLERRM', sqlerrm,
                                'SQLSTATE', sqlstate);
      
      when other then
        rollback;
        
      Insert into metadata_archive_log_info  (status , error_msg, JOB_ID, JOB_RUN_TIME, SQL_RUN) 
      values    ( 'ERROR', :sqlerrm, :job_id, :job_start_time, :stmt_to_run);
        
      return object_construct('Error type', 'Other error',
                                'SQLCODE', sqlcode,
                                'SQLERRM', sqlerrm,
                                'SQLSTATE', sqlstate);
       
end;
$$;
ALTER PROCEDURE SNOWFLAKE_ARCHIVE_LOAD_DATA()
SET COMMENT = 
$$PROCEDURE NAME: SNOWFLAKE_ARCHIVE_LOAD_DATA() 
PROCEDURE ARGUMENTS: None. 
PROCEDURE PURPOSE: To collect Snowflake Metadata for views designated in the METADATA_ARCHIVE_CONTROL_INFO table.  
PROCEDURE DETAILS:  The procedure collects data for the views designated in the METADATA_ARCHIVE_CONTROL_INFO table since the last runtime as in the METADATA_ARCHIVE_LOG_INFO table.
A table in is created in the CONTROLS schema for each view in the METADATA_ARCHIVE_CONTROL_INFO table with the same name as the view being persisted. 
    Each table created has the following columns: 
        JOB_ID (VARCHAR)  --Identifies the specific collection execution run from the METADATA_ARCHIVE_LOG_INFO 
        LOG_ID(VARCHAR)  --SHA2_HEX value of the view row to prevent inserting duplicate records
        PAYLOAD (VARIANT)  -- OBJECT_CONSTRUCT(*) of the view row.  
                The SNOWFLAKE_ARCHIVE_CREATE_VIEWS procedure flattens this column into a view based on the current definition of the Snowflake view that the data has been collected from.  
                This protects from the impacts of schema drift in the view definition. 
        TIME_INSERTED (TIMESTAMP)  -- Insert Timestamp

If this is the inital run, the lookback time is since 1900 to get all possible data since then. 
If it is an incremental run, the lookback period is determined by the LOOKBACK_IN_DAYS column of the METADATA_ARCHIVE_CONTROL_INFO table.
    The LOOKBACK_IN_DAYS value should be at least twice as long as the time between runs to avoid data loss.
    There is logic in place to avoid inserting the same record twice (Using the LOG_ID value).
This procedure should only be used to collect data from historical views with addidtive updates only, not point in time snapshot views (account_usage object views for example);
$$;

### IN THE SOURCE ACCOUNT, CREATE PROCEDURE SNOWFLAKE_ARCHIVE_CREATE_VIEWS() 
PROCEDURE ARGUMENTS: None.

PROCEDURE PURPOSE: To create flattened views for tables created by the SNOWFLAKE_ARCHIVE_LOAD_DATA() for the views designated in the METADATA_ARCHIVE_CONTROL_INFO table.  
PROCEDURE DETAILS: It generates a view for each of the Snowflake views listed in the METADATA_ARCHIVE_CONTROL_INFO table.  

Each view: 
- flattens the PAYLOAD column from the corresponding table created by the SNOWFLAKE_ARCHIVE_LOAD_DATA().  
    has the same columns and column types as the current Snowflake database view 
    resides in the SNOWFLAKE_ARCHIVE database under the applicable schema
    can be queried with the same syntax as the corresponding Snowflake view. 

In [None]:
--create procedure to create/update views based on the current Snowflake view definition
CREATE OR REPLACE PROCEDURE SNOWFLAKE_ARCHIVE_CREATE_VIEWS()
RETURNS VARIANT
EXECUTE AS CALLER AS              
$$
DECLARE  
    --create table cursor for roles
  table_cursor CURSOR FOR
        select  SRC_DATABASE_NAME,SRC_SCHEMA_NAME,SRC_VIEW_NAME  from METADATA_ARCHIVE_CONTROL_INFO;
    t varchar ;
    db varchar;
    s varchar;
    fqn varchar;
    output varchar;
    msg_array array := array_construct();
   error_array array:= array_construct();
   
BEGIN  

--create temporary table to store ddl to be executed
   create or replace temporary table sqlstmt
                (ddl varchar);
                
--loop through METADATA_ARCHIVE_CONTROL_INFO table and create create view ddl 
 for var in table_cursor DO
 begin
    t  := var.SRC_VIEW_NAME;
    db  := var.SRC_DATABASE_NAME;
    s  := var.SRC_SCHEMA_NAME;
    fqn  := var.SRC_DATABASE_NAME||'.'||var.SRC_SCHEMA_NAME||'.'||var.SRC_VIEW_NAME;

               
    describe view IDENTIFIER(:fqn);
    
    insert into sqlstmt (ddl)
    with col_list as (select listagg('PAYLOAD:'||"name" ||'::'|| "type"  ||' as '|| "name"||', ') as cols
    from table(result_scan(last_query_id())))
    select 'CREATE OR REPLACE VIEW ' ||:s ||'.'||:t ||' COPY GRANTS as select ' ||   rtrim(cols, ', ') || ' from CONTROLS.'|| :t   as ddl from col_list
    ;
   
      --loop through sqlstmt table and execute sql; 
      let view_ddl varchar;
      let cur cursor for (select replace(ddl, ' ROWS,', ' "ROWS",') as ddl from sqlstmt);
          for row_variable in cur do
             begin
              --get the DDL and execute it to create the view
              view_ddl := row_variable.DDL;
              execute immediate (:view_ddl);
              output := 'View created  ::  ' || :view_ddl;
              msg_array := array_append(msg_array, :output );

                     EXCEPTION  -- log errors
                     WHEN statement_error THEN
                      LET errm := SQLERRM;
                      error_array := array_append(error_array, 'ERROR creating view for ' ||:fqn  ||'.  Error Message: '||  :errm ||' -- DDL: '||:view_ddl);
               
                     WHEN expression_error THEN
                      LET errm := SQLERRM;
                      error_array := array_append(error_array, 'ERROR creating view for ' ||:fqn  ||'.  Error Message: '||  :errm ||' -- DDL: '||:view_ddl);
               
               
                      WHEN other THEN
                      LET errm := SQLERRM;
                      error_array := array_append(error_array, 'ERROR creating view for ' ||:fqn  ||'.  Error Message: '||  :errm ||' -- DDL: '||:view_ddl);
                end;
   
                         
                   end for;
                   close cur;

                   end;

                   end for; 
                    return (msg_array );
                   end;;
$$;
ALTER PROCEDURE SNOWFLAKE_ARCHIVE_CREATE_VIEWS()
SET COMMENT = 
$$PROCEDURE NAME: SNOWFLAKE_ARCHIVE_CREATE_VIEWS() 
PROCEDURE ARGUMENTS: None. 
PROCEDURE PURPOSE: To create flattened views for tables created by the SNOWFLAKE_ARCHIVE_LOAD_DATA() for the views designated in the METADATA_ARCHIVE_CONTROL_INFO table.  
PROCEDURE DETAILS: It generates a view for each of the Snowflake views listed in the METADATA_ARCHIVE_CONTROL_INFO table.  
Each view: 
    flattens the PAYLOAD column from the corresponding table created by the SNOWFLAKE_ARCHIVE_LOAD_DATA().  
    has the same columns and column types as the current Snowflake database view 
    resides in the SNOWFLAKE_ARCHIVE database under the applicable schema
    can be queried with the same syntax as the corresponding Snowflake view. 
$$;


### IN THE SOURCE ACCOUNT, CREATE PROCEDURE SNOWFLAKE_ARCHIVE_ELEVATED_ROLE_USERS() 
PROCEDURE ARGUMENTS: None.

PROCEDURE PURPOSE: To identify users that have been granted elevated privileges by either direct grant or through inheritance.

In [None]:
--create data collection procedure for users with elevated roles
CREATE OR REPLACE PROCEDURE SNOWFLAKE_ARCHIVE_ELEVATED_ROLE_USERS()
RETURNS VARIANT
EXECUTE AS CALLER AS
$$
begin
CREATE TABLE IF NOT EXISTS ELEVATED_ROLE_USERS (
	USER_NAME VARCHAR(16777216),
	ELEVATED_ROLE VARCHAR(16777216),
	GRANTED_VIA_ROLE VARCHAR(16777216),
	GRANT_PATH VARCHAR(16777216),
	CREATED_ON TIMESTAMP_LTZ(6),
    ELEVATED_PRIVILEGE VARCHAR(16777216),
    SNAPSHOT_TIMESTAMP TIMESTAMP 
);

CREATE  TABLE IF NOT EXISTS ROLE_HIERARCHIES (
	ELEVATED_ROLE VARCHAR(16777216),
	ROLE_PATH VARCHAR(16777216),
	PATH_ARRAY ARRAY,
	ROLE_NAME VARCHAR(16777216),
    PRIVILEGE VARCHAR(16777216),
    SNAPSHOT_TIMESTAMP TIMESTAMP 
);


--FIND GRANTS OF ELEVATED ROLES TO OTHER ROLES 
INSERT OVERWRITE INTO ROLE_HIERARCHIES 
(ELEVATED_ROLE,ROLE_PATH,PATH_ARRAY,ROLE_NAME, SNAPSHOT_TIMESTAMP)
WITH HIERARCHY AS 
        (SELECT NAME AS ROLE_NAME, 
            GRANTEE_NAME AS PARENT_NAME  ,
            1 AS LEVEL,
            ARRAY_CONSTRUCT(PARENT_NAME) PATH,
            NAME AS BOTTOM_ROLE,
          FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES
          WHERE GRANTED_ON = 'ROLE' AND PRIVILEGE = 'USAGE'
          AND DELETED_ON IS NULL
          AND NAME IN('SECURITYADMIN','ACCOUNTADMIN')  --ADD RESTRICTED ROLES AS DESIRED
          UNION ALL 
          SELECT 
                  PC.NAME ,PC.GRANTEE_NAME ,
                  H.LEVEL+1 LEVEL ,
                  ARRAY_CAT(H.PATH, ARRAY_CONSTRUCT(PC.GRANTEE_NAME)) PATH,
                  H.BOTTOM_ROLE
          FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES PC
          JOIN HIERARCHY H
          ON H.PARENT_NAME = PC.NAME
          WHERE NOT ARRAY_CONTAINS(PC.GRANTEE_NAME::VARIANT ,H.PATH)  --ELIMINATE CIRCULAR REFERENCES
          AND GRANTED_ON = 'ROLE' 
          AND GRANTED_TO = 'ROLE' 
          AND PRIVILEGE = 'USAGE'
          AND DELETED_ON IS NULL) 
SELECT BOTTOM_ROLE AS ELEVATED_ROLE, BOTTOM_ROLE||' > '||ARRAY_TO_STRING( H.PATH , ' > ' ) AS ROLE_PATH, H.PATH AS PATH_ARRAY, X.VALUE::VARCHAR AS ROLE_NAME, CURRENT_TIMESTAMP()
FROM HIERARCHY H, LATERAL FLATTEN(H.PATH) X;

--MANAGE ROLES GRANTS
INSERT INTO ROLE_HIERARCHIES 
(ELEVATED_ROLE,ROLE_PATH,PATH_ARRAY,ROLE_NAME, PRIVILEGE, SNAPSHOT_TIMESTAMP)
WITH MG_HIERARCHY AS 
        (SELECT NAME AS ROLE_NAME, 
            GRANTEE_NAME AS PARENT_NAME  ,
            1 AS LEVEL,
            ARRAY_CONSTRUCT(PARENT_NAME) PATH,
            NAME AS BOTTOM_ROLE,
            PRIVILEGE
          FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES
          WHERE GRANTED_ON = 'ACCOUNT' AND PRIVILEGE = 'MANAGE GRANTS'
          AND DELETED_ON IS NULL
          AND GRANTEE_NAME NOT IN('SECURITYADMIN','ACCOUNTADMIN')  --ADD RESTRICTED ROLES AS DESIRED
          
          UNION ALL 
          SELECT 
                  PC.NAME ,PC.GRANTEE_NAME ,
                  H.LEVEL+1 LEVEL ,
                  ARRAY_CAT(H.PATH, ARRAY_CONSTRUCT(PC.GRANTEE_NAME)) PATH,
                  H.BOTTOM_ROLE,
                  H.PRIVILEGE
          FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES PC
          JOIN MG_HIERARCHY H
          ON H.PARENT_NAME = PC.NAME
          WHERE NOT ARRAY_CONTAINS(PC.GRANTEE_NAME::VARIANT ,H.PATH)  --ELIMINATE CIRCULAR REFERENCES
          AND GRANTED_ON = 'ROLE' 
          AND GRANTED_TO = 'ROLE' 
          AND PC.PRIVILEGE = 'USAGE'
          AND DELETED_ON IS NULL) 
SELECT BOTTOM_ROLE AS ELEVATED_ROLE, BOTTOM_ROLE||' > '||ARRAY_TO_STRING( H.PATH , ' > ' ) AS ROLE_PATH, H.PATH AS PATH_ARRAY, X.VALUE::VARCHAR AS ROLE_NAME, H.PRIVILEGE, CURRENT_TIMESTAMP()
FROM MG_HIERARCHY H, LATERAL FLATTEN(H.PATH) X;

               
--ELEVATED_ROLE_USERS
INSERT OVERWRITE INTO  ELEVATED_ROLE_USERS  
(USER_NAME,ELEVATED_ROLE,GRANTED_VIA_ROLE,GRANT_PATH,CREATED_ON,ELEVATED_PRIVILEGE, SNAPSHOT_TIMESTAMP )
SELECT GRANTEE_NAME AS USER_NAME, 
    CASE WHEN PRIVILEGE IS NOT NULL THEN NULL ELSE ELEVATED_ROLE END AS ELEVATED_ROLE, 
    ROLE AS GRANTED_VIA_ROLE,  ROLE_PATH AS GRANT_PATH,  CREATED_ON,PRIVILEGE, CURRENT_TIMESTAMP()
FROM ROLE_HIERARCHIES RH
JOIN SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_USERS GTU
ON GTU.ROLE = RH.ROLE_NAME
AND (ELEVATED_ROLE IN ('SECURITYADMIN','ACCOUNTADMIN')
OR PRIVILEGE IS NOT NULL)
WHERE DELETED_ON IS NULL;
end
$$;
ALTER PROCEDURE SNOWFLAKE_ARCHIVE_ELEVATED_ROLE_USERS()
SET COMMENT = 
$$PROCEDURE NAME: SNOWFLAKE_ARCHIVE_ELEVATED_ROLE_USERS() 
PROCEDURE ARGUMENTS: None. 
PROCEDURE PURPOSE: To create a list of users with evelvated roles or peivileges.  
$$;



### IN THE SOURCE ACCOUNT, CREATE DATA COLLECTION TASK

Task to execute procedures on a regular schedule 
- Currently set to run every 24 hours at midnight CT but this can be configured by altering the schedule paramater.

In [None]:
--create task to execute procedures on a regular schedule 
--set to run every 24 hours at midnight CT but this can be configured 
CREATE OR REPLACE TASK task_load_snowflake_archive_data
  SCHEDULE = 'USING CRON 0 0 * * * America/Chicago'
  WAREHOUSE = $WH_NAME   
AS
BEGIN
CALL SNOWFLAKE_ARCHIVE_LOAD_DATA();
CALL SNOWFLAKE_ARCHIVE_CREATE_VIEWS();
CALL SNOWFLAKE_ARCHIVE_ELEVATED_ROLE_USERS();
END;

ALTER TASK task_load_snowflake_archive_data RESUME;

### IN THE SOURCE ACCOUNT, CREATE ACCOUNT PARAMETER COLLECTION TASK

Task to execute account parameter value collection on a regular schedule 
- Currently set to run every 24 hours at midnight CT but this can be configured by altering the schedule paramater.

In [None]:
--create task to collect account parameters on a regular schedule 
--set to run every 24 hours at midnight CT but this can be configured 
CREATE OR REPLACE TASK task_load_account_parameters
  SCHEDULE = 'USING CRON 0 0 * * * America/Chicago'
  WAREHOUSE = $WH_NAME    --CHANGE WAREHOUSE 
AS
    BEGIN
    SHOW PARAMETERS IN ACCOUNT;
    INSERT OVERWRITE INTO CONTROLS.ACCOUNT_PARAMATERS 
    (KEY, VALUE,DEFAULT,LEVEL,DESCRIPTION, TYPE)
    SELECT
    	"key" AS KEY,
    	"value" AS VALUE,
    	"default" AS DEFAULT,
    	"level" AS LEVEL,
    	"description" AS DESCRIPTION,
    	"type" AS TYPE
        FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
    END;

ALTER TASK task_load_account_parameters RESUME;

### IN THE SOURCE ACCOUNT, CREATE PROCEDURE TO CREATE SHARE AND ADD OBJECTS AND GRANTS TO THE SHARE

FOR SOURCE ACCOUNTS IN THE SAME REGION AS THE TARGET ACCOUNT

SHARE_ALL_SCHEMA_OBJECTS() 
PROCEDURE ARGUMENTS: 
- DATABASE_NAME VARCHAR, --Database that contains objects to be shared
- SCHEMA_NAME VARCHAR, --Schema name that contains objects to be shared
- SHARE_NAME VARCHAR  --Name of outbound share to be created or updated

PROCEDURE PURPOSE: To create a share (that allows non-secure objects to be shared) from a specified database and schema. And share all sharable objects in the specified schema with the created share. 

USAGE EXAMPLE:  CALL SHARE_ALL_SCHEMA_OBJECTS ('DATABASE_NAME','SCHEMA_NAME','OUTBOUND_SHARE_NAME')


In [None]:

/**--***for accounts in the same region as the org account you want to monitor security from **/
--create proc to create/outbound share that shares objects in the specified database and schema 
CREATE OR REPLACE PROCEDURE SHARE_ALL_SCHEMA_OBJECTS(DATABASE_NAME VARCHAR, SCHEMA_NAME VARCHAR, SHARE_NAME VARCHAR)
RETURNS VARIANT
EXECUTE AS CALLER AS
$$
DECLARE
DB_NAME VARCHAR := DATABASE_NAME;   --database that contains objects to add to share
S_NAME VARCHAR :=SCHEMA_NAME;
OUTBOUND_SHARE_NAME VARCHAR :=SHARE_NAME;

BEGIN 

-- --create outbound share 
let createShare varchar := 'CREATE SHARE IF NOT EXISTS '||SHARE_NAME;
execute immediate (createShare);

-- --allow sharing of non-secure objects
let alterShare varchar := 'ALTER SHARE '||SHARE_NAME || ' SET SECURE_OBJECTS_ONLY = FALSE';
execute immediate (alterShare);

USE DATABASE IDENTIFIER(:DB_NAME);
USE SCHEMA IDENTIFIER(:S_NAME);

--CREATE TEMP TABLE TO CONTAIN GRANT STATEMENTS
CREATE OR REPLACE TEMPORARY TABLE SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS 
(OBJ_TYPE VARCHAR, STATEMENT VARCHAR);
--DATABASE
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'SCHEMA' AS OBJ_TYPE, 'GRANT USAGE ON DATABASE '||  :DB_NAME ||' TO SHARE '|| :SHARE_NAME||';' AS STATEMENT;  
--SCHEMAS
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'SCHEMA' AS OBJ_TYPE, 'GRANT USAGE ON SCHEMA '||  :S_NAME ||' TO SHARE '|| :SHARE_NAME||';' AS STATEMENT;  
--TABLES
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'TABLE' AS OBJ_TYPE,'GRANT SELECT ON ALL TABLES IN SCHEMA '||   :S_NAME ||' TO SHARE '|| :SHARE_NAME||';' AS STATEMENT;  
--EXTERNAL TABLES
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'EXTERNAL TABLE' AS OBJ_TYPE,'GRANT SELECT ON ALL EXTERNAL TABLES IN SCHEMA ' ||  :S_NAME ||' TO SHARE '|| :SHARE_NAME||';' AS STATEMENT;  
--ICEBERG TABLES
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'ICEBERG TABLE' AS OBJ_TYPE,'GRANT SELECT ON ALL ICEBERG TABLES IN SCHEMA ' ||  :S_NAME ||' TO SHARE '|| :SHARE_NAME||';' AS STATEMENT;  
--MATEIALIZED VIEWS
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'MATEIALIZED VIEWS' AS OBJ_TYPE,'GRANT SELECT ON ALL MATERIALIZED VIEWS IN SCHEMA ' ||  :S_NAME ||' TO SHARE '|| :SHARE_NAME||';' AS STATEMENT;   

----- FUNCTION AND VIEW GRANTS CAN'T BE MADE IN BULK 

--GET FUNCTIONS IN DB -- MUST USE SHOW COMMAND TO GET PARAMETER TYPES WHICH IS REQUIRED WHEN GRANTING PRIVILAGES VIA CODE
SHOW FUNCTIONS IN DATABASE;

--LOG GRANT STATEMENTS TO BE EXECUTED FOR FUNCTIONS IN DB
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'FUNCTION','GRANT USAGE ON FUNCTION ' ||:DB_NAME||'.'||"schema_name"||'.'||SPLIT_PART(UPPER("arguments"),'RETURN',0)|| ' TO SHARE '||  :SHARE_NAME ||';' AS STATEMENT 
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
WHERE "is_builtin" = 'N' AND "is_secure" ='Y';

-- LOG GRANT STATEMENTS TO BE EXECUTED FOR VIEWS IN DB
INSERT INTO SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS  (OBJ_TYPE, STATEMENT)
SELECT 'VIEW','GRANT SELECT ON VIEW ' ||TABLE_CATALOG||'.'||TABLE_SCHEMA||'.'||TABLE_NAME || ' TO SHARE '||  :SHARE_NAME ||';' AS STATEMENT
FROM INFORMATION_SCHEMA.VIEWS 
WHERE TABLE_SCHEMA = :S_NAME;

BEGIN 
  LET MSG_ARRAY ARRAY := ARRAY_CONSTRUCT();
  LET ERROR_ARRAY ARRAY:= ARRAY_CONSTRUCT();
  LET OUTPUT VARCHAR;  
  LET GRANT_STATEMENT VARCHAR;
  LET CUR CURSOR FOR (SELECT STATEMENT FROM SNOWFLAKE_ARCHIVE.CONTROLS.GRANT_STATEMENTS );
  FOR ROW_VARIABLE IN CUR DO
     BEGIN
      --GET THE DDL AND EXECUTE IT TO CREATE THE VIEW
      GRANT_STATEMENT := ROW_VARIABLE.STATEMENT;
      EXECUTE IMMEDIATE (:GRANT_STATEMENT);
      OUTPUT := 'EXECUTED: ' || :GRANT_STATEMENT;
      MSG_ARRAY := ARRAY_APPEND(MSG_ARRAY, :OUTPUT );

             EXCEPTION  -- LOG ERRORS
             WHEN STATEMENT_ERROR THEN
              LET ERRM := SQLERRM;
              ERROR_ARRAY := ARRAY_APPEND(ERROR_ARRAY, 'ERROR EXECUTING: ' ||:GRANT_STATEMENT ||'.  ERROR MESSAGE: '||  :ERRM );
       
             WHEN EXPRESSION_ERROR THEN
              LET ERRM := SQLERRM;
              ERROR_ARRAY := ARRAY_APPEND(ERROR_ARRAY, 'ERROR EXECUTING: ' ||:GRANT_STATEMENT ||'.  ERROR MESSAGE: '||  :ERRM );
       
              WHEN OTHER THEN
              LET ERRM := SQLERRM;
              ERROR_ARRAY := ARRAY_APPEND(ERROR_ARRAY, 'ERROR EXECUTING: ' ||:GRANT_STATEMENT ||'.  ERROR MESSAGE: '||  :ERRM );
        END;

   END FOR;
 RETURN ARRAY_CAT(ERROR_ARRAY, MSG_ARRAY ); 
END;
END;
$$
;
ALTER PROCEDURE SHARE_ALL_SCHEMA_OBJECTS(varchar, varchar, varchar)
SET COMMENT = 
$$PROCEDURE NAME: SHARE_ALL_SCHEMA_OBJECTS() 
PROCEDURE ARGUMENTS: 
    DATABASE_NAME VARCHAR, --Database that contains objects to be shared
    SCHEMA_NAME VARCHAR, --Schema name that contains objects to be shared
    SHARE_NAME VARCHAR  --Name of outbound share to be created or updated

PROCEDURE PURPOSE: To create a share (that allows non-secure objects to be shared) from a specified database and schema. And share all sharable objects in the specified schema with the created share. 

USAGE EXAMPLE:  CALL SHARE_ALL_SCHEMA_OBJECTS ('DATABASE_NAME','SCHEMA_NAME','OUTBOUND_SHARE_NAME');. 
$$;

### IN THE SOURCE ACCOUNT, POPULATE THE CONTROL TABLE WITH DATA TO BE COLLECTED 

- ACCOUNT_USAGE.METERING_HISTORY to monitor Trust Center costs
- TRUST_CENTER.FINDINGS to monitor Trust Center findings

In [None]:
--add views to the control table
truncate table METADATA_ARCHIVE_CONTROL_INFO ;

INSERT INTO METADATA_ARCHIVE_CONTROL_INFO 
    (SRC_DATABASE_NAME, SRC_SCHEMA_NAME ,  SRC_VIEW_NAME, DELTA_COLUMN, LOOKBACK_IN_DAYS )
VALUES 
       ('SNOWFLAKE', 'ACCOUNT_USAGE', 'METERING_HISTORY', 'END_TIME', -2),
       ('SNOWFLAKE', 'TRUST_CENTER', 'FINDINGS', 'CREATED_ON', -2);
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'LOGIN_HISTORY', 'EVENT_TIMESTAMP', 0), 
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'QUERY_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'ACCESS_HISTORY', 'QUERY_START_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'ALERT_HISTORY', 'COMPLETED_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'AUTOMATIC_CLUSTERING_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'COPY_HISTORY', 'LAST_LOAD_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'DATABASE_STORAGE_USAGE_HISTORY', 'USAGE_DATE', 1),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'DATABASE_REPLICATION_USAGE_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'DATA_TRANSFER_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'EVENT_USAGE_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'LOAD_HISTORY', 'LAST_LOAD_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'LOCK_WAIT_HISTORY', 'REQUESTED_AT', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'MATERIALIZED_VIEW_REFRESH_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'METERING_DAILY_HISTORY', 'USAGE_DATE', 1),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'METERING_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'PIPE_USAGE_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'QUERY_ACCELERATION_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'REPLICATION_GROUP_REFRESH_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'REPLICATION_GROUP_USAGE_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'REPLICATION_USAGE_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'SEARCH_OPTIMIZATION_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'SERVERLESS_TASK_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'SNOWPIPE_STREAMING_CLIENT_HISTORY', 'EVENT_TIMESTAMP', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY', 'START_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'STAGE_STORAGE_USAGE_HISTORY', 'USAGE_DATE', 1),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'TASK_HISTORY', 'COMPLETED_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'WAREHOUSE_EVENTS_HISTORY', 'TIMESTAMP', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'WAREHOUSE_LOAD_HISTORY', 'END_TIME', 0),
       -- ('SNOWFLAKE', 'ACCOUNT_USAGE', 'WAREHOUSE_METERING_HISTORY', 'END_TIME', 0)
;

--validate insert statement
SELECT * FROM METADATA_ARCHIVE_CONTROL_INFO;

### IN THE SOURCE ACCOUNT, EXECUTE INTIAL DATA COLLECTION

1. Call the SNOWFLAKE_ARCHIVE_LOAD_DATA stored procedue.  And view the log table to validate execution.  This step may take a few minutes.
    - create a table in the CONTROLS schema (if one doesn't already exist) for each view listed in the control table
    - load each table with data from the corresponding view, with each row loaded into a vaiant column
    - log the results into the logging table
2. Validate that data was loaded by querying the SNOWFLAKE_ARCHIVE.CONTROLS.FINDINGS table

In [None]:
--test procedure 
CALL SNOWFLAKE_ARCHIVE_LOAD_DATA();
--validate proc execution 
--should see a row for each table from the control table with a status of SUCCESS
SELECT * FROM METADATA_ARCHIVE_LOG_INFO;

In [None]:
--view table created --note table structure - each row is converted into an JSON object in the PAYLOAD column (variant) 
select * from SNOWFLAKE_ARCHIVE.CONTROLS.FINDINGS limit 10;

### IN THE SOURCE ACCOUNT, CREATE FLATTENDED VIEWS FROM TABLES CREATED

Execute the SNOWFLAKE_ARCHIVE_CREATE_VIEWS to create flattend views of the collected data.

- views will be created in the SNOWFLAKE_ARCHIVE database under the schema that corresponse to the schema of the source view 
- views will have the same columns and column data types as the current Snowflake Share database view
- views can be queried with the same syntax as the corresponding Snowflake views

In [None]:
-- execute procedure 
CALL SNOWFLAKE_ARCHIVE_CREATE_VIEWS();

--validate procedure by selecting from one of the views created --note that the new view has the same structure as the original Snowflake Shared view
SELECT * FROM  SNOWFLAKE_ARCHIVE.TRUST_CENTER.FINDINGS limit 10;


### IN THE SOURCE ACCOUNT, EXECUTE TASK TASK_LOAD_SNOWFLAKE_ARCHIVE_DATA 

Resume, execute and validate the schedule and state of the TASK_LOAD_SNOWFLAKE_ARCHIVE_DATA 

In [None]:
--start the task TASK_LOAD_SNOWFALKE_ARCHIVE_DATA
ALTER TASK task_load_snowflake_archive_data RESUME;

--validate that the task runs and succeeded 
EXECUTE TASK TASK_LOAD_SNOWFLAKE_ARCHIVE_DATA;

--validate task status
SELECT * 
FROM TABLE(information_schema.task_history())
WHERE database_name = 'SNOWFLAKE_ARCHIVE'
AND NAME = 'TASK_LOAD_SNOWFLAKE_ARCHIVE_DATA'
ORDER BY scheduled_time DESC;

### IN THE SOURCE ACCOUNT, EXECUTE TASK TASK_LOAD_ACCOUNT_PARAMETERS

Resume, execute and validate the schedule and state of the TASK_LOAD_ACCOUNT_PARAMETERS

In [None]:
--start the task TASK_LOAD_SNOWFALKE_ARCHIVE_DATA
ALTER TASK TASK_LOAD_ACCOUNT_PARAMETERS RESUME;

--validate that the task runs and succeeded 
EXECUTE TASK TASK_LOAD_ACCOUNT_PARAMETERS;

--validate task status
SELECT * 
FROM TABLE(information_schema.task_history())
WHERE database_name = 'SNOWFLAKE_ARCHIVE'
AND NAME = 'TASK_LOAD_ACCOUNT_PARAMETERS'
ORDER BY scheduled_time DESC;

### IN THE SOURCE ACCOUNT, CREATE TRUST CENTER VIEWS


1. Create the Trust Center Views that will be used for monitoring security compliance.
    - TRUST_CENTER_COST_DETAILS
    - TRUST_CENTER_SEVERITY_FINDINGS_SUMMARY
    - TRUST_CENTER_SCANNER_AT_RISK_COUNT
    - TRUST_CENTER_SEVERITY_FINDINGS_DETAILS
2. Validate the views were created by querying the row counts from each view.


In [None]:

---------------------------------------------------
 --COST DETAILS 
CREATE OR REPLACE VIEW TRUST_CENTER_COST_DETAILS 
COPY GRANTS
AS 
    (SELECT *
    FROM SNOWFLAKE_ARCHIVE.ACCOUNT_USAGE.METERING_HISTORY
    WHERE
       SERVICE_TYPE = 'TRUST_CENTER' 
    ORDER BY START_TIME DESC);
    
 
--FINDINGS SEVERITY SUMMARY
CREATE OR REPLACE VIEW TRUST_CENTER_SEVERITY_FINDINGS_SUMMARY  
COPY GRANTS
AS 
    (WITH FINDINGS AS (
        SELECT CURRENT_ACCOUNT() AS ACCOUNT, 
            CURRENT_ACCOUNT_NAME() AS ACCOUNT_NAME,
            SCANNER_PACKAGE_NAME,
            CREATED_ON::DATE AS FINDING_DATE, 
            CASE WHEN SCANNER_SHORT_DESCRIPTION ILIKE 'Ensure monitoring%' THEN 'MONITOR' ELSE SEVERITY END AS SEVERITY, 
            SCANNER_ID,
            RANK() OVER (PARTITION BY SCANNER_ID ORDER BY end_timestamp DESC) AS DATE_RANK, TOTAL_AT_RISK_COUNT, EVENT_ID, AT_RISK_ENTITIES
        FROM SNOWFLAKE_ARCHIVE.TRUST_CENTER.FINDINGS
        QUALIFY DATE_RANK = 1 AND TOTAL_AT_RISK_COUNT >0
        ORDER BY SCANNER_ID, EVENT_ID)
    SELECT ACCOUNT, 
        ACCOUNT_NAME,
        SCANNER_PACKAGE_NAME,
        FINDING_DATE,
        SEVERITY,  
        CASE WHEN SEVERITY = 'CRITICAL' THEN 1
                    WHEN SEVERITY = 'HIGH' THEN 2
                    WHEN SEVERITY = 'MEDIUM' THEN 3
                    WHEN SEVERITY = 'LOW' THEN 4 
                    WHEN SEVERITY = 'MONITOR' THEN 5 END AS SEVERITY_LEVEL_RANK,
        COUNT(DISTINCT SCANNER_ID) AS SEVERITY_COUNT
    FROM FINDINGS
    GROUP BY ALL);

--RISK COUNT BY FINDING
CREATE OR REPLACE VIEW TRUST_CENTER_SCANNER_AT_RISK_COUNT  
COPY GRANTS
AS 
    (WITH FINDINGS AS (
        SELECT CURRENT_ACCOUNT() AS ACCOUNT, 
            CURRENT_ACCOUNT_NAME() AS ACCOUNT_NAME,
            SCANNER_PACKAGE_NAME,
            CREATED_ON::DATE AS FINDING_DATE,
            SCANNER_SHORT_DESCRIPTION,
            CASE WHEN SCANNER_SHORT_DESCRIPTION ILIKE 'Ensure monitoring%' THEN 'MONITOR' ELSE SEVERITY END AS SEVERITY,  
            TOTAL_AT_RISK_COUNT,
            RANK() OVER (PARTITION BY SCANNER_ID ORDER BY end_timestamp DESC) AS DATE_RANK, 
        FROM SNOWFLAKE_ARCHIVE.TRUST_CENTER.FINDINGS
        QUALIFY DATE_RANK = 1 AND TOTAL_AT_RISK_COUNT >0)
    SELECT ACCOUNT, 
        ACCOUNT_NAME,
        SCANNER_PACKAGE_NAME,
        FINDING_DATE,
        SCANNER_SHORT_DESCRIPTION,
        SEVERITY,   
        CASE WHEN SEVERITY = 'CRITICAL' THEN 1
            WHEN SEVERITY = 'HIGH' THEN 2
            WHEN SEVERITY = 'MEDIUM' THEN 3
            WHEN SEVERITY = 'LOW' THEN 4 
            WHEN SEVERITY = 'MONITOR' THEN 5 END AS SEVERITY_LEVEL_RANK,
        SUM(TOTAL_AT_RISK_COUNT) AS TOTAL_AT_RISK_COUNT
    FROM FINDINGS
    GROUP BY ALL);
                
                

--LATEST  FINDINGS DETAILS
CREATE OR REPLACE VIEW TRUST_CENTER_SEVERITY_FINDINGS_DETAILS  
COPY GRANTS
AS 
    (SELECT CURRENT_ACCOUNT() AS ACCOUNT, CURRENT_ACCOUNT_NAME() AS ACCOUNT_NAME,
        CASE WHEN SCANNER_SHORT_DESCRIPTION ILIKE 'Ensure monitoring%' THEN 'MONITOR' ELSE SEVERITY END AS SEVERITY, 
        CASE WHEN SEVERITY = 'CRITICAL' THEN 1
            WHEN SEVERITY = 'HIGH' THEN 2
            WHEN SEVERITY = 'MEDIUM' THEN 3
            WHEN SEVERITY = 'LOW' THEN 4 
            WHEN SEVERITY = 'MONITOR' THEN 5 END AS SEVERITY_LEVEL_RANK,
        SCANNER_ID,
        SCANNER_SHORT_DESCRIPTION, 
        CREATED_ON::DATE AS FINDING_DATE, 
        F.VALUE:entity_object_type::VARCHAR AS ENTITY_OBJECT_TYPE,
        F.VALUE:entity_id::VARCHAR AS ENTITY_ID,
        F.VALUE:entity_name::VARCHAR AS ENTITY_NAME,
        IMPACT, 
        SUGGESTED_ACTION,
        AT_RISK_ENTITIES,
        RANK() OVER (PARTITION BY SCANNER_ID ORDER BY end_timestamp DESC) AS DATE_RANK
    FROM
      SNOWFLAKE_ARCHIVE.TRUST_CENTER.FINDINGS,
      LATERAL FLATTEN(INPUT => AT_RISK_ENTITIES) AS F
    QUALIFY DATE_RANK = 1 AND TOTAL_AT_RISK_COUNT >0
    );


In [None]:
-------------TEST VIEWS 
SELECT  'TRUST_CENTER_COST_DETAILS' AS V_NAME, COUNT(*) AS CNT  FROM TRUST_CENTER_COST_DETAILS
UNION ALL 
SELECT 'TRUST_CENTER_SEVERITY_FINDINGS_SUMMARY' AS V_NAME, COUNT(*) AS CNT   FROM TRUST_CENTER_SEVERITY_FINDINGS_SUMMARY 
UNION ALL
SELECT 'TRUST_CENTER_SCANNER_AT_RISK_COUNT' AS V_NAME, COUNT(*) AS CNT    FROM TRUST_CENTER_SCANNER_AT_RISK_COUNT
UNION ALL
SELECT 'TRUST_CENTER_SEVERITY_FINDINGS_DETAILS' AS V_NAME, COUNT(*) AS CNT    FROM TRUST_CENTER_SEVERITY_FINDINGS_DETAILS;