### Pricing power using Bloomberg inverse of the standard deviation of gross margins

In [5]:
# Bloomberg-style Pricing Power (GPM stability) — self-contained + 403-safe S&P500 fetch

# ---------- 0) AUTO-INSTALL DEPENDENCIES ----------
import sys, subprocess, importlib

REQUIRED_PACKAGES = [
    "yfinance",
    "pandas",
    "numpy",
    "lxml",
    "beautifulsoup4",
    "html5lib",
    "requests",
    "openpyxl",  # for to_excel
]

def ensure_packages(pkgs):
    for p in pkgs:
        try:
            importlib.import_module(p if p != "beautifulsoup4" else "bs4")
        except ImportError:
            print(f"Installing missing package: {p} …")
            subprocess.check_call([sys.executable, "-m", "pip", "install", p])

ensure_packages(REQUIRED_PACKAGES)

# ---------- 1) IMPORTS ----------
import re
import time
import math
import io
import numpy as np
import pandas as pd
import requests
import yfinance as yf
from typing import Optional, List, Dict

# ---------- 2) CONFIG ----------
MAX_YEARS = 5                   # number of most recent annual GPMs to use
SLEEP_SEC = 0.5                 # throttle between ticker calls
RETRY_N = 2                     # simple retry for transient errors
EXCLUDE_FINANCIALS = True       # Bloomberg often excludes Financials/REITs
EXCLUDE_YOUNG_YEARS = 0         # set >0 to exclude firms younger than N years from Wiki "Founded"
OUTPUT_CSV = "sp500_pricing_power.csv"

UA = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/127.0.0.1 Safari/537.36"
)

# ---------- 3) S&P500 FETCHERS (robust to 403) ----------
def fetch_sp500_from_wikipedia() -> pd.DataFrame:
    """
    Primary: requests.get with headers -> pd.read_html on the HTML
    Secondary: pd.read_html with storage_options (may or may not work on some pandas versions)
    """
    wiki = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"

    # Primary path: requests + read_html
    try:
        with requests.Session() as s:
            s.headers.update({"User-Agent": UA})
            r = s.get(wiki, timeout=20)
            r.raise_for_status()
            tables = pd.read_html(r.text)
            if tables:
                return tables[0]
    except Exception as e:
        last_err = f"requests+read_html failed: {e}"

    # Secondary path: direct read_html with storage_options (not always honored)
    try:
        tables = pd.read_html(wiki, storage_options={"User-Agent": UA})
        if tables:
            return tables[0]
    except Exception as e:
        last_err = f"pd.read_html(storage_options) failed: {e}"

    raise RuntimeError(f"Wikipedia fetch failed. Last error: {last_err}")

def fetch_sp500_from_fallback() -> pd.DataFrame:
    """
    Fallback source (community dataset). Schema: Symbol,Security, etc.
    Note: may lag Wikipedia updates, but avoids 403s.
    """
    url_csv = "https://raw.githubusercontent.com/datasets/s-and-p-500-companies/master/data/constituents.csv"
    with requests.Session() as s:
        s.headers.update({"User-Agent": UA})
        r = s.get(url_csv, timeout=20)
        r.raise_for_status()
        df = pd.read_csv(io.StringIO(r.text))
    return df

def get_sp500_table() -> pd.DataFrame:
    """
    Returns columns: Ticker, Full Name, Sector, Sub-Industry, Date Added, Founded (Wiki) (if available)
    """
    try:
        df = fetch_sp500_from_wikipedia()
        source = "wikipedia"
    except Exception:
        df = fetch_sp500_from_fallback()
        source = "fallback"

    # Normalize column names
    df.columns = [c.strip() for c in df.columns]
    mapper = {
        "Symbol": "Ticker",
        "Security": "Full Name",
        "GICS Sector": "Sector",
        "GICS Sub-Industry": "Sub-Industry",
        "Date added": "Date Added",
        "Founded": "Founded (Wiki)",
    }
    for old, new in mapper.items():
        if old in df.columns:
            df = df.rename(columns={old: new})

    # Ensure essential columns
    if "Ticker" not in df.columns or "Full Name" not in df.columns:
        raise RuntimeError(f"S&P 500 table from {source} missing essential columns.")

    # Add missing optional columns
    for col in ["Sector", "Sub-Industry", "Date Added", "Founded (Wiki)"]:
        if col not in df.columns:
            df[col] = np.nan

    keep_cols = ["Ticker", "Full Name", "Sector", "Sub-Industry", "Date Added", "Founded (Wiki)"]
    return df[keep_cols].copy()

# ---------- 4) TICKER NORMALIZATION ----------
def yf_ticker(symbol: str) -> str:
    # yfinance expects dashes for class shares (e.g., BRK-B)
    return symbol.replace(".", "-").strip()

# ---------- 5) HELPERS ----------
def find_label(idx: pd.Index, candidates: List[str]) -> Optional[str]:
    """Return the first index entry that contains any candidate (case-insensitive)."""
    s = idx.to_series().astype(str)
    for cand in candidates:
        mask = s.str.contains(cand, case=False, na=False)
        if mask.any():
            # Return the exact index label (string) to use with .loc
            return s[mask].index[0]
    return None

def parse_year(x) -> Optional[int]:
    """Extract a 4-digit year from strings like '1888', '2009 (1887)', 'c. 1901', etc."""
    if pd.isna(x): 
        return None
    m = re.search(r"(18|19|20)\d{2}", str(x))
    return int(m.group(0)) if m else None

def get_income_stmt(tkr: yf.Ticker) -> Optional[pd.DataFrame]:
    """
    Retrieve annual income statement across yfinance versions:
    - prefer .income_stmt (new API)
    - fallback .financials (older API)
    - final attempt: .get_income_stmt()
    """
    for attr in ("income_stmt", "financials"):
        try:
            df = getattr(tkr, attr, None)
            if isinstance(df, pd.DataFrame) and not df.empty:
                return df
        except Exception:
            pass
    try:
        df = tkr.get_income_stmt()
        if isinstance(df, pd.DataFrame) and not df.empty:
            return df
    except Exception:
        pass
    return None

def safe_get_info_item(tkr: yf.Ticker, key: str):
    try:
        info = tkr.info
        return info.get(key, np.nan) if isinstance(info, dict) else np.nan
    except Exception:
        return np.nan

def collect_pricing_power(row: pd.Series) -> Optional[Dict]:
    """
    Compute 'pricing power' = 1 / std of last up-to-5 annual gross margins.
    Returns dict with metadata/metrics or None if insufficient data.
    """
    ticker = row["YF Ticker"]
    orig_ticker = row["Ticker"]
    last_err = None

    for attempt in range(1, RETRY_N + 2):
        try:
            t = yf.Ticker(ticker)
            fin = get_income_stmt(t)
            if fin is None or fin.empty:
                last_err = "no financials"
                break

            idx = fin.index
            rev_label  = find_label(idx, ["total revenue", "revenue"])
            cogs_label = find_label(idx, ["cost of revenue", "cost of goods sold", "cost revenue"])
            if not rev_label or not cogs_label:
                last_err = f"missing rows → rev={rev_label}, cogs={cogs_label}"
                break

            revenue = fin.loc[rev_label].dropna()
            cogs    = fin.loc[cogs_label].dropna()

            # Align on common columns (dates)
            common_cols = [c for c in revenue.index if c in cogs.index]
            if not common_cols:
                last_err = "no overlapping revenue/COGS columns"
                break

            # Sort columns newest→oldest when possible
            try:
                common_cols = sorted(common_cols, reverse=True)
            except Exception:
                pass

            margins: List[float] = []
            for col in common_cols:
                rev = revenue[col]
                cost = cogs[col]
                if pd.notna(rev) and pd.notna(cost) and rev not in (0, 0.0):
                    gm = (rev - cost) / rev
                    if isinstance(gm, (int, float)) and math.isfinite(gm) and -5.0 < gm < 5.0:
                        margins.append(float(gm))
                if len(margins) >= MAX_YEARS:
                    break

            if len(margins) < 2:
                last_err = f"only {len(margins)} gross margins"
                break

            std = float(np.std(margins, ddof=0))
            pricing_power = (1.0 / std) if std != 0 else float("inf")

            founding_year_yf = safe_get_info_item(t, "founded")
            mcap = safe_get_info_item(t, "marketCap")

            # success
            return {
                "Ticker": orig_ticker,
                "YF Ticker": ticker,
                "Full Name": row["Full Name"],
                "Sector": row.get("Sector", np.nan),
                "Sub-Industry": row.get("Sub-Industry", np.nan),
                "Date Added": row.get("Date Added", np.nan),
                "Founded (Wiki)": row.get("Founded (Wiki)", np.nan),
                "Founded_Year_Wiki": row.get("Founded_Year_Wiki", np.nan),
                "Founding Year (YF)": founding_year_yf,
                "Market Cap": mcap,
                "Gross Margins (latest→older)": margins,
                "Pricing Power": pricing_power,
            }

        except Exception as e:
            last_err = str(e)

        # naive backoff on retry
        time.sleep(SLEEP_SEC * attempt)

    print(f"  ❌ {orig_ticker}: {last_err}")
    return None

# ---------- 6) PIPELINE ----------
# 6.1 Fetch S&P500 with 403-safe logic
sp500_df = get_sp500_table()

# 6.2 Normalize & optional filters
sp500_df["YF Ticker"] = sp500_df["Ticker"].map(yf_ticker)
sp500_df["Founded_Year_Wiki"] = sp500_df["Founded (Wiki)"].apply(parse_year)

if EXCLUDE_FINANCIALS and "Sector" in sp500_df.columns:
    sp500_df = sp500_df[~sp500_df["Sector"].isin({"Financials", "Real Estate"})].copy()

if EXCLUDE_YOUNG_YEARS and "Founded_Year_Wiki" in sp500_df.columns:
    current_year = pd.Timestamp.today().year
    mask_old_enough = sp500_df["Founded_Year_Wiki"].apply(
        lambda y: (isinstance(y, (int, np.integer)) and (current_year - int(y) >= EXCLUDE_YOUNG_YEARS))
    )
    sp500_df = sp500_df[mask_old_enough | sp500_df["Founded_Year_Wiki"].isna()].copy()

# 6.3 Iterate & compute
results: List[Dict] = []
n = len(sp500_df)
for i, row in sp500_df.reset_index(drop=True).iterrows():
    print(f"Processing {row['Ticker']} → {row['YF Ticker']}  ({i+1}/{n}) …")
    rec = collect_pricing_power(row)
    if rec:
        results.append(rec)
    time.sleep(SLEEP_SEC)

# 6.4 Output
if results:
    GPM_original = pd.DataFrame(results).set_index("Ticker")
    show_cols = [
        "Full Name", "Sector", "Sub-Industry",
        "Founding Year (YF)", "Founded (Wiki)", "Date Added",
        "Market Cap", "Pricing Power", "Gross Margins (latest→older)"
    ]
    GPM_original = GPM_original.reindex(columns=show_cols)
    GPM_original = GPM_original.sort_values("Pricing Power", ascending=False)

    pd.set_option("display.width", 200)
    pd.set_option("display.max_rows", 50)
    print(GPM_original.head(20))

    GPM_original.to_csv(OUTPUT_CSV, index=True)
    print(f"\nSaved to {OUTPUT_CSV}  (rows: {len(GPM_original)})")
    GPM_original.to_excel('sp500_pricing_power_GPM_original.xlsx', index=True)
else:
    print("No results — likely all tickers failed schema/financial checks.")


Installing missing package: openpyxl …
Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [openpyxl]1/2[0m [openpyxl]
[1A[2KSuccessfully installed et-xmlfile-2.0.0 openpyxl-3.1.5



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
  tables = pd.read_html(r.text)


Processing MMM → MMM  (1/397) …
Processing AOS → AOS  (2/397) …
Processing ABT → ABT  (3/397) …
Processing ABBV → ABBV  (4/397) …
Processing ACN → ACN  (5/397) …
Processing ADBE → ADBE  (6/397) …
Processing AMD → AMD  (7/397) …
Processing AES → AES  (8/397) …
Processing A → A  (9/397) …
Processing APD → APD  (10/397) …
Processing ABNB → ABNB  (11/397) …
Processing AKAM → AKAM  (12/397) …
Processing ALB → ALB  (13/397) …
Processing ALGN → ALGN  (14/397) …
Processing ALLE → ALLE  (15/397) …
Processing LNT → LNT  (16/397) …
Processing GOOGL → GOOGL  (17/397) …
Processing GOOG → GOOG  (18/397) …
Processing MO → MO  (19/397) …
Processing AMZN → AMZN  (20/397) …
Processing AMCR → AMCR  (21/397) …
Processing AEE → AEE  (22/397) …
Processing AEP → AEP  (23/397) …
Processing AWK → AWK  (24/397) …
Processing AME → AME  (25/397) …
Processing AMGN → AMGN  (26/397) …
Processing APH → APH  (27/397) …
Processing ADI → ADI  (28/397) …
Processing APA → APA  (29/397) …
Processing AAPL → AAPL  (30/397) …

In [None]:
Bloomberg_PricingPower = pd.read_csv("GPM_original.csv")

Bloomberg_PricingPower.sort_values(by="Pricing Power", ascending=False, inplace=True)
Bloomberg_PricingPower

Unnamed: 0,Ticker,Full Name,Founding Year,Founded (Wiki),Date Added,Market Cap,Pricing Power,Gross Margins
0,HD,Home Depot (The),,1978,1988-03-31,363708055552,1485.287286,"[0.33874769612698574, 0.3375537928459609, 0.33..."
1,LOW,Lowe's,,1904/1946/1959,1984-02-29,127428263936,1458.734690,"[0.3360661615316586, 0.336316380517962, 0.3345..."
2,COR,Cencora,,1985,2001-08-30,56281374720,1159.010981,"[0.033803729619761864, 0.03423854831716707, 0...."
3,MA,Mastercard,,1966,2008-07-18,508658647040,776.223535,"[0.7630915610466148, 0.7600605625946291, 0.763..."
4,FDX,FedEx,,1971,1980-12-31,52922630144,705.404925,"[0.2161175920540978, 0.2125894293161777, 0.215..."
...,...,...,...,...,...,...,...,...
444,MU,Micron Technology,,1978,1994-09-27,91126661120,4.803668,"[0.22352753773246783, -0.09111969111969112, 0...."
445,EXE,Expand Energy,,1989,2025-03-24,25015162880,4.003942,"[0.2703150912106136, 0.6481028938906752, 0.712..."
446,RCL,Royal Caribbean Group,,1997,2014-12-05,62270062592,1.935326,"[0.4751592356687898, 0.44064748201438847, 0.25..."
447,NCLH,Norwegian Cruise Line Holdings,,2011 (1966),2017-10-13,7751153152,1.348033,"[0.40868856880912596, 0.36912667293884716, 0.1..."


### Modification of the bloomberg measurement for smoothness


In [None]:
# Pricing Power on S&P 500 — robust fetch + selectable metric
# Fixes Wikipedia 403, normalizes tickers, handles yfinance fallbacks, and caches constituents.

# ---------- 0) AUTO-INSTALL DEPENDENCIES ----------
import sys, subprocess, importlib

REQUIRED_PACKAGES = [
    "yfinance",
    "pandas",
    "numpy",
    "lxml",
    "beautifulsoup4",
    "html5lib",
    "requests",
]

def ensure_packages(pkgs):
    for p in pkgs:
        try:
            importlib.import_module(p if p != "beautifulsoup4" else "bs4")
        except ImportError:
            print(f"Installing missing package: {p} …")
            subprocess.check_call([sys.executable, "-m", "pip", "install", p])

ensure_packages(REQUIRED_PACKAGES)

# ---------- 1) IMPORTS ----------
import os, io, re, time, math, requests
import numpy as np
import pandas as pd
import yfinance as yf
from typing import Optional, List, Dict, Tuple

# ---------- 2) CONFIG ----------
METRIC = "changes"     # <- choose: "level", "changes", "uptrend"
MAX_YEARS = 5          # use up to N most recent annual margins
SLEEP_SEC = 0.5        # throttle per ticker
RETRY_N = 2            # retries for transient errors
EXCLUDE_FINANCIALS = False   # set True to exclude Financials/Real Estate
OUTPUT_CSV = f"sp500_pricing_power_{METRIC}.csv"

UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
      "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.1 Safari/537.36")
CACHE_PATH = "sp500_constituents_cache.csv"

# ---------- 3) ROBUST S&P500 FETCH (handles 403 + fallback + cache) ----------
def load_cached_sp500(path=CACHE_PATH):
    if os.path.exists(path):
        try:
            df = pd.read_csv(path)
            if {"Ticker", "Full Name"}.issubset(df.columns):
                return df
        except Exception:
            pass
    return None

def save_cache(df, path=CACHE_PATH):
    try:
        df.to_csv(path, index=False)
    except Exception:
        pass

def fetch_sp500_wikipedia() -> pd.DataFrame:
    wiki = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    # A) direct with User-Agent via pandas
    try:
        tables = pd.read_html(wiki, storage_options={"User-Agent": UA})
        if tables:
            return tables[0]
    except Exception:
        pass
    # B) requests + read_html
    with requests.Session() as s:
        s.headers.update({"User-Agent": UA})
        r = s.get(wiki, timeout=20)
        r.raise_for_status()
        tables = pd.read_html(r.text)
        if tables:
            return tables[0]
    raise RuntimeError("Wikipedia fetch failed.")

def fetch_sp500_fallback_csv() -> pd.DataFrame:
    url_csv = "https://raw.githubusercontent.com/datasets/s-and-p-500-companies/master/data/constituents.csv"
    with requests.Session() as s:
        s.headers.update({"User-Agent": UA})
        r = s.get(url_csv, timeout=20)
        r.raise_for_status()
        return pd.read_csv(io.StringIO(r.text))

def get_sp500_constituents() -> pd.DataFrame:
    cached = load_cached_sp500()
    if cached is not None:
        print(f"Loaded constituents from cache ({len(cached)} rows).")
        return cached

    try:
        df = fetch_sp500_wikipedia()
        source = "wikipedia"
    except Exception:
        df = fetch_sp500_fallback_csv()
        source = "fallback_csv"

    df.columns = [c.strip() for c in df.columns]
    rename_map = {
        "Symbol": "Ticker",
        "Security": "Full Name",
        "GICS Sector": "Sector",
        "GICS Sub-Industry": "Sub-Industry",
        "Date added": "Date Added",
        "Founded": "Founded (Wiki)",
    }
    for old, new in rename_map.items():
        if old in df.columns:
            df = df.rename(columns={old: new})

    for col in ["Ticker", "Full Name", "Sector", "Sub-Industry", "Date Added", "Founded (Wiki)"]:
        if col not in df.columns:
            df[col] = np.nan

    # yfinance class shares: BRK.B -> BRK-B
    df["Ticker"] = df["Ticker"].astype(str).str.replace(".", "-", regex=False).str.strip()

    df = df[["Ticker", "Full Name", "Sector", "Sub-Industry", "Date Added", "Founded (Wiki)"]].copy()

    save_cache(df)
    print(f"S&P 500 table loaded from: {source} (rows: {len(df)})")
    return df

# ---------- 4) YFINANCE HELPERS ----------
def find_label(idx: pd.Index, candidates: List[str]) -> Optional[str]:
    s = idx.to_series().astype(str)
    for cand in candidates:
        mask = s.str.contains(cand, case=False, na=False)
        if mask.any():
            return s[mask].index[0]
    return None

def get_income_stmt(tkr: yf.Ticker) -> Optional[pd.DataFrame]:
    for attr in ("income_stmt", "financials"):
        df = getattr(tkr, attr, None)
        if isinstance(df, pd.DataFrame) and not df.empty:
            return df
    try:
        df = tkr.get_income_stmt()
        if isinstance(df, pd.DataFrame) and not df.empty:
            return df
    except Exception:
        pass
    return None

def safe_info(tkr: yf.Ticker, key: str):
    try:
        info = tkr.info
        return info.get(key, np.nan) if isinstance(info, dict) else np.nan
    except Exception:
        return np.nan

# ---------- 5) METRIC FUNCTIONS ----------
def metric_level(margins: List[float]) -> Dict[str, float]:
    sd = float(np.std(margins, ddof=0))
    score = (1.0 / sd) if sd != 0 else float("inf")
    return {"Score": score, "Std_Lvl": sd}

def metric_changes(margins: List[float]) -> Dict[str, float]:
    if len(margins) < 3:
        # prefer ≥3 margins → ≥2 changes for a meaningful std
        pass
    changes = np.diff(margins)
    sd = float(np.std(changes, ddof=0)) if len(changes) > 0 else float("inf")
    score = (1.0 / sd) if sd != 0 else float("inf")
    return {"Score": score, "Std_Delta": sd, "Deltas": changes.tolist()}

def metric_uptrend(margins: List[float]) -> Dict[str, float]:
    # Linear trend fit: latest->older indexed as t=0,1,2,...
    y = np.array(margins, dtype=float)
    n = len(y)
    t = np.arange(n, dtype=float)
    # np.polyfit returns slope, intercept (degree=1)
    slope, intercept = np.polyfit(t, y, 1)
    y_hat = intercept + slope * t
    resid = y - y_hat
    resid_std = float(np.std(resid, ddof=0))
    ss_res = float(np.sum(resid**2))
    ss_tot = float(np.sum((y - y.mean())**2)) if n > 1 else 0.0
    r2 = 1.0 - ss_res/ss_tot if ss_tot > 0 else 1.0
    score = max(float(slope), 0.0) / (resid_std + 1e-6)
    return {"Score": score, "Slope": float(slope), "ResidStd": resid_std, "R2": r2}

def score_from_margins(margins: List[float]) -> Dict[str, float]:
    if METRIC == "level":
        return metric_level(margins)
    elif METRIC == "changes":
        return metric_changes(margins)
    elif METRIC == "uptrend":
        return metric_uptrend(margins)
    else:
        raise ValueError("METRIC must be one of: 'level', 'changes', 'uptrend'")

# ---------- 6) MAIN COLLECTOR ----------
def collect_record(row: pd.Series, n_years:int=MAX_YEARS) -> Optional[Dict]:
    ticker = row["Ticker"]
    last_err = None

    for attempt in range(1, RETRY_N + 2):
        try:
            t = yf.Ticker(ticker)
            fin = get_income_stmt(t)
            if fin is None or fin.empty:
                last_err = "no financials"
                break

            # Find revenue & COGS
            rev_label  = find_label(fin.index, ["total revenue", "revenue"])
            cogs_label = find_label(fin.index, ["cost of revenue", "cost of goods sold", "cost revenue"])
            if not rev_label or not cogs_label:
                last_err = f"missing rows → rev={rev_label}, cogs={cogs_label}"
                break

            revenue = fin.loc[rev_label].dropna()
            cogs    = fin.loc[cogs_label].dropna()
            cols = [c for c in revenue.index if c in cogs.index]
            if not cols:
                last_err = "no overlapping revenue/COGS columns"
                break
            # newest -> older if possible
            try:
                cols = sorted(cols, reverse=True)
            except Exception:
                pass

            margins, used_cols = [], []
            for c in cols:
                r, cg = revenue[c], cogs[c]
                if pd.notna(r) and pd.notna(cg) and r not in (0, 0.0):
                    gm = (r - cg) / r
                    if isinstance(gm, (int, float)) and math.isfinite(gm) and -5.0 < gm < 5.0:
                        margins.append(float(gm))
                        used_cols.append(str(c))
                if len(margins) >= n_years:
                    break

            # minimal data requirements
            min_needed = 2 if METRIC == "level" else 3  # level needs ≥2; others ≥3 recommended
            if len(margins) < min_needed:
                last_err = f"only {len(margins)} margins"
                break

            score_parts = score_from_margins(margins)

            return {
                "Ticker": ticker,
                "Full Name": row.get("Full Name", np.nan),
                "Sector": row.get("Sector", np.nan),
                "Sub-Industry": row.get("Sub-Industry", np.nan),
                "Date Added": row.get("Date Added", np.nan),
                "Founded (Wiki)": row.get("Founded (Wiki)", np.nan),
                "Market Cap": safe_info(t, "marketCap"),
                "Founding Year (YF)": safe_info(t, "founded"),
                "Gross Margins (newest→older)": margins,
                "Years Used (labels)": used_cols,
                **score_parts,
            }
        except Exception as e:
            last_err = str(e)

        time.sleep(SLEEP_SEC * attempt)

    print(f"  ❌ {ticker}: {last_err}")
    return None

# ---------- 7) RUN PIPELINE ----------
sp500 = get_sp500_constituents()

if EXCLUDE_FINANCIALS and "Sector" in sp500.columns:
    sp500 = sp500[~sp500["Sector"].isin({"Financials", "Real Estate"})].copy()

results: List[Dict] = []
n = len(sp500)
for i, row in sp500.reset_index(drop=True).iterrows():
    print(f"Processing {row['Ticker']}  ({i+1}/{n}) …")
    rec = collect_record(row)
    if rec:
        results.append(rec)
    time.sleep(SLEEP_SEC)

# ---------- 8) OUTPUT ----------
if results:
    GPM_smooth = pd.DataFrame(results).set_index("Ticker")
    base_cols = [
        "Full Name", "Sector", "Sub-Industry",
        "Founding Year (YF)", "Founded (Wiki)", "Date Added",
        "Market Cap", "Gross Margins (newest→older)", "Years Used (labels)"
    ]

    if METRIC == "level":
        order = base_cols + ["Score", "Std_Lvl"]
    elif METRIC == "changes":
        order = base_cols + ["Score", "Std_Delta", "Deltas"]
    else:  # uptrend
        order = base_cols + ["Score", "Slope", "ResidStd", "R2"]

   


Loaded constituents from cache (503 rows).
Processing MMM  (1/503) …
Processing AOS  (2/503) …
Processing ABT  (3/503) …
Processing ABBV  (4/503) …
Processing ACN  (5/503) …
Processing ADBE  (6/503) …
Processing AMD  (7/503) …
Processing AES  (8/503) …
Processing AFL  (9/503) …
  ❌ AFL: missing rows → rev=Total Revenue, cogs=None
Processing A  (10/503) …
Processing APD  (11/503) …
Processing ABNB  (12/503) …
Processing AKAM  (13/503) …
Processing ALB  (14/503) …
Processing ARE  (15/503) …
Processing ALGN  (16/503) …
Processing ALLE  (17/503) …
Processing LNT  (18/503) …
Processing ALL  (19/503) …
  ❌ ALL: missing rows → rev=Total Revenue, cogs=None
Processing GOOGL  (20/503) …
Processing GOOG  (21/503) …
Processing MO  (22/503) …
Processing AMZN  (23/503) …
Processing AMCR  (24/503) …
Processing AEE  (25/503) …
Processing AEP  (26/503) …
Processing AXP  (27/503) …
  ❌ AXP: missing rows → rev=Total Revenue, cogs=None
Processing AIG  (28/503) …
  ❌ AIG: missing rows → rev=Total Revenue, 

NameError: name 'GPM_smoothf' is not defined

In [None]:

GPM_smooth = GPM_smooth.reindex(columns=order)
GPM_smooth = GPM_smooth.sort_values("Score", ascending=False)

pd.set_option("display.width", 200)
pd.set_option("display.max_rows", 60)
print(GPM_smooth.head(20))


                        Full Name                  Sector                         Sub-Industry  Founding Year (YF)  Founded (Wiki)  Date Added    Market Cap  \
Ticker                                                                                                                                                         
MCK          McKesson Corporation             Health Care             Health Care Distributors                 NaN            1833  1999-01-13   94973698048   
HD               Home Depot (The)  Consumer Discretionary              Home Improvement Retail                 NaN            1978  1988-03-31  374016507904   
LOW                        Lowe's  Consumer Discretionary              Home Improvement Retail                 NaN  1904/1946/1959  1984-02-29  130234761216   
COR                       Cencora             Health Care             Health Care Distributors                 NaN            1985  2001-08-30   61653168128   
JCI              Johnson Controls       

In [None]:
GPM_smooth.to_excel("sp500_pricing_power_GPM_smooth.xlsx", index=True)


In [2]:
# ==========================================
# Upward Trending Gross Margins — strict positive trend version
# ==========================================
# Rewards firms whose gross profit margins show a positive linear trend.
# Negative or flat trends are fully excluded from output.
# Standard deviation still reported but does not affect inclusion.

# ---------- 0) AUTO-INSTALL DEPENDENCIES ----------
import sys, subprocess, importlib

REQUIRED_PACKAGES = [
    "yfinance",
    "pandas",
    "numpy",
    "lxml",
    "beautifulsoup4",
    "html5lib",
    "requests",
]

def ensure_packages(pkgs):
    for p in pkgs:
        try:
            importlib.import_module(p if p != "beautifulsoup4" else "bs4")
        except ImportError:
            print(f"Installing missing package: {p} …")
            subprocess.check_call([sys.executable, "-m", "pip", "install", p])

ensure_packages(REQUIRED_PACKAGES)

# ---------- 1) IMPORTS ----------
import re, time, math, numpy as np, pandas as pd, requests, yfinance as yf
from typing import Optional, List, Dict, Tuple

# ---------- 2) CONFIG ----------
MAX_YEARS = 5
SLEEP_SEC = 0.5
RETRY_N = 2
EXCLUDE_FINANCIALS = True
EXCLUDE_YOUNG_YEARS = 0
OUTPUT_CSV = "sp500_pricing_power_uptrend_positive_only.csv"

UA = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/127.0.0.1 Safari/537.36"
)

# ---------- 3) S&P500 FETCHERS ----------
def read_html_with_headers(url: str, **kwargs) -> List[pd.DataFrame]:
    storage_options = kwargs.pop("storage_options", {}) or {}
    storage_options.setdefault("User-Agent", UA)
    return pd.read_html(url, storage_options=storage_options, **kwargs)

def fetch_sp500_from_wikipedia() -> pd.DataFrame:
    wiki = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    try:
        tables = read_html_with_headers(wiki)
        if tables:
            return tables[0]
    except Exception:
        pass
    with requests.Session() as s:
        s.headers.update({"User-Agent": UA})
        r = s.get(wiki, timeout=20)
        r.raise_for_status()
        tables = pd.read_html(r.text)
        if tables:
            return tables[0]
    raise RuntimeError("Wikipedia fetch failed.")

def fetch_sp500_from_fallback() -> pd.DataFrame:
    url_csv = "https://raw.githubusercontent.com/datasets/s-and-p-500-companies/master/data/constituents.csv"
    import io
    with requests.Session() as s:
        s.headers.update({"User-Agent": UA})
        r = s.get(url_csv, timeout=20)
        r.raise_for_status()
        return pd.read_csv(io.StringIO(r.text))

def get_sp500_table() -> pd.DataFrame:
    try:
        df = fetch_sp500_from_wikipedia()
    except Exception:
        df = fetch_sp500_from_fallback()
    df.columns = [c.strip() for c in df.columns]
    mapper = {
        "Symbol": "Ticker",
        "Security": "Full Name",
        "GICS Sector": "Sector",
        "GICS Sub-Industry": "Sub-Industry",
        "Date added": "Date Added",
        "Founded": "Founded (Wiki)"
    }
    for old, new in mapper.items():
        if old in df.columns:
            df = df.rename(columns={old: new})
    for col in ["Sector", "Sub-Industry", "Date Added", "Founded (Wiki)"]:
        if col not in df.columns:
            df[col] = np.nan
    return df[["Ticker", "Full Name", "Sector", "Sub-Industry", "Date Added", "Founded (Wiki)"]].copy()

# ---------- 4) HELPERS ----------
def yf_ticker(symbol: str) -> str:
    return str(symbol).replace(".", "-").strip()

def find_label(idx: pd.Index, candidates: List[str]) -> Optional[str]:
    s = idx.to_series().astype(str)
    for cand in candidates:
        mask = s.str.contains(cand, case=False, na=False)
        if mask.any():
            return s[mask].index[0]
    return None

def parse_year(x) -> Optional[int]:
    if pd.isna(x): return None
    m = re.search(r"(18|19|20)\d{2}", str(x))
    return int(m.group(0)) if m else None

def get_income_stmt(tkr: yf.Ticker) -> Optional[pd.DataFrame]:
    for attr in ("income_stmt", "financials"):
        df = getattr(tkr, attr, None)
        if isinstance(df, pd.DataFrame) and not df.empty:
            return df
    try:
        df = tkr.get_income_stmt()
        if isinstance(df, pd.DataFrame) and not df.empty:
            return df
    except Exception:
        pass
    return None

def safe_get_info_item(tkr: yf.Ticker, key: str):
    try:
        info = tkr.info
        return info.get(key, np.nan) if isinstance(info, dict) else np.nan
    except Exception:
        return np.nan

# ---------- 5) METRIC: strict positive trend ----------
def uptrend_score_positive_only(margins: List[float]) -> Tuple[float, float, float, float]:
    """
    Only accepts firms with a positive slope in gross margins.
    Returns (score, slope, resid_std, r2). Score = slope / (resid_std + 1e-6)
    Negative slopes → excluded later (None).
    """
    y = np.array(margins, dtype=float)
    n = len(y)
    if n < 3:
        return np.nan, np.nan, np.nan, np.nan
    t = np.arange(n, dtype=float)
    b, a = np.polyfit(t, y, 1)
    y_hat = a + b * t
    resid = y - y_hat
    resid_std = float(np.std(resid, ddof=0))
    ss_res = float(np.sum(resid**2))
    ss_tot = float(np.sum((y - y.mean())**2)) if n > 1 else 0.0
    r2 = 1.0 - ss_res/ss_tot if ss_tot > 0 else 1.0
    slope = float(b)
    if slope <= 0:  # strictly enforce positive trend
        return np.nan, slope, resid_std, r2
    score = slope / (resid_std + 1e-6)
    return score, slope, resid_std, r2

# ---------- 6) COLLECTOR ----------
def collect_uptrend_pricing_power(row: pd.Series, n_years:int=MAX_YEARS) -> Optional[Dict]:
    ticker = row["YF Ticker"]
    orig_ticker = row["Ticker"]
    last_err = None

    for attempt in range(1, RETRY_N + 2):
        try:
            t = yf.Ticker(ticker)
            fin = get_income_stmt(t)
            if fin is None or fin.empty:
                last_err = "no financials"
                break

            rev_label  = find_label(fin.index, ["total revenue", "revenue"])
            cogs_label = find_label(fin.index, ["cost of revenue", "cost of goods sold", "cost revenue"])
            if not rev_label or not cogs_label:
                last_err = f"missing rows → rev={rev_label}, cogs={cogs_label}"
                break

            revenue = fin.loc[rev_label].dropna()
            cogs    = fin.loc[cogs_label].dropna()
            common_cols = [c for c in revenue.index if c in cogs.index]
            if not common_cols:
                last_err = "no overlapping revenue/COGS columns"
                break
            try:
                common_cols = sorted(common_cols, reverse=True)
            except Exception:
                pass

            margins, used_cols = [], []
            for c in common_cols:
                r, cg = revenue[c], cogs[c]
                if pd.notna(r) and pd.notna(cg) and r not in (0, 0.0):
                    gm = (r - cg) / r
                    if isinstance(gm, (int, float)) and math.isfinite(gm) and -5.0 < gm < 5.0:
                        margins.append(float(gm))
                        used_cols.append(str(c))
                if len(margins) >= n_years:
                    break
            if len(margins) < 3:
                last_err = f"only {len(margins)} margins"
                break

            score, slope, resid_std, r2 = uptrend_score_positive_only(margins)
            if np.isnan(score):  # exclude non-positive slopes
                return None

            founding_year_yf = safe_get_info_item(t, "founded")
            mcap = safe_get_info_item(t, "marketCap")

            return {
                "Ticker": orig_ticker,
                "YF Ticker": ticker,
                "Full Name": row["Full Name"],
                "Sector": row.get("Sector", np.nan),
                "Sub-Industry": row.get("Sub-Industry", np.nan),
                "Date Added": row.get("Date Added", np.nan),
                "Founded (Wiki)": row.get("Founded (Wiki)", np.nan),
                "Founding Year (YF)": founding_year_yf,
                "Market Cap": mcap,
                "Gross Margins (newest→older)": margins,
                "Years Used (labels)": used_cols,
                "Uptrend Slope": slope,
                "Trend Residual Std": resid_std,
                "Trend R^2": r2,
                "Pricing Power (Uptrend)": score,
            }
        except Exception as e:
            last_err = str(e)
        time.sleep(SLEEP_SEC * attempt)

    print(f"  ❌ {orig_ticker}: {last_err}")
    return None

# ---------- 7) PIPELINE ----------
sp500_df = get_sp500_table()
sp500_df["YF Ticker"] = sp500_df["Ticker"].map(yf_ticker)
sp500_df["Founded_Year_Wiki"] = sp500_df["Founded (Wiki)"].apply(parse_year)

if EXCLUDE_FINANCIALS:
    sp500_df = sp500_df[~sp500_df["Sector"].isin({"Financials", "Real Estate"})].copy()

if EXCLUDE_YOUNG_YEARS and "Founded_Year_Wiki" in sp500_df.columns:
    current_year = pd.Timestamp.today().year
    mask_old = sp500_df["Founded_Year_Wiki"].apply(
        lambda y: (isinstance(y, (int, np.integer)) and (current_year - int(y) >= EXCLUDE_YOUNG_YEARS))
    )
    sp500_df = sp500_df[mask_old | sp500_df["Founded_Year_Wiki"].isna()].copy()

results: List[Dict] = []
n = len(sp500_df)
for i, row in sp500_df.reset_index(drop=True).iterrows():
    print(f"Processing {row['Ticker']} ({i+1}/{n}) …")
    rec = collect_uptrend_pricing_power(row)
    if rec:
        results.append(rec)
    time.sleep(SLEEP_SEC)

# ---------- 8) OUTPUT ----------
if results:
    df = pd.DataFrame(results).set_index("Ticker")
    show_cols = [
        "Full Name", "Sector", "Sub-Industry",
        "Founding Year (YF)", "Founded (Wiki)", "Date Added",
        "Market Cap", "Pricing Power (Uptrend)", "Uptrend Slope",
        "Trend Residual Std", "Trend R^2",
        "Gross Margins (newest→older)", "Years Used (labels)"
    ]
    df = df.reindex(columns=show_cols).sort_values("Pricing Power (Uptrend)", ascending=False)
    df.to_csv(OUTPUT_CSV)
    print(f"\n✅ Done! Positive-trend firms saved to {OUTPUT_CSV}\n")
else:
    print("⚠️ No firms with positive uptrends found.")


Processing MMM (1/397) …
Processing AOS (2/397) …
Processing ABT (3/397) …
Processing ABBV (4/397) …
Processing ACN (5/397) …
Processing ADBE (6/397) …
Processing AMD (7/397) …
Processing AES (8/397) …
Processing A (9/397) …
Processing APD (10/397) …
Processing ABNB (11/397) …
Processing AKAM (12/397) …
Processing ALB (13/397) …
Processing ALGN (14/397) …
Processing ALLE (15/397) …
Processing LNT (16/397) …
Processing GOOGL (17/397) …
Processing GOOG (18/397) …
Processing MO (19/397) …
Processing AMZN (20/397) …
Processing AMCR (21/397) …
Processing AEE (22/397) …
Processing AEP (23/397) …
Processing AWK (24/397) …
Processing AME (25/397) …
Processing AMGN (26/397) …
Processing APH (27/397) …
Processing ADI (28/397) …
Processing APA (29/397) …
Processing AAPL (30/397) …
Processing AMAT (31/397) …
Processing APP (32/397) …
Processing APTV (33/397) …
Processing ADM (34/397) …
Processing ANET (35/397) …
Processing T (36/397) …
Processing ATO (37/397) …
Processing ADSK (38/397) …
Processin

In [None]:

df = GPM_uptrend
GPM_uptrend.to_excel("sp500_pricing_power_GPM_uptrend.xlsx", index=True)

In [None]:
GPM_uptrend.head(500
                 )

Unnamed: 0_level_0,Full Name,Sector,Sub-Industry,Founding Year (YF),Founded (Wiki),Date Added,Market Cap,Pricing Power (Uptrend),Uptrend Slope (pp/year),Trend Residual Std,Trend R^2,Gross Margins (newest→older),Years Used (labels)
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
MCK,McKesson Corporation,Health Care,Health Care Distributors,,1833,1999-01-13,94973698048,10.328353,0.004104,0.000396,0.992594,"[0.0371061492657032, 0.041521147366410854, 0.0...","[2025-03-31 00:00:00, 2024-03-31 00:00:00, 202..."
CVS,CVS Health,Health Care,Health Care Services,,1996,1957-03-04,98802630656,8.358944,0.013853,0.001656,0.988694,"[0.13787489035940656, 0.15213709136442913, 0.1...","[2024-12-31 00:00:00, 2023-12-31 00:00:00, 202..."
F,Ford Motor Company,Consumer Discretionary,Automobile Manufacturers,,1903,1957-03-04,45410205696,6.844903,0.012711,0.001856,0.983230,"[0.08381984085798305, 0.09171864624186253, 0.1...","[2024-12-31 00:00:00, 2023-12-31 00:00:00, 202..."
DG,Dollar General,Consumer Staples,Consumer Staples Merchandise Retail,,1939,2012-12-03,21838921728,5.599267,0.006967,0.001243,0.975157,"[0.29590775781568485, 0.3028828291943093, 0.31...","[2025-01-31 00:00:00, 2024-01-31 00:00:00, 202..."
INCY,Incyte,Health Care,Biotechnology,,1991,2017-02-28,16364141568,5.269908,0.007710,0.001462,0.972038,"[0.9264126310915004, 0.9309999407411256, 0.939...","[2024-12-31 00:00:00, 2023-12-31 00:00:00, 202..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...
AMAT,Applied Materials,Information Technology,Semiconductor Materials & Equipment,,1967,1995-03-16,167255080960,0.000000,-0.000595,0.003946,0.027625,"[0.47457315278186635, 0.4670211562393936, 0.46...","[2024-10-31 00:00:00, 2023-10-31 00:00:00, 202..."
APP,AppLovin,Information Technology,Application Software,,2012,2025-09-22,192765247488,0.000000,-0.044124,0.051109,0.482312,"[0.7522309294392651, 0.6773795516232132, 0.554...","[2024-12-31 00:00:00, 2023-12-31 00:00:00, 202..."
AOS,A. O. Smith,Industrials,Building Products,,1916,2017-07-26,9493779456,0.000000,-0.006402,0.009730,0.351092,"[0.38136769597443754, 0.3853820598006645, 0.35...","[2024-12-31 00:00:00, 2023-12-31 00:00:00, 202..."
ZTS,Zoetis,Health Care,Pharmaceuticals,,1952,2013-06-21,62537621504,0.000000,-0.001691,0.002765,0.318572,"[0.7447061365600691, 0.7402855805243446, 0.735...","[2024-12-31 00:00:00, 2023-12-31 00:00:00, 202..."
