In [1]:
# Install Libraries
!pip install requests snowflake-connector-python pandas



In [2]:
import requests
import pandas as pd
from datetime import datetime, timedelta
import snowflake.connector
# Import the userdata module to securely read secrets from Colab
from google.colab import userdata

In [3]:
STOCK_SYMBOL = "IBM"
TABLE_NAME = "RAW.STOCK_PRICES"

In [4]:
# Securely Retrieve Credentials
try:
    API_KEY = userdata.get('ALPHA_VANTAGE_API_KEY')
    SNOWFLAKE_USER = userdata.get('SNOWFLAKE_USER')
    SNOWFLAKE_PASSWORD = userdata.get('SNOWFLAKE_PASSWORD')
    SNOWFLAKE_ACCOUNT = userdata.get('SNOWFLAKE_ACCOUNT')
    SNOWFLAKE_WAREHOUSE = userdata.get('SNOWFLAKE_WAREHOUSE')
    SNOWFLAKE_ROLE = userdata.get('SNOWFLAKE_ROLE')
    SNOWFLAKE_DATABASE = userdata.get('SNOWFLAKE_DATABASE')

    if not all([API_KEY, SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE, SNOWFLAKE_DATABASE]):
        raise ValueError("One or more required secrets are missing. Please check the Colab Secrets panel.")

except Exception as e:
    print(f"Error retrieving secrets: {e}")

In [5]:
# Function to fetch data
def fetch_stock_data(symbol, api_key):
    """Reads the last 90 days of price info via the Alpha Vantage API."""
    print(f"\nExtracting data for {symbol} (Last 90 Days)")

    ALPHA_VANTAGE_URL = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={api_key}'

    params = {
        "function": "TIME_SERIES_DAILY",
        "symbol": symbol,
        "outputsize": "compact",
        "apikey": api_key,
        "datatype": "json"
    }

    response = requests.get(ALPHA_VANTAGE_URL, params=params)
    response.raise_for_status()
    data = response.json()

    time_series = data["Time Series (Daily)"]
    records = []

    # Calculate the cutoff date for 90 days ago
    ninety_days_ago = datetime.now() - timedelta(days=90)

    for date_str, values in time_series.items():
        #print(date_str, "==", data["Time Series (Daily)"][date_str])
        record_date = datetime.strptime(date_str, '%Y-%m-%d').date()

        # Filter to only the last 90 days (including the date)
        if record_date >= ninety_days_ago.date():
             records.append({
                "symbol": symbol,
                "date": date_str,
                "open": float(values["1. open"]),
                "high": float(values["2. high"]),
                "low": float(values["3. low"]),
                "close": float(values["4. close"]),
                "volume": int(values["5. volume"])
            })

    print(f"Successfully retrieved {len(records)} trading days for the last 90 days")
    return records

In [6]:
# ETL function with Transaction Logic
def run_etl_pipeline(symbol, stock_data):
    """
    Connects to Snowflake, ensures table existence, and executes a transactional
    DELETE and INSERT to ensure idempotency.
    """
    conn = None
    try:
        print("\nConnecting to Snowflake")

        # Connect to Snowflake
        conn = snowflake.connector.connect(
            user=SNOWFLAKE_USER,
            password=SNOWFLAKE_PASSWORD,
            account=SNOWFLAKE_ACCOUNT,
            warehouse=SNOWFLAKE_WAREHOUSE,
            role=SNOWFLAKE_ROLE,
            database=SNOWFLAKE_DATABASE
        )
        cur = conn.cursor()

        # Create a table under "raw" schema if it doesn't exist
        create_table_sql = f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
            symbol VARCHAR(10) NOT NULL,
            date DATE NOT NULL,
            open FLOAT,
            close FLOAT,
            high FLOAT,
            low FLOAT,
            volume INT,
            PRIMARY KEY (symbol, date)
        );
        """
        cur.execute(f"CREATE SCHEMA IF NOT EXISTS {TABLE_NAME.split('.')[0]}")
        cur.execute(create_table_sql)
        print(f"Created table {TABLE_NAME} successfully")

        cur.execute('BEGIN')
        print("Starting SQL transaction")

        # Delete all records from the table
        delete_sql = f"DELETE FROM {TABLE_NAME} WHERE symbol = '{symbol}';"
        cur.execute(delete_sql)
        print(f"Deleted {cur.rowcount} existing records for {symbol}")

        # Populate the table with the records using INSERT SQL
        insert_sql = f"""
        INSERT INTO {TABLE_NAME} (symbol, date, open, close, high, low, volume)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        """

        # Prepare the data for batch insertion
        insert_data = [
            (r['symbol'], r['date'], r['open'], r['close'], r['high'], r['low'], r['volume'])
            for r in stock_data
        ]

        # Execute the batch insert
        cur.executemany(insert_sql, insert_data)

        # Commit the transaction if successful
        cur.execute('COMMIT')
        print(f"Successfully added {len(insert_data)} records to {TABLE_NAME}")

        return True # Transaction successful

    except Exception as e:
        if conn:
            cur.execute('ROLLBACK')
            print("Transaction failed. Rolling back changes.")
        print(f"An error occurred during ETL: {e}")
        return False # Transaction failed

    finally:
        if conn:
            conn.close()


In [7]:
# Function to check record count for idempotency
def check_record_count(symbol):
    """Checks the number of records in the table for the given symbol."""
    conn = None
    try:
        conn = snowflake.connector.connect(
            user=SNOWFLAKE_USER,
            password=SNOWFLAKE_PASSWORD,
            account=SNOWFLAKE_ACCOUNT,
            warehouse=SNOWFLAKE_WAREHOUSE,
            role=SNOWFLAKE_ROLE,
            database=SNOWFLAKE_DATABASE
        )
        cur = conn.cursor()

        count_sql = f"SELECT COUNT(*) FROM {TABLE_NAME} WHERE symbol = '{symbol}';"
        cur.execute(count_sql)
        count = cur.fetchone()[0]
        return count

    except Exception as e:
        print(f"Error checking record count: {e}")
        return -1
    finally:
        if conn:
            conn.close()

In [8]:
# Main Execution Block
def main():

    try:
        # Fetch the stock data once
        stock_data = fetch_stock_data(STOCK_SYMBOL, API_KEY)
        print(stock_data)
        EXPECTED_RECORDS = len(stock_data)

        print(f"\nRUN 1: Running ETL pipeline for {STOCK_SYMBOL}")

        # Run 1
        run_etl_pipeline(STOCK_SYMBOL, stock_data)

        # Check records after Run 1
        count_after_run1 = check_record_count(STOCK_SYMBOL)
        print(f"\nRecord count after first run: {count_after_run1} (Expected: {EXPECTED_RECORDS} trading days)")

        print("\nRUN 2: Running pipeline again to test Idempotency")

        # Run 2
        run_etl_pipeline(STOCK_SYMBOL, stock_data)

        # Check records after Run 2
        count_after_run2 = check_record_count(STOCK_SYMBOL)
        print(f"\nRecord count after second run (Idempotency Check): {count_after_run2} (Expected: {EXPECTED_RECORDS} trading days)")

        # Demonstrate Idempotency
        if count_after_run1 == count_after_run2 and count_after_run1 == EXPECTED_RECORDS:
            print("\n\nIdempotency successfully demonstrated")
            print("The record count remained the same after two consecutive runs")
            print("It confirms the transactional DELETE/INSERT works as an idempotent operation")
        else:
            print("\n\nIdempotency check failed")

    except Exception as e:
        print(f"\nError in main execution: {e}")

if __name__ == "__main__":
    main()


Extracting data for IBM (Last 90 Days)
Successfully retrieved 62 trading days for the last 90 days
[{'symbol': 'IBM', 'date': '2025-09-29', 'open': 286.0, 'high': 286.0, 'low': 279.66, 'close': 279.8, 'volume': 6022125}, {'symbol': 'IBM', 'date': '2025-09-26', 'open': 280.51, 'high': 288.85, 'low': 280.11, 'close': 284.31, 'volume': 9063938}, {'symbol': 'IBM', 'date': '2025-09-25', 'open': 272.935, 'high': 284.23, 'low': 271.148, 'close': 281.44, 'volume': 11506192}, {'symbol': 'IBM', 'date': '2025-09-24', 'open': 272.62, 'high': 273.6499, 'low': 267.3, 'close': 267.53, 'volume': 3159924}, {'symbol': 'IBM', 'date': '2025-09-23', 'open': 272.7, 'high': 273.2962, 'low': 269.265, 'close': 272.24, 'volume': 5394121}, {'symbol': 'IBM', 'date': '2025-09-22', 'open': 266.62, 'high': 272.31, 'low': 266.0, 'close': 271.37, 'volume': 5030540}, {'symbol': 'IBM', 'date': '2025-09-19', 'open': 266.05, 'high': 267.87, 'low': 263.64, 'close': 266.4, 'volume': 9858112}, {'symbol': 'IBM', 'date': '202