In [30]:
"""
compute_technical.py

Computes technical indicators from price_data and writes the results to a new PostgreSQL table.
"""

import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")

# Setup DB connection
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()

# Load price data
query = """
SELECT ticker, date, close, volume
FROM price_data
ORDER BY ticker, date ASC;
"""
df = pd.read_sql(query, engine, parse_dates=["date"])

# Compute technical indicators per ticker
def compute_indicators(group):
    group = group.sort_values("date").copy()
    group["return_1d"] = group["close"].pct_change()
    group["sma_5"] = group["close"].rolling(window=5).mean()
    group["sma_20"] = group["close"].rolling(window=20).mean()
    group["ema_10"] = group["close"].ewm(span=10, adjust=False).mean()
    group["rsi_14"] = compute_rsi(group["close"], 14)
    group["macd"] = group["close"].ewm(span=12, adjust=False).mean() - group["close"].ewm(span=26, adjust=False).mean()
    return group

# RSI function
def compute_rsi(series, period=14):
    delta = series.diff()
    gain = delta.where(delta > 0, 0).rolling(window=period).mean()
    loss = -delta.where(delta < 0, 0).rolling(window=period).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

# Clean and apply without warning or dropping 'ticker'
features_df = (
    df.groupby("ticker", group_keys=False)
    .apply(compute_indicators)
    .reset_index(drop=True)
)

# Drop duplicates before inserting
unique_keys = features_df[['ticker', 'date']].drop_duplicates()

for i in range(len(unique_keys)):
    ticker = unique_keys.iloc[i]['ticker']
    date_val = pd.to_datetime(unique_keys.iloc[i]['date']).date()

    session.execute(
        text("DELETE FROM technical_features WHERE ticker = :ticker AND date = :date"),
        {"ticker": ticker, "date": date_val}
    )
session.commit()

# Append new data
features_df.to_sql('technical_features', engine, if_exists='append', index=False)
print("✅ Technical features deduplicated and saved to 'technical_features' table in PostgreSQL")


  .apply(compute_indicators)


✅ Technical features deduplicated and saved to 'technical_features' table in PostgreSQL


In [31]:
"""
merge_features.py

Uses merge_asof for fundamental and macroeconomic time-aware joins.
Merges technical, sentiment, macro, and fundamental features into a single table `merged_features`.
"""

import os
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
from datetime import datetime, timezone

# Load env vars and setup engine
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL)

def safe_load_sql(table, date_col="date"):
    try:
        return pd.read_sql(f"SELECT * FROM {table}", engine, parse_dates=[date_col])
    except Exception as e:
        print(f"⚠️ Skipping {table}: {e}")
        return pd.DataFrame()

# Load tables
technical_df = safe_load_sql("technical_features")
sentiment_df = safe_load_sql("sentiment_data")
macro_df = safe_load_sql("macro_data")
fundamentals_df = safe_load_sql("fundamental_data")

# Aggregate sentiment
if not sentiment_df.empty:
    sentiment_agg = sentiment_df.groupby(["ticker", "date"]).agg(
        sentiment_avg=("sentiment_score", "mean"),
        sentiment_std=("sentiment_score", "std"),
        sentiment_count=("sentiment_score", "count")
    ).reset_index()
else:
    sentiment_agg = pd.DataFrame()

# Merge base technical + sentiment
merged = pd.merge(technical_df, sentiment_agg, on=["ticker", "date"], how="left")
merged = merged.sort_values(["ticker", "date"])

# Merge fundamentals using merge_asof
if not fundamentals_df.empty and "metric" in fundamentals_df.columns:
    fundamentals_df = fundamentals_df.pivot(index=["symbol", "date"], columns="metric", values="value").reset_index()
    fundamentals_df.rename(columns={"symbol": "ticker"}, inplace=True)
    fundamentals_df = fundamentals_df.sort_values(["ticker", "date"])
    merged = pd.merge_asof(
        merged, fundamentals_df,
        by="ticker", on="date",
        direction="backward"
    )

# Merge macro data using merge_asof
if not macro_df.empty and "indicator_name" in macro_df.columns:
    macro_df = macro_df.pivot(index="date", columns="indicator_name", values="value").reset_index()
    macro_df = macro_df.sort_values("date")
    merged = pd.merge_asof(
        merged, macro_df,
        on="date",
        direction="backward"
    )

# Add merge timestamp
merged["merged_at"] = datetime.now(timezone.utc)

# Save to DB
merged.to_sql("merged_features", engine, if_exists="replace", index=False)
print(f"✅ Saved {len(merged)} rows to 'merged_features' table.")


✅ Saved 6641 rows to 'merged_features' table.
