In [20]:
from sqlalchemy import create_engine
import pandas as pd
from dotenv import load_dotenv
import os

load_dotenv()

DB_URL_psycopg2 = os.getenv("DATABASE_URL_PSYCOPG2","postgresql+psycopg2://postgres:arise#007@localhost:5432/marketsentinel")

engine=create_engine(DB_URL_psycopg2)

conn=engine.connect()

query="""
SELECT symbol, timestamp, open,high,low,close,volume,interval,source FROM marketsentinel.market_data
ORDER BY symbol, timestamp ASC
"""

df=pd.read_sql(query, conn)

df.head()

Unnamed: 0,symbol,timestamp,open,high,low,close,volume,interval,source
0,AAPL,2015-01-02 00:00:00+00:00,27.8475,27.86,26.8375,27.3325,212818400.0,,
1,AAPL,2015-01-05 00:00:00+00:00,27.0725,27.1625,26.3525,26.5625,257142000.0,,
2,AAPL,2015-01-06 00:00:00+00:00,26.635,26.8575,26.1575,26.565,263188400.0,,
3,AAPL,2015-01-07 00:00:00+00:00,26.8,27.049999,26.675,26.9375,160423600.0,,
4,AAPL,2015-01-08 00:00:00+00:00,27.3075,28.0375,27.175,27.9725,237458000.0,,


In [2]:
# --- Cell 0: Daily Aggregation ---
df = df.sort_index()

# Normalize timestamp
if df.index.name == "timestamp":
    df = df.reset_index()
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
df = df.set_index("timestamp").sort_index()

# Make tz-naive
df.index = df.index.tz_convert(None)

# Aggregate to 1-day OHLCV
df_daily = (
    df.groupby("symbol")
      .resample("1D")
      .agg({
          "open": "first",
          "high": "max",
          "low": "min",
          "close": "last",
          "volume": "sum"
      })
      .reset_index()
)

df = df_daily.set_index("timestamp").sort_index()

# Deduplicate safety
df = df[~df.index.duplicated()]
print("Daily DF:", df.shape)


Daily DF: (3960, 6)


In [3]:
def find_gap_blocks(series):
    isna = series.isna().astype(int)
    blocks = {}
    if isna.sum() == 0:
        return blocks

    block_id = (isna.diff() == 1).cumsum() * isna
    for b in block_id.unique():
        if b == 0: continue
        idx = block_id[block_id == b].index
        blocks[int(b)] = {
            "days": len(idx),
            "start": idx.min(),
            "end": idx.max()
        }
    return blocks


In [5]:
import numpy as np
def simple_imputer_v2(g):
    """
    Simplified hybrid:
    - Daily OHLCV already prepared
    - Tiny gaps (<=2 BD): ffill
    - Medium/large gaps (>2 BD): keep as NaN (mask)
    - Weekends preserved
    """
    g = g.sort_index()
    g = g[~g.index.duplicated()]

    # Business-day baseline
    bd = g.resample("B").asfreq()

    # Tiny-gap ffill limit=2
    bd = bd.ffill(limit=2)

    # Detect medium/large gaps & mask explicitly
    gaps = find_gap_blocks(bd["close"])

    for blk in gaps.values():
        if blk["days"] > 2:
            bd.loc[blk["start"]:blk["end"],
                   ["open","high","low","close","volume"]] = np.nan

    # Attach weekends without duplicates
    weekends = g[g.index.weekday >= 5]
    weekends = weekends[~weekends.index.duplicated()]
    bd = bd[bd.index.weekday < 5]  # keep Monâ€“Fri
    final = pd.concat([bd, weekends]).sort_index()

    # Final dedupe
    final = final[~final.index.duplicated()]

    return final


In [6]:
cleaned_list = []

for sym, g in df.groupby("symbol"):
    out = simple_imputer_v2(g)
    out["symbol"] = sym
    cleaned_list.append(out)

final_clean_df = pd.concat(cleaned_list)
final_clean_df = final_clean_df.reset_index().set_index(["symbol","timestamp"]).sort_index()

print("Final cleaned shape:", final_clean_df.shape)


Final cleaned shape: (180106, 5)


In [7]:
final_clean_df["gap_flag"] = final_clean_df["close"].isna().astype(int)
final_clean_df["tiny_gap_filled"] = final_clean_df["close"].ffill(limit=2).isna().astype(int)


In [9]:
final_clean_df.to_parquet("imputed_data/final_clean_simple.parquet")
print("Saved: final_clean_simple.parquet")


Saved: final_clean_simple.parquet
