## Dynamic Merge


##### The source system supplied a meta data view indicating join conditions

In [None]:
create or replace view DEV_DB.RAW_SWP.V_SOURCE_COLUMN_META(
	SNOWFLAKE_VIEW,
	LOGICAL_KEY,
	JOIN_CONDITION,
	MINUS_EXPRESSION,
	DELETE_CONDITION
) as

    WITH JOIN_CONDITION AS (
    
        SELECT
            V.SNOWFLAKE_VIEW,
            V.LOGICAL_KEY,
            LISTAGG(
                CASE
                    WHEN C.DATA_TYPE = 'TEXT' THEN 'NVL(S.' || V.JOIN_FIELD ||', '''') = NVL(T.' || V.JOIN_FIELD ||', '''') AND '
                    WHEN C.DATA_TYPE = 'NUMBER' THEN 'NVL(S.' || V.JOIN_FIELD ||', -999) = NVL(T.' || V.JOIN_FIELD ||', -999) AND ' 
                    WHEN C.DATA_TYPE = 'TIMESTAMP_NTZ' THEN 'NVL(S.' || V.JOIN_FIELD ||', ''1900-01-01'') = NVL(T.' || V.JOIN_FIELD ||', ''1900-01-01'') AND '            
                END) AS JOIN_CONDITION,
            LISTAGG(
                CASE
                    WHEN C.DATA_TYPE = 'TEXT' THEN 'NVL(' || V.JOIN_FIELD ||', '''')||'
                    WHEN C.DATA_TYPE = 'NUMBER' THEN 'NVL(' || V.JOIN_FIELD ||', -999)||' 
                    WHEN C.DATA_TYPE = 'TIMESTAMP_NTZ' THEN 'NVL(' || V.JOIN_FIELD ||', ''1900-01-01'')||'            
                END) AS DELETE_CONDITION
        FROM (
            SELECT
                SNOWFLAKE_VIEW,
                LOGICAL_KEY,
                VALUE AS JOIN_FIELD
            FROM
                SWP_I10_UAT_DB.I10_SWPR_SECURE_VW_TB.SF_SWPR_VIEW_DETAILS_VIEW,
                LATERAL FLATTEN(INPUT => SPLIT(LOGICAL_KEY, '|'))) V
                JOIN SWP_I10_UAT_DB.INFORMATION_SCHEMA.COLUMNS C ON C.TABLE_NAME = V.SNOWFLAKE_VIEW AND V.JOIN_FIELD = C.COLUMN_NAME
        GROUP BY
            V.SNOWFLAKE_VIEW, V.LOGICAL_KEY

    )

SELECT
    V.SNOWFLAKE_VIEW,
    V.LOGICAL_KEY,
    LEFT(J.JOIN_CONDITION, LENGTH(J.JOIN_CONDITION) - 4) JOIN_CONDITION,
    CASE
        WHEN LISTAGG(C.COLUMN_NAME, '') = '' THEN NULL
        ELSE 'SELECT ' || LISTAGG(C.COLUMN_NAME, ',') WITHIN GROUP (ORDER BY ORDINAL_POSITION) || ' FROM SWP_I10_UAT_DB.I10_SWPR_SECURE_VW_TB.' || V.SNOWFLAKE_VIEW || ' MINUS SELECT ' || LISTAGG(C.COLUMN_NAME, ',') WITHIN GROUP (ORDER BY ORDINAL_POSITION) || ' FROM ' || V.SNOWFLAKE_VIEW 
    END AS MINUS_EXPRESSION,
    LEFT(J.DELETE_CONDITION, LENGTH(J.DELETE_CONDITION) - 2) DELETE_CONDITION,
FROM
    SWP_I10_UAT_DB.I10_SWPR_SECURE_VW_TB.SF_SWPR_VIEW_DETAILS_VIEW V
    LEFT JOIN INFORMATION_SCHEMA.COLUMNS C ON C.TABLE_NAME = V.SNOWFLAKE_VIEW AND TABLE_SCHEMA = 'RAW_SWP'
    LEFT JOIN JOIN_CONDITION J ON J.SNOWFLAKE_VIEW = V.SNOWFLAKE_VIEW
GROUP BY
    V.SNOWFLAKE_VIEW, V.LOGICAL_KEY, J.JOIN_CONDITION, J.DELETE_CONDITION;

##### The main procedure read the join conditions

In [None]:
CREATE OR REPLACE PROCEDURE DEV_DB.RAW_SWP.SP_PROCESS_SWP_MERGE_ALL_MINUS("V_SOURCE_PATH" VARCHAR, "V_TARGET_TABLE" VARCHAR)
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS 
$$

DECLARE
            
    V_SQL_MERGE         STRING;    
    V_SQL_DELETE        STRING;

    V_SOURCE_DATABASE   STRING;
    V_SOURCE_SCHEMA     STRING;
    V_SOURCE_TABLE      STRING;
        
    V_JOIN_CONDITION    STRING;
    V_MINUS_EXPRESSION  STRING;
    V_DELETE_CONDITION  STRING;
    
BEGIN

    -- Seperate out the source schema and table

    V_SOURCE_DATABASE   := (SELECT SPLIT(:V_SOURCE_PATH, '.')[0]);
    V_SOURCE_SCHEMA     := (SELECT SPLIT(:V_SOURCE_PATH, '.')[1]);
    V_SOURCE_TABLE      := (SELECT SPLIT(:V_SOURCE_PATH, '.')[2]);

    -- Convert the logical_key value into a join condition which can be used in the merge statement

    V_JOIN_CONDITION    := (SELECT JOIN_CONDITION FROM V_SOURCE_COLUMN_META WHERE SNOWFLAKE_VIEW = :V_SOURCE_TABLE);
    V_MINUS_EXPRESSION  := (SELECT MINUS_EXPRESSION FROM V_SOURCE_COLUMN_META WHERE SNOWFLAKE_VIEW = :V_SOURCE_TABLE);
    V_DELETE_CONDITION  := (SELECT DELETE_CONDITION FROM V_SOURCE_COLUMN_META WHERE SNOWFLAKE_VIEW = :V_SOURCE_TABLE);

    -- Merge in new data and update any changes

    V_SQL_MERGE := '
    
        MERGE INTO ' || V_TARGET_TABLE || ' AS T
        USING (' || V_MINUS_EXPRESSION || ') AS S ON ' || V_JOIN_CONDITION || '
        WHEN NOT MATCHED THEN INSERT ALL BY NAME
        WHEN MATCHED THEN UPDATE ALL BY NAME

    ';

    EXECUTE IMMEDIATE V_SQL_MERGE;   

    -- Remove any rows that no longer exists

    V_SQL_DELETE := '

       DELETE FROM ' || :V_TARGET_TABLE || ' WHERE ' || V_DELETE_CONDITION || ' NOT IN (SELECT ' || V_DELETE_CONDITION || ' FROM ' || :V_SOURCE_DATABASE || '.' || :V_SOURCE_SCHEMA || '.' || :V_SOURCE_TABLE || ')';

    EXECUTE IMMEDIATE V_SQL_DELETE;

    RETURN 'SUCCESS';

END
$$;
