### Installing Relevant Packages

In [None]:
#pip install yfinance
#!pip install sqlalchemy psycopg2-binary yfinance pandas

###
### Importing Libraries


In [2]:
from datetime import datetime, timedelta

import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text

###
### Defining Configurations

In [3]:
# ===================== CONFIG =====================

DATABASE_URL = 'postgresql://neondb_owner:npg_pw2QhB4XGqbx@ep-muddy-union-ad4fjgcw-pooler.c-2.us-east-1.aws.neon.tech/neondb?sslmode=require&channel_binding=require'

# 2) Ticker + interval settings
TICKER = "TCS.NS"            # change this to your chosen company
HOURLY_INTERVAL = "60m"      # 60-minute bars

# Create SQLAlchemy engine
#engine = create_engine(DATABASE_URL, echo=False, future=True)
engine = create_engine(
    DATABASE_URL,
    echo=False,
    future=True,
    pool_pre_ping=True,    # ‚úÖ checks connection before using it
    pool_recycle=300,      # ‚úÖ recycle conns every 5 minutes
    pool_size=5,
    max_overflow=5,
)

###
### Database Set-up Function Definition
- Function call will be commented after one time execution

In [4]:
# ===================== DB SETUP =====================

def init_tables():
    """
    Create raw daily & hourly tables in the cloud Postgres DB if they don't exist.
    """
    create_daily_sql = """
    CREATE TABLE IF NOT EXISTS raw_daily_prices (
        ticker      VARCHAR(20) NOT NULL,
        date        DATE        NOT NULL,
        open        DOUBLE PRECISION,
        high        DOUBLE PRECISION,
        low         DOUBLE PRECISION,
        close       DOUBLE PRECISION,
        adj_close   DOUBLE PRECISION,
        volume      BIGINT,
        PRIMARY KEY (ticker, date)
    );
    """

    create_hourly_sql = """
    CREATE TABLE IF NOT EXISTS raw_hourly_prices (
        ticker      VARCHAR(20) NOT NULL,
        ts          TIMESTAMP   NOT NULL,
        interval    VARCHAR(10) NOT NULL,
        open        DOUBLE PRECISION,
        high        DOUBLE PRECISION,
        low         DOUBLE PRECISION,
        close       DOUBLE PRECISION,
        adj_close   DOUBLE PRECISION,
        volume      BIGINT,
        PRIMARY KEY (ticker, ts, interval)
    );
    """

    with engine.begin() as conn:
        conn.execute(text(create_daily_sql))
        conn.execute(text(create_hourly_sql))

    print("‚úÖ Tables ensured in cloud Postgres (raw_daily_prices, raw_hourly_prices).")



###
### One Time Daily Backfill for last ten years Function Definition
- Function call will be commented after one time execution

In [5]:
# ===================== ONE-TIME DAILY BACKFILL =====================

def backfill_daily(years_back: int = 10):
    """
    Download ~N years of DAILY OHLCV data from Yahoo Finance
    and store it in raw_daily_prices (ON CONFLICT DO NOTHING).

    Run this ONCE at the beginning, or when you want to refresh history.
    """

    # Fix for DeprecationWarning: datetime.utcnow() and AttributeError: type object 'datetime.datetime' has no attribute 'UTC'
    end_date = datetime.now(timezone.utc).date() # Use timezone.utc
    start_date = end_date - timedelta(days=365 * years_back)

    print(f"üì• Downloading daily data for {TICKER} from {start_date} to {end_date}...")
    # Fix for KeyError: "['Adj Close'] not in index"
    # Set auto_adjust=False and prepost=False to ensure 'Adj Close' column is present
    df_daily = yf.download(TICKER, start=start_date, end=end_date, interval="1d", auto_adjust=False, prepost=False)

    if df_daily.empty:
        print("‚ö†Ô∏è No daily data returned from Yahoo.")
        return

    # --- Robust Column Handling --- Fix for KeyError: 'date' and general column consistency ---

    # 1. Standardize MultiIndex columns if they exist: convert ('Open', '') to 'Open'
    if isinstance(df_daily.columns, pd.MultiIndex):
        # Use droplevel(1) to get single-level column names like 'Open', 'High', 'Adj Close', 'Volume'
        df_daily.columns = df_daily.columns.droplevel(1)

    # 2. Reset index to bring the Date (and potentially Ticker if MultiIndex index) from index to a column
    # After droplevel(1), the index name should be 'Date' if it was part of original yf.download structure.
    df_daily = df_daily.reset_index() # This will create a 'Date' column

    # 3. Rename all columns to desired database-friendly lowercase names in one go
    df_daily.rename(
        columns={
            "Date": "date", # Always ensure 'Date' becomes 'date'
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Adj Close": "adj_close",
            "Volume": "volume",
            "Ticker": "ticker" # In case 'Ticker' also comes from MultiIndex index after reset_index
        },
        inplace=True,
    )

    # 4. Add 'ticker' column - always add it after all other renames to ensure it's set correctly
    # This ensures the global TICKER value is used, even if 'Ticker' column existed and was renamed.
    df_daily["ticker"] = TICKER

    # 5. Select only the final columns we want for the database insertion
    final_cols = ["ticker", "date", "open", "high", "low", "close", "adj_close", "volume"]
    # Ensure all final_cols exist in df_daily before selecting, or handle missing ones if necessary
    # For robustness, filter final_cols against actual df_daily columns
    existing_final_cols = [col for col in final_cols if col in df_daily.columns]

    if len(existing_final_cols) < len(final_cols):
        print(f"‚ö†Ô∏è Warning: Some expected final columns are missing. Missing: {list(set(final_cols) - set(existing_final_cols))}")

    df_daily_out = df_daily[existing_final_cols].copy()

    # --- End of Robust Column Handling ---

    insert_sql = text("""
        INSERT INTO raw_daily_prices
            (ticker, date, open, high, low, close, adj_close, volume)
        VALUES
            (:ticker, :date, :open, :high, :low, :close, :adj_close, :volume)
        ON CONFLICT (ticker, date) DO NOTHING;
    """)


    rows = df_daily_out.to_dict(orient="records")
    inserted = 0
    with engine.begin() as conn:
        for row in rows:
            # Convert types for safety
            params = {
                "ticker": row["ticker"], # 'ticker' should now always be present
                "date": pd.to_datetime(row["date"]).date(), # 'date' should now always be present
                "open": float(row["open"]) if pd.notna(row["open"]) else None,
                "high": float(row["high"]) if pd.notna(row["high"]) else None,
                "low": float(row["low"]) if pd.notna(row["low"]) else None,
                "close": float(row["close"]) if pd.notna(row["close"]) else None,
                "adj_close": float(row["adj_close"]) if pd.notna(row["adj_close"]) else None,
                "volume": int(row["volume"]) if pd.notna(row["volume"]) else None,
            }
            result = conn.execute(insert_sql, params)
            # result.rowcount isn't reliable with ON CONFLICT, so we don't count precisely
            inserted += 1

    print(f"‚úÖ Backfilled daily data (~{len(rows)} rows processed).")


###
### Hourly Data Extract Function Definition - one time for historic data
- This Function call will be commented once the past data is loaded.
- New hourly data will

In [6]:
from datetime import datetime, timedelta, timezone
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine

# make sure these exist globally:
# DATABASE_URL, engine, TICKER, HOURLY_INTERVAL

def fetch_latest_hourly(days_back: int = 700):
    """
    Fetch recent HOURLY data from Yahoo Finance and append to raw_hourly_prices.
    Handles MultiIndex columns from yfinance and avoids manual connection issues
    by using pandas.to_sql().
    """

    end_dt = datetime.now(timezone.utc)
    start_dt = end_dt - timedelta(days=days_back)

    print(
        f"üì• Downloading hourly ({HOURLY_INTERVAL}) data for {TICKER} "
        f"from {start_dt} to {end_dt}..."
    )

    df_hourly = yf.download(
        TICKER,
        start=start_dt,
        end=end_dt,
        interval=HOURLY_INTERVAL,
        auto_adjust=False,
        prepost=False,
    )

    if df_hourly.empty:
        print("‚ö†Ô∏è No hourly data returned from Yahoo.")
        return

    # ---------- Use index as timestamp ----------
    df_hourly["ts"] = df_hourly.index

    # Reset index so ts is a normal column
    df_hourly = df_hourly.reset_index(drop=True)

    print("Raw columns from yfinance:", df_hourly.columns.tolist())

    # ---------- Flatten MultiIndex columns, if any ----------
    if isinstance(df_hourly.columns, pd.MultiIndex):
        flat_cols = []
        for col in df_hourly.columns:
            # col is like ('Open', 'TCS.NS') or ('ts', '')
            if isinstance(col, tuple):
                # take the first non-empty part
                name = col[0] if col[0] not in (None, "", " ") else col[1]
            else:
                name = col
            flat_cols.append(str(name))
        df_hourly.columns = flat_cols
        print("Flattened columns:", df_hourly.columns.tolist())

    # At this point we expect something like:
    # ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'ts']

    # ---------- Ensure Adj Close exists ----------
    if "Adj Close" not in df_hourly.columns and "Adj close" not in df_hourly.columns.lower():
        if "Close" in df_hourly.columns:
            print("‚ÑπÔ∏è 'Adj Close' missing, using 'Close' as proxy.")
            df_hourly["Adj Close"] = df_hourly["Close"]
        else:
            raise ValueError(f"No 'Adj Close' or 'Close' in columns: {df_hourly.columns}")

    # Some versions may have 'Adj Close' vs 'Adj close'; normalize name
    if "Adj close" in df_hourly.columns:
        df_hourly.rename(columns={"Adj close": "Adj Close"}, inplace=True)

    # ---------- Build clean output DataFrame ----------
    required_cols = ["ts", "Open", "High", "Low", "Close", "Adj Close", "Volume"]
    missing = [c for c in required_cols if c not in df_hourly.columns]
    if missing:
        raise ValueError(f"Missing required columns {missing} in df_hourly: {df_hourly.columns}")

    df_hourly_out = df_hourly[required_cols].copy()
    df_hourly_out.rename(
        columns={
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Adj Close": "adj_close",
            "Volume": "volume",
        },
        inplace=True,
    )

    df_hourly_out["ticker"] = TICKER
    df_hourly_out["interval"] = HOURLY_INTERVAL

    # ---------- Normalize ts to proper datetime ----------
    df_hourly_out["ts"] = pd.to_datetime(df_hourly_out["ts"], errors="coerce")
    df_hourly_out = df_hourly_out[df_hourly_out["ts"].notna()]

    print("Sample of cleaned hourly data:")
    print(df_hourly_out.head())

    # ---------- Append to DB using to_sql (no manual connection handling) ----------
    df_hourly_out.to_sql(
        "raw_hourly_prices",
        con=engine,
        if_exists="append",   # append new rows
        index=False,
        method="multi",
        chunksize=1000,
    )

    print(f"‚úÖ Hourly ingestion done. Rows appended: {len(df_hourly_out)}")


In [None]:
# ===================== QUICK MANUAL TEST =====================

if __name__ == "__main__":

    # 1) Ensure tables exist in your cloud Postgres
    # Commented this part as tables are created one time. Uncomment if executing this for the first time
    #init_tables()


    # 2) One-time: backfill ~10 years of daily data
    # Commenting this after one time data load for ten years
    # backfill_daily(years_back=10)

    # 3) Reusable: fetch recent hourly data (last 600 days)
    # Commenting this after one time hourly data load for past dates
    # fetch_latest_hourly(days_back=600)

üì• Downloading hourly (60m) data for TCS.NS from 2024-04-05 17:34:22.362038+00:00 to 2025-11-26 17:34:22.362038+00:00...


[*********************100%***********************]  1 of 1 completed


Raw columns from yfinance: [('Adj Close', 'TCS.NS'), ('Close', 'TCS.NS'), ('High', 'TCS.NS'), ('Low', 'TCS.NS'), ('Open', 'TCS.NS'), ('Volume', 'TCS.NS'), ('ts', '')]
Flattened columns: ['Adj Close', 'Close', 'High', 'Low', 'Open', 'Volume', 'ts']
Sample of cleaned hourly data:
                         ts         open         high          low  \
0 2024-04-08 03:45:00+00:00  3989.000000  3995.000000  3978.050049   
1 2024-04-08 04:45:00+00:00  3985.449951  3998.949951  3966.500000   
2 2024-04-08 05:45:00+00:00  3984.050049  4017.949951  3983.449951   
3 2024-04-08 06:45:00+00:00  4016.000000  4025.949951  4007.149902   
4 2024-04-08 07:45:00+00:00  4014.899902  4031.949951  4005.600098   

         close    adj_close  volume  ticker interval  
0  3985.000000  3985.000000       0  TCS.NS      60m  
1  3984.050049  3984.050049  256214  TCS.NS      60m  
2  4016.000000  4016.000000  365476  TCS.NS      60m  
3  4014.899902  4014.899902  468619  TCS.NS      60m  
4  4010.600098  4010.6000

###
### Hourly Data Extract - for daily excution via scheduler
- This will be scheduled on daily basis to execute once a day to predict the clsing price of the day
- Also, This part will be used to update last closing amount to daily table too

In [7]:
# =====================  HOURLY DATA FETCH for the EXECUTION DATE =====================

def fetch_hourly_today():
    """
    Fetch ONLY today's hourly data for TICKER and append to raw_hourly_prices.
    Safe to run multiple times a day (will re-append today's bars).
    """

    print(f"üì• Downloading today's hourly data for {TICKER} ({HOURLY_INTERVAL})...")

    # period='1d' is the simplest way to ask "today's intraday"
    df_hourly = yf.download(
        TICKER,
        period="1d",
        interval=HOURLY_INTERVAL,
        auto_adjust=False,
        prepost=False,
    )

    if df_hourly.empty:
        print("‚ö†Ô∏è No hourly data returned for today.")
        return

    # Use index as timestamp
    df_hourly["ts"] = df_hourly.index
    df_hourly = df_hourly.reset_index(drop=True)

    # Flatten possible MultiIndex columns
    if isinstance(df_hourly.columns, pd.MultiIndex):
        flat_cols = []
        for col in df_hourly.columns:
            if isinstance(col, tuple):
                name = col[0] if col[0] else col[1]
            else:
                name = col
            flat_cols.append(str(name))
        df_hourly.columns = flat_cols

    # Ensure Adj Close column exists
    if "Adj Close" not in df_hourly.columns:
        print("‚ÑπÔ∏è 'Adj Close' missing, using 'Close' as proxy.")
        df_hourly["Adj Close"] = df_hourly["Close"]

    df_out = df_hourly[["ts", "Open", "High", "Low", "Close", "Adj Close", "Volume"]].copy()
    df_out.rename(
        columns={
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Adj Close": "adj_close",
            "Volume": "volume",
        },
        inplace=True,
    )

    df_out["ticker"] = TICKER
    df_out["interval"] = HOURLY_INTERVAL

    # Normalize ts
    df_out["ts"] = pd.to_datetime(df_out["ts"], errors="coerce")
    df_out = df_out[df_out["ts"].notna()]

    print("Sample hourly rows to insert:")
    print(df_out.head())

    # Append to DB (no manual connection headaches)
    df_out.to_sql(
        "raw_hourly_prices",
        con=engine,
        if_exists="append",
        index=False,
        method="multi",
        chunksize=1000,
    )

    print(f"‚úÖ Hourly ingestion (today) done. Rows appended: {len(df_out)}")


In [8]:
# =====================  APPENDING DAILY from HOURLY for LATEST DATE =====================

from sqlalchemy import text

def append_daily_from_hourly():
    """
    For any calendar dates that exist in raw_hourly_prices but not yet in
    raw_daily_prices, aggregate hourly -> daily OHLCV and insert as new rows.

    - open  = first hourly open of the day
    - high  = max hourly high of the day
    - low   = min hourly low of the day
    - close = last hourly close of the day
    - adj_close = last hourly adj_close of the day
    - volume = sum of hourly volumes
    """

    # 1) Get the latest daily date we already have
    with engine.begin() as conn:
        last_daily_date = conn.execute(
            text("""
                SELECT MAX(date)
                FROM raw_daily_prices
                WHERE ticker = :ticker
            """),
            {"ticker": TICKER},
        ).scalar()

    print("Last daily date in table:", last_daily_date)

    # 2) Load hourly data newer than that date (or all if None)
    with engine.begin() as conn:
        if last_daily_date is None:
            hourly_df = pd.read_sql(
                text("""
                    SELECT ts, open, high, low, close, adj_close, volume
                    FROM raw_hourly_prices
                    WHERE ticker = :ticker
                """),
                conn,
                params={"ticker": TICKER},
            )
        else:
            hourly_df = pd.read_sql(
                text("""
                    SELECT ts, open, high, low, close, adj_close, volume
                    FROM raw_hourly_prices
                    WHERE ticker = :ticker
                      AND ts::date > :last_date
                """),
                conn,
                params={"ticker": TICKER, "last_date": last_daily_date},
            )

    if hourly_df.empty:
        print("‚ÑπÔ∏è No new hourly data beyond last daily date. Nothing to aggregate.")
        return

    # 3) Convert ts to datetime and derive calendar date
    hourly_df["ts"] = pd.to_datetime(hourly_df["ts"], errors="coerce")
    hourly_df = hourly_df[hourly_df["ts"].notna()]

    # drop tz info for date extraction, if present
    if hourly_df["ts"].dt.tz is not None:
        hourly_df["ts"] = hourly_df["ts"].dt.tz_convert(None)

    hourly_df["date"] = hourly_df["ts"].dt.date

    # 4) Aggregate hourly -> daily OHLCV
    agg = hourly_df.sort_values("ts").groupby("date").agg(
        open=("open", "first"),
        high=("high", "max"),
        low=("low", "min"),
        close=("close", "last"),
        adj_close=("adj_close", "last"),
        volume=("volume", "sum"),
    ).reset_index()

    print("Daily aggregates to insert:")
    print(agg.head())

    # 5) Insert aggregated rows into raw_daily_prices
    insert_sql = text("""
        INSERT INTO raw_daily_prices
            (ticker, date, open, high, low, close, adj_close, volume)
        VALUES
            (:ticker, :date, :open, :high, :low, :close, :adj_close, :volume)
        ON CONFLICT (ticker, date) DO NOTHING;
    """)

    with engine.begin() as conn:
        inserted = 0
        for _, row in agg.iterrows():
            params = {
                "ticker": TICKER,
                "date": row["date"],
                "open": float(row["open"]) if pd.notna(row["open"]) else None,
                "high": float(row["high"]) if pd.notna(row["high"]) else None,
                "low": float(row["low"]) if pd.notna(row["low"]) else None,
                "close": float(row["close"]) if pd.notna(row["close"]) else None,
                "adj_close": float(row["adj_close"]) if pd.notna(row["adj_close"]) else None,
                "volume": int(row["volume"]) if pd.notna(row["volume"]) else None,
            }
            conn.execute(insert_sql, params)
            inserted += 1

    print(f"‚úÖ Aggregated and inserted {inserted} new daily rows from hourly data.")


In [9]:
# =====================  CAlling the hourly fetch function  =====================

fetch_hourly_today()

üì• Downloading today's hourly data for TCS.NS (60m)...


[*********************100%***********************]  1 of 1 completed


Sample hourly rows to insert:
                         ts         open         high          low  \
0 2025-11-27 03:45:00+00:00  3178.000000  3178.899902  3148.500000   
1 2025-11-27 04:45:00+00:00  3164.399902  3164.600098  3155.100098   
2 2025-11-27 05:45:00+00:00  3158.699951  3159.199951  3145.000000   
3 2025-11-27 06:45:00+00:00  3145.699951  3146.000000  3129.000000   
4 2025-11-27 07:45:00+00:00  3130.000000  3133.000000  3125.000000   

         close    adj_close  volume  ticker interval  
0  3164.600098  3164.600098  468909  TCS.NS      60m  
1  3157.500000  3157.500000  302753  TCS.NS      60m  
2  3145.699951  3145.699951  251005  TCS.NS      60m  
3  3130.000000  3130.000000  388684  TCS.NS      60m  
4  3127.699951  3127.699951  296511  TCS.NS      60m  
‚úÖ Hourly ingestion (today) done. Rows appended: 7


In [11]:
# =====================  CAlling the daily append function  =====================

append_daily_from_hourly()

Last daily date in table: 2025-11-27
‚ÑπÔ∏è No new hourly data beyond last daily date. Nothing to aggregate.


###
### Quality Checks on both Hourly and Daily Tables

####
#### Generic QC Runner

In [1]:
# ======================= GENERIC QC RUNNER ===========================

from sqlalchemy import text
import pandas as pd
from datetime import date, timedelta

def run_qc_checks(engine, checks, scope_params=None):
    """
    Generic QC runner:
    - checks: list of dicts {name, sql, level}
    - scope_params: extra params injected into SQL (e.g., ticker, date_from)
    """
    results = []

    with engine.begin() as conn:
        for chk in checks:
            params = scope_params or {}
            row = conn.execute(text(chk["sql"]), params).fetchone()
            value = row[0] if row is not None else None

            results.append(
                {
                    "name": chk["name"],
                    "level": chk.get("level", "info"),
                    "value": value,
                }
            )

    return pd.DataFrame(results)


####
#### Hourly Quality Checks

In [12]:
# ========================== HOURLY QC =============================

HOURLY_QC_CHECKS = [
    {
        "name": "hourly_duplicate_keys",
        "level": "error",
        "sql": """
            SELECT COUNT(*) FROM (
                SELECT ticker, ts, interval, COUNT(*) AS c
                FROM raw_hourly_prices
                WHERE ticker = :ticker
                {date_filter}
                GROUP BY ticker, ts, interval
                HAVING COUNT(*) > 1
            ) t;
        """,
    },
    {
        "name": "hourly_null_prices",
        "level": "error",
        "sql": """
            SELECT COUNT(*)
            FROM raw_hourly_prices
            WHERE ticker = :ticker
              {date_filter}
              AND (
                open IS NULL OR close IS NULL OR ts IS NULL
              );
        """,
    },
    {
        "name": "hourly_future_timestamps",
        "level": "warn",
        "sql": """
            SELECT COUNT(*)
            FROM raw_hourly_prices
            WHERE ticker = :ticker
              {date_filter}
              AND ts > NOW() + INTERVAL '5 minutes';
        """,
    },
]


####
#### Building Date Filter Helper - to allow full vs recent QC


In [13]:
def build_date_filter(column_name: str, date_from=None):
    if date_from is None:
        return "", {}
    else:
        return f"AND {column_name}::date >= :date_from", {"date_from": date_from}


####
#### Historical Hourly QC

In [14]:
def qc_hourly_full(engine, ticker):
    checks_sql = []
    for chk in HOURLY_QC_CHECKS:
        date_filter, extra_params = build_date_filter("ts", date_from=None)
        sql = chk["sql"].format(date_filter=date_filter)
        checks_sql.append({**chk, "sql": sql})

    df = run_qc_checks(
        engine,
        checks_sql,
        scope_params={"ticker": ticker},
    )
    print("Full hourly QC:")
    print(df)
    return df


####
#### Daily/Incremental Hourly QC

In [15]:
def qc_hourly_incremental(engine, ticker, days_back=1):
    date_from = date.today() - timedelta(days=days_back)

    checks_sql = []
    for chk in HOURLY_QC_CHECKS:
        date_filter, extra_params = build_date_filter("ts", date_from=date_from)
        sql = chk["sql"].format(date_filter=date_filter)
        checks_sql.append({**chk, "sql": sql})

    params = {"ticker": ticker, "date_from": date_from}
    df = run_qc_checks(engine, checks_sql, scope_params=params)
    print(f"Incremental hourly QC from {date_from}:")
    print(df)
    return df


###
### Daily Quality Checks

In [22]:
DAILY_QC_CHECKS = [
    {
        "name": "daily_duplicate_dates",
        "level": "error",
        "sql": """
            SELECT COUNT(*) FROM (
                SELECT ticker, date, COUNT(*) AS c
                FROM raw_daily_prices
                WHERE ticker = :ticker
                {date_filter}
                GROUP BY ticker, date
                HAVING COUNT(*) > 1
            ) t;
        """,
    },
    {
      "name": "daily_missing_prices",
      "level": "error",
      "sql": """
          SELECT COUNT(*)
          FROM raw_daily_prices
          WHERE ticker = :ticker
            {date_filter}
            AND (
              open IS NULL OR close IS NULL
            );
      """
    }
]


####
#### Daily QC - Full and Recent

In [17]:
# ====================== Daily QC - Full Data =======================

def qc_daily_full(engine, ticker):
    checks_sql = []
    for chk in DAILY_QC_CHECKS:
        date_filter, _ = build_date_filter("date", date_from=None)
        sql = chk["sql"].format(date_filter=date_filter)
        checks_sql.append({**chk, "sql": sql})

    df = run_qc_checks(engine, checks_sql, scope_params={"ticker": ticker})
    print("Full daily QC:")
    print(df)
    return df


# ====================== Daily QC - Incremental Data ====================

def qc_daily_incremental(engine, ticker, days_back=1):
    date_from = date.today() - timedelta(days=days_back)

    checks_sql = []
    for chk in DAILY_QC_CHECKS:
        date_filter, _ = build_date_filter("date", date_from=date_from)
        sql = chk["sql"].format(date_filter=date_filter)
        checks_sql.append({**chk, "sql": sql})

    params = {"ticker": ticker, "date_from": date_from}
    df = run_qc_checks(engine, checks_sql, scope_params=params)
    print(f"Incremental daily QC from {date_from}:")
    print(df)
    return df


###
### Executing Quality Check steps for Historic Load

In [27]:
# Commenting this part as historic QC was needed only once, but can be reused later if required

# qc_hourly = qc_hourly_full(engine, 'TCS.NS')

Full hourly QC:
                       name  level  value
0     hourly_duplicate_keys  error      0
1        hourly_null_prices  error      0
2  hourly_future_timestamps   warn      0


In [28]:
# Commenting this part as historic QC was needed only once, but can be reused later if required

# qc_daily = qc_daily_full(engine, 'TCS.NS')

Full daily QC:
                    name  level  value
0  daily_duplicate_dates  error      0
1   daily_missing_prices  error      0


###
### Executing Quality Check steps for Incremental Load

In [None]:
def run_ingestion_pipeline():
    qc_hourly = qc_hourly_incremental(engine, TICKER, days_back=1)

    qc_daily = qc_daily_incremental(engine, TICKER, days_back=1)


###
### Quality Checks Actions

In [31]:
# ========================== Actions on Hourly QC results ==============================

from sqlalchemy import text

def apply_hourly_qc_actions(engine, qc_df):
    """
    Deletes invalid HOURLY records based on QC findings:
    - Rows with NULL price/volume
    - Duplicate (ticker, ts, interval) rows (keeps one)
    """

    with engine.begin() as conn:

        # ---------- 1. Delete NULL / missing rows ----------
        try:
            null_issues = int(
                qc_df.loc[
                    qc_df["name"] == "hourly_null_or_missing_values", "value"
                ].iloc[0]
            )
        except IndexError:
            null_issues = 0

        if null_issues > 0:
            print(f"üßπ Deleting {null_issues} hourly rows with NULL values")

            conn.execute(text("""
                DELETE FROM raw_hourly_prices
                WHERE ticker = :ticker
                  AND (
                        open IS NULL
                     OR high IS NULL
                     OR low IS NULL
                     OR close IS NULL
                     OR adj_close IS NULL
                     OR volume IS NULL
                  );
            """), {"ticker": TICKER})

        else:
          print("‚úÖ No missing hourly prices found")

        # ---------- 2. Delete duplicate key rows ----------
        try:
            dup_issues = int(
                qc_df.loc[
                    qc_df["name"] == "hourly_duplicate_keys", "value"
                ].iloc[0]
            )
        except IndexError:
            dup_issues = 0

        if dup_issues > 0:
            print(f"üßπ Removing duplicate hourly rows (keeping one per key)")

            # Use ctid trick to delete "older" duplicates
            conn.execute(text("""
                DELETE FROM raw_hourly_prices a
                USING raw_hourly_prices b
                WHERE a.ctid < b.ctid
                  AND a.ticker  = b.ticker
                  AND a.ts      = b.ts
                  AND a.interval = b.interval;
            """))
        else:
          print("‚úÖ No duplicate hourly records found")


In [30]:
# ================================ Actions on Daily QC results =========================

def apply_daily_qc_actions(engine, qc_df):
    """
    Deletes invalid DAILY records based on QC findings:
    - Rows with NULL price/volume
    - Duplicate (ticker, date) rows (keeps one)
    """

    with engine.begin() as conn:

        # ---------- 1. Delete NULL / missing rows ----------
        try:
            null_issues = int(
                qc_df.loc[
                    qc_df["name"] == "daily_null_or_missing_values", "value"
                ].iloc[0]
            )
        except IndexError:
            null_issues = 0

        if null_issues > 0:
            print(f"üßπ Deleting {null_issues} daily rows with NULL values")

            conn.execute(text("""
                DELETE FROM raw_daily_prices
                WHERE ticker = :ticker
                  AND (
                        open IS NULL
                     OR high IS NULL
                     OR low IS NULL
                     OR close IS NULL
                     OR adj_close IS NULL
                     OR volume IS NULL
                  );
            """), {"ticker": TICKER})

        else:
          print("‚úÖ No missing daily prices found")

        # ---------- 2. Delete duplicate (ticker, date) rows ----------
        try:
            dup_issues = int(
                qc_df.loc[
                    qc_df["name"] == "daily_duplicate_dates", "value"
                ].iloc[0]
            )
        except IndexError:
            dup_issues = 0

        if dup_issues > 0:
            print(f"üßπ Removing duplicate daily rows (keeping one per key)")

            conn.execute(text("""
                DELETE FROM raw_daily_prices a
                USING raw_daily_prices b
                WHERE a.ctid < b.ctid
                  AND a.ticker = b.ticker
                  AND a.date   = b.date;
            """))

        else:
          print("‚úÖ No duplicate daily records found")


In [33]:
# Calling action function

apply_hourly_qc_actions(engine, qc_hourly)
print("\n\n")
apply_daily_qc_actions(engine, qc_daily)

‚úÖ No missing hourly prices found
‚úÖ No duplicate hourly records found



‚úÖ No missing daily prices found
‚úÖ No duplicate daily records found
