# This notebook is a POC for an ETL pipeline, where I load prices from yahoo finance into PostgreSQL staging table, transform it, and then load it into PostgreSQL production table

### Import Libraries

In [13]:
# Standard libraries
import os
import numpy as np
import pandas as pd
from datetime import datetime
import json

# Connect to AWS RDS
from dotenv import load_dotenv
import psycopg2
from psycopg2.extensions import connection as Connection
from psycopg2 import OperationalError, ProgrammingError, Error

# Connect to yahoo finance's API
import yfinance as yf

### Configuration & Constants

In [2]:
# Load .env file (searches in the current directory or parent)
load_dotenv()

# Access environment variables
rds_host = os.getenv("rds_host")
rds_port = int(os.getenv("rds_port"))
rds_dbname = os.getenv("rds_dbname")
rds_username = os.getenv("rds_username")
rds_password = os.getenv("rds_password")

### Helper Functions - DB Connection

In [16]:
def connect_to_rds(rds_host: str, rds_port: int, rds_dbname: str, rds_username: str, rds_password: str) -> Connection:

    try:
        conn = psycopg2.connect(
            host=rds_host,
            port=rds_port,
            dbname=rds_dbname,
            user=rds_username,
            password=rds_password
        )
        print("✅ Connected successfully!")
        return conn

    except OperationalError as e:
        print("❌ Operational error (e.g. bad credentials, unreachable host):", e)
        raise
    except ProgrammingError as e:
        print("❌ Programming error (e.g. bad DB name or SQL syntax):", e)
        raise
    except Error as e:
        print("❌ psycopg2 general error:", e)
        raise
    except Exception as e:
        print("❌ Unknown error:", e)
        raise        

### Start of Notebook

In [18]:
conn = connect_to_rds(rds_host, rds_port, rds_dbname, rds_username, rds_password)
cursor = conn.cursor()

✅ Connected successfully!


In [26]:
# Pull SPY data
df = yf.download(["SPY","QQQ"], period="5d", interval="1d")

[*********************100%***********************]  2 of 2 completed


In [27]:
df

Price,Close,Close,High,High,Low,Low,Open,Open,Volume,Volume
Ticker,QQQ,SPY,QQQ,SPY,QQQ,SPY,QQQ,SPY,QQQ,SPY
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2
2025-04-07,423.690002,504.380005,443.140015,523.169983,402.390015,481.799988,408.660004,489.190002,161557000,256611400
2025-04-08,416.059998,496.480011,443.140015,524.97998,409.790009,489.160004,438.160004,521.859985,101248100,165816600
2025-04-09,466.0,548.619995,467.829987,548.619995,415.429993,493.049988,415.570007,493.440002,142876900,241867300
2025-04-10,446.179993,524.580017,455.589996,533.5,432.630005,509.320007,453.559998,532.169983,108384100,162331200
2025-04-11,454.399994,533.940002,455.790009,536.429993,441.329987,520.070007,444.649994,523.01001,52417600,97741700


In [24]:
df.index

RangeIndex(start=0, stop=5, step=1)

In [25]:
df.columns

Index(['Date', 'Close', 'High', 'Low', 'Open', 'Volume'], dtype='object')

Here is my vision for the ETL pipeline:
- I want to create a historical backfill just once, upfront, for let's say 3 or 5 years of historical price data
- But going forward, I want this ETL pipeline to run at the end of date, let's say after stock market, to load only that day's closing prices into my daily_price_staging table.  This is the ETL job that Airflow and Docker will handle
- The ETL job should also check for conflicts, i.e. - if the (ticker, date) combination already exists in the database, it should handle that somehow.  I think it should have "on conflict (key1, key2) do nothing" and still send out an alert via Airflow saying something like "Just as you know, there was a conflict in (ticker, date) insertion during today's ETL run today, we handled it, but FYI you should know this happened and maybe take a look"

Ref ChatGPT thread: https://chatgpt.com/c/67fa79e6-8f34-8001-bc98-c29adc5cac9d

In [23]:
# Flatten MultiIndex columns
df.columns.name = None  # remove name
df.columns = [col[0] if isinstance(col, tuple) else col for col in df.columns]

df.reset_index(inplace=True)

In [6]:
df.head(n = 5)

Unnamed: 0,Date,Close,High,Low,Open,Volume
0,2025-04-07,504.380005,523.169983,481.799988,489.190002,256611400
1,2025-04-08,496.480011,524.97998,489.160004,521.859985,165816600
2,2025-04-09,548.619995,548.619995,493.049988,493.440002,241867300
3,2025-04-10,524.580017,533.5,509.320007,532.169983,162331200
4,2025-04-11,533.940002,536.429993,520.070007,523.01001,97741700


In [7]:
df.columns

Index(['Date', 'Close', 'High', 'Low', 'Open', 'Volume'], dtype='object')

In [21]:
# Insert each row
for _, row in df.iterrows():
    date = row['Date'].date()
    close = float(row['Close'])
    source_api = "yfinance"
    ingestion_timestamp = datetime.utcnow()

    # Handle non-serializable timestamp objects
    row_dict = {k: (str(v) if isinstance(v, pd.Timestamp) else v) for k, v in row.to_dict().items()}
    raw_payload = json.dumps(row_dict)

    cursor.execute("""
        INSERT INTO spy_prices_staging (date, close, source_api, ingestion_timestamp, raw_payload)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (date) DO NOTHING;
    """, (date, close, source_api, ingestion_timestamp, raw_payload))

conn.commit()
print("✅ Inserted SPY rows into staging table")

✅ Inserted SPY rows into staging table


In [22]:
# Verify insertion worked
cursor.execute("SELECT * FROM spy_prices_staging ORDER BY date DESC;")
rows = cursor.fetchall()

for row in rows:
    print(row)

(datetime.date(2025, 4, 11), Decimal('533.9400024414062'), 'yfinance', datetime.datetime(2025, 4, 12, 3, 28, 35, 60819, tzinfo=datetime.timezone.utc), {'Low': 520.0700073242188, 'Date': '2025-04-11 00:00:00', 'High': 536.4299926757812, 'Open': 523.010009765625, 'Close': 533.9400024414062, 'Volume': 97741700})
(datetime.date(2025, 4, 10), Decimal('524.5800170898438'), 'yfinance', datetime.datetime(2025, 4, 12, 3, 28, 35, 60299, tzinfo=datetime.timezone.utc), {'Low': 509.32000732421875, 'Date': '2025-04-10 00:00:00', 'High': 533.5, 'Open': 532.1699829101562, 'Close': 524.5800170898438, 'Volume': 162331200})
(datetime.date(2025, 4, 9), Decimal('548.6199951171875'), 'yfinance', datetime.datetime(2025, 4, 12, 3, 28, 35, 59666, tzinfo=datetime.timezone.utc), {'Low': 493.04998779296875, 'Date': '2025-04-09 00:00:00', 'High': 548.6199951171875, 'Open': 493.44000244140625, 'Close': 548.6199951171875, 'Volume': 241867300})
(datetime.date(2025, 4, 8), Decimal('496.4800109863281'), 'yfinance', dat

In [None]:
# Create a mock ETL transformation.  Just multiply prices by 2x and store into "production" table
def transform_prices_by_2(df: pd.DataFrame) -> pd.DataFrame:
    

In [None]:
create_table_query = """
CREATE TABLE IF NOT EXISTS daily_prices_staging (
    ticker TEXT NOT NULL,
    date DATE NOT NULL,
    price_close NUMERIC,
    source_api TEXT,
    ingestion_timestamp TIMESTAMPTZ DEFAULT NOW(),
    raw_payload JSONB,
    PRIMARY KEY (ticker, date)
);
"""

cursor.execute(create_table_query)
conn.commit()

print("✅ Created new table: daily_prices_staging")