# Cryptocurrency Data Pipeline - Task Orchestration & Automation

This notebook implements the task orchestration layer for the cryptocurrency data pipeline, automating the flow from data ingestion through harmonization to analytics.

## Setup Environment

In [None]:
from snowflake.snowpark import Session
session = Session.builder.getOrCreate()

In [None]:
-- %%sql
USE ROLE CRYPTO_ROLE;
USE WAREHOUSE CRYPTO_WH;
USE SCHEMA CRYPTO_DB.HARMONIZED_CRYPTO;

## Create Tasks for Pipeline Automation

### 1. Data Ingestion Task - Runs every 4 hours to fetch new data

In [None]:
CREATE OR REPLACE PROCEDURE CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_DATA_SP()
RETURNS VARIANT
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS '
try {
  // Crypto files to load
  const CRYPTO_FILES = [
    {file: ''BTC_raw_daily.csv'', table: ''BTC''},
    {file: ''ETH_raw_daily.csv'', table: ''ETH''},
    {file: ''DOGE_raw_daily.csv'', table: ''DOGE''}
  ];
  
  var results = {
    status: ''success'',
    details: {},
    error: null
  };
  
  // Process each cryptocurrency file
  for (const crypto of CRYPTO_FILES) {
    // Load data from S3 to Snowflake
    const tableName = crypto.table;
    const fileName = crypto.file;
    
    // Create temp table with the correct structure (6 columns)
    var createTempTableStmt = snowflake.createStatement({
      sqlText: `
        CREATE OR REPLACE TEMPORARY TABLE crypto_temp_${tableName} (
          "date" TIMESTAMP_NTZ(9),
          "open" NUMBER(17,11),
          "high" NUMBER(17,11),
          "low" NUMBER(17,11),
          "close" NUMBER(17,11),
          "volume" NUMBER(38,0)
        )
      `
    });
    createTempTableStmt.execute();
    
    // Copy data from S3 to temp table
    var copyStmt = snowflake.createStatement({
      sqlText: `
        COPY INTO crypto_temp_${tableName}
        FROM @CRYPTO_DB.INTEGRATIONS.CRYPTO_RAW_STAGE/${fileName}
        FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1)
      `
    });
    var copyResult = copyStmt.execute();
    
    // Merge data into target table
    var mergeStmt = snowflake.createStatement({
      sqlText: `
        MERGE INTO CRYPTO_DB.RAW_CRYPTO.${tableName} target
        USING crypto_temp_${tableName} source
        ON target."date" = source."date"
        WHEN MATCHED THEN
          UPDATE SET 
            target."open" = source."open",
            target."high" = source."high",
            target."low" = source."low",
            target."close" = source."close",
            target."volume" = source."volume"
        WHEN NOT MATCHED THEN
          INSERT ("date", "open", "high", "low", "close", "volume")
          VALUES (source."date", source."open", source."high", source."low", source."close", source."volume")
      `
    });
    var mergeResult = mergeStmt.execute();
    
    // Get counts for reporting
    var countStmt = snowflake.createStatement({
      sqlText: `SELECT COUNT(*) FROM crypto_temp_${tableName}`
    });
    var countResult = countStmt.execute();
    countResult.next();
    var rowCount = countResult.getColumnValue(1);
    
    // Record results
    results.details[tableName] = {
      file: fileName,
      rows_processed: rowCount,
      status: ''success''
    };
  }
  
  return results;
  
} catch (error) {
  return {
    status: ''error'',
    message: error.message,
    stack: error.stack
  };
}
';

In [None]:
-- %%sql
USE ROLE CRYPTO_ROLE;
CREATE OR REPLACE TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK
    WAREHOUSE = CRYPTO_WH
    SCHEDULE = 'USING CRON 0 */4 * * * UTC'  -- Run every 4 hours
AS
CALL CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_DATA_SP();

In [None]:
-- Create stream on BTC table to capture changes
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_BTC
  ON TABLE CRYPTO_DB.RAW_CRYPTO.BTC;

-- Create stream on ETH table to capture changes
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_ETH
  ON TABLE CRYPTO_DB.RAW_CRYPTO.ETH;

-- Create stream on DOGE table to capture changes
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_DOGE
  ON TABLE CRYPTO_DB.RAW_CRYPTO.DOGE;

In [None]:
-- Grant privileges on the streams to CRYPTO_ROLE
GRANT SELECT ON STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_BTC TO ROLE CRYPTO_ROLE;
GRANT SELECT ON STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_ETH TO ROLE CRYPTO_ROLE;
GRANT SELECT ON STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_DOGE TO ROLE CRYPTO_ROLE;

In [None]:
EXECUTE TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK;

In [None]:
SHOW TASKS LIKE 'LOAD_CRYPTO_TASK' IN SCHEMA CRYPTO_DB.HARMONIZED_CRYPTO;

In [None]:
EXECUTE TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK;

In [None]:
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START=>DATEADD('MINUTE',-10,CURRENT_TIMESTAMP()),
    RESULT_LIMIT => 10))
WHERE NAME = 'LOAD_CRYPTO_TASK'
ORDER BY SCHEDULED_TIME DESC;

### 2. Create Task for Data Harmonization - Triggered when new data arrives

In [None]:
-- %%sql
CREATE OR REPLACE TASK CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_TASK
    WAREHOUSE = CRYPTO_WH
    AFTER CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK
    WHEN SYSTEM$STREAM_HAS_DATA('CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_BTC')
    OR SYSTEM$STREAM_HAS_DATA('CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_ETH')
    OR SYSTEM$STREAM_HAS_DATA('CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_DOGE')
AS
CALL CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_DATA_SP();

### 3. Create Task to Update Analytics Tables - Runs after harmonization completes

In [None]:
USE ROLE CRYPTO_ROLE;
CREATE OR REPLACE TASK CRYPTO_DB.HARMONIZED_CRYPTO.UPDATE_CRYPTO_METRICS_TASK
    WAREHOUSE = CRYPTO_WH
    AFTER CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_TASK
    WHEN SYSTEM$STREAM_HAS_DATA('CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED_STREAM')
AS
CALL CRYPTO_DB.ANALYTICS_CRYPTO.UPDATE_CRYPTO_ANALYTICS();

## Set Up Change Tracking with Streams

Streams track changes in tables to trigger downstream processes only when new data exists.

In [None]:
-- Create a stream on the harmonized data for change tracking
USE ROLE CRYPTO_ROLE;
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED_STREAM
ON TABLE CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED;

In [None]:
SHOW TABLES IN SCHEMA CRYPTO_DB.HARMONIZED_CRYPTO;

In [None]:
-- Create stream for BTC (already done)
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_BTC
ON TABLE CRYPTO_DB.RAW_CRYPTO.BTC;

-- Create stream for ETH
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_ETH
ON TABLE CRYPTO_DB.RAW_CRYPTO.ETH;

-- Create stream for DOGE
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_DOGE
ON TABLE CRYPTO_DB.RAW_CRYPTO.DOGE;

## Activate the Automation Pipeline

Resume all tasks to start the automation workflow. Tasks are resumed in reverse order of their dependency chain.

In [None]:
-- %%sql
-- %%sql
ALTER TASK CRYPTO_DB.HARMONIZED_CRYPTO.UPDATE_CRYPTO_METRICS_TASK RESUME;
ALTER TASK CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_TASK RESUME;
ALTER TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK RESUME;

### View Task Dependency Graph

In [None]:

SELECT *
FROM TABLE(INFORMATION_SCHEMA.CURRENT_TASK_GRAPHS())
ORDER BY SCHEDULED_TIME;

## Set Up Alert Notifications

Create email alerts that will notify administrators when tasks fail

In [None]:

-- CREATE OR REPLACE NOTIFICATION INTEGRATION crypto_email_integration
--   TYPE = EMAIL
--   ENABLED = TRUE;

-- CREATE OR REPLACE ALERT CRYPTO_DB.ANALYTICS_CRYPTO.TASK_FAILURE_ALERT
--   WAREHOUSE = CRYPTO_WH
--   SCHEDULE = 'USING CRON 0 */1 * * * UTC'  -- Check every hour
--   IF (EXISTS (
--     SELECT 1 
--     FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
--       SCHEDULED_TIME_RANGE_START=>DATEADD('HOUR',-1,CURRENT_TIMESTAMP())))
--     WHERE STATE = 'FAILED'
--   ))
--   THEN CALL SYSTEM$SEND_EMAIL(
--     'crypto_email_integration',
--     'admin@example.com',
--     'Crypto Pipeline Task Failure Alert',
--     'A task in the Crypto data pipeline has failed in the last hour. Please check the task history.'
--   );

-- -- Resume the alert to activate it
-- ALTER ALERT CRYPTO_DB.ANALYTICS_CRYPTO.TASK_FAILURE_ALERT RESUME;

## Pipeline Visualization

The cryptocurrency data pipeline has the following task dependencies:

```
LOAD_CRYPTO_TASK (every 4 hours)
       |
       V
HARMONIZE_CRYPTO_TASK (when RAW_CRYPTO_STREAM has data)
       |
       V
UPDATE_CRYPTO_METRICS_TASK (when CRYPTO_HARMONIZED_STREAM has data)
```

This creates a fully automated workflow that processes data in stages:
1. Ingest raw cryptocurrency data
2. Transform and harmonize the data
3. Calculate analytics and metrics

Each step only runs when there is actual new data to process, optimizing resource usage.