In [1]:
import os
import re
import json
import time
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Tuple, Optional

import requests
import pandas as pd

# ==============
# 1) 基础配置
# ==============
CIK = "0000863436"  # Benchmark Electronics, Inc.
TICKER_OR_NAME = "BHE"
CACHE_DIR = Path("./edgar_cache") / TICKER_OR_NAME

# SEC 强制要求带 User-Agent（写清楚你的身份/邮箱）
HEADERS = {
    "User-Agent": "Your Name (your_email@example.com)",
    "Accept-Encoding": "gzip, deflate",
    "Host": "data.sec.gov",
}

HEADERS_SEC = {
    "User-Agent": HEADERS["User-Agent"],
    "Accept-Encoding": "gzip, deflate",
    "Host": "www.sec.gov",
}

RATE_LIMIT_SECONDS = 0.25  # 别太快，给 SEC 留点余量


# ==========================
# 2) 工具：目录/请求/时间
# ==========================
def ensure_dir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)

def sec_get(url: str, use_data_host: bool = False) -> requests.Response:
    time.sleep(RATE_LIMIT_SECONDS)
    h = HEADERS if use_data_host else HEADERS_SEC
    resp = requests.get(url, headers=h, timeout=60)
    resp.raise_for_status()
    return resp

def to_cik_int(cik_str: str) -> int:
    return int(cik_str.lstrip("0") or "0")

def cik_nopad(cik_str: str) -> str:
    return str(to_cik_int(cik_str))

def accession_no_nodash(accession: str) -> str:
    return accession.replace("-", "")

def parse_yyyymmdd(s: str) -> datetime:
    return datetime.strptime(s, "%Y-%m-%d")


# ==========================================
# 3) 从 submissions JSON 拉 filings 列表
# ==========================================
def load_company_submissions(cik_str: str) -> Dict:
    url = f"https://data.sec.gov/submissions/CIK{cik_str}.json"
    return sec_get(url, use_data_host=True).json()

def extract_filings(submissions: Dict) -> List[Dict]:
    """
    返回扁平化的 filings 列表：
    form, filingDate, reportDate, accessionNumber, primaryDocument
    """
    recent = submissions.get("filings", {}).get("recent", {})
    forms = recent.get("form", [])
    filing_dates = recent.get("filingDate", [])
    report_dates = recent.get("reportDate", [])
    accessions = recent.get("accessionNumber", [])
    primary_docs = recent.get("primaryDocument", [])

    out = []
    n = min(len(forms), len(filing_dates), len(accessions), len(primary_docs))
    for i in range(n):
        out.append({
            "form": forms[i],
            "filingDate": filing_dates[i],
            "reportDate": report_dates[i] if i < len(report_dates) else "",
            "accessionNumber": accessions[i],
            "primaryDocument": primary_docs[i],
        })
    return out


# ==========================================
# 4) “季度”识别：把 reportDate 归到 Q1~Q4
# ==========================================
def quarter_label_from_report_date(report_date: str) -> Optional[str]:
    """
    给定 reportDate (YYYY-MM-DD)，返回类似 "2025Q3"。
    逻辑：按月份归 Q1/Q2/Q3/Q4。
    """
    if not report_date:
        return None
    dt = parse_yyyymmdd(report_date)
    q = (dt.month - 1) // 3 + 1
    return f"{dt.year}Q{q}"

def pick_quarter_end_key(f: Dict) -> Optional[Tuple[datetime, str]]:
    """
    用 reportDate 作为“季度结束日”主键；返回 (report_dt, quarter_label)
    """
    rd = f.get("reportDate") or ""
    qlab = quarter_label_from_report_date(rd)
    if not qlab:
        return None
    return (parse_yyyymmdd(rd), qlab)


# ==========================================
# 5) 下载并缓存：按 quarter_label 命名
# ==========================================
def local_report_path(quarter_label: str, form: str, filing_date: str, primary_doc: str) -> Path:
    # primary_doc 可能含路径，取最后的文件名
    doc_name = os.path.basename(primary_doc)
    safe_doc = re.sub(r"[^A-Za-z0-9._-]+", "_", doc_name)
    return CACHE_DIR / quarter_label / f"{quarter_label}_{form}_{filing_date}_{safe_doc}"

def download_filing_primary_doc(cik_str: str, accession: str, primary_doc: str, save_to: Path) -> None:
    """
    从 https://www.sec.gov/Archives/edgar/data/{cik}/{acc_no_nodash}/{primary_doc} 下载
    """
    ensure_dir(save_to.parent)
    url = f"https://www.sec.gov/Archives/edgar/data/{cik_nopad(cik_str)}/{accession_no_nodash(accession)}/{primary_doc}"
    resp = sec_get(url, use_data_host=False)
    save_to.write_bytes(resp.content)

def cache_latest_12_quarters(cik_str: str) -> List[Path]:
    """
    核心：确保本地有最近 12 个季度对应的报表文件，然后返回这些文件路径（按季度从新到旧）。
    """
    ensure_dir(CACHE_DIR)

    submissions = load_company_submissions(cik_str)
    filings = extract_filings(submissions)

    # 只要 10-Q + 10-K（你如果想只要 10-Q，可以改这里）
    filings = [f for f in filings if f["form"] in ("10-Q", "10-K")]

    # 以 reportDate 归档到季度；一个季度可能既有 10-Q/10-K，也可能有 amendment，
    # 我们取 filingDate 最新的那份作为该季度主文件
    by_quarter: Dict[str, Dict] = {}
    for f in filings:
        key = pick_quarter_end_key(f)
        if not key:
            continue
        report_dt, qlab = key
        # 同季度多条：选 filingDate 最大的
        if qlab not in by_quarter:
            by_quarter[qlab] = {**f, "_report_dt": report_dt}
        else:
            cur = by_quarter[qlab]
            if f["filingDate"] > cur["filingDate"]:
                by_quarter[qlab] = {**f, "_report_dt": report_dt}

    # 按 report_dt 倒序取 12 个季度
    quarters_sorted = sorted(by_quarter.items(), key=lambda kv: kv[1]["_report_dt"], reverse=True)
    quarters_sorted = quarters_sorted[:12]

    cached_paths: List[Path] = []
    for qlab, f in quarters_sorted:
        p = local_report_path(qlab, f["form"], f["filingDate"], f["primaryDocument"])
        if not p.exists():
            download_filing_primary_doc(cik_str, f["accessionNumber"], f["primaryDocument"], p)
        cached_paths.append(p)

    return cached_paths


# ==========================================
# 6) “读取”这 12 个季度：示例用 read_html 抽表
# ==========================================
def read_report_tables(report_file: Path, max_tables: int = 10) -> List[pd.DataFrame]:
    """
    把 HTML 报表里的表格读出来（不保证每个表都是财报三大表，但足够做自动化初筛）。
    """
    try:
        # 有些文件是 .htm/.html，pandas 可以直接读本地文件路径
        dfs = pd.read_html(report_file.as_posix())
        return dfs[:max_tables]
    except Exception:
        return []

def main():
    paths = cache_latest_12_quarters(CIK)

    print("最近 12 个季度（从新到旧）的报表文件：")
    for p in paths:
        print(" -", p)

    # 示例：读取每个报表的前几个表格
    all_quarter_tables = {}
    for p in paths:
        quarter = p.parent.name
        tables = read_report_tables(p, max_tables=5)
        all_quarter_tables[quarter] = tables
        print(f"\n[{quarter}] 读取到 {len(tables)} 个表（展示第 1 个表前 5 行）")
        if tables:
            print(tables[0].head())

if __name__ == "__main__":
    main()

最近 12 个季度（从新到旧）的报表文件：
 - edgar_cache/BHE/2025Q3/2025Q3_10-Q_2025-11-05_bhe-20250930.htm
 - edgar_cache/BHE/2025Q2/2025Q2_10-Q_2025-07-31_bhe-20250630.htm
 - edgar_cache/BHE/2025Q1/2025Q1_10-Q_2025-05-01_bhe-20250331.htm
 - edgar_cache/BHE/2024Q4/2024Q4_10-K_2025-02-24_bhe-20241231.htm
 - edgar_cache/BHE/2024Q3/2024Q3_10-Q_2024-11-01_bhe-20240930.htm
 - edgar_cache/BHE/2024Q2/2024Q2_10-Q_2024-08-01_bhe-20240630.htm
 - edgar_cache/BHE/2024Q1/2024Q1_10-Q_2024-05-03_bhe-20240331.htm
 - edgar_cache/BHE/2023Q4/2023Q4_10-K_2024-02-27_bhe-20231231.htm
 - edgar_cache/BHE/2023Q3/2023Q3_10-Q_2023-11-02_bhe-20230930.htm
 - edgar_cache/BHE/2023Q2/2023Q2_10-Q_2023-08-03_bhe-20230630.htm
 - edgar_cache/BHE/2023Q1/2023Q1_10-Q_2023-05-05_bhe-20230331.htm
 - edgar_cache/BHE/2022Q4/2022Q4_10-K_2023-02-24_bhe-20221231.htm

[2025Q3] 读取到 5 个表（展示第 1 个表前 5 行）
                                                   0
0  ☑ QUARTERLY REPORT PURSUANT TO SECTION 13 OR 1...

[2025Q2] 读取到 5 个表（展示第 1 个表前 5 行）
            