In [6]:
import requests
import pandas as pd
import snowflake.connector

from dotenv import load_dotenv
import os

In [None]:
API_KEY = "ALPHAVANTAGE_API_KEY"
load_dotenv()

api_key = os.getenv("API_KEY")
print(api_key)
symbol = "TSLA"

url = f"https://www.alphavantage.co/query"
params = {
    "function": "TIME_SERIES_DAILY",
    "symbol": symbol,
    "outputsize": "compact",  # 'full' for all history
    "datatype": "json",
    "apikey": API_KEY
}

response = requests.get(url, params=params)
data = response.json()

# Convert to DataFrame
df = pd.DataFrame.from_dict(data["Time Series (Daily)"], orient="index")
df = df.rename(columns={
    "1. open": "open",
    "2. high": "high",
    "3. low": "low",
    "4. close": "close",
    "5. volume": "volume"
})
df.index = pd.to_datetime(df.index)
df = df.sort_index()

print(df.head())

None
                open      high       low     close     volume
2025-05-06  273.1050  277.7300  271.3500  275.3500   76715792
2025-05-07  276.8800  277.9200  271.0000  276.2200   71882408
2025-05-08  279.6300  289.8000  279.4100  284.8200   97539448
2025-05-09  290.2100  307.0400  290.0000  298.2600  132387835
2025-05-12  321.9900  322.2100  311.5000  318.3800  112826661


In [8]:
import os
from dotenv import load_dotenv
load_dotenv()

# Now read them
user = os.getenv("SNOWFLAKE_USER")
password = os.getenv("SNOWFLAKE_PASSWORD")

In [28]:
conn = snowflake.connector.connect(
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DB"),
    schema=os.getenv("SNOWFLAKE_SCHEMA"),
    role=os.getenv("SNOWFLAKE_ROLE")
)

cur = conn.cursor()
cur.execute("SELECT CURRENT_USER(), CURRENT_ACCOUNT(), CURRENT_REGION();")

for row in cur:
    print(row)
cur.close()
conn.close()

('BISON', 'LVB17920', 'AWS_US_WEST_2')


In [29]:
def return_last_90d_price(symbol):
  """
   - return the last 90 days of the stock prices of symbol as a list of json strings
  """
  vantage_api_key = API_KEY
  url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={vantage_api_key}'
  r = requests.get(url)
  data = r.json()
  results = []   # empyt list for now to hold the 90 days of stock info (open, high, low, close, volume)
  for d in data["Time Series (Daily)"]:   # here d is a date: "YYYY-MM-DD"
    results.append(data["Time Series (Daily)"][d])
    # an example of data["Time Series (Daily)"][d] is
    # '1. open': '117.3500', '2. high': '119.6600', '3. low': '117.2500', '4. close': '117.8700', '5. volume': '286038878'}
  return results

In [30]:
data=return_last_90d_price("TSLA")

In [31]:
conn = snowflake.connector.connect(
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DB"),
    role=os.getenv("SNOWFLAKE_ROLE")
)

cur = conn.cursor()

cur.execute("CREATE SCHEMA IF NOT EXISTS RAW")

print("Schema RAW created or already exists ✅")

cur.execute("""
CREATE TABLE IF NOT EXISTS RAW.MARKET_DATA (
    SYMBOL STRING NOT NULL,
    TRADE_DATE DATE NOT NULL,
    OPEN NUMBER(18,4),
    CLOSE NUMBER(18,4),
    HIGH NUMBER(18,4),
    LOW NUMBER(18,4),
    VOLUME NUMBER(18,0),
    CONSTRAINT PK_MARKET_DATA PRIMARY KEY (SYMBOL, TRADE_DATE)
)
""")

Schema RAW created or already exists ✅


<snowflake.connector.cursor.SnowflakeCursor at 0x20e00577290>

In [41]:
# Call Alpha Vantage Daily Prices API
url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={API_KEY}"
data = requests.get(url).json()["Time Series (Daily)"]

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DB"),
    schema="RAW",
    role=os.getenv("SNOWFLAKE_ROLE")
)
cur = conn.cursor()

# Insert each record
for date, values in data.items():
    cur.execute("""
        INSERT INTO RAW.MARKET_DATA (SYMBOL, DATE, OPEN, CLOSE, HIGH, LOW, VOLUME)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        """,
        (
            symbol,
            date,
            values["1. open"],
            values["4. close"],
            values["2. high"],
            values["3. low"],
            values["5. volume"]
        )
    )


conn.commit()

In [42]:
cur.execute("TRUNCATE TABLE RAW.MARKET_DATA")

<snowflake.connector.cursor.SnowflakeCursor at 0x20e0081ce50>

In [44]:
API_KEY = os.getenv("ALPHAVANTAGE_API_KEY")
symbol = "TSLA"

# Get daily prices from Alpha Vantage
url = "https://www.alphavantage.co/query"
params = {
    "function": "TIME_SERIES_DAILY",
    "symbol": symbol,
    "apikey": API_KEY
}
data = requests.get(url, params=params).json()["Time Series (Daily)"]

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DB"),
    schema="RAW",
    role=os.getenv("SNOWFLAKE_ROLE")
)
cur = conn.cursor()

# Step 1: Create schema & table if not exists
cur.execute("""
CREATE SCHEMA IF NOT EXISTS RAW;
""")

cur.execute("""
CREATE TABLE IF NOT EXISTS RAW.MARKET_DATA (
    SYMBOL STRING NOT NULL,
    TRADE_DATE DATE NOT NULL,
    OPEN NUMBER(18,4),
    CLOSE NUMBER(18,4),
    HIGH NUMBER(18,4),
    LOW NUMBER(18,4),
    VOLUME NUMBER(18,0),
    CONSTRAINT PK_MARKET_DATA PRIMARY KEY (SYMBOL, TRADE_DATE)
);
""")

# Step 2: Insert records from API
for date, values in data.items():
    cur.execute("""
        INSERT INTO RAW.MARKET_DATA (SYMBOL, DATE, OPEN, CLOSE, HIGH, LOW, VOLUME)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        """,
        (
            symbol,
            date,
            values["1. open"],
            values["4. close"],
            values["2. high"],
            values["3. low"],
            values["5. volume"]
        )
    )



conn.commit()

In [47]:
import os
import requests
import snowflake.connector
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("ALPHAVANTAGE_API_KEY")
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
SNOWFLAKE_DB = os.getenv("SNOWFLAKE_DB")
SNOWFLAKE_SCHEMA = "RAW"
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")


def extract_alpha_vantage():
    url = "https://www.alphavantage.co/query"
    params = {"function": "TIME_SERIES_DAILY", "symbol": "TSLA", "apikey": API_KEY}
    data = requests.get(url, params=params).json()["Time Series (Daily)"]
    return data


def load_to_snowflake(**context):
    data = context['ti'].xcom_pull(task_ids="extract_task")

    conn = snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        account=SNOWFLAKE_ACCOUNT,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DB,
        schema=SNOWFLAKE_SCHEMA,
        role=SNOWFLAKE_ROLE
    )
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS RAW.MARKET_DATA (
        SYMBOL STRING NOT NULL,
        TRADE_DATE DATE NOT NULL,
        OPEN NUMBER(18,4),
        CLOSE NUMBER(18,4),
        HIGH NUMBER(18,4),
        LOW NUMBER(18,4),
        VOLUME NUMBER(18,0),
        CONSTRAINT PK_MARKET_DATA PRIMARY KEY (SYMBOL, TRADE_DATE)
    );
    """)

    symbol = "TSLA"
    for date, values in data.items():
        cur.execute("""
        MERGE INTO RAW.MARKET_DATA t
        USING (SELECT %s AS SYMBOL,
                      %s AS TRADE_DATE,
                      %s AS OPEN,
                      %s AS CLOSE,
                      %s AS HIGH,
                      %s AS LOW,
                      %s AS VOLUME) s
        ON t.SYMBOL = s.SYMBOL AND t.TRADE_DATE = s.TRADE_DATE
        WHEN MATCHED THEN UPDATE SET
            OPEN = s.OPEN,
            CLOSE = s.CLOSE,
            HIGH = s.HIGH,
            LOW = s.LOW,
            VOLUME = s.VOLUME
        WHEN NOT MATCHED THEN
            INSERT (SYMBOL, TRADE_DATE, OPEN, CLOSE, HIGH, LOW, VOLUME)
            VALUES (s.SYMBOL, s.TRADE_DATE, s.OPEN, s.CLOSE, s.HIGH, s.LOW, s.VOLUME);
        """, (
            symbol, date,
            values["1. open"], values["4. close"],
            values["2. high"], values["3. low"], values["5. volume"]
        ))

    conn.commit()
    cur.close()
    conn.close()


with DAG(
    dag_id="stock_pipeline_dag",
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:

    extract_task = PythonOperator(
        task_id="extract_task",
        python_callable=extract_alpha_vantage,
    )

    load_task = PythonOperator(
        task_id="load_task",
        python_callable=load_to_snowflake,
        provide_context=True,
    )

    extract_task >> load_task

TypeError: HookspecMarker.__call__() got an unexpected keyword argument 'warn_on_impl_args'

In [None]:
cur.close()
conn.close()