In [61]:
import time
import requests
import pandas as pd
from typing import Optional, Tuple, Dict, List

# ----------------------------
# Config
# Access to SEC, URLs, and Revenue & Shares tags
# ----------------------------

#This is to access the API, otherwise the request will be denied
SEC_HEADERS = {
    "User-Agent": "Maria Esparza (maria.esparza2949@gmail.com)",  
    "Accept-Encoding": "gzip, deflate",
}

TICKER_CIK_URL = "https://www.sec.gov/files/company_tickers.json"
COMPANY_FACTS_URL = "https://data.sec.gov/api/xbrl/companyfacts/CIK{cik}.json"

REVENUE_TAGS = [
    "Revenues",
    "SalesRevenueNet",
    "RevenueFromContractWithCustomerExcludingAssessedTax",
]

SHARES_TAG = "CommonStockSharesOutstanding"


In [62]:
# ----------------------------
# Helper Functions
# ----------------------------

'''
Get a JSON payload from SEC-friendly endpoints
'''
def fetch_json(url: str, headers: dict, sleep_s: float = 0.2) -> dict:
    r = requests.get(url, headers=headers, timeout=30)
    if r.status_code != 200:
        raise RuntimeError(f"HTTP {r.status_code} for {url}: {r.text[:200]}")
    data = r.json()
    time.sleep(sleep_s)  # SEC courtesy delay
    return data

'''
SEC uses CIK values in place of tickers. 
We pull a ticker -> CIK mapping from the SEC.
Return DataFrame: ticker (uppercase), cik (10-digit zero-padded string).
'''
def load_ticker_cik_map(headers: dict = SEC_HEADERS) -> pd.DataFrame:
    raw = fetch_json(TICKER_CIK_URL, headers={"User-Agent": headers["User-Agent"]})
    rows = []
    for _, v in raw.items():
        ticker = v["ticker"].upper()
        cik = str(v["cik_str"]).zfill(10)
        rows.append((ticker, cik))
    return pd.DataFrame(rows, columns=["ticker", "cik"])

'''
Fetch companyfacts JSON for a 10-digit CIK string.
'''
def get_companyfacts(cik: str, headers: dict = SEC_HEADERS) -> dict:
    url = COMPANY_FACTS_URL.format(cik=cik)
    return fetch_json(url, headers=headers)


In [63]:
# ----------------------------
# Tag selection + extraction
# ----------------------------

'''
Return (unique_fy_count, df_of_units) for a tag+unit, or (0, None) if missing.
'''
def fy_coverage_for_tag(usgaap: dict, tag: str, unit: str = "USD") -> Tuple[int, Optional[pd.DataFrame]]:
    if tag not in usgaap:
        return 0, None
    units = usgaap[tag].get("units", {})
    if unit not in units:
        return 0, None
    df = pd.DataFrame(units[unit])
    if "fy" not in df.columns:
        return 0, df
    return df["fy"].nunique(dropna=True), df

'''
Pick the revenue tag with the best FY coverage.
'''
def choose_best_revenue_tag(usgaap: dict, tags: List[str] = REVENUE_TAGS, unit: str = "USD") -> Tuple[Optional[str], Optional[pd.DataFrame]]:
    best_tag, best_df, best_score = None, None, -1
    for tag in tags:
        score, df_tmp = fy_coverage_for_tag(usgaap, tag, unit=unit)
        if score > best_score and df_tmp is not None:
            best_score = score
            best_tag = tag
            best_df = df_tmp
    return best_tag, best_df

'''
Clean an SEC 'units' dataframe into one row per fiscal year:
    - filter to annual forms
    - (optionally) filter to fp == 'FY' if available
    - dedupe by keeping latest end date per FY
Returns columns: fy, end, val
'''
def clean_annual_series(raw_df: pd.DataFrame, unit_name: str, prefer_forms=("10-K", "10-K/A")) -> pd.DataFrame:
    df = raw_df.copy()
    
    # keep annual filings
    if "form" in df.columns:
        df = df[df["form"].isin(prefer_forms)]

    # keep full-year if fp column exists and FY exists
    if "fp" in df.columns and (df["fp"] == "FY").any():
        df = df[df["fp"] == "FY"]

    # ensure the core columns exist
    for col in ["fy", "end", "val"]:
        if col not in df.columns:
            raise ValueError(f"Missing column {col} in SEC facts for {unit_name}")

    df = df.dropna(subset=["fy", "end", "val"])

    df["end"] = pd.to_datetime(df["end"], errors="coerce")
    df = df.dropna(subset=["end"])
    
    # filed might be missing; handle gracefully
    if "filed" in df.columns:
        df["filed"] = pd.to_datetime(df["filed"], errors="coerce")
    else:
        df["filed"] = pd.NaT

    # Keep the latest observation per FY (using end date)
    df = (
        df.sort_values(["fy", "end"])
          .drop_duplicates(subset=["fy"], keep="last")
          .reset_index(drop=True)
    )

    return df[["fy", "end", "filed", "val"]]

In [64]:
# ----------------------------
# Company-level computation
# ----------------------------

'''
From a companyfacts JSON, build a fiscal-year table with:
    revenue, shares, sales_per_share, sps_5y_cagr
'''
def compute_sales_per_share_from_companyfacts(cf: dict) -> pd.DataFrame:
    usgaap = cf.get("facts", {}).get("us-gaap", {})
    if not usgaap:
        return pd.DataFrame()

    # revenue
    rev_tag, rev_raw = choose_best_revenue_tag(usgaap, REVENUE_TAGS, unit="USD")
    if rev_tag is None or rev_raw is None:
        return pd.DataFrame()

    rev_df = clean_annual_series(rev_raw, unit_name=f"revenue:{rev_tag}")
    rev_df = rev_df.rename(columns={"val": "revenue_usd"})

    # shares
    if SHARES_TAG not in usgaap or "units" not in usgaap[SHARES_TAG] or "shares" not in usgaap[SHARES_TAG]["units"]:
        return pd.DataFrame()

    sh_raw = pd.DataFrame(usgaap[SHARES_TAG]["units"]["shares"])
    sh_df  = clean_annual_series(sh_raw, unit_name="shares_outstanding")
    sh_df  = sh_df.rename(columns={"val": "shares_outstanding"})

    # merge on fiscal year
    merged = pd.merge(
        rev_df[["fy", "filed", "revenue_usd"]],
        sh_df[["fy", "filed", "shares_outstanding"]],
        on="fy",
        how="inner",
        suffixes=("_rev", "_sh")
    )

    # pick an effective date: max of the two filed dates (be conservative)
    merged["effective_date"] = merged[["filed_rev", "filed_sh"]].max(axis=1)
    merged = merged.drop(columns=["filed_rev", "filed_sh"])

    merged = merged.sort_values("fy").reset_index(drop=True)

    # compute sales per share
    merged["sales_per_share"] = merged["revenue_usd"] / merged["shares_outstanding"]
    # 5-year CAGR of sales per share
    merged["sales_per_share_lag5"] = merged["sales_per_share"].shift(5)
    merged["sps_5y_cagr"] = (merged["sales_per_share"] / merged["sales_per_share_lag5"]) ** (1/5) - 1

    merged["revenue_tag_used"] = rev_tag
    merged = merged.drop(columns=["sales_per_share_lag5"])

    return merged


In [65]:
# ----------------------------
# Batch runner
# ----------------------------

'''
Loop through tickers → companyfacts → sales-per-share table.
Returns:
    - results_df (stacked fiscal-year rows)
    - errors_df (ticker + reason)
'''
def build_sec_sales_growth_for_tickers(
    tickers: List[str],
    cik_map: pd.DataFrame,
    headers: dict = SEC_HEADERS,
    sleep_s: float = 0.2,
    max_tickers: Optional[int] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    tickers = [t.upper() for t in tickers]
    if max_tickers is not None:
        tickers = tickers[:max_tickers]

    # join to map
    m = pd.merge(pd.DataFrame({"ticker": tickers}), cik_map, on="ticker", how="left")

    results = []
    errors = []

    for i, row in m.iterrows():
        ticker = row["ticker"]
        cik = row["cik"]

        if pd.isna(cik):
            errors.append((ticker, "no_cik"))
            continue

        try:
            cf = get_companyfacts(cik, headers=headers)
            tbl = compute_sales_per_share_from_companyfacts(cf)

            if tbl.empty:
                errors.append((ticker, "missing_revenue_or_shares"))
                continue

            tbl["ticker"] = ticker
            tbl["cik"] = cik
            results.append(tbl)

            print(f"[{i+1}/{len(m)}] OK {ticker} years={tbl['fy'].nunique()} tag={tbl['revenue_tag_used'].iloc[0]}")
        except Exception as e:
            errors.append((ticker, f"error:{str(e)[:150]}"))
            print(f"[{i+1}/{len(m)}] FAIL {ticker} {e}")

        time.sleep(sleep_s)

    results_df = pd.concat(results, ignore_index=True) if results else pd.DataFrame()
    errors_df = pd.DataFrame(errors, columns=["ticker", "reason"])

    return results_df, errors_df

In [66]:
'''
daily_panel: MultiIndex (date, ticker)
sec_fy_table: columns include ['ticker', 'effective_date', 'sps_5y_cagr'] (and maybe others)
Returns daily_panel with a new column 'sec_sps_5y_cagr' forward-filled by ticker.
'''
def attach_sec_factor_daily(daily_panel: pd.DataFrame, sec_fy_table: pd.DataFrame) -> pd.DataFrame:
    out = daily_panel.copy()
    out["date"] = pd.to_datetime(out["date"], errors="coerce")
    out["ticker"] = out["ticker"].astype(str).str.upper()
    out = out.dropna(subset=["date", "ticker"]).sort_values(["ticker", "date"]).reset_index(drop=True)

    sec = sec_fy_table.copy()
    sec["ticker"] = sec["ticker"].astype(str).str.upper()
    sec["effective_date"] = pd.to_datetime(sec["effective_date"], errors="coerce")
    sec = sec.dropna(subset=["ticker", "effective_date", "sps_5y_cagr"])
    sec = sec.sort_values(["ticker", "effective_date"]).reset_index(drop=True)

    merged_list = []

    for tkr, g in out.groupby("ticker", sort=False):
        s = sec.loc[sec["ticker"] == tkr, ["effective_date", "sps_5y_cagr"]].copy()
        if s.empty:
            g = g.copy()
            g["sec_sps_5y_cagr"] = pd.NA
            merged_list.append(g)
            continue

        # merge_asof needs same key name
        s = s.rename(columns={"effective_date": "date"}).sort_values("date")

        g2 = pd.merge_asof(
            g.sort_values("date"),
            s,
            on="date",
            direction="backward",
            allow_exact_matches=True
        )

        # Create final output column cleanly (no duplicates)
        g2 = g2.rename(columns={"sps_5y_cagr": "sec_sps_5y_cagr"})
        merged_list.append(g2)

    return pd.concat(merged_list, ignore_index=True)


In [None]:
tickers = pd.read_csv("ticker_list.csv")["Ticker"].dropna().tolist()

cik_map = load_ticker_cik_map()

results_df, errors_df = build_sec_sales_growth_for_tickers(
    tickers=tickers, ## edit to include only midcap tickers!
    cik_map=cik_map,
    max_tickers=20,   # this is to limit the number of tickers, used for testing
    sleep_s=0.25
)

results_df.head()
errors_df.head()

[2/20] OK AA years=7 tag=RevenueFromContractWithCustomerExcludingAssessedTax
[3/20] OK AAL years=7 tag=RevenueFromContractWithCustomerExcludingAssessedTax
[5/20] OK AAPL years=9 tag=SalesRevenueNet
[9/20] OK ACGL years=13 tag=Revenues
[10/20] OK ACHC years=7 tag=RevenueFromContractWithCustomerExcludingAssessedTax
[11/20] OK ACI years=2 tag=RevenueFromContractWithCustomerExcludingAssessedTax
[12/20] OK ACM years=7 tag=RevenueFromContractWithCustomerExcludingAssessedTax
[14/20] OK ADBE years=16 tag=Revenues
[15/20] OK ADC years=14 tag=Revenues
[16/20] OK ADI years=9 tag=SalesRevenueNet
[18/20] OK ADP years=16 tag=Revenues
[19/20] OK ADSK years=9 tag=SalesRevenueNet


Unnamed: 0,ticker,reason
0,A,missing_revenue_or_shares
1,AAON,missing_revenue_or_shares
2,ABBV,missing_revenue_or_shares
3,ABNB,missing_revenue_or_shares
4,ABT,missing_revenue_or_shares


In [75]:
df = pd.read_csv("data_with_growth_v1.0.csv")
daily_df = df.copy()
daily_df["date"] = pd.to_datetime(daily_df["date"])
daily_df["ticker"] = daily_df["ticker"].astype(str).str.upper()

# Attach SEC factor (forward-fill based on effective_date)
daily_with_sec = attach_sec_factor_daily(daily_df, results_df)

  return pd.concat(merged_list, ignore_index=True)


In [None]:
out_path = "data_with_growth_and_sec.csv"
daily_with_sec.to_csv(out_path, index=False)
print("Saved:", out_path)


Saved: data_with_growth_and_sec.csv


In [78]:
results_df.to_csv("sec_sales_growth_fy_table.csv", index=False)
errors_df.to_csv("sec_sales_growth_errors.csv", index=False)

In [None]:
# checks
coverage = daily_df.groupby("ticker")["sec_sps_5y_cagr"].apply(lambda s: s.notna().any())
print(coverage.value_counts())

sec_sps_5y_cagr
False    999
True      11
Name: count, dtype: int64


In [None]:
# checks
tickers_with_vals = coverage[coverage].index.tolist()
print("Tickers with SEC factor:", tickers_with_vals[:10])

t = tickers_with_vals[0]
daily_df.loc[daily_df["ticker"] == t, ["date", "sec_sps_5y_cagr"]].dropna().tail(30)


Unnamed: 0,ticker,fy,sales_per_share,sps_5y_cagr
96,ADP,2015.0,5.77723,0.053504
97,ADP,2016.0,6.359886,0.044808
98,ADP,2017.0,6.887191,0.048104
99,ADP,2018.0,7.562899,0.053655
100,ADP,2019.0,8.057807,0.047118
101,ADP,2020.0,7.85485,0.063368
102,ADP,2021.0,35.41515,0.409769
103,ADP,2022.0,39.64984,0.419188
104,ADP,2023.0,43.70832,0.420284
105,ADP,2024.0,47.05366,0.423231
