In [1]:
from datetime import datetime, timedelta
import yfinance as yf
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
from dotenv import load_dotenv
import os

## Snowflake Connector

In [2]:
def return_snowflake_conn():
    # Manually specify the .env file path
    load_dotenv(os.path.join(os.getcwd(), "Snowflake_Credentials.env"))
    
    # Connect to Snowflake
    conn = snowflake.connector.connect(
        user = os.environ.get("SNOWFLAKE_USERNAME"),
        password = os.environ.get("SNOWFLAKE_PASSWORD"),
        account= os.environ.get("SNOWFLAKE_ACCOUNT"),
        warehouse='COMPUTE_WH',
        database='DEV',
        schema='RAW'
    )

    return conn.cursor()

## Gather Data

In [3]:
def get_data(symbol):
    #Download stock data based on the last 180 days
    df = yf.download(symbol, period='180d')

    # Remove multi-index
    df = df.droplevel(0, axis=1)
    # Rename the columns
    df.columns = ['Open', 'Close', 'High', 'Low', 'Volume']
    
    # Reset the index to make 'Date' a column
    df = df.reset_index()
    
    # Select only the desired columns
    df = df[['Date', 'Open', 'Close', 'High', 'Low', 'Volume']]
    
    df.insert(0, 'Symbol', symbol)

    return df

## Save Data to Snowflake

In [4]:
# Implement incremental update using MERGE SQL in Snowflake
def create_tables(con):
    staging_table = "DEV.RAW.lab1_stock_price_staging"
    target_table = "DEV.RAW.lab1_stock_price_table"

    con.execute(f"""
      CREATE TABLE IF NOT EXISTS {target_table} (
        symbol varchar NOT NULL,
        date timestamp_ntz NOT NULL,
        open float,
        high float,
        low float,
        close float,
        volume number,
        PRIMARY KEY (symbol, date)
      );""")

    con.execute(f"""
      CREATE OR REPLACE TABLE {staging_table} (
        symbol varchar NOT NULL,
        date timestamp_ntz NOT NULL,
        open float,
        high float,
        low float,
        close float,
        volume number,
        PRIMARY KEY (symbol, date)
      );""")

In [5]:
# Implement incremental update using MERGE SQL in Snowflake
def clear_stage(con):
    staging_table = "DEV.RAW.lab1_stock_price_staging"
    # target_table = "DEV.RAW.lab1_stock_price_table"

    # con.execute(f"""DELETE FROM {target_table}""")
    con.execute(f"""DELETE FROM {staging_table}""")

In [6]:
# Implement incremental update using MERGE SQL in Snowflake
def load_data(con, data, symbol):
    staging_table = "DEV.RAW.lab1_stock_price_staging"
    target_table = "DEV.RAW.lab1_stock_price_table"

    for index, row in data.iterrows():
        sql = f"INSERT INTO {staging_table} (symbol, date, open, high, low, close, volume) VALUES ('{row['Symbol']}', '{row['Date']}', '{row['Open']}', '{row['Close']}', '{row['High']}', '{row['Low']}', '{row['Volume']}')"
        print(sql)
        con.execute(sql)

    # Perform UPSERT (MERGE)
    upsert_sql = f"""
        MERGE INTO {target_table} AS target
        USING {staging_table} AS stage
        ON target.symbol = stage.symbol AND target.date = stage.date
        WHEN MATCHED THEN
            UPDATE SET
                target.open = stage.open,
                target.high = stage.high,
                target.low = stage.low,
                target.close = stage.close,
                target.volume = stage.volume
        WHEN NOT MATCHED THEN
            INSERT (symbol, date, open, high, low, close, volume)
            VALUES (stage.symbol, stage.date, stage.open, stage.high, stage.low, stage.close, stage.volume);
    """
    con.execute(upsert_sql)

In [7]:
def stock_data_pipeline(data, symbol):
  con = return_snowflake_conn()

  # Save functions and matching arguments in list
  functions = [
      (create_tables, [con]),
      (clear_stage, [con]),
      (load_data, [con, data, symbol])
  ]

  for func, args in functions:
      try:
          # Unpack arguments into appropriate functions
          func(*args)
      except Exception as e:
          # If code fails, pring the function that failed then exit excecution
          print(f"Execution stopped due to error in {func.__name__}: {e}")
          break

  con.close()

## Main

In [8]:
stock_symbols = ["FIVE", "AAPL"]

for symbol in stock_symbols:
    stock_data_pipeline(get_data(symbol), symbol)

YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed


INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('FIVE', '2024-06-07 00:00:00', '118.48999786376953', '124.97000122070312', '117.0199966430664', '119.3499984741211', '3333700')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('FIVE', '2024-06-10 00:00:00', '115.4800033569336', '117.93000030517578', '114.22000122070312', '117.5199966430664', '2447600')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('FIVE', '2024-06-11 00:00:00', '120.31999969482422', '121.13999938964844', '113.77999877929688', '115.37000274658203', '1761400')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('FIVE', '2024-06-12 00:00:00', '116.56999969482422', '125.08000183105469', '116.3499984741211', '122.4000015258789', '1629300')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, vo

[*********************100%***********************]  1 of 1 completed


INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('AAPL', '2024-06-07 00:00:00', '196.23048400878906', '196.28031956711487', '193.48969558531903', '193.99798178181695', '53103900')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('AAPL', '2024-06-10 00:00:00', '192.4730987548828', '196.63910486677477', '191.50634675889475', '196.2404356269631', '97262100')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('AAPL', '2024-06-11 00:00:00', '206.45611572265625', '206.46609195915696', '192.9814138224934', '193.00133588013964', '172373300')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, close, volume) VALUES ('AAPL', '2024-06-12 00:00:00', '212.35630798339844', '219.46241499269527', '206.20696163392407', '206.67538853844061', '198134300')
INSERT INTO DEV.RAW.lab1_stock_price_staging (symbol, date, open, high, low, 

In [None]:
# con = return_snowflake_conn()

# staging_table = "DEV.RAW.lab1_stock_price_staging"
# target_table = "DEV.RAW.lab1_stock_price_table"

# # First run of pipeline
# stock_data_pipeline(data, "AAPL")
# con.execute(f"""SELECT COUNT(*) FROM DEV.RAW.temp_stock_prices""")
# print(f"First iteration stage count: {con.fetchone()}")
# con.execute(f"""SELECT COUNT(*) FROM DEV.RAW.stock_prices""")
# print(f"First iteration table count: {con.fetchone()}")

# # Second run of pipeline
# stock_data_pipeline(data, "AAPL")
# con.execute(f"""SELECT COUNT(*) FROM DEV.RAW.temp_stock_prices""")
# print(f"Second iteration stage count: {con.fetchone()}")
# con.execute(f"""SELECT COUNT(*) FROM DEV.RAW.stock_prices""")
# print(f"Second iteration table count: {con.fetchone()}")

# con.close()