In [None]:


CREATE OR REPLACE STORAGE INTEGRATION s3_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::676206927597:role/Snowflake-role'
STORAGE_ALLOWED_LOCATIONS = ('s3://s3-airflow-bucket-1/tom_db/tomtom_traffic_data.csv');

DESC STORAGE INTEGRATION S3_INTEGRATION;

ALTER STORAGE INTEGRATION S3_INTEGRATION
SET STORAGE_ALLOWED_LOCATIONS = ('s3://s3-airflow-bucket-1/tom_db/');

DROP STAGE IF EXISTS traffic_stage;

CREATE OR REPLACE STAGE traffic_stage
STORAGE_INTEGRATION = S3_INTEGRATION
URL = 's3://s3-airflow-bucket-1/tom_db/'
FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1);

COPY INTO traffic_data_stream
FROM @traffic_stage
PATTERN = '.*tomtom_traffic_data.csv'
FILE_FORMAT = (
    TYPE = CSV 
    SKIP_HEADER = 1 
    FIELD_OPTIONALLY_ENCLOSED_BY = '"' 
    FIELD_DELIMITER = ',' 
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
);

CREATE OR REPLACE STREAM raw_tom_stream
ON TABLE RAW_TOM_STAGING;  CREATE OR REPLACE VIEW VW_TOM_PROCESSED AS
SELECT 
    rank,
    city,
    country,
    avg_time_per_6_miles,
    change_in_congestion,
    congestion_level,
    yearly_delay_hours,
    CURRENT_TIMESTAMP AS processed_timestamp
FROM RAW_TOM_STAGING
WHERE congestion_level > 0;  -- Example filter: Only valid records 
CREATE OR REPLACE TABLE TOM_PROCESSED_TABLE AS
SELECT * FROM VW_TOM_PROCESSED;

CREATE OR REPLACE STREAM STREAM_TOM_PROCESSED 
ON TABLE TOM_PROCESSED_TABLE;



In [None]:
CREATE OR REPLACE PROCEDURE process_raw_tom_stream()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    -- Process new records in the stream (INSERTS)
    MERGE INTO RAW_TOM_STAGING AS target
    USING (
        SELECT * FROM raw_tom_stream
        WHERE metadata$action = 'INSERT'
    ) AS source
    ON target.rank = source.rank AND target.city = source.city
    WHEN MATCHED THEN 
        UPDATE SET 
            target.country = source.country,
            target.avg_time_per_6_miles = source.avg_time_per_6_miles,
            target.change_in_congestion = source.change_in_congestion,
            target.congestion_level = source.congestion_level,
            target.yearly_delay_hours = source.yearly_delay_hours
    WHEN NOT MATCHED THEN 
        INSERT (rank, city, country, avg_time_per_6_miles, change_in_congestion, congestion_level, yearly_delay_hours)
        VALUES (source.rank, source.city, source.country, source.avg_time_per_6_miles, source.change_in_congestion, source.congestion_level, source.yearly_delay_hours);

    RETURN 'Stream processing completed successfully';
END;
$$;

CREATE OR REPLACE TASK process_raw_tom_stream_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 0 * * * UTC'  -- Runs at midnight UTC every day
AS
CALL process_raw_tom_stream();

ALTER TASK process_raw_tom_stream_task RESUME;

CREATE OR REPLACE PROCEDURE process_tom_processed_stream()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    -- Insert new data from stream into the processed table
    MERGE INTO TOM_PROCESSED_TABLE AS target
    USING (
        SELECT * FROM STREAM_TOM_PROCESSED
        WHERE metadata$action = 'INSERT'
    ) AS source
    ON target.rank = source.rank AND target.city = source.city
    WHEN MATCHED THEN 
        UPDATE SET 
            target.country = source.country,
            target.avg_time_per_6_miles = source.avg_time_per_6_miles,
            target.change_in_congestion = source.change_in_congestion,
            target.congestion_level = source.congestion_level,
            target.yearly_delay_hours = source.yearly_delay_hours
    WHEN NOT MATCHED THEN 
        INSERT (rank, city, country, avg_time_per_6_miles, change_in_congestion, congestion_level, yearly_delay_hours, processed_timestamp)
        VALUES (source.rank, source.city, source.country, source.avg_time_per_6_miles, source.change_in_congestion, source.congestion_level, source.yearly_delay_hours, CURRENT_TIMESTAMP);

    RETURN 'Stream processing completed successfully';
END;
$$;

CREATE OR REPLACE TASK process_tom_processed_stream_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 0 * * * UTC'  -- Runs daily at midnight UTC
AS
CALL process_tom_processed_stream();

ALTER TASK process_tom_processed_stream_task RESUME;


CREATE OR REPLACE PROCEDURE process_harmonized_tom_stream()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    -- Merge new data from the stream into the harmonized table
    MERGE INTO HARMONIZED_TOM AS target
    USING (
        SELECT * FROM STREAM_TOM_PROCESSED
        WHERE metadata$action = 'INSERT'  -- Only process new records
    ) AS source
    ON target.rank = source.rank 
       AND target.city = source.city
       AND target.country = source.country
    WHEN MATCHED THEN 
        UPDATE SET 
            target.avg_time_per_6_miles = source.avg_time_per_6_miles,
            target.change_in_congestion = source.change_in_congestion,
            target.congestion_level = source.congestion_level,
            target.yearly_delay_hours = source.yearly_delay_hours,
            target.processed_timestamp = CURRENT_TIMESTAMP
    WHEN NOT MATCHED THEN 
        INSERT (rank, city, country, avg_time_per_6_miles, change_in_congestion, congestion_level, yearly_delay_hours, processed_timestamp)
        VALUES (source.rank, source.city, source.country, source.avg_time_per_6_miles, source.change_in_congestion, source.congestion_level, source.yearly_delay_hours, CURRENT_TIMESTAMP);

    RETURN 'Stream data merged into HARMONIZED_TOM successfully';
END;
$$;

CREATE OR REPLACE TASK process_harmonized_tom_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 0 * * * UTC'  -- Runs daily at midnight UTC
AS
CALL process_harmonized_tom_stream();

ALTER TASK process_harmonized_tom_task RESUME;






