In [85]:
# standard imports and config
import os
import time
from datetime import datetime, timedelta
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from io import StringIO
from kiteconnect import KiteConnect
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

# load .env if present

load_dotenv()  

# DB connection string: prefer env var for security
DB_URI = os.getenv("RAILWAY_DB_URI") or "postgresql://postgres:ZpfLDFFOLJemAIEkOTBpEjCuBWYyIwSm@switchback.proxy.rlwy.net:19114/railway"

# Kite credentials - you must set these as env vars or replace here (not recommended to hardcode)
KITE_API_KEY = os.getenv("KITE_API_KEY")  # e.g. "your_api_key"
KITE_API_SECRET = os.getenv("KITE_API_SECRET")
KITE_ACCESS_TOKEN = os.getenv("KITE_ACCESS_TOKEN")  # if you have an access token already

# create sqlalchemy engine (connection pooling + convenience)

engine = create_engine(DB_URI)


In [86]:
kite = KiteConnect(api_key=KITE_API_KEY)
kite.set_access_token(access_token=KITE_ACCESS_TOKEN)


In [87]:
# create tables if not exists using raw SQL
create_sql = """
CREATE TABLE IF NOT EXISTS instruments (
  instrument_token BIGINT PRIMARY KEY,
  tradingsymbol TEXT NOT NULL,
  name TEXT,
  exchange TEXT,
  segment TEXT,
  instrument_type TEXT,
  lot_size INTEGER,
  tick_size NUMERIC(18,8),
  is_active BOOLEAN DEFAULT TRUE,
  metadata JSONB,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
  updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);

CREATE TABLE IF NOT EXISTS ohlcv (
  id BIGSERIAL PRIMARY KEY,
  instrument_token BIGINT NOT NULL REFERENCES instruments(instrument_token),
  tradingsymbol TEXT NOT NULL,
  exchange TEXT NOT NULL,
  interval TEXT NOT NULL,
  ts TIMESTAMP WITH TIME ZONE NOT NULL,
  open NUMERIC(18,6) NOT NULL,
  high NUMERIC(18,6) NOT NULL,
  low NUMERIC(18,6) NOT NULL,
  close NUMERIC(18,6) NOT NULL,
  volume BIGINT NOT NULL,
  oi BIGINT NULL,
  raw JSONB NULL,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);

-- unique constraint to avoid duplicate candles
DO $$
BEGIN
  IF NOT EXISTS (
    SELECT 1 FROM pg_indexes WHERE schemaname = 'public' AND indexname = 'ux_ohlcv_inst_int_ts'
  ) THEN
    CREATE UNIQUE INDEX ux_ohlcv_inst_int_ts ON ohlcv(instrument_token, interval, ts);
  END IF;
END$$;

-- index for fast range queries
CREATE INDEX IF NOT EXISTS idx_ohlcv_inst_ts ON ohlcv(instrument_token, ts DESC);
CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_ts ON ohlcv(tradingsymbol, ts DESC);
"""

with engine.connect() as conn:
    conn.execute(text(create_sql))
    conn.commit()

print("Tables and indexes created/ensured.")


Tables and indexes created/ensured.


In [88]:

def fetch_historical(instrument_token, from_date, to_date, interval='5minute', max_chunk_days=None):
    """
    Fetch historical data from Kite Connect API.
    Returns a DataFrame with columns: ts, open, high, low, close, volume, oi
    """
    # Map our interval names to Kite's expected format
    interval_map = {
        '1minute': 'minute',
        '3minute': '3minute',
        '5minute': '5minute',
        '15minute': '15minute',
        '30minute': '30minute',
        '60minute': '60minute',
        '180minute': '60minute',  # Kite doesn't support 180min, use 60min
        '1day': 'day',
        '1week': 'week',
        '1month': 'month',
        '1year': 'day'  # Kite doesn't support year, use day and aggregate later
    }
    
    kite_interval = interval_map.get(interval, interval)
    
    try:
        # Kite API expects dates in specific format
        from_dt = from_date if isinstance(from_date, datetime) else pd.to_datetime(from_date)
        to_dt = to_date if isinstance(to_date, datetime) else pd.to_datetime(to_date)
        
        # Add small delay to avoid rate limiting
        time.sleep(0.25)
        
        # Fetch data from Kite
        records = kite.historical_data(
            instrument_token=instrument_token,
            from_date=from_dt,
            to_date=to_dt,
            interval=kite_interval,
            continuous=False,
            oi=True
        )
        
        if not records:
            return pd.DataFrame()
        
        # Convert to DataFrame
        df = pd.DataFrame(records)
        
        # Rename 'date' column to 'ts' if it exists
        if 'date' in df.columns:
            df = df.rename(columns={'date': 'ts'})
        
        return df
        
    except Exception as e:
        err_str = str(e)
        # If 503 or rate limit, raise to trigger retry with backoff
        if '503' in err_str or 'rate' in err_str.lower() or 'too many' in err_str.lower():
            print(f"Rate limit/503 error: {e}")
            raise  # Let retry logic handle it
        print(f"Error fetching historical data: {e}")
        return pd.DataFrame()

In [89]:
import re

def normalize_sym(s):
    """normalize ticker strings for matching (uppercase, remove punctuation/spaces)"""
    s = str(s).upper()
    s = re.sub(r'[\s\.\&\/\(\)\,\'\"]', '', s)
    s = s.replace('-', '')
    return s

def df_to_postgres_copy(df, table_name, conn, columns=None, commit=True):
    """
    fast bulk insert using COPY FROM STDIN
    df: pandas DataFrame must have columns matching the target table order
    conn: psycopg2 connection
    columns: list of column names to specify in COPY command (optional)
    commit: whether to commit after COPY (default True)
    """
    buf = StringIO()
    df.to_csv(buf, index=False, header=False, na_rep='\\N')  # nulls as \N for COPY
    buf.seek(0)
    cur = conn.cursor()
    try:
        # If columns specified, use them in COPY command
        if columns:
            cols_str = ', '.join(columns)
            copy_sql = f"COPY {table_name} ({cols_str}) FROM STDIN WITH (FORMAT CSV)"
        else:
            copy_sql = f"COPY {table_name} FROM STDIN WITH (FORMAT CSV)"
        cur.copy_expert(copy_sql, buf)
        if commit:
            conn.commit()
    except Exception as e:
        conn.rollback()
        raise
    finally:
        cur.close()


In [90]:
# path to your matched CSV (from earlier step)
matched_csv = "./data/nifty500_instruments_matched.csv"

# read CSV
df_inst = pd.read_csv(matched_csv, dtype=str)  # read as strings to be safe
df_inst = df_inst.fillna('')  # replace NaNs with empty string for fields

# convert numeric-ish columns if they exist
if 'instrument_token' in df_inst.columns:
    df_inst['instrument_token'] = df_inst['instrument_token'].astype(int)
if 'lot_size' in df_inst.columns:
    df_inst['lot_size'] = pd.to_numeric(df_inst['lot_size'], errors='coerce').fillna(1).astype(int)
if 'tick_size' in df_inst.columns:
    df_inst['tick_size'] = pd.to_numeric(df_inst['tick_size'], errors='coerce').fillna(0.0)

# prepare rows and upsert into instruments table
rows = []
for _, r in df_inst.iterrows():
    rows.append((
        int(r.get('instrument_token', 0)),
        r.get('tradingsymbol', ''),
        r.get('name', ''),
        r.get('exchange', ''),
        r.get('segment', ''),
        r.get('instrument_type', ''),
        int(r.get('lot_size', 1)) if r.get('lot_size','')!='' else 1,
        float(r.get('tick_size', 0.0)) if r.get('tick_size','')!='' else 0.0,
        True,
        None
    ))

upsert_sql = """
INSERT INTO instruments (
  instrument_token, tradingsymbol, name, exchange, segment, instrument_type, lot_size, tick_size, is_active, metadata
) VALUES %s
ON CONFLICT (instrument_token) DO UPDATE
SET
  tradingsymbol = EXCLUDED.tradingsymbol,
  name = EXCLUDED.name,
  exchange = EXCLUDED.exchange,
  segment = EXCLUDED.segment,
  instrument_type = EXCLUDED.instrument_type,
  lot_size = EXCLUDED.lot_size,
  tick_size = EXCLUDED.tick_size,
  is_active = EXCLUDED.is_active,
  metadata = EXCLUDED.metadata,
  updated_at = now()
"""

conn = psycopg2.connect(DB_URI)
try:
    cur = conn.cursor()
    execute_values(cur, upsert_sql, rows, template=None, page_size=500)
    conn.commit()
    print("Instruments upserted:", len(rows))
finally:
    cur.close()
    conn.close()


Instruments upserted: 498


In [91]:
# Re-define ingest_candles_df (uses df_to_postgres_copy from earlier cells)
import psycopg2
import pandas as pd

def ingest_candles_df(df_candles, instrument_token, tradingsymbol, exchange='NSE', interval='5minute'):
    """
    Normalize df_candles and bulk insert into ohlcv table using COPY to temp table + INSERT ON CONFLICT.
    Returns number of inserted rows.
    """
    df = df_candles.copy()
    if df.empty:
        print("no rows to ingest")
        return 0

    # attach metadata columns expected by DB
    df['tradingsymbol'] = tradingsymbol
    df['instrument_token'] = int(instrument_token)
    df['exchange'] = exchange
    df['interval'] = interval

    # ensure ts is timezone-aware UTC
    df['ts'] = pd.to_datetime(df['ts'])
    if df['ts'].dt.tz is None:
        df['ts'] = df['ts'].dt.tz_localize('UTC')

    # ensure numeric types
    for col in ['open','high','low','close']:
        df[col] = df[col].astype(float)

    df['volume'] = df.get('volume', pd.Series([0]*len(df))).fillna(0).astype(int)
    if 'oi' not in df.columns:
        df['oi'] = None

    # reorder to match COPY target (excluding auto-generated id and raw columns)
    df_to_copy = df[['instrument_token','tradingsymbol','exchange','interval','ts','open','high','low','close','volume','oi']]

    conn = psycopg2.connect(DB_URI)
    cur = conn.cursor()
    try:
        # Create temp table with same structure
        cur.execute("""
            CREATE TEMP TABLE temp_ohlcv (
                instrument_token BIGINT,
                tradingsymbol TEXT,
                exchange TEXT,
                interval TEXT,
                ts TIMESTAMP WITH TIME ZONE,
                open NUMERIC(18,6),
                high NUMERIC(18,6),
                low NUMERIC(18,6),
                close NUMERIC(18,6),
                volume BIGINT,
                oi BIGINT
            ) ON COMMIT DROP
        """)
        
        # COPY to temp table (no conflicts possible) - don't commit yet to keep temp table
        columns = ['instrument_token','tradingsymbol','exchange','interval','ts','open','high','low','close','volume','oi']
        df_to_postgres_copy(df_to_copy, 'temp_ohlcv', conn, columns=columns, commit=False)
        
        # Insert from temp to main table with ON CONFLICT DO NOTHING
        cur.execute("""
            INSERT INTO ohlcv (instrument_token, tradingsymbol, exchange, interval, ts, open, high, low, close, volume, oi)
            SELECT instrument_token, tradingsymbol, exchange, interval, ts, open, high, low, close, volume, oi
            FROM temp_ohlcv
            ON CONFLICT (instrument_token, interval, ts) DO NOTHING
        """)
        
        inserted = cur.rowcount
        conn.commit()
        
        if inserted > 0:
            print(f"Inserted {inserted} new rows for {tradingsymbol} ({instrument_token}) interval={interval}")
        else:
            print(f"No new rows for {tradingsymbol} ({instrument_token}) interval={interval} (all duplicates)")
        return inserted
    except Exception as e:
        conn.rollback()
        raise
    finally:
        cur.close()
        conn.close()


In [92]:
# intervals you asked for
INTERVALS = [
    '1minute', '3minute', '5minute', '15minute', '30minute',
    '60minute', '180minute', '1day', '1week', '1month', '1year'
]

# map some alternate names Kite may accept: '1hour' -> '60minute', '3hour' -> '180minute'
# if Kite doesn't accept e.g. '1year' you can skip; the fetch function will just return empty for unsupported intervals.
# chunk size (days) to request per API call per interval â€” conservative defaults
CHUNK_DAYS_BY_INTERVAL = {
    '1minute': 7,     # small chunks for 1m
    '3minute': 10,
    '5minute': 14,
    '15minute': 30,
    '30minute': 45,
    '60minute': 90,
    '180minute': 180,
    '1day': 365,      # 1 year per chunk is okay for daily
    '1week': 365*3,   # weeks: bigger chunk
    '1month': 365*5,
    '1year': 365*20,  # asking for multi-year might be accepted as single rows; keep large chunk
}

# canonicalize interval names used by our fetch/ingest functions (if you used '1hour' earlier replace with 60minute)
# ensure CHUNK_DAYS_BY_INTERVAL contains each INTERVAL
for itv in INTERVALS:
    if itv not in CHUNK_DAYS_BY_INTERVAL:
        CHUNK_DAYS_BY_INTERVAL[itv] = 90

print("Intervals to fetch:", INTERVALS)


Intervals to fetch: ['1minute', '3minute', '5minute', '15minute', '30minute', '60minute', '180minute', '1day', '1week', '1month', '1year']


In [93]:
# create an ingest_jobs table to track progress per instrument_token + interval
create_jobs_sql = """
CREATE TABLE IF NOT EXISTS ingest_jobs (
  job_id BIGSERIAL PRIMARY KEY,
  instrument_token BIGINT NOT NULL,
  tradingsymbol TEXT NOT NULL,
  interval TEXT NOT NULL,
  start_ts TIMESTAMP WITH TIME ZONE NOT NULL,
  end_ts TIMESTAMP WITH TIME ZONE NOT NULL,
  last_ingested_ts TIMESTAMP WITH TIME ZONE,
  status TEXT NOT NULL DEFAULT 'pending',   -- pending, running, done, error
  last_error TEXT,
  updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);

CREATE UNIQUE INDEX IF NOT EXISTS ux_ingest_job_inst_int ON ingest_jobs(instrument_token, interval, start_ts, end_ts);
"""

with engine.connect() as conn:
    conn.execute(text(create_jobs_sql))
    conn.commit()

print("ingest_jobs table ensured.")


ingest_jobs table ensured.


In [94]:
from datetime import timedelta

def make_date_chunks(start_dt, end_dt, chunk_days):
    """
    Yield (chunk_start, chunk_end) datetimes from start_dt to end_dt inclusive,
    each chunk with length chunk_days.
    """
    cur = start_dt
    while cur < end_dt:
        nxt = min(cur + timedelta(days=chunk_days), end_dt)
        yield (cur, nxt)
        cur = nxt + timedelta(seconds=1)  # avoid overlapping by 1s


In [95]:
import traceback
from sqlalchemy import text as sql_text

def process_instrument_interval(instrument_token, tradingsymbol, interval, start_ts, end_ts,
                                chunk_days=None, max_retries=3, sleep_between_chunks=0.5):
    """
    Processes one instrument + one interval across time range [start_ts, end_ts].
    - Uses ingest_jobs to record progress and allow resume.
    - chunk_days: override; otherwise uses CHUNK_DAYS_BY_INTERVAL.
    Returns: dict with summary: {'inserted': int, 'chunks': int, 'status': 'done'/'error'}
    """
    chunk_days = chunk_days or CHUNK_DAYS_BY_INTERVAL.get(interval, 90)
    inserted_total = 0
    chunks = 0

    # upsert a job row (or fetch existing)
    with engine.begin() as conn:
        existing = conn.execute(sql_text("""
            SELECT job_id, last_ingested_ts, status FROM ingest_jobs
            WHERE instrument_token = :it AND interval = :itv AND start_ts = :s AND end_ts = :e
            FOR UPDATE
        """), {"it": instrument_token, "itv": interval, "s": start_ts, "e": end_ts}).fetchone()

        if existing is None:
            r = conn.execute(sql_text("""
                INSERT INTO ingest_jobs (instrument_token, tradingsymbol, interval, start_ts, end_ts, status)
                VALUES (:it, :sym, :itv, :s, :e, :status)
                RETURNING job_id
            """), {"it": instrument_token, "sym": tradingsymbol, "itv": interval, "s": start_ts, "e": end_ts, "status": "pending"})
            job_id = r.scalar()
            last_ingested_ts = None
            status = 'pending'
        else:
            job_id, last_ingested_ts, status = existing
            job_id = int(job_id)

    # if job is marked done skip
    if status == 'done':
        print(f"[skip] job {instrument_token} {tradingsymbol} {interval} already done.")
        return {"inserted": 0, "chunks": 0, "status": "done"}

    # determine resume start: if last_ingested_ts exists, resume after that
    resume_after = last_ingested_ts if last_ingested_ts is not None else start_ts

    try:
        # mark running
        with engine.begin() as conn:
            conn.execute(sql_text("UPDATE ingest_jobs SET status='running', updated_at=now(), last_error=NULL WHERE job_id=:jid"),
                         {"jid": job_id})

        for chunk_start, chunk_end in make_date_chunks(resume_after, end_ts, chunk_days):
            chunks += 1
            success = False
            attempts = 0

            # retry loop per chunk
            while not success and attempts < max_retries:
                attempts += 1
                try:
                    print(f"Fetching {tradingsymbol} ({instrument_token}) interval={interval} chunk {chunk_start} -> {chunk_end} (attempt {attempts})")
                    df_chunk = fetch_historical(instrument_token, chunk_start, chunk_end, interval=interval, max_chunk_days=chunk_days)
                    if df_chunk.empty:
                        print("  chunk returned empty.")
                    else:
                        # ingest
                        inserted = ingest_candles_df(df_chunk, instrument_token, tradingsymbol, exchange='NSE', interval=interval)
                        inserted_total += inserted
                    # update last_ingested_ts to chunk_end
                    with engine.begin() as conn:
                        conn.execute(sql_text("""
                            UPDATE ingest_jobs SET last_ingested_ts = :ts, updated_at = now()
                            WHERE job_id = :jid
                        """), {"ts": chunk_end, "jid": job_id})
                    success = True
                except Exception as e:
                    print(f"  error on attempt {attempts} for chunk: {e}")
                    traceback.print_exc()
                    time.sleep(1.0 * attempts)  # exponential-ish backoff
            # short polite pause between chunks
            time.sleep(sleep_between_chunks)

        # finished all chunks: mark done
        with engine.begin() as conn:
            conn.execute(sql_text("UPDATE ingest_jobs SET status='done', updated_at=now() WHERE job_id=:jid"), {"jid": job_id})
        print(f"[done] {tradingsymbol} {interval}: inserted_total={inserted_total}, chunks={chunks}")
        return {"inserted": inserted_total, "chunks": chunks, "status": "done"}
    except Exception as e:
        err = str(e)[:2000]
        with engine.begin() as conn:
            conn.execute(sql_text("UPDATE ingest_jobs SET status='error', last_error=:err, updated_at=now() WHERE job_id=:jid"),
                         {"err": err, "jid": job_id})
        print(f"[error] job failed: {e}")
        traceback.print_exc()
        return {"inserted": inserted_total, "chunks": chunks, "status": "error"}


In [96]:
# CELL X5: orchestrator - safe test run for N instruments and all desired intervals
from datetime import datetime, timezone

# configure what to process
LIMIT_INSTRUMENTS = 10   # change after testing
END_DATE = datetime.now(timezone.utc)

# Define different lookback periods for each interval
INTERVAL_LOOKBACK_YEARS = {
    '1minute': 1,      # 1 year of 1-minute data
    '3minute': 2,      # 2 years of 3-minute data
    '5minute': 2,      # 2 years of 5-minute data
    '15minute': 3,     # 3 years of 15-minute data
    '30minute': 3,     # 3 years of 30-minute data
    '60minute': 7,     # 7 years of 60-minute data
    '180minute': 7,    # 7 years of 3-hour data
    '1day': 20,        # max available daily data
    '1week': 20,       # max available weekly data
    '1month': 20,      # max available monthly data
    '1year': 20,       # max available yearly data
}

# read candidate instruments (NSE equities)
with engine.connect() as conn:
    df_candidates = pd.read_sql("""
      SELECT instrument_token, tradingsymbol, exchange
      FROM instruments
      WHERE exchange ILIKE 'NSE' AND (instrument_type ILIKE 'EQ' OR instrument_type = '')
      ORDER BY tradingsymbol
      LIMIT %s
    """, conn, params=(LIMIT_INSTRUMENTS,))

print("Found instruments:", len(df_candidates))
summary = []
for _, r in df_candidates.iterrows():
    tkn = int(r['instrument_token'])
    sym = r['tradingsymbol']
    exch = r['exchange'] or 'NSE'
    print("=== Processing instrument:", sym, tkn)
    for itv in INTERVALS:
        # Get lookback period for this interval
        lookback_years = INTERVAL_LOOKBACK_YEARS.get(itv, 3)
        start_date = END_DATE - timedelta(days=365*lookback_years)
        
        print(f"  {itv}: fetching {lookback_years} years of data")
        
        res = process_instrument_interval(
            instrument_token=tkn,
            tradingsymbol=sym,
            interval=itv,
            start_ts=start_date,
            end_ts=END_DATE,
            chunk_days=CHUNK_DAYS_BY_INTERVAL.get(itv, 90),
            max_retries=5,
            sleep_between_chunks=1.0
        )
        summary.append((sym, itv, lookback_years, res['status'], res.get('inserted',0), res.get('chunks',0)))
        # sleep between intervals to avoid rate limits
        time.sleep(2.0)

print("\nOrchestrator finished. Summary:")
print(f"{'Symbol':<12} {'Interval':<12} {'Years':<8} {'Status':<10} {'Inserted':<10} {'Chunks':<8}")
print("-" * 70)
for row in summary:
    print(f"{row[0]:<12} {row[1]:<12} {row[2]:<8} {row[3]:<10} {row[4]:<10} {row[5]:<8}")


Found instruments: 10
=== Processing instrument: 360ONE 3343617
  1minute: fetching 1 years of data
Fetching 360ONE (3343617) interval=1minute chunk 2024-11-26 12:32:28.772354+00:00 -> 2024-12-03 12:32:28.772354+00:00 (attempt 1)
No new rows for 360ONE (3343617) interval=1minute (all duplicates)
Fetching 360ONE (3343617) interval=1minute chunk 2024-12-03 12:32:29.772354+00:00 -> 2024-12-10 12:32:29.772354+00:00 (attempt 1)
No new rows for 360ONE (3343617) interval=1minute (all duplicates)
Fetching 360ONE (3343617) interval=1minute chunk 2024-12-10 12:32:30.772354+00:00 -> 2024-12-17 12:32:30.772354+00:00 (attempt 1)
No new rows for 360ONE (3343617) interval=1minute (all duplicates)
Fetching 360ONE (3343617) interval=1minute chunk 2024-12-17 12:32:31.772354+00:00 -> 2024-12-24 12:32:31.772354+00:00 (attempt 1)
No new rows for 360ONE (3343617) interval=1minute (all duplicates)
Fetching 360ONE (3343617) interval=1minute chunk 2024-12-24 12:32:32.772354+00:00 -> 2024-12-31 12:32:32.772354+

KeyboardInterrupt: 