# Check bronze (staging)

In [None]:
USE ROLE TPCH_DEVELOPER;
USE SCHEMA TPCH_ANALYTICS_DB.STAGING;

SELECT 'ORDERS' AS TABLE_NAME, COUNT(*) AS ROW_COUNT FROM ORDERS
UNION ALL
SELECT 'CUSTOMER', COUNT(*) FROM CUSTOMER
UNION ALL
SELECT 'LINEITEM', COUNT(*) FROM LINEITEM;

# Create Silver (Analytics)

In [None]:
USE SCHEMA TPCH_ANALYTICS_DB.ANALYTICS;
USE ROLE TPCH_DEVELOPER;


-- Silver Table: Orders w clean and enrichment
CREATE OR REPLACE TABLE ORDERS_SILVER (
    O_ORDERKEY          NUMBER(38,0) PRIMARY KEY,
    O_CUSTKEY           NUMBER(38,0),
    O_ORDERSTATUS       VARCHAR(1),
    O_ORDERSTATUS_DESC  VARCHAR(20),          -- Enriched
    O_TOTALPRICE        NUMBER(12,2),
    O_ORDERDATE         DATE,
    O_ORDER_YEAR        NUMBER(4,0),          -- Derived
    O_ORDER_MONTH       NUMBER(2,0),          -- Derived
    O_ORDER_QUARTER     NUMBER(1,0),          -- Derived
    O_ORDERPRIORITY     VARCHAR(15),
    O_PRIORITY_RANK     NUMBER(1,0),          -- Derived
    O_CLERK             VARCHAR(15),
    O_CLERK_ID          NUMBER(9,0),          -- Derived
    O_SHIPPRIORITY      NUMBER(38,0),
    O_COMMENT           VARCHAR(79),
    -- Metadata columns
    SOURCE_FILE         VARCHAR(256),
    FIRST_LOADED_AT     TIMESTAMP_LTZ,
    LAST_UPDATED_AT     TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Silver Table: Customers w enrichment
CREATE OR REPLACE TABLE CUSTOMER_SILVER (
    C_CUSTKEY           NUMBER(38,0) PRIMARY KEY,
    C_NAME              VARCHAR(25),
    C_ADDRESS           VARCHAR(40),
    C_NATIONKEY         NUMBER(38,0),
    C_NATION_NAME       VARCHAR(25),          -- Joined from NATION
    C_REGIONKEY         NUMBER(38,0),         -- Joined from NATION->REGION
    C_REGION_NAME       VARCHAR(25),          -- Joined from REGION
    C_PHONE             VARCHAR(15),
    C_ACCTBAL           NUMBER(12,2),
    C_MKTSEGMENT        VARCHAR(10),
    C_COMMENT           VARCHAR(117),
    LOAD_TIMESTAMP      TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Silver Table: Lineitem w enrichment
CREATE OR REPLACE TABLE LINEITEM_SILVER (
    L_ORDERKEY          NUMBER(38,0),
    L_LINENUMBER        NUMBER(38,0),
    L_PARTKEY           NUMBER(38,0),
    L_PART_NAME         VARCHAR(55),          -- Joined from PART
    L_PART_TYPE         VARCHAR(25),          -- Joined from PART
    L_SUPPKEY           NUMBER(38,0),
    L_SUPPLIER_NAME     VARCHAR(25),          -- Joined from SUPPLIER
    L_QUANTITY          NUMBER(12,2),
    L_EXTENDEDPRICE     NUMBER(12,2),
    L_DISCOUNT          NUMBER(12,2),
    L_TAX               NUMBER(12,2),
    L_RETURNFLAG        VARCHAR(1),
    L_LINESTATUS        VARCHAR(1),
    L_SHIPDATE          DATE,
    L_COMMITDATE        DATE,
    L_RECEIPTDATE       DATE,
    L_SHIPINSTRUCT      VARCHAR(25),
    L_SHIPMODE          VARCHAR(10),
    L_COMMENT           VARCHAR(44),
    -- Calculated columns
    L_NET_PRICE         NUMBER(12,2),        -- EXTENDEDPRICE * (1 - DISCOUNT)
    L_FINAL_PRICE       NUMBER(12,2),        -- NET_PRICE * (1 + TAX)
    L_SHIP_DELAY_DAYS   NUMBER(38,0),        -- Days between commit and receipt
    LOAD_TIMESTAMP      TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP(),
    PRIMARY KEY (L_ORDERKEY, L_LINENUMBER)
);


## Consider Trigger Automatic Clustering

In [None]:
/*
Pricing: Using Clustering Keys will enable Snowflake's Automatic Clustering service. Monitor the Credit costs incurred by this service, especially on large Fact tables like LINEITEM_SILVER.

Order: The order of columns in CLUSTER BY is important. Put the most selective or filter column first.

*/

/*

USE SCHEMA TPCH_ANALYTICS_DB.ANALYTICS;
USE ROLE TPCH_DEVELOPER;

ALTER TABLE ORDERS_SILVER CLUSTER BY (O_ORDERDATE, O_CUSTKEY, O_ORDER_YEAR);
ALTER TABLE CUSTOMER_SILVER CLUSTER BY (C_REGION_NAME, C_NATION_NAME);
ALTER TABLE LINEITEM_SILVER CLUSTER BY (L_SHIPDATE, L_ORDERKEY, L_PARTKEY);

-- monitor cost
SELECT 
    TO_DATE(START_TIME) AS DATE,
    SUM(CREDITS_USED) AS TOTAL_CLUSTERING_CREDITS
FROM 
    SNOWFLAKE.ACCOUNT_USAGE.AUTOMATIC_CLUSTERING_HISTORY
GROUP BY 1
ORDER BY 1 DESC;

-- drop cluster
ALTER TABLE ORDERS_SILVER DROP CLUSTERING KEY;
ALTER TABLE CUSTOMER_SILVER DROP CLUSTERING KEY;
ALTER TABLE LINEITEM_SILVER DROP CLUSTERING KEY;

*/

# Create Gold (Reports)

In [None]:
USE SCHEMA TPCH_ANALYTICS_DB.REPORTS;
USE ROLE TPCH_DEVELOPER;


-- Gold Table - Daily Sales Summary
CREATE OR REPLACE TABLE DAILY_SALES_SUMMARY (
    SUMMARY_DATE        DATE PRIMARY KEY,
    ORDER_YEAR          NUMBER(4,0),
    ORDER_MONTH         NUMBER(2,0),
    ORDER_QUARTER       NUMBER(1,0),
    TOTAL_ORDERS        NUMBER(38,0),
    TOTAL_CUSTOMERS     NUMBER(38,0),
    TOTAL_REVENUE       NUMBER(15,2),
    AVG_ORDER_VALUE     NUMBER(15,2),
    MIN_ORDER_VALUE     NUMBER(15,2),
    MAX_ORDER_VALUE     NUMBER(15,2),
    LOAD_TIMESTAMP      TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Gold Table - Customer Lifetime Value
CREATE OR REPLACE TABLE CUSTOMER_LTV (
    C_CUSTKEY           NUMBER(38,0) PRIMARY KEY,
    C_NAME              VARCHAR(25),
    C_NATION_NAME       VARCHAR(25),
    C_REGION_NAME       VARCHAR(25),
    C_MKTSEGMENT        VARCHAR(10),
    TOTAL_ORDERS        NUMBER(38,0),
    TOTAL_SPENT         NUMBER(15,2),
    AVG_ORDER_VALUE     NUMBER(15,2),
    FIRST_ORDER_DATE    DATE,
    LAST_ORDER_DATE     DATE,
    CUSTOMER_TENURE_DAYS NUMBER(38,0),
    CUSTOMER_TIER       VARCHAR(20),          -- VIP, GOLD, SILVER, BRONZE, STANDARD
    IS_ACTIVE           BOOLEAN,              -- Has order in last 90 days
    LOAD_TIMESTAMP      TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);


/*

USE SCHEMA TPCH_ANALYTICS_DB.REPORTS;
USE ROLE TPCH_DEVELOPER;

ALTER TABLE CUSTOMER_LTV CLUSTER BY (C_CUSTKEY, C_REGION_NAME, C_NATION_NAME);
ALTER TABLE DAILY_SALES_SUMMARY CLUSTER BY (SUMMARY_DATE, ORDER_MONTH);

*/


# SNOWPIPE

AUTO LOAD DATA FROM STAGE -> RAW_ORDERS_LANDING
Snowpipe automatically loads data when new files appear in the stage

## Create Pipe

In [None]:

USE SCHEMA TPCH_ANALYTICS_DB.STAGING;
DESC STAGE TPCH_DATA_STAGE;

LIST @TPCH_DATA_STAGE;

In [None]:
select current_timestamp()::TIMESTAMP_LTZ

In [None]:
USE ROLE TPCH_DEVELOPER;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.ORDERS ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.ORDERS ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.LINEITEM ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.LINEITEM ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.CUSTOMER ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.CUSTOMER ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.NATION ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.NATION ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.PART ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.PART ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.PARTSUPP ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.PARTSUPP ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.REGION ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.REGION ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

ALTER TABLE TPCH_ANALYTICS_DB.STAGING.SUPPLIER ADD COLUMN FROM_SOURCE VARCHAR;
ALTER TABLE TPCH_ANALYTICS_DB.STAGING.SUPPLIER ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

In [None]:
-- USE ROLE TPCH_DEVELOPER;

-- ALTER TABLE TPCH_ANALYTICS_DB.STAGING.ORDERS DROP COLUMN CREATED_AT;
-- ALTER TABLE TPCH_ANALYTICS_DB.STAGING.ORDERS ADD COLUMN CREATED_AT TIMESTAMP_LTZ;

In [None]:
CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_ORDERS
AUTO_INGEST = TRUE 
INTEGRATION = 'MY_GCP_INTEGRATION' -- Adjust!
AS
...

In [None]:
USE DATABASE TPCH_ANALYTICS_DB;
USE ROLE TPCH_DEVELOPER;

/*
In this case study, I chose to create a pipe by batch (not streaming) (because I am testing by reading from the internal stage, and need to configure cloud service if using streaming)
See setup streaming https://docs.snowflake.com/en/sql-reference/sql/create-pipe
NOTED: NEED TO SETUP METADATA$START_SCAN_TIME IF WE SETUP STREAMING

*/

CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_ORDERS
AUTO_INGEST = FALSE -- Automate the data ingestion process (Choose TRUE)
AS
COPY INTO STAGING.ORDERS (
    O_ORDERKEY,
    O_CUSTKEY,
    O_ORDERSTATUS,
    O_TOTALPRICE,
    O_ORDERDATE,
    O_ORDERPRIORITY,
    O_CLERK,
    O_SHIPPRIORITY,
    O_COMMENT,
    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::NUMBER(38,0),
        $3::VARCHAR(1),
        $4::NUMBER(15,2),
        $5::DATE,
        $6::VARCHAR(15),
        $7::VARCHAR(15),
        $8::NUMBER(38,0),
        $9::VARCHAR(79),
        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
        -- METADATA$START_SCAN_TIME
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_orders_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);

In [None]:
    SELECT DISTINCT
        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED + interval '7' hour,
        METADATA$START_SCAN_TIME
    
    FROM @STAGING.TPCH_DATA_STAGE
(
    PATTERN => '.*tpch_sf1_orders_.*\.csv'

)

order by 2 desc

In [None]:
CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_LINEITEM
AUTO_INGEST = FALSE
AS
COPY INTO STAGING.LINEITEM (
    L_ORDERKEY,
    L_PARTKEY,
    L_SUPPKEY,
    L_LINENUMBER,
    L_QUANTITY,
    L_EXTENDEDPRICE,
    L_DISCOUNT,
    L_TAX,
    L_RETURNFLAG,
    L_LINESTATUS,
    L_SHIPDATE,
    L_COMMITDATE,
    L_RECEIPTDATE,
    L_SHIPINSTRUCT,
    L_SHIPMODE,
    L_COMMENT,
    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::NUMBER(38,0),
        $3::NUMBER(38,0),
        $4::NUMBER(38,0),
        $5::NUMBER(12,2),
        $6::NUMBER(12,2),
        $7::NUMBER(12,2),
        $8::NUMBER(12,2),
        $9::VARCHAR(1),
        $10::VARCHAR(1),
        $11::DATE,
        $12::DATE,
        $13::DATE,
        $14::VARCHAR(25),
        $15::VARCHAR(10),
        $16::VARCHAR(44),
        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
        -- METADATA$START_SCAN_TIME
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_line_item_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);


--

CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_CUSTOMER
AUTO_INGEST = FALSE
AS
COPY INTO STAGING.CUSTOMER (
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,

    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::VARCHAR(25),
        $3::VARCHAR(40),
        $4::NUMBER(38,0),
        $5::VARCHAR(15),
        $6::NUMBER(12,2),
        $7::VARCHAR(10),
        $8::VARCHAR(117),

        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
        -- METADATA$START_SCAN_TIME
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_customer_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);


--

CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_NATION
AUTO_INGEST = FALSE
AS
COPY INTO STAGING.NATION (
    N_NATIONKEY,
    N_NAME,
    N_REGIONKEY,
    N_COMMENT,
    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::VARCHAR(25),
        $3::NUMBER(38,0),
        $4::VARCHAR(152),
        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_nation_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);

--

CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_PART
AUTO_INGEST = FALSE
AS
COPY INTO STAGING.PART (
    P_PARTKEY,
    P_NAME,
    P_MFGR,
    P_BRAND,
    P_TYPE,
    P_SIZE,
    P_CONTAINER,
    P_RETAILPRICE,
    P_COMMENT,

    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::VARCHAR(55),
        $3::VARCHAR(25),
        $4::VARCHAR(10),
        $5::VARCHAR(25),
        $6::NUMBER(38,0),
        $7::VARCHAR(10),
        $8::NUMBER(12,2),
        $9::VARCHAR(23),

        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_part_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);

--

CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_PARTSUPP
AUTO_INGEST = FALSE
AS
COPY INTO STAGING.PARTSUPP (
    PS_PARTKEY,
    PS_SUPPKEY,
    PS_AVAILQTY,
    PS_SUPPLYCOST,
    PS_COMMENT,
    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::NUMBER(38,0),
        $3::NUMBER(38,0),
        $4::NUMBER(12,2),
        $5::VARCHAR(199),
        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
        -- METADATA$START_SCAN_TIME
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_partsupp_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);

--

CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_REGION
AUTO_INGEST = FALSE
AS
COPY INTO STAGING.REGION (
    R_REGIONKEY,
    R_NAME,
    R_COMMENT,
    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::VARCHAR(25),
        $3::VARCHAR(152),
        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
        -- METADATA$START_SCAN_TIME
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_region_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);


--

CREATE OR REPLACE PIPE CONTROL.PIPE_LOAD_SUPPLIER
AUTO_INGEST = FALSE
AS
COPY INTO STAGING.SUPPLIER (
    S_SUPPKEY,
    S_NAME,
    S_ADDRESS,
    S_NATIONKEY,
    S_PHONE,
    S_ACCTBAL,
    S_COMMENT,

    FROM_SOURCE,
    CREATED_AT
)
FROM (
    SELECT 
        $1::NUMBER(38,0),
        $2::VARCHAR(25),
        $3::VARCHAR(40),
        $4::NUMBER(38,0),
        $5::VARCHAR(15),
        $6::NUMBER(12,2),
        $7::VARCHAR(101),
        METADATA$FILENAME::VARCHAR,
        METADATA$FILE_LAST_MODIFIED
        -- METADATA$START_SCAN_TIME
    
    FROM @STAGING.TPCH_DATA_STAGE
)
PATTERN='.*tpch_sf1_supplier_.*\.csv'
FILE_FORMAT = (
    TYPE = CSV,
    SKIP_HEADER = 1,
    FIELD_DELIMITER = ',',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);


In [None]:
use role tpch_developer;
ALTER PIPE CONTROL.PIPE_LOAD_ORDERS REFRESH;


In [None]:
select max(o_orderkey) from staging.orders 
-- where o_orderkey = 5400001

In [None]:
-- ALTER ACCOUNT SET TIMEZONE = 'Asia/Ho_Chi_Minh';

-- SHOW PARAMETERS LIKE 'TIMEZONE';

In [None]:
-- use role tpch_developer;
delete from staging.orders where o_orderkey >= 6000001;

In [None]:
-- select count(*) from staging.orders

select 
    O_ORDERKEY,
    CREATED_AT,
    FROM_SOURCE,


from staging.orders where o_orderkey >= 6000001 order by created_at desc

In [None]:
SELECT SYSTEM$PIPE_STATUS('TPCH_analytics_db.control.PIPE_LOAD_ORDERS'); 

## Set job

In [None]:
-- use compute serverless
CREATE OR REPLACE TASK TPCH_ANALYTICS_DB.CONTROL.TASK_REFRESH_PIPE
  SCHEDULE = 'USING CRON 05 00 * * * Asia/Ho_Chi_Minh'
  AS
    BEGIN
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_LINEITEM REFRESH;
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_CUSTOMER REFRESH;
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_NATION REFRESH;
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_ORDERS REFRESH;
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_PART REFRESH;
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_PARTSUPP REFRESH;
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_REGION REFRESH;
        ALTER PIPE TPCH_ANALYTICS_DB.CONTROL.PIPE_LOAD_SUPPLIER REFRESH;
    END;

-- trigger task
ALTER TASK TPCH_ANALYTICS_DB.CONTROL.TASK_REFRESH_PIPE RESUME;

-- suspend
-- ALTER TASK TPCH_ANALYTICS_DB.CONTROL.TASK_REFRESH_PIPE SUSPEND;

# Create STREAM (CDC)

In [None]:
-- Stream captures all changes (INSERT, UPDATE, DELETE) on the source table
-- This is the foundation for incremental processing

CREATE OR REPLACE STREAM CONTROL.STREAM_ORDERS 
ON TABLE STAGING.ORDERS
SHOW_INITIAL_ROWS = FALSE;  -- Set to TRUE if you want to process existing data as changes

In [None]:
CREATE OR REPLACE STREAM CONTROL.STREAM_CUSTOMER
ON TABLE STAGING.CUSTOMER
SHOW_INITIAL_ROWS = FALSE;

CREATE OR REPLACE STREAM CONTROL.STREAM_LINEITEM
ON TABLE STAGING.LINEITEM
SHOW_INITIAL_ROWS = FALSE;

In [None]:
SELECT COUNT(*) AS PENDING_CHANGES FROM CONTROL.STREAM_ORDERS;

In [None]:
SELECT 
    METADATA$ACTION,
    -- METADATA$ISUPDATE,
    -- METADATA$ROW_ID,
    O_ORDERKEY,
    -- O_CUSTKEY,
    -- O_ORDERSTATUS,
    -- O_TOTALPRICE,
    -- O_ORDERDATE,
    O_COMMENT,
    from_source,
    created_at
FROM CONTROL.STREAM_ORDERS
where created_at is not null
ORDER BY created_at desc

# Create SP (Stored Procedure)

## SP Silver

In [None]:
-- test
        SELECT 
            O_ORDERKEY,
            O_CUSTKEY,
            O_ORDERSTATUS,
            CASE O_ORDERSTATUS
                WHEN 'F' THEN 'FINISHED'
                WHEN 'O' THEN 'OPEN'
                WHEN 'P' THEN 'PENDING'
                ELSE 'UNKNOWN'
            END AS O_ORDERSTATUS_DESC,
            O_TOTALPRICE,
            O_ORDERDATE,
            YEAR(O_ORDERDATE) AS O_ORDER_YEAR,
            MONTH(O_ORDERDATE) AS O_ORDER_MONTH,
            QUARTER(O_ORDERDATE) AS O_ORDER_QUARTER,
            O_ORDERPRIORITY,
            CASE 
                WHEN O_ORDERPRIORITY LIKE '1-URGENT%' THEN 1
                WHEN O_ORDERPRIORITY LIKE '2-HIGH%' THEN 2
                WHEN O_ORDERPRIORITY LIKE '3-MEDIUM%' THEN 3
                WHEN O_ORDERPRIORITY LIKE '4-NOT SPECIFIED%' THEN 4
                WHEN O_ORDERPRIORITY LIKE '5-LOW%' THEN 5
                ELSE 9
            END AS O_PRIORITY_RANK,
            O_CLERK,
            TRY_CAST(REGEXP_SUBSTR(O_CLERK, '[0-9]+') AS NUMBER) AS O_CLERK_ID,
            O_SHIPPRIORITY,
            O_COMMENT,
            FROM_SOURCE AS SOURCE_FILE,
            CREATED_AT AS FIRST_LOADED_AT,
            METADATA$ACTION,
            METADATA$ISUPDATE,

            ROW_NUMBER() OVER (PARTITION BY O_ORDERKEY ORDER BY CREATED_AT DESC) AS _RANK
        FROM CONTROL.STREAM_ORDERS
        WHERE METADATA$ACTION = 'INSERT' 
        QUALIFY ROW_NUMBER() OVER (PARTITION BY O_ORDERKEY ORDER BY CREATED_AT DESC) = 1

In [None]:
USE ROLE TPCH_DEVELOPER;
-- ============================================================================
-- Create Stored Procedure for CDC MERGE Logic
-- ============================================================================
-- This procedure processes the stream and merges changes into SILVER table

CREATE OR REPLACE PROCEDURE CONTROL.MERGE_ORDERS_CDC()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
    rows_merged NUMBER DEFAULT 0;
BEGIN
    -- Check if stream has data before processing
    LET pending_count NUMBER := (SELECT COUNT(*) FROM CONTROL.STREAM_ORDERS);
    
    IF (pending_count = 0) THEN
        RETURN 'No changes detected in stream. Nothing to process.';
    END IF;
    
    -- MERGE statement: Update existing records or insert new ones
    MERGE INTO ANALYTICS.ORDERS_SILVER AS target
    USING (
        -- Process stream data with transformations
        SELECT 
            O_ORDERKEY,
            O_CUSTKEY,
            O_ORDERSTATUS,
            CASE O_ORDERSTATUS
                WHEN 'F' THEN 'FINISHED'
                WHEN 'O' THEN 'OPEN'
                WHEN 'P' THEN 'PENDING'
                ELSE 'UNKNOWN'
            END AS O_ORDERSTATUS_DESC,
            O_TOTALPRICE,
            O_ORDERDATE,
            YEAR(O_ORDERDATE) AS O_ORDER_YEAR,
            MONTH(O_ORDERDATE) AS O_ORDER_MONTH,
            QUARTER(O_ORDERDATE) AS O_ORDER_QUARTER,
            O_ORDERPRIORITY,
            CASE 
                WHEN O_ORDERPRIORITY LIKE '1-URGENT%' THEN 1
                WHEN O_ORDERPRIORITY LIKE '2-HIGH%' THEN 2
                WHEN O_ORDERPRIORITY LIKE '3-MEDIUM%' THEN 3
                WHEN O_ORDERPRIORITY LIKE '4-NOT SPECIFIED%' THEN 4
                WHEN O_ORDERPRIORITY LIKE '5-LOW%' THEN 5
                ELSE 9
            END AS O_PRIORITY_RANK,
            O_CLERK,
            TRY_CAST(REGEXP_SUBSTR(O_CLERK, '[0-9]+') AS NUMBER) AS O_CLERK_ID,
            O_SHIPPRIORITY,
            O_COMMENT,
            FROM_SOURCE AS SOURCE_FILE,
            CREATED_AT AS FIRST_LOADED_AT,
            METADATA$ACTION,
            METADATA$ISUPDATE
        FROM CONTROL.STREAM_ORDERS
        WHERE METADATA$ACTION IN ('INSERT', 'UPDATE')
        QUALIFY ROW_NUMBER() OVER (PARTITION BY O_ORDERKEY ORDER BY CREATED_AT DESC) = 1
    ) AS source
    ON target.O_ORDERKEY = source.O_ORDERKEY
    
    WHEN MATCHED THEN
        -- Update existing records with latest data
        UPDATE SET
            target.O_CUSTKEY = source.O_CUSTKEY,
            target.O_ORDERSTATUS = source.O_ORDERSTATUS,
            target.O_ORDERSTATUS_DESC = source.O_ORDERSTATUS_DESC,
            target.O_TOTALPRICE = source.O_TOTALPRICE,
            target.O_ORDERDATE = source.O_ORDERDATE,
            target.O_ORDER_YEAR = source.O_ORDER_YEAR,
            target.O_ORDER_MONTH = source.O_ORDER_MONTH,
            target.O_ORDER_QUARTER = source.O_ORDER_QUARTER,
            target.O_ORDERPRIORITY = source.O_ORDERPRIORITY,
            target.O_PRIORITY_RANK = source.O_PRIORITY_RANK,
            target.O_CLERK = source.O_CLERK,
            target.O_CLERK_ID = source.O_CLERK_ID,
            target.O_SHIPPRIORITY = source.O_SHIPPRIORITY,
            target.O_COMMENT = source.O_COMMENT,
            target.SOURCE_FILE = source.SOURCE_FILE,
            target.LAST_UPDATED_AT = CURRENT_TIMESTAMP()
    
    WHEN NOT MATCHED THEN
        -- Insert new records
        INSERT (
            O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_ORDERSTATUS_DESC,
            O_TOTALPRICE, O_ORDERDATE, O_ORDER_YEAR, O_ORDER_MONTH, O_ORDER_QUARTER,
            O_ORDERPRIORITY, O_PRIORITY_RANK, O_CLERK, O_CLERK_ID,
            O_SHIPPRIORITY, O_COMMENT, SOURCE_FILE, FIRST_LOADED_AT, LAST_UPDATED_AT
        )
        VALUES (
            source.O_ORDERKEY, source.O_CUSTKEY, source.O_ORDERSTATUS, source.O_ORDERSTATUS_DESC,
            source.O_TOTALPRICE, source.O_ORDERDATE, source.O_ORDER_YEAR, source.O_ORDER_MONTH, source.O_ORDER_QUARTER,
            source.O_ORDERPRIORITY, source.O_PRIORITY_RANK, source.O_CLERK, source.O_CLERK_ID,
            source.O_SHIPPRIORITY, source.O_COMMENT, source.SOURCE_FILE, source.FIRST_LOADED_AT, CURRENT_TIMESTAMP()
        );
    
    -- Get the number of rows affected
    rows_merged := SQLROWCOUNT;
    
    RETURN 'CDC merge completed. Rows affected: ' || rows_merged || ' (from ' || pending_count || ' stream rows)';
END;
$$;



In [None]:
    -- MERGE statement: Update existing records or insert new ones
    MERGE INTO ANALYTICS.CUSTOMER_SILVER AS target
    USING (
        -- Process stream data with transformations
        SELECT 
            C_CUSTKEY,
            C_NAME,
            C_ADDRESS,
            C_NATIONKEY,
            N_NAME AS C_NATION_NAME,
            N_REGIONKEY AS C_REGIONKEY,
            R_NAME AS C_REGION_NAME,
            C_PHONE,
            C_ACCTBAL,
            C_MKTSEGMENT,
            C_COMMENT,            
            
            FROM_SOURCE,
            CREATED_AT,
            METADATA$ACTION,
            METADATA$ISUPDATE
        FROM CONTROL.STREAM_CUSTOMER
        LEFT JOIN STAGING.NATION ON STREAM_CUSTOMER.C_NATIONKEY = NATION.N_NATIONKEY
        LEFT JOIN STAGING.REGION ON REGION.R_REGIONKEY = NATION.N_REGIONKEY

        WHERE METADATA$ACTION = 'INSERT' 
        QUALIFY ROW_NUMBER() OVER (PARTITION BY C_CUSTKEY ORDER BY CREATED_AT DESC) = 1
    ) AS source
    ON target.C_CUSTKEY = source.C_CUSTKEY
    
    WHEN MATCHED THEN
        -- Update existing records with latest data
        UPDATE SET
            target.C_NAME = source.C_NAME,
            target.C_ADDRESS = source.C_ADDRESS,
            target.C_NATIONKEY = source.C_NATIONKEY,
            target.C_NATION_NAME = source.C_NATION_NAME,
            target.C_REGIONKEY = source.C_REGIONKEY,
            target.C_REGION_NAME = source.C_REGION_NAME,
            target.C_PHONE = source.C_PHONE,
            target.C_ACCTBAL = source.C_ACCTBAL,
            target.C_MKTSEGMENT = source.C_MKTSEGMENT,
            target.C_COMMENT = source.C_COMMENT,
            target.LOAD_TIMESTAMP = CURRENT_TIMESTAMP()
    
    WHEN NOT MATCHED THEN
        -- Insert new records
        INSERT (
            C_CUSTKEY, 
            C_NAME, 
            C_ADDRESS, 
            C_NATIONKEY, 
            C_NATION_NAME, 
            C_REGIONKEY, 
            C_REGION_NAME, 
            C_PHONE, 
            C_ACCTBAL, 
            C_MKTSEGMENT, 
            C_COMMENT, 
            LOAD_TIMESTAMP
        )
        VALUES (
            source.C_CUSTKEY, 
            source.C_NAME, 
            source.C_ADDRESS, 
            source.C_NATIONKEY, 
            source.C_NATION_NAME, 
            source.C_REGIONKEY, 
            source.C_REGION_NAME, 
            source.C_PHONE, 
            source.C_ACCTBAL, 
            source.C_MKTSEGMENT, 
            source.C_COMMENT, 
            CURRENT_TIMESTAMP()
        );
    
    -- Get the number of rows affected

In [None]:
USE ROLE TPCH_DEVELOPER;
-- ============================================================================
-- Create Stored Procedure for CDC MERGE Logic
-- ============================================================================
-- This procedure processes the stream and merges changes into SILVER table

CREATE OR REPLACE PROCEDURE CONTROL.MERGE_CUSTOMER_CDC()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
    rows_merged NUMBER DEFAULT 0;
BEGIN
    -- Check if stream has data before processing
    LET pending_count NUMBER := (SELECT COUNT(*) FROM CONTROL.STREAM_CUSTOMER);
    
    IF (pending_count = 0) THEN
        RETURN 'No changes detected in stream. Nothing to process.';
    END IF;
    
    -- MERGE statement: Update existing records or insert new ones
    MERGE INTO ANALYTICS.CUSTOMER_SILVER AS target
    USING (
        -- Process stream data with transformations
        SELECT 
            C_CUSTKEY,
            C_NAME,
            C_ADDRESS,
            C_NATIONKEY,
            N_NAME AS C_NATION_NAME,
            N_REGIONKEY AS C_REGIONKEY,
            R_NAME AS C_REGION_NAME,
            C_PHONE,
            C_ACCTBAL,
            C_MKTSEGMENT,
            C_COMMENT,            
            
            FROM_SOURCE,
            CREATED_AT,
            METADATA$ACTION,
            METADATA$ISUPDATE
        FROM CONTROL.STREAM_CUSTOMER
        LEFT JOIN STAGING.NATION ON STREAM_CUSTOMER.C_NATIONKEY = NATION.N_NATIONKEY
        LEFT JOIN STAGING.REGION ON REGION.R_REGIONKEY = NATION.N_REGIONKEY

        WHERE METADATA$ACTION IN ('INSERT', 'UPDATE')
        QUALIFY ROW_NUMBER() OVER (PARTITION BY C_CUSTKEY ORDER BY CREATED_AT DESC) = 1
    ) AS source
    ON target.C_CUSTKEY = source.C_CUSTKEY
    
    WHEN MATCHED THEN
        -- Update existing records with latest data
        UPDATE SET
            target.C_NAME = source.C_NAME,
            target.C_ADDRESS = source.C_ADDRESS,
            target.C_NATIONKEY = source.C_NATIONKEY,
            target.C_NATION_NAME = source.C_NATION_NAME,
            target.C_REGIONKEY = source.C_REGIONKEY,
            target.C_REGION_NAME = source.C_REGION_NAME,
            target.C_PHONE = source.C_PHONE,
            target.C_ACCTBAL = source.C_ACCTBAL,
            target.C_MKTSEGMENT = source.C_MKTSEGMENT,
            target.C_COMMENT = source.C_COMMENT,
            target.LOAD_TIMESTAMP = CURRENT_TIMESTAMP()
    
    WHEN NOT MATCHED THEN
        -- Insert new records
        INSERT (
            C_CUSTKEY, 
            C_NAME, 
            C_ADDRESS, 
            C_NATIONKEY, 
            C_NATION_NAME, 
            C_REGIONKEY, 
            C_REGION_NAME, 
            C_PHONE, 
            C_ACCTBAL, 
            C_MKTSEGMENT, 
            C_COMMENT, 
            LOAD_TIMESTAMP
        )
        VALUES (
            source.C_CUSTKEY, 
            source.C_NAME, 
            source.C_ADDRESS, 
            source.C_NATIONKEY, 
            source.C_NATION_NAME, 
            source.C_REGIONKEY, 
            source.C_REGION_NAME, 
            source.C_PHONE, 
            source.C_ACCTBAL, 
            source.C_MKTSEGMENT, 
            source.C_COMMENT, 
            CURRENT_TIMESTAMP()
        );
    
    -- Get the number of rows affected
    rows_merged := SQLROWCOUNT;
    
    RETURN 'CDC merge completed. Rows affected: ' || rows_merged || ' (from ' || pending_count || ' stream rows)';
END;
$$;



In [None]:
    -- MERGE statement: Update existing records or insert new ones
    MERGE INTO ANALYTICS.LINEITEM_SILVER AS target
    USING (
        -- Process stream data with transformations
        SELECT 
            L_ORDERKEY,
            L_LINENUMBER,
            L_PARTKEY,
            P_NAME AS L_PART_NAME,
            P_TYPE AS L_PART_TYPE,
            L_SUPPKEY,
            S_NAME AS L_SUPPLIER_NAME,
            L_QUANTITY,
            L_EXTENDEDPRICE,
            L_DISCOUNT,
            L_TAX,
            L_RETURNFLAG,
            L_LINESTATUS,
            L_SHIPDATE,
            L_COMMITDATE,
            L_RECEIPTDATE,
            L_SHIPINSTRUCT,
            L_SHIPMODE,
            L_COMMENT,
            
            L_EXTENDEDPRICE * (1 - L_DISCOUNT) AS L_NET_PRICE,
            (L_EXTENDEDPRICE * (1 - L_DISCOUNT)) * (1 + L_TAX) AS L_FINAL_PRICE,
            CASE 
                WHEN DATEDIFF(DAY, L_COMMITDATE, L_RECEIPTDATE) <0 THEN 0 
                ELSE DATEDIFF(DAY, L_COMMITDATE, L_RECEIPTDATE) 
            END AS L_SHIP_DELAY_DAYS,
            
            FROM_SOURCE,
            CREATED_AT,
            METADATA$ACTION,
            METADATA$ISUPDATE
        FROM CONTROL.STREAM_LINEITEM
        LEFT JOIN STAGING.PART ON STREAM_LINEITEM.L_PARTKEY = PART.P_PARTKEY
        LEFT JOIN STAGING.SUPPLIER ON STREAM_LINEITEM.L_SUPPKEY = SUPPLIER.S_SUPPKEY

        WHERE METADATA$ACTION IN ('INSERT', 'UPDATE')
        QUALIFY ROW_NUMBER() OVER (PARTITION BY L_ORDERKEY, L_LINENUMBER ORDER BY CREATED_AT DESC) = 1
    ) AS source
    ON target.L_ORDERKEY = source.L_ORDERKEY
    AND target.L_LINENUMBER = source.L_LINENUMBER
    
    WHEN MATCHED THEN
        -- Update existing records with latest data
        UPDATE SET
			target.L_PARTKEY = source.L_PARTKEY,
			target.L_PART_NAME = source.L_PART_NAME,
			target.L_PART_TYPE = source.L_PART_TYPE,
			target.L_SUPPKEY = source.L_SUPPKEY,
			target.L_SUPPLIER_NAME = source.L_SUPPLIER_NAME,
			target.L_QUANTITY = source.L_QUANTITY,
			target.L_EXTENDEDPRICE = source.L_EXTENDEDPRICE,
			target.L_DISCOUNT = source.L_DISCOUNT,
			target.L_TAX = source.L_TAX,
			target.L_RETURNFLAG = source.L_RETURNFLAG,
			target.L_LINESTATUS = source.L_LINESTATUS,
			target.L_SHIPDATE = source.L_SHIPDATE,
			target.L_COMMITDATE = source.L_COMMITDATE,
			target.L_RECEIPTDATE = source.L_RECEIPTDATE,
			target.L_SHIPINSTRUCT = source.L_SHIPINSTRUCT,
			target.L_SHIPMODE = source.L_SHIPMODE,
			target.L_COMMENT = source.L_COMMENT,
			target.L_NET_PRICE = source.L_NET_PRICE,
			target.L_FINAL_PRICE = source.L_FINAL_PRICE,
			target.L_SHIP_DELAY_DAYS = source.L_SHIP_DELAY_DAYS,
			target.LOAD_TIMESTAMP = CURRENT_TIMESTAMP()
    
    WHEN NOT MATCHED THEN
        -- Insert new records
        INSERT (
			L_ORDERKEY,
			L_LINENUMBER,
			L_PARTKEY,
			L_PART_NAME,
			L_PART_TYPE,
			L_SUPPKEY,
			L_SUPPLIER_NAME,
			L_QUANTITY,
			L_EXTENDEDPRICE,
			L_DISCOUNT,
			L_TAX,
			L_RETURNFLAG,
			L_LINESTATUS,
			L_SHIPDATE,
			L_COMMITDATE,
			L_RECEIPTDATE,
			L_SHIPINSTRUCT,
			L_SHIPMODE,
			L_COMMENT,
			L_NET_PRICE,
			L_FINAL_PRICE,
			L_SHIP_DELAY_DAYS,
			LOAD_TIMESTAMP
        )
        VALUES (
			source.L_ORDERKEY,
			source.L_LINENUMBER,
			source.L_PARTKEY,
			source.L_PART_NAME,
			source.L_PART_TYPE,
			source.L_SUPPKEY,
			source.L_SUPPLIER_NAME,
			source.L_QUANTITY,
			source.L_EXTENDEDPRICE,
			source.L_DISCOUNT,
			source.L_TAX,
			source.L_RETURNFLAG,
			source.L_LINESTATUS,
			source.L_SHIPDATE,
			source.L_COMMITDATE,
			source.L_RECEIPTDATE,
			source.L_SHIPINSTRUCT,
			source.L_SHIPMODE,
			source.L_COMMENT,
			source.L_NET_PRICE,
			source.L_FINAL_PRICE,
			source.L_SHIP_DELAY_DAYS,
			CURRENT_TIMESTAMP()
        );
    
    -- Get the number of rows affected

In [None]:
USE ROLE TPCH_DEVELOPER;
-- ============================================================================
-- Create Stored Procedure for CDC MERGE Logic LINEITEM_SILVER
-- ============================================================================
-- This procedure processes the stream and merges changes into SILVER table

CREATE OR REPLACE PROCEDURE CONTROL.MERGE_LINEITEM_CDC()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
    rows_merged NUMBER DEFAULT 0;
BEGIN
    -- Check if stream has data before processing
    LET pending_count NUMBER := (SELECT COUNT(*) FROM CONTROL.STREAM_LINEITEM);
    
    IF (pending_count = 0) THEN
        RETURN 'No changes detected in stream. Nothing to process.';
    END IF;
    
    -- MERGE statement: Update existing records or insert new ones
    MERGE INTO ANALYTICS.LINEITEM_SILVER AS target
    USING (
        -- Process stream data with transformations
        SELECT 
            L_ORDERKEY,
            L_LINENUMBER,
            L_PARTKEY,
            P_NAME AS L_PART_NAME,
            P_TYPE AS L_PART_TYPE,
            L_SUPPKEY,
            S_NAME AS L_SUPPLIER_NAME,
            L_QUANTITY,
            L_EXTENDEDPRICE,
            L_DISCOUNT,
            L_TAX,
            L_RETURNFLAG,
            L_LINESTATUS,
            L_SHIPDATE,
            L_COMMITDATE,
            L_RECEIPTDATE,
            L_SHIPINSTRUCT,
            L_SHIPMODE,
            L_COMMENT,
            
            L_EXTENDEDPRICE * (1 - L_DISCOUNT) AS L_NET_PRICE,
            (L_EXTENDEDPRICE * (1 - L_DISCOUNT)) * (1 + L_TAX) AS L_FINAL_PRICE,
            CASE 
                WHEN DATEDIFF(DAY, L_COMMITDATE, L_RECEIPTDATE) <0 THEN 0 
                ELSE DATEDIFF(DAY, L_COMMITDATE, L_RECEIPTDATE) 
            END AS L_SHIP_DELAY_DAYS,
            
            FROM_SOURCE,
            CREATED_AT,
            METADATA$ACTION,
            METADATA$ISUPDATE
        FROM CONTROL.STREAM_LINEITEM
        LEFT JOIN STAGING.PART ON STREAM_LINEITEM.L_PARTKEY = PART.P_PARTKEY
        LEFT JOIN STAGING.SUPPLIER ON STREAM_LINEITEM.L_SUPPKEY = SUPPLIER.S_SUPPKEY

        WHERE METADATA$ACTION IN ('INSERT', 'UPDATE')
        QUALIFY ROW_NUMBER() OVER (PARTITION BY L_ORDERKEY, L_LINENUMBER ORDER BY CREATED_AT DESC) = 1
    ) AS source
    ON target.L_ORDERKEY = source.L_ORDERKEY
    AND target.L_LINENUMBER = source.L_LINENUMBER
    
    WHEN MATCHED THEN
        -- Update existing records with latest data
        UPDATE SET
			target.L_PARTKEY = source.L_PARTKEY,
			target.L_PART_NAME = source.L_PART_NAME,
			target.L_PART_TYPE = source.L_PART_TYPE,
			target.L_SUPPKEY = source.L_SUPPKEY,
			target.L_SUPPLIER_NAME = source.L_SUPPLIER_NAME,
			target.L_QUANTITY = source.L_QUANTITY,
			target.L_EXTENDEDPRICE = source.L_EXTENDEDPRICE,
			target.L_DISCOUNT = source.L_DISCOUNT,
			target.L_TAX = source.L_TAX,
			target.L_RETURNFLAG = source.L_RETURNFLAG,
			target.L_LINESTATUS = source.L_LINESTATUS,
			target.L_SHIPDATE = source.L_SHIPDATE,
			target.L_COMMITDATE = source.L_COMMITDATE,
			target.L_RECEIPTDATE = source.L_RECEIPTDATE,
			target.L_SHIPINSTRUCT = source.L_SHIPINSTRUCT,
			target.L_SHIPMODE = source.L_SHIPMODE,
			target.L_COMMENT = source.L_COMMENT,
			target.L_NET_PRICE = source.L_NET_PRICE,
			target.L_FINAL_PRICE = source.L_FINAL_PRICE,
			target.L_SHIP_DELAY_DAYS = source.L_SHIP_DELAY_DAYS,
			target.LOAD_TIMESTAMP = CURRENT_TIMESTAMP()
    
    WHEN NOT MATCHED THEN
        -- Insert new records
        INSERT (
			L_ORDERKEY,
			L_LINENUMBER,
			L_PARTKEY,
			L_PART_NAME,
			L_PART_TYPE,
			L_SUPPKEY,
			L_SUPPLIER_NAME,
			L_QUANTITY,
			L_EXTENDEDPRICE,
			L_DISCOUNT,
			L_TAX,
			L_RETURNFLAG,
			L_LINESTATUS,
			L_SHIPDATE,
			L_COMMITDATE,
			L_RECEIPTDATE,
			L_SHIPINSTRUCT,
			L_SHIPMODE,
			L_COMMENT,
			L_NET_PRICE,
			L_FINAL_PRICE,
			L_SHIP_DELAY_DAYS,
			LOAD_TIMESTAMP
        )
        VALUES (
			source.L_ORDERKEY,
			source.L_LINENUMBER,
			source.L_PARTKEY,
			source.L_PART_NAME,
			source.L_PART_TYPE,
			source.L_SUPPKEY,
			source.L_SUPPLIER_NAME,
			source.L_QUANTITY,
			source.L_EXTENDEDPRICE,
			source.L_DISCOUNT,
			source.L_TAX,
			source.L_RETURNFLAG,
			source.L_LINESTATUS,
			source.L_SHIPDATE,
			source.L_COMMITDATE,
			source.L_RECEIPTDATE,
			source.L_SHIPINSTRUCT,
			source.L_SHIPMODE,
			source.L_COMMENT,
			source.L_NET_PRICE,
			source.L_FINAL_PRICE,
			source.L_SHIP_DELAY_DAYS,
			CURRENT_TIMESTAMP()
        );
    
    -- Get the number of rows affected
    rows_merged := SQLROWCOUNT;
    
    RETURN 'CDC merge completed. Rows affected: ' || rows_merged || ' (from ' || pending_count || ' stream rows)';
END;
$$;



In [None]:
ALTER TASK CONTROL.TASK_REFRESH_PIPE suspend

In [None]:
-- ============================================================================
-- STEP 18: Create TASK to Automate CDC Pipeline
-- ============================================================================
-- This task runs every 5 minutes to process new data from the stream

CREATE OR REPLACE TASK TPCH_ANALYTICS_DB.CONTROL.TASK_CDC_MERGE_ORDERS
    WAREHOUSE = COMPUTE_WH
    -- SCHEDULE = 'USING CRON 05 04 * * * Asia/Ho_Chi_Minh' 
    -- Or use CRON: SCHEDULE = '5 MINUTE'
    AFTER TPCH_ANALYTICS_DB.CONTROL.TASK_REFRESH_PIPE
    WHEN SYSTEM$STREAM_HAS_DATA('STREAM_RAW_ORDERS_LANDING')  -- Only run if stream has data
AS
    CALL CONTROL.MERGE_ORDERS_CDC();

-- Resume the task to start it
ALTER TASK CONTROL.TASK_CDC_MERGE_ORDERS RESUME;

In [None]:
CREATE OR REPLACE TASK TPCH_ANALYTICS_DB.CONTROL.TASK_CDC_MERGE_CUSTOMER
    WAREHOUSE = COMPUTE_WH
    AFTER TPCH_ANALYTICS_DB.CONTROL.TASK_REFRESH_PIPE
    WHEN SYSTEM$STREAM_HAS_DATA('STREAM_RAW_ORDERS_LANDING')  -- Only run if stream has data
AS
    CALL CONTROL.MERGE_CUSTOMER_CDC();

-- Resume the task to start it
ALTER TASK CONTROL.TASK_CDC_MERGE_CUSTOMER RESUME;


In [None]:
CREATE OR REPLACE TASK TPCH_ANALYTICS_DB.CONTROL.TASK_CDC_MERGE_LINEITEM
    WAREHOUSE = COMPUTE_WH
    AFTER TPCH_ANALYTICS_DB.CONTROL.TASK_REFRESH_PIPE
    WHEN SYSTEM$STREAM_HAS_DATA('STREAM_RAW_ORDERS_LANDING')  -- Only run if stream has data
AS
    CALL CONTROL.MERGE_LINEITEM_CDC();

-- Resume the task to start it
ALTER TASK CONTROL.TASK_CDC_MERGE_LINEITEM RESUME;

In [None]:
ALTER TASK CONTROL.TASK_CDC_MERGE_LINEITEM suspend;
ALTER TASK CONTROL.TASK_CDC_MERGE_CUSTOMER suspend;
ALTER TASK CONTROL.TASK_CDC_MERGE_ORDERS suspend;

In [None]:
ALTER TASK CONTROL.TASK_refresh_pipe resume;

In [None]:
CALL CONTROL.MERGE_ORDERS_CDC()

In [None]:
CALL CONTROL.MERGE_CUSTOMER_CDC()

In [None]:
CALL CONTROL.MERGE_LINEITEM_CDC()

## SP Gold

In [None]:
USE ROLE TPCH_DEVELOPER;

CREATE OR REPLACE PROCEDURE CONTROL.LOAD_DAILY_SALES_SUMMARY()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
    rows_merged NUMBER;
    max_order_date DATE;
    start_date DATE;

BEGIN
    -- 1. Defind (MAX_ORDER_DATE) on Silver
    SELECT MAX(O_ORDERDATE) INTO max_order_date FROM ANALYTICS.ORDERS_SILVER;
    
    IF (max_order_date IS NULL) THEN
        RETURN 'ORDERS_SILVER is empty. Nothing to process.';
    END IF;

    -- 2. Define start_date from max_date
    start_date := DATEADD(DAY, -7, :max_order_date);

    -- 3. MERGE (UPSERT)
    MERGE INTO REPORTS.DAILY_SALES_SUMMARY AS target
    USING (
        SELECT 
            O_ORDERDATE AS SUMMARY_DATE,
            YEAR(O_ORDERDATE) AS ORDER_YEAR,
            MONTH(O_ORDERDATE) AS ORDER_MONTH,
            QUARTER(O_ORDERDATE) AS ORDER_QUARTER,
            
            COUNT(DISTINCT O_ORDERKEY) AS TOTAL_ORDERS,
            COUNT(DISTINCT O_CUSTKEY) AS TOTAL_CUSTOMERS,
            SUM(O_TOTALPRICE) AS TOTAL_REVENUE,
            AVG(O_TOTALPRICE) AS AVG_ORDER_VALUE,
            MIN(O_TOTALPRICE) AS MIN_ORDER_VALUE,
            MAX(O_TOTALPRICE) AS MAX_ORDER_VALUE
        FROM ANALYTICS.ORDERS_SILVER
        -- Scan within 7 days
        WHERE O_ORDERDATE >= :start_date
        GROUP BY 1, 2, 3, 4
    ) AS source
    ON target.SUMMARY_DATE = source.SUMMARY_DATE

    WHEN MATCHED THEN
        UPDATE SET
            target.TOTAL_ORDERS = source.TOTAL_ORDERS,
            target.TOTAL_CUSTOMERS = source.TOTAL_CUSTOMERS,
            target.TOTAL_REVENUE = source.TOTAL_REVENUE,
            target.AVG_ORDER_VALUE = source.AVG_ORDER_VALUE,
            target.MIN_ORDER_VALUE = source.MIN_ORDER_VALUE,
            target.MAX_ORDER_VALUE = source.MAX_ORDER_VALUE,
            target.LOAD_TIMESTAMP = CURRENT_TIMESTAMP()
            
    WHEN NOT MATCHED THEN
        INSERT (
            SUMMARY_DATE, ORDER_YEAR, ORDER_MONTH, ORDER_QUARTER,
            TOTAL_ORDERS, TOTAL_CUSTOMERS, TOTAL_REVENUE, AVG_ORDER_VALUE,
            MIN_ORDER_VALUE, MAX_ORDER_VALUE, LOAD_TIMESTAMP
        )
        VALUES (
            source.SUMMARY_DATE, source.ORDER_YEAR, source.ORDER_MONTH, source.ORDER_QUARTER,
            source.TOTAL_ORDERS, source.TOTAL_CUSTOMERS, source.TOTAL_REVENUE, source.AVG_ORDER_VALUE,
            source.MIN_ORDER_VALUE, source.MAX_ORDER_VALUE, CURRENT_TIMESTAMP()
        );

    rows_merged := SQLROWCOUNT;
    RETURN 'Daily Sales Summary loaded from ' || :start_date || '. Rows affected: ' || rows_merged;
END;
$$;

In [None]:

CALL CONTROL.LOAD_DAILY_SALES_SUMMARY()

In [None]:
SELECT * FROM REPORTS.DAILY_SALES_SUMMARY LIMIT 10

In [None]:
select max(O_ORDERDATE) from staging.orders

In [None]:
SELECT DATEADD(DAY, -7, DATE('1998-08-02'))::TIMESTAMP

In [None]:
USE ROLE TPCH_DEVELOPER;

WITH TEMP_LTV_CUSTOMERS AS (
SELECT DISTINCT O_CUSTKEY
FROM ANALYTICS.ORDERS_SILVER
WHERE O_ORDERDATE >= DATEADD(DAY, -7, DATE('1998-08-02'))

)
,FINAL_TAB AS (
        SELECT
            T1.C_CUSTKEY,
            T1.C_NAME,
            T1.C_NATION_NAME,
            T1.C_REGION_NAME,
            T1.C_MKTSEGMENT,
            
            COUNT(DISTINCT T0.O_ORDERKEY) AS TOTAL_ORDERS,
            SUM(T0.O_TOTALPRICE) AS TOTAL_SPENT,
            AVG(T0.O_TOTALPRICE) AS AVG_ORDER_VALUE,
            MIN(T0.O_ORDERDATE) AS FIRST_ORDER_DATE,
            MAX(T0.O_ORDERDATE) AS LAST_ORDER_DATE,
            DATEDIFF(DAY, FIRST_ORDER_DATE, DATE('1998-09-02')) AS CUSTOMER_TENURE_DAYS,
            
            -- Logic CUSTOMER_TIER:
            -- CASE
            --     WHEN TOTAL_SPENT >= 500000 THEN 'VIP'
            --     WHEN TOTAL_SPENT >= 100000 THEN 'GOLD'
            --     WHEN TOTAL_SPENT >= 10000 THEN 'SILVER'
            --     ELSE 'STANDARD'
            -- END AS CUSTOMER_TIER,

            UDFS.CLASSIFY_CUSTOMER_REVENUE(
                TOTAL_SPENT
            ) AS CUSTOMER_TIER,     
            
            -- Logic IS_ACTIVE: Đặt hàng trong 90 ngày gần nhất
            CASE
                WHEN LAST_ORDER_DATE >= DATEADD(DAY, -90, DATE('1998-09-02')) THEN TRUE
                ELSE FALSE
            END AS IS_ACTIVE
            
        FROM ANALYTICS.ORDERS_SILVER T0
        JOIN TEMP_LTV_CUSTOMERS T5 ON T0.O_CUSTKEY = T5.O_CUSTKEY -- CHỈ xử lý khách hàng có hoạt động trong 7 ngày qua
        LEFT JOIN ANALYTICS.CUSTOMER_SILVER T1 ON T1.C_CUSTKEY = T0.O_CUSTKEY

        -- FROM ANALYTICS.CUSTOMER_SILVER T1
        -- JOIN CONTROL.TEMP_LTV_CUSTOMERS T5 ON T1.C_CUSTKEY = T5.O_CUSTKEY
        -- LEFT JOIN ANALYTICS.ORDERS_SILVER T0 ON T1.C_CUSTKEY = T0.O_CUSTKEY

        GROUP BY 1, 2, 3, 4, 5

)

-- SELECT C_CUSTKEY, COUNT(*)
-- FROM FINAL_TAB GROUP BY 1 HAVING COUNT(*)>1
select * from FINAL_TAB limit 10

In [None]:
SELECT DISTINCT O_CUSTKEY
FROM ANALYTICS.ORDERS_SILVER
WHERE O_ORDERDATE >= DATE('1998-07-02')

UNION 
SELECT DISTINCT C_CUSTKEY AS O_CUSTKEY
FROM ANALYTICS.CUSTOMER_SILVER
WHERE LOAD_TIMESTAMP >= DATE('1998-07-02')::TIMESTAMP

In [None]:
use role tpch_developer;

with raw as (
    select o_custkey, sum(O_TOTALPRICE) as total_spend
    from analytics.orders_silver group by 1
    

)

SELECT
    PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY total_spend) AS p1,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_spend) AS p2,
    PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY total_spend) AS p3
FROM
    raw;


In [None]:
use role tpch_developer;

CREATE OR REPLACE PROCEDURE CONTROL.LOAD_CUSTOMER_LTV()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
    rows_merged NUMBER;
    rows_deactivated NUMBER;
    max_order_date DATE;
    start_date DATE;
	now_date DATE;
    customer_list_sql VARCHAR;
    customer_count NUMBER;
    
BEGIN
    -- 1. Define (MAX_ORDER_DATE) in Silver
    SELECT MAX(O_ORDERDATE) INTO max_order_date FROM ANALYTICS.ORDERS_SILVER;
    
    IF (max_order_date IS NULL) THEN
        RETURN 'ORDERS_SILVER is empty. Nothing to process.';
    END IF;
    
    start_date := DATEADD(DAY, -7, :max_order_date);
	
	now_date := DATE('1998-09-02');
	-- change to CURRENT_DATE() on produciton
	-- now_date := CURRENT_DATE();

    
    -- =================================================================
    -- Step 1: Incremental (UPSERT) for Cus just active
    -- =================================================================

    -- Temp table for C_CUSTKEY having orders or updating info  within 7 days
    customer_list_sql := '
        CREATE OR REPLACE TEMPORARY TABLE TEMP_LTV_CUSTOMERS AS
        
		SELECT DISTINCT O_CUSTKEY
        FROM ANALYTICS.ORDERS_SILVER
        WHERE O_ORDERDATE >= ''' || :start_date || '''
		
		UNION 
		SELECT DISTINCT C_CUSTKEY AS O_CUSTKEY
        FROM ANALYTICS.CUSTOMER_SILVER
		WHERE LOAD_TIMESTAMP >= ''' || :start_date || '''::TIMESTAMP
		
		
		;
    ';
    EXECUTE IMMEDIATE customer_list_sql;
    

    IF ((SELECT COUNT(*) FROM TEMP_LTV_CUSTOMERS) > 0) THEN

        MERGE INTO REPORTS.CUSTOMER_LTV AS target
        USING (
            SELECT
                T1.C_CUSTKEY,
                T1.C_NAME,
                T1.C_NATION_NAME,
                T1.C_REGION_NAME,
                T1.C_MKTSEGMENT,
                
                COUNT(DISTINCT T0.O_ORDERKEY) AS TOTAL_ORDERS,
                SUM(T0.O_TOTALPRICE) AS TOTAL_SPENT,
                AVG(T0.O_TOTALPRICE) AS AVG_ORDER_VALUE,
                MIN(T0.O_ORDERDATE) AS FIRST_ORDER_DATE,
                MAX(T0.O_ORDERDATE) AS LAST_ORDER_DATE,
                DATEDIFF(DAY, FIRST_ORDER_DATE, :now_date) AS CUSTOMER_TENURE_DAYS,
                
                -- Logic CUSTOMER_TIER:
                UDFS.CLASSIFY_CUSTOMER_REVENUE(
                    TOTAL_SPENT
                ) AS CUSTOMER_TIER, 
                
                -- Logic IS_ACTIVE: make orders within 90 days
                CASE
                    WHEN LAST_ORDER_DATE >= DATEADD(DAY, -90, :now_date) THEN TRUE
                    ELSE FALSE
                END AS IS_ACTIVE
                
            FROM ANALYTICS.CUSTOMER_SILVER T1 -- root table
            INNER JOIN TEMP_LTV_CUSTOMERS T5 ON T1.C_CUSTKEY = T5.O_CUSTKEY -- only filter cases needed to update
            LEFT JOIN ANALYTICS.ORDERS_SILVER T0 ON T1.C_CUSTKEY = T0.O_CUSTKEY 
    
            GROUP BY 1, 2, 3, 4, 5
        ) AS source
        ON target.C_CUSTKEY = source.C_CUSTKEY
        
        
        WHEN MATCHED THEN
            UPDATE SET
                target.C_NAME = source.C_NAME,
                target.C_NATION_NAME = source.C_NATION_NAME,
                target.C_REGION_NAME = source.C_REGION_NAME,
                target.C_MKTSEGMENT = source.C_MKTSEGMENT,
                target.TOTAL_ORDERS = source.TOTAL_ORDERS,
                target.TOTAL_SPENT = source.TOTAL_SPENT,
                target.AVG_ORDER_VALUE = source.AVG_ORDER_VALUE,
                target.FIRST_ORDER_DATE = source.FIRST_ORDER_DATE,
                target.LAST_ORDER_DATE = source.LAST_ORDER_DATE,
                target.CUSTOMER_TENURE_DAYS = source.CUSTOMER_TENURE_DAYS,
                target.CUSTOMER_TIER = source.CUSTOMER_TIER,
                target.IS_ACTIVE = source.IS_ACTIVE,
                target.LOAD_TIMESTAMP = CURRENT_TIMESTAMP()
                
        -- LTV for new customer
        WHEN NOT MATCHED THEN
            INSERT (
                C_CUSTKEY, C_NAME, C_NATION_NAME, C_REGION_NAME, C_MKTSEGMENT,
                TOTAL_ORDERS, TOTAL_SPENT, AVG_ORDER_VALUE, FIRST_ORDER_DATE,
                LAST_ORDER_DATE, CUSTOMER_TENURE_DAYS, CUSTOMER_TIER, IS_ACTIVE, LOAD_TIMESTAMP
            )
            VALUES (
                source.C_CUSTKEY, source.C_NAME, source.C_NATION_NAME, source.C_REGION_NAME, source.C_MKTSEGMENT,
                source.TOTAL_ORDERS, source.TOTAL_SPENT, source.AVG_ORDER_VALUE, source.FIRST_ORDER_DATE,
                source.LAST_ORDER_DATE, source.CUSTOMER_TENURE_DAYS, source.CUSTOMER_TIER, source.IS_ACTIVE, CURRENT_TIMESTAMP()
            );
    
        rows_merged := SQLROWCOUNT;
    
    END IF;


    -- =================================================================
    -- Step 2: Update deactivation
    -- =================================================================
    
    
    UPDATE REPORTS.CUSTOMER_LTV
    SET 
        IS_ACTIVE = FALSE,
        LOAD_TIMESTAMP = CURRENT_TIMESTAMP()
    WHERE 
        LAST_ORDER_DATE < DATEADD(DAY, -90, :now_date)
        AND IS_ACTIVE = TRUE;
        
    rows_deactivated := SQLROWCOUNT;    

    RETURN 'Customer LTV update completed. ' 
        || 'Incremental Rows affected: ' || rows_merged 
        || '. Deactivated Rows: ' || rows_deactivated
        || ' (Max Order Date: ' || :max_order_date || ')';
END;
$$;

In [None]:
use role tpch_developer;
CALL CONTROL.LOAD_CUSTOMER_LTV()


In [None]:
select * from REPORTS.CUSTOMER_LTV WHERE TOTAL_ORDERS>0 limit 100;

In [None]:
use role tpch_developer;
select distinct is_active from REPORTS.CUSTOMER_LTV 
-- WHERE TOTAL_ORDERS>0 limit 100;

In [None]:
select customer_tier, count(*)  from REPORTS.CUSTOMER_LTV group by 1


In [None]:
CREATE OR REPLACE TASK TPCH_ANALYTICS_DB.CONTROL.TASK_LOAD_CUSTOMER_LTV
    WAREHOUSE = COMPUTE_WH
    AFTER TPCH_ANALYTICS_DB.CONTROL.TASK_CDC_MERGE_CUSTOMER, TPCH_ANALYTICS_DB.CONTROL.TASK_CDC_MERGE_ORDERS
AS
    CALL CONTROL.LOAD_CUSTOMER_LTV();


CREATE OR REPLACE TASK TPCH_ANALYTICS_DB.CONTROL.TASK_LOAD_DAILY_SALES_SUMMARY
    WAREHOUSE = COMPUTE_WH
    AFTER TPCH_ANALYTICS_DB.CONTROL.TASK_CDC_MERGE_ORDERS
AS
    CALL CONTROL.LOAD_DAILY_SALES_SUMMARY();

-- Resume the task to start it
-- ALTER TASK CONTROL.TASK_LOAD_CUSTOMER_LTV RESUME;
-- ALTER TASK CONTROL.TASK_LOAD_DAILY_SALES_SUMMARY RESUME;

# 1st insert

## insert silver

In [None]:
use role tpch_developer;
use database tpch_analytics_db;

truncate table analytics.orders_silver;
insert into analytics.orders_silver

        SELECT 
            O_ORDERKEY,
            O_CUSTKEY,
            O_ORDERSTATUS,
            CASE O_ORDERSTATUS
                WHEN 'F' THEN 'FINISHED'
                WHEN 'O' THEN 'OPEN'
                WHEN 'P' THEN 'PENDING'
                ELSE 'UNKNOWN'
            END AS O_ORDERSTATUS_DESC,
            O_TOTALPRICE,
            O_ORDERDATE,
            YEAR(O_ORDERDATE) AS O_ORDER_YEAR,
            MONTH(O_ORDERDATE) AS O_ORDER_MONTH,
            QUARTER(O_ORDERDATE) AS O_ORDER_QUARTER,
            O_ORDERPRIORITY,
            CASE 
                WHEN O_ORDERPRIORITY LIKE '1-URGENT%' THEN 1
                WHEN O_ORDERPRIORITY LIKE '2-HIGH%' THEN 2
                WHEN O_ORDERPRIORITY LIKE '3-MEDIUM%' THEN 3
                WHEN O_ORDERPRIORITY LIKE '4-NOT SPECIFIED%' THEN 4
                WHEN O_ORDERPRIORITY LIKE '5-LOW%' THEN 5
                ELSE 9
            END AS O_PRIORITY_RANK,
            O_CLERK,
            TRY_CAST(REGEXP_SUBSTR(O_CLERK, '[0-9]+') AS NUMBER) AS O_CLERK_ID,
            O_SHIPPRIORITY,
            O_COMMENT,
            FROM_SOURCE AS SOURCE_FILE,
            CREATED_AT AS FIRST_LOADED_AT,
            CURRENT_TIMESTAMP() AS LAST_UPDATED_AT
            -- METADATA$ACTION,
            -- METADATA$ISUPDATE,

            -- ROW_NUMBER() OVER (PARTITION BY O_ORDERKEY ORDER BY CREATED_AT DESC) AS _RANK
        FROM staging.ORDERS
        -- WHERE METADATA$ACTION = 'INSERT' 
        QUALIFY ROW_NUMBER() OVER (PARTITION BY O_ORDERKEY ORDER BY CREATED_AT DESC) = 1



In [None]:
use role tpch_developer;
use database tpch_analytics_db;

truncate table analytics.CUSTOMER_SILVER;
insert into analytics.CUSTOMER_SILVER
        -- Process stream data with transformations
        SELECT 
            C_CUSTKEY,
            C_NAME,
            C_ADDRESS,
            C_NATIONKEY,
            N_NAME AS C_NATION_NAME,
            N_REGIONKEY AS C_REGIONKEY,
            R_NAME AS C_REGION_NAME,
            C_PHONE,
            C_ACCTBAL,
            C_MKTSEGMENT,
            C_COMMENT,            
            CURRENT_TIMESTAMP() AS LOAD_TIMESTAMP

        FROM STAGING.CUSTOMER STREAM_CUSTOMER
        LEFT JOIN STAGING.NATION ON STREAM_CUSTOMER.C_NATIONKEY = NATION.N_NATIONKEY
        LEFT JOIN STAGING.REGION ON REGION.R_REGIONKEY = NATION.N_REGIONKEY

        -- WHERE METADATA$ACTION = 'INSERT' 
        QUALIFY ROW_NUMBER() OVER (PARTITION BY C_CUSTKEY ORDER BY STREAM_CUSTOMER.CREATED_AT DESC) = 1

In [None]:
use role tpch_developer;
use database tpch_analytics_db;

truncate table analytics.LINEITEM_SILVER;
insert into analytics.LINEITEM_SILVER

        SELECT 
            L_ORDERKEY,
            L_LINENUMBER,
            L_PARTKEY,
            P_NAME AS L_PART_NAME,
            P_TYPE AS L_PART_TYPE,
            L_SUPPKEY,
            S_NAME AS L_SUPPLIER_NAME,
            L_QUANTITY,
            L_EXTENDEDPRICE,
            L_DISCOUNT,
            L_TAX,
            L_RETURNFLAG,
            L_LINESTATUS,
            L_SHIPDATE,
            L_COMMITDATE,
            L_RECEIPTDATE,
            L_SHIPINSTRUCT,
            L_SHIPMODE,
            L_COMMENT,
            
            L_EXTENDEDPRICE * (1 - L_DISCOUNT) AS L_NET_PRICE,
            (L_EXTENDEDPRICE * (1 - L_DISCOUNT)) * (1 + L_TAX) AS L_FINAL_PRICE,
            CASE 
                WHEN DATEDIFF(DAY, L_COMMITDATE, L_RECEIPTDATE) <0 THEN 0 
                ELSE DATEDIFF(DAY, L_COMMITDATE, L_RECEIPTDATE) 
            END AS L_SHIP_DELAY_DAYS,

            CURRENT_TIMESTAMP() AS LOAD_TIMESTAMP
        FROM STAGING.LINEITEM STREAM_LINEITEM
        LEFT JOIN STAGING.PART ON STREAM_LINEITEM.L_PARTKEY = PART.P_PARTKEY
        LEFT JOIN STAGING.SUPPLIER ON STREAM_LINEITEM.L_SUPPKEY = SUPPLIER.S_SUPPKEY

        -- WHERE METADATA$ACTION IN ('INSERT', 'UPDATE')
        QUALIFY ROW_NUMBER() OVER (PARTITION BY L_ORDERKEY, L_LINENUMBER ORDER BY STREAM_LINEITEM.CREATED_AT DESC) = 1


## insert gold

# End