# Cryptocurrency Data Pipeline - Task Orchestration & Automation

This notebook implements the task orchestration layer for the cryptocurrency data pipeline, automating the flow from data ingestion through harmonization to analytics.

## Setup Environment

In [None]:
from snowflake.snowpark import Session
session = Session.builder.getOrCreate()

In [None]:
-- %%sql
USE ROLE CRYPTO_ROLE;
USE WAREHOUSE CRYPTO_WH;
USE SCHEMA CRYPTO_DB.HARMONIZED_CRYPTO;

## Create Tasks for Pipeline Automation

### 1. Data Ingestion Task - Runs every 4 hours to fetch new data

In [None]:
CREATE OR REPLACE PROCEDURE CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_DATA_SP()
RETURNS VARIANT
LANGUAGE JAVASCRIPT
AS
$$
try {
  // RapidAPI configuration
  // Use Snowflake's secrets properly
  var stmt = snowflake.createStatement({
    sqlText: "SELECT SECRET$GET_SECRET('CRYPTO_API_SECRET', 'PASSWORD')"
  });
  var result = stmt.execute();
  result.next();
  var RAPIDAPI_KEY = result.getColumnValue(1);
  
  const RAPIDAPI_HOST = "apidojo-yahoo-finance-v1.p.rapidapi.com";

  // S3 configuration 
  const S3_BUCKET = 'damg7245-crypto';
  const S3_PREFIX = 'raw_data/';

  // Get AWS credentials from secrets
  stmt = snowflake.createStatement({
    sqlText: "SELECT SECRET$GET_SECRET('CRYPTO_AWS_CREDENTIALS', 'SECRET_STRING')"
  });
  result = stmt.execute();
  result.next();
  var AWS_CREDS_STRING = result.getColumnValue(1);
  var AWS_CREDS = JSON.parse(AWS_CREDS_STRING);
  
  var AWS_ACCESS_KEY = AWS_CREDS.ACCESS_KEY_ID;
  var AWS_SECRET_KEY = AWS_CREDS.SECRET_ACCESS_KEY;
  var AWS_REGION = AWS_CREDS.REGION;

  // Crypto symbols to fetch
  const CRYPTO_SYMBOLS = [
    {symbol: 'BTC-USD', table: 'BTC'},
    {symbol: 'ETH-USD', table: 'ETH'},
    {symbol: 'DOGE-USD', table: 'DOGE'}
  ];

  // Create a temporary stage with direct AWS credentials
  var createTempStageStmt = snowflake.createStatement({
    sqlText: `
      CREATE OR REPLACE TEMPORARY STAGE TEMP_CRYPTO_STAGE
      URL = 's3://${S3_BUCKET}/'
      CREDENTIALS = (AWS_KEY_ID = '${AWS_ACCESS_KEY}' AWS_SECRET_KEY = '${AWS_SECRET_KEY}')
      FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1)
    `
  });
  createTempStageStmt.execute();

  var results = {
    status: 'success',
    details: {},
    error: null
  };
  
  // Process each cryptocurrency
  for (const crypto of CRYPTO_SYMBOLS) {
    // Fetch data from RapidAPI
    const data = fetchCryptoData(crypto.symbol, RAPIDAPI_KEY, RAPIDAPI_HOST);
    
    // Format data as CSV
    const csvData = formatCsvData(data);
    
    // File name pattern matching the existing files
    const fileName = `${crypto.table}_raw_daily.csv`;
    const s3Path = S3_PREFIX + fileName;
    
    // Update the S3 file
    const s3Result = updateS3File(csvData, s3Path);
    
    // Load data directly into Snowflake table
    const loadResult = loadIntoRawTable(data, crypto.table);
    
    // Record results
    results.details[crypto.symbol] = {
      s3Updated: s3Result,
      dbUpdated: loadResult
    };
  }
  
  return results;
  
} catch (error) {
  return {
    status: 'error',
    message: error.message,
    stack: error.stack
  };
}

// Function to fetch crypto data from RapidAPI
function fetchCryptoData(symbol, apiKey, apiHost) {
  // Create SQL statement to execute HTTP request via Snowflake
  const stmt = snowflake.createStatement({
    sqlText: `
      SELECT SYSTEM$HTTPGET(
        'https://${apiHost}/market/get-quotes',
        ARRAY_CONSTRUCT(
          OBJECT_CONSTRUCT('name', 'x-rapidapi-key', 'value', '${apiKey}'),
          OBJECT_CONSTRUCT('name', 'x-rapidapi-host', 'value', '${apiHost}')
        ),
        ARRAY_CONSTRUCT(
          OBJECT_CONSTRUCT('name', 'region', 'value', 'US'),
          OBJECT_CONSTRUCT('name', 'symbols', 'value', '${symbol}')
        )
      ) as response
    `
  });
  
  // Execute statement and get results
  const result = stmt.execute();
  result.next();
  const response = JSON.parse(result.getColumnValue(1));
  
  // Extract and transform required fields
  const now = new Date();
  // Format date as YYYY-MM-DD 19:00:00
  const timestamp = now.getUTCFullYear() + '-' + 
                   String(now.getUTCMonth() + 1).padStart(2, '0') + '-' + 
                   String(now.getUTCDate()).padStart(2, '0') + ' 19:00:00';
  const quoteData = response.quoteResponse.result[0];
  
  return {
    date: timestamp,
    open: quoteData.regularMarketOpen,
    high: quoteData.regularMarketDayHigh,
    low: quoteData.regularMarketDayLow,
    close: quoteData.regularMarketPrice,
    volume: quoteData.regularMarketVolume,
    adjclose: quoteData.regularMarketPrice // Using regularMarketPrice as adjclose
  };
}

// Function to format CSV header and data
function formatCsvData(data) {
  const header = "date,open,high,low,close,volume,adjclose";
  const row = `${data.date},${data.open},${data.high},${data.low},${data.close},${data.volume},${data.adjclose}`;
  return header + "\n" + row;
}

// Function to update S3 file
function updateS3File(csvData, s3Path) {
  try {
    // First try to read existing data if available
    const readExistingStmt = snowflake.createStatement({
      sqlText: `
        SELECT $1 as content
        FROM @TEMP_CRYPTO_STAGE/${s3Path}
      `
    });
    
    let existingData = "";
    try {
      const result = readExistingStmt.execute();
      while (result.next()) {
        existingData += result.getColumnValue(1) + "\n";
      }
    } catch (e) {
      // If file doesn't exist or can't be read, we'll create a new one
      // Just use the header + new row
    }
    
    // If we have existing data, append new data to it (minus the header)
    let finalData;
    if (existingData && existingData.trim().length > 0) {
      // Split the new CSV data to get just the row (skip header)
      const newRow = csvData.split("\n")[1];
      // Check if the date already exists in the data
      const date = newRow.split(",")[0];
      
      // Simple check to avoid duplicating today's data
      if (existingData.includes(date)) {
        // Date already exists, no need to update
        return { status: 'skipped', message: 'Data for today already exists' };
      } else {
        // Add the new row to existing data
        finalData = existingData.trim() + "\n" + newRow;
      }
    } else {
      // No existing data, use the full CSV including header
      finalData = csvData;
    }
    
    // Create temporary file with the data
    const tmpFile = "/tmp/crypto_data_" + Math.random().toString(36).substring(7) + ".csv";
    const writeFileStmt = snowflake.createStatement({
      sqlText: `CALL SYSTEM$FILE_WRITE('${tmpFile}', '${finalData}')`
    });
    writeFileStmt.execute();
    
    // Upload to S3
    const putStmt = snowflake.createStatement({
      sqlText: `
        PUT file://${tmpFile} @TEMP_CRYPTO_STAGE/${s3Path}
        OVERWRITE = TRUE
      `
    });
    putStmt.execute();
    
    // Clean up temporary file
    const cleanupStmt = snowflake.createStatement({
      sqlText: `CALL SYSTEM$FILE_DELETE('${tmpFile}')`
    });
    cleanupStmt.execute();
    
    return { status: 'success', path: s3Path };
  } catch (error) {
    return { status: 'error', message: error.message };
  }
}

// Function to load data directly into the raw table
function loadIntoRawTable(data, tableName) {
  try {
    // Check if we already have data for this date
    const checkStmt = snowflake.createStatement({
      sqlText: `
        SELECT COUNT(1) FROM CRYPTO_DB.RAW_CRYPTO.${tableName}
        WHERE date = '${data.date}'
      `
    });
    const checkResult = checkStmt.execute();
    checkResult.next();
    const count = checkResult.getColumnValue(1);
    
    if (count > 0) {
      // Data for this date already exists, update it
      const updateStmt = snowflake.createStatement({
        sqlText: `
          UPDATE CRYPTO_DB.RAW_CRYPTO.${tableName}
          SET open = ${data.open},
              high = ${data.high},
              low = ${data.low},
              close = ${data.close},
              volume = ${data.volume},
              adjclose = ${data.adjclose},
              ingestion_timestamp = CURRENT_TIMESTAMP()
          WHERE date = '${data.date}'
        `
      });
      updateStmt.execute();
      return { status: 'updated', date: data.date };
    } else {
      // Insert new record
      const insertStmt = snowflake.createStatement({
        sqlText: `
          INSERT INTO CRYPTO_DB.RAW_CRYPTO.${tableName}
          (date, open, high, low, close, volume, adjclose)
          VALUES (
            '${data.date}',
            ${data.open},
            ${data.high},
            ${data.low},
            ${data.close},
            ${data.volume},
            ${data.adjclose}
          )
        `
      });
      insertStmt.execute();
      return { status: 'inserted', date: data.date };
    }
  } catch (error) {
    return { status: 'error', message: error.message };
  }
}
$$;

-- Grant necessary permissions
GRANT USAGE ON PROCEDURE CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_DATA_SP() TO ROLE CRYPTO_ROLE;

In [None]:
-- %%sql
CREATE OR REPLACE TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK
    WAREHOUSE = CRYPTO_WH
    SCHEDULE = 'USING CRON 0 */4 * * * UTC'  -- Run every 4 hours
AS
CALL CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_DATA_SP();

In [None]:
EXECUTE TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK;

In [None]:
SHOW TASKS LIKE 'LOAD_CRYPTO_TASK' IN SCHEMA CRYPTO_DB.HARMONIZED_CRYPTO;

In [None]:
EXECUTE TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK;

In [None]:
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START=>DATEADD('MINUTE',-10,CURRENT_TIMESTAMP()),
    RESULT_LIMIT => 10))
WHERE NAME = 'LOAD_CRYPTO_TASK'
ORDER BY SCHEDULED_TIME DESC;

### 2. Create Task for Data Harmonization - Triggered when new data arrives

In [None]:
-- %%sql
CREATE OR REPLACE TASK CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_TASK
    WAREHOUSE = CRYPTO_WH
    AFTER CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK
    WHEN SYSTEM$STREAM_HAS_DATA('CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM')
AS
CALL CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_DATA_SP();

### 3. Create Task to Update Analytics Tables - Runs after harmonization completes

In [None]:
USE ROLE CRYPTO_ROLE;
CREATE OR REPLACE TASK CRYPTO_DB.HARMONIZED_CRYPTO.UPDATE_CRYPTO_METRICS_TASK
    WAREHOUSE = CRYPTO_WH
    AFTER CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_TASK
    WHEN SYSTEM$STREAM_HAS_DATA('CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED_STREAM')
AS
CALL CRYPTO_DB.ANALYTICS_CRYPTO.UPDATE_CRYPTO_ANALYTICS();

## Set Up Change Tracking with Streams

Streams track changes in tables to trigger downstream processes only when new data exists.

In [None]:
-- Create a stream on the harmonized data for change tracking
USE ROLE CRYPTO_ROLE;
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED_STREAM
ON TABLE CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED;

In [None]:
SHOW TABLES IN SCHEMA CRYPTO_DB.HARMONIZED_CRYPTO;

In [None]:
-- Create stream for BTC (already done)
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_BTC
ON TABLE CRYPTO_DB.RAW_CRYPTO.BTC;

-- Create stream for ETH
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_ETH
ON TABLE CRYPTO_DB.RAW_CRYPTO.ETH;

-- Create stream for DOGE
CREATE OR REPLACE STREAM CRYPTO_DB.HARMONIZED_CRYPTO.RAW_CRYPTO_STREAM_DOGE
ON TABLE CRYPTO_DB.RAW_CRYPTO.DOGE;

## Activate the Automation Pipeline

Resume all tasks to start the automation workflow. Tasks are resumed in reverse order of their dependency chain.

In [None]:
-- %%sql
-- %%sql
ALTER TASK CRYPTO_DB.HARMONIZED_CRYPTO.UPDATE_CRYPTO_METRICS_TASK RESUME;
ALTER TASK CRYPTO_DB.HARMONIZED_CRYPTO.HARMONIZE_CRYPTO_TASK RESUME;
ALTER TASK CRYPTO_DB.HARMONIZED_CRYPTO.LOAD_CRYPTO_TASK RESUME;

## Task Monitoring and Observability

### Check Recent Task Execution History

In [None]:
task_history = session.sql("""
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START=>DATEADD('DAY',-1,CURRENT_TIMESTAMP()),
    RESULT_LIMIT => 100))
ORDER BY SCHEDULED_TIME DESC
""")

task_history.show()

### View Task Dependency Graph

In [None]:

SELECT *
FROM TABLE(INFORMATION_SCHEMA.CURRENT_TASK_GRAPHS())
ORDER BY SCHEDULED_TIME;

## Create Pipeline Health Dashboard

This dashboard view provides insights into both task performance and data freshness

In [None]:

CREATE OR REPLACE VIEW CRYPTO_DB.ANALYTICS_CRYPTO.PIPELINE_HEALTH_DASHBOARD AS
WITH task_stats AS (
    SELECT
        NAME as task_name,
        COUNT(*) as total_runs,
        SUM(CASE WHEN STATE = 'SUCCEEDED' THEN 1 ELSE 0 END) as successful_runs,
        SUM(CASE WHEN STATE = 'FAILED' THEN 1 ELSE 0 END) as failed_runs,
        MAX(CASE WHEN STATE = 'SUCCEEDED' THEN COMPLETED_TIME ELSE NULL END) as last_successful_run,
        MAX(CASE WHEN STATE = 'FAILED' THEN COMPLETED_TIME ELSE NULL END) as last_failed_run,
        AVG(CASE WHEN STATE = 'SUCCEEDED' THEN TIMESTAMPDIFF(MILLISECOND, QUERY_START_TIME, COMPLETED_TIME) ELSE NULL END) as avg_duration_ms
    FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
        SCHEDULED_TIME_RANGE_START=>DATEADD('DAY',-7,CURRENT_TIMESTAMP())))
    GROUP BY NAME
),
data_stats AS (
    SELECT
        'BTC' as crypto_symbol,
        COUNT(*) as record_count,
        MIN(timestamp) as earliest_record,
        MAX(timestamp) as latest_record,
        DATEDIFF('hour', MAX(timestamp), CURRENT_TIMESTAMP()) as hours_since_last_update
    FROM CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED
    WHERE crypto_symbol = 'BTC'
    UNION ALL
    SELECT
        'ETH' as crypto_symbol,
        COUNT(*) as record_count,
        MIN(timestamp) as earliest_record,
        MAX(timestamp) as latest_record,
        DATEDIFF('hour', MAX(timestamp), CURRENT_TIMESTAMP()) as hours_since_last_update
    FROM CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED
    WHERE crypto_symbol = 'ETH'
    UNION ALL
    SELECT
        'DOGE' as crypto_symbol,
        COUNT(*) as record_count,
        MIN(timestamp) as earliest_record,
        MAX(timestamp) as latest_record,
        DATEDIFF('hour', MAX(timestamp), CURRENT_TIMESTAMP()) as hours_since_last_update
    FROM CRYPTO_DB.HARMONIZED_CRYPTO.CRYPTO_HARMONIZED
    WHERE crypto_symbol = 'DOGE'
)
SELECT
    'Task Health' as metric_type,
    task_name as metric_name,
    total_runs,
    successful_runs,
    failed_runs,
    ROUND(successful_runs/NULLIF(total_runs,0)*100, 2) as success_rate,
    last_successful_run,
    last_failed_run,
    avg_duration_ms,
    NULL as record_count,
    NULL as earliest_record,
    NULL as latest_record,
    NULL as hours_since_last_update
FROM task_stats
UNION ALL
SELECT
    'Data Health' as metric_type,
    crypto_symbol as metric_name,
    NULL as total_runs,
    NULL as successful_runs,
    NULL as failed_runs,
    NULL as success_rate,
    NULL as last_successful_run,
    NULL as last_failed_run,
    NULL as avg_duration_ms,
    record_count,
    earliest_record,
    latest_record,
    hours_since_last_update
FROM data_stats
ORDER BY metric_type, metric_name;

### Check the Pipeline Health Dashboard

In [None]:
pipeline_health = session.sql("SELECT * FROM CRYPTO_DB.ANALYTICS_CRYPTO.PIPELINE_HEALTH_DASHBOARD")
pipeline_health.show()

## Set Up Alert Notifications

Create email alerts that will notify administrators when tasks fail

In [None]:

-- CREATE OR REPLACE NOTIFICATION INTEGRATION crypto_email_integration
--   TYPE = EMAIL
--   ENABLED = TRUE;

-- CREATE OR REPLACE ALERT CRYPTO_DB.ANALYTICS_CRYPTO.TASK_FAILURE_ALERT
--   WAREHOUSE = CRYPTO_WH
--   SCHEDULE = 'USING CRON 0 */1 * * * UTC'  -- Check every hour
--   IF (EXISTS (
--     SELECT 1 
--     FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
--       SCHEDULED_TIME_RANGE_START=>DATEADD('HOUR',-1,CURRENT_TIMESTAMP())))
--     WHERE STATE = 'FAILED'
--   ))
--   THEN CALL SYSTEM$SEND_EMAIL(
--     'crypto_email_integration',
--     'admin@example.com',
--     'Crypto Pipeline Task Failure Alert',
--     'A task in the Crypto data pipeline has failed in the last hour. Please check the task history.'
--   );

-- -- Resume the alert to activate it
-- ALTER ALERT CRYPTO_DB.ANALYTICS_CRYPTO.TASK_FAILURE_ALERT RESUME;

## Pipeline Visualization

The cryptocurrency data pipeline has the following task dependencies:

```
LOAD_CRYPTO_TASK (every 4 hours)
       |
       V
HARMONIZE_CRYPTO_TASK (when RAW_CRYPTO_STREAM has data)
       |
       V
UPDATE_CRYPTO_METRICS_TASK (when CRYPTO_HARMONIZED_STREAM has data)
```

This creates a fully automated workflow that processes data in stages:
1. Ingest raw cryptocurrency data
2. Transform and harmonize the data
3. Calculate analytics and metrics

Each step only runs when there is actual new data to process, optimizing resource usage.