In [4]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
from functools import lru_cache
import re
from datetime import datetime
import os, json, time, tempfile

In [2]:
df_statements = pd.read_csv("data/annual_statements.csv")
all_tickers = df_statements['tic'].unique().tolist()
all_tickers[:10]

['AIR',
 '4165A',
 'ADCT.1',
 'IWKS',
 'ALO.2',
 'AEN.2',
 'AAL',
 '4267A',
 'CECE',
 'ARXX']

In [None]:
START = "2003-01-01"
# yfinance's `end` is exclusive; use Aug 1 to include all of July 2025
END   = "2025-08-01"
OUT_PATH = "data/monthly_returns.csv"
LOG_PATH = "data/monthly_returns.log.jsonl"
os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True)

def normalize_for_yahoo(sym: str) -> list[str]:
    """
    Produce candidate Yahoo symbols from a Compustat-like ticker.
    e.g., 'BRK.B' -> ['BRK-B','BRK.B'], 'GKIN.1' -> ['GKIN','GKIN-1','GKIN.1']
    """
    s = str(sym).strip().upper()
    cands = [s]
    if "." in s:
        cands.append(s.replace(".", "-"))
        # drop trailing .<digits> like GKIN.1
        if re.search(r"\.\d+$", s):
            cands.append(re.sub(r"\.\d+$", "", s))
    if "/" in s:
        cands.append(s.replace("/", "-"))
    # de-dup while preserving order
    seen, out = set(), []
    for x in cands:
        if x and x not in seen:
            out.append(x); seen.add(x)
    return out

@lru_cache(maxsize=None)
def _download_monthly_adjclose(sym: str, start: str = START, end: str = END) -> pd.Series | None:
    """
    Try normalized candidates; return a clean monthly adjusted close Series or None.
    """
    for cand in normalize_for_yahoo(sym):
        try:
            df = yf.download(cand, start=start, end=end, interval="1mo",
                             auto_adjust=True, progress=False, keepna=False)
            if not df.empty:
                col = "Adj Close" if "Adj Close" in df.columns else "Close"
                px = df[col].dropna()
                if not px.empty:
                    px.index = px.index.to_period('M')
                    return px
        except Exception:
            # swallow and try next candidate
            pass
    return None

def _atomic_save_csv(df: pd.DataFrame, path: str):
    """Write CSV atomically to avoid partial files on interruption."""
    dir_ = os.path.dirname(path) or "."
    with tempfile.NamedTemporaryFile("w", dir=dir_, delete=False, suffix=".tmp") as tmp:
        tmp_path = tmp.name
        df.to_csv(tmp_path, index=True)
    os.replace(tmp_path, path)

def _append_log(event: dict, path: str = LOG_PATH):
    event = dict(event)
    event["ts"] = datetime.utcnow().isoformat(timespec="seconds") + "Z"
    with open(path, "a") as f:
        f.write(json.dumps(event) + "\n")

def monthly_returns_df(tickers: list[str],
                       start: str = START,
                       end: str = END,
                       min_non_na: int = 1,
                       as_period_index: bool = True,
                       out_path: str = OUT_PATH) -> pd.DataFrame:
    """
    Build monthly returns for tickers from Jan 2003 to Jul 2025.
    Checkpoints to CSV after each ticker, and resumes from that CSV if present.
    Columns are the ORIGINAL tickers you passed in.
    """
    # Master monthly grid
    full_pidx = pd.period_range("2003-01", "2025-07", freq="M")

    # Resume if an output file already exists
    if os.path.exists(out_path):
        prev = pd.read_csv(out_path, index_col=0)
        # Convert index back to PeriodIndex if it isn't already
        try:
            prev.index = pd.PeriodIndex(prev.index, freq="M")
        except Exception:
            # If index looks like timestamps, convert then to Period
            prev.index = pd.to_datetime(prev.index).to_period("M")
        # Align to our master grid
        prev = prev.reindex(full_pidx)
        done = set(prev.columns)
        df = prev
        _append_log({"event": "resume", "columns": sorted(list(done)), "n_cols": len(done)})
        print(f"Resuming from '{out_path}' with {len(done)} columns.")
    else:
        df = pd.DataFrame(index=full_pidx)
        done = set()
        print("Starting fresh.")

    skipped = []
    attempted = 0
    added = 0

    for t in tickers:
        if t in done:
            continue

        attempted += 1
        # simple backoff loop for transient rate-limit errors
        for attempt in range(4):  # up to 4 tries with backoff
            try:
                px = _download_monthly_adjclose(t, start=start, end=end)
                break
            except Exception as e:
                wait = 2 ** attempt
                _append_log({"event": "exception", "ticker": t, "attempt": attempt+1, "error": repr(e)})
                time.sleep(wait)
        else:
            px = None  # all attempts failed

        if px is None or len(px) <= 2:
            skipped.append(t)
            _append_log({"event": "skip", "ticker": t, "reason": "no_data_or_short"})
            continue

        # compute monthly returns; ensure Series
        rets = px.pct_change().dropna()
        if isinstance(rets, pd.DataFrame):
            rets = rets.squeeze()

        # column DF named by original ticker, aligned to master grid
        col_df = pd.DataFrame(rets)
        col_df.columns = [t]
        col_df = col_df.reindex(full_pidx)

        # Merge into our accumulating DF (preserving any existing columns)
        df[t] = col_df[t]
        added += 1
        done.add(t)

        # Optional prune ultra-sparse columns on the fly
        if min_non_na > 1 and df[t].count() < min_non_na:
            # remove and mark skipped if too sparse
            df.drop(columns=[t], inplace=True)
            done.discard(t)
            skipped.append(t)
            _append_log({"event": "drop_sparse", "ticker": t, "non_na": int(col_df[t].count())})
        else:
            # Checkpoint after each successful add
            _atomic_save_csv(df, out_path)
            _append_log({"event": "checkpoint", "ticker": t, "n_cols": int(df.shape[1])})
            print(f"Saved checkpoint with '{t}' (now {df.shape[1]} cols).")

    if not df.columns.tolist():
        print("No tickers returned data.")
        # keep index as requested
        if not as_period_index:
            out = df.copy()
            out.index = out.index.to_timestamp('M')
            return out
        return df

    # Final tidy: drop ultra-sparse columns globally if needed
    if min_non_na > 1:
        keep = df.columns[df.count() >= min_non_na]
        if len(keep) != df.shape[1]:
            dropped = sorted(set(df.columns) - set(keep))
            df = df[keep]
            _append_log({"event": "final_drop_sparse", "dropped": dropped})

    if skipped:
        print(f"Skipped (no/short data): {len(skipped)} â†’ {sorted(skipped)[:20]}{' ...' if len(skipped)>20 else ''}")
    print(f"Built monthly returns for {df.shape[1]} tickers, {df.shape[0]} months. "
          f"(attempted: {attempted}, added: {added})")

    # Save final snapshot (idempotent)
    _atomic_save_csv(df, out_path)
    _append_log({"event": "final_save", "n_cols": int(df.shape[1])})

    # Return with desired index type
    if as_period_index:
        return df
    out = df.copy()
    out.index = out.index.to_timestamp('M')  # month-end timestamps
    return out

# -------- Example usage --------
# all_tickers = ["AAPL", "MSFT", "BRK.B", "GOOG", "ACEL.", "BFTC", "XOM", "BRK.A"]
df_mret = monthly_returns_df(all_tickers, out_path=OUT_PATH)
print(df_mret.tail())

Resuming from 'data/monthly_returns.csv' with 460 columns.



1 Failed download:
['4165A']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['IWKS']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['ALO.2']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['ALO-2']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['ALO']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['AEN.2']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['AEN-2']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['AEN']: YFPricesMissingError('possibly delisted; no price data found  (1mo 2003-01-01 -> 2025-08-01)')

1 Failed download:
['4267A']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['CECE']: YFTzMissingError('possibly delisted; no timezone found')

1 Failed download:
['ARXX']: YFPricesMissingError('possibly delisted; no price data fou