In [3]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
02_collect_prospectus_links.py
功能：按《正股映射结果.xlsx》的公司集合，从巨潮“历史公告查询”接口检索
     “招股说明书”相关公告，筛选并导出为与“年报链接_2024_选取公司【年报】.xlsx”
     相同列顺序/格式的 Excel（只是把其中的标题/年份/链接替换为招股说明书对应内容）。
保持：第1段代码不改；本脚本不覆盖你原来的第2段（年报版），作为并行的“招股书版”使用。
"""

import os
import re
import time
import requests
import openpyxl
import pandas as pd
from typing import List, Dict, Tuple
from datetime import date

# ========= 可调参数 =========
# 关键词：默认“招股说明书”；可追加“申报稿”“更新稿”“招股意向书”等
KEYWORDS = ["招股说明书"]
# 可选：额外关键词（例如某行业词、再融资关键词等），会与 KEYWORDS 合并后做 OR 匹配
EXTRA_KWS: List[str] = []  # 例：["申报稿", "更新稿", "招股意向书"]

# 时间范围（起止含边界），建议覆盖更长历史，避免漏报（UI 上你截图用了 2000-02-01 到今天）
DATE_BEGIN = "2000-02-01"
DATE_END   = date.today().strftime("%Y-%m-%d")  # 自动到今天

# 输出目录 / 输入映射表
output_dir   = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(output_dir, exist_ok=True)
mapping_file = os.path.expanduser("~/Desktop/正股映射结果.xlsx")

# 输出文件名（保持与你的“年报版”列顺序一致：最后一列列名沿用“年报链接”，便于后续统一处理）
OUT_XLSX = os.path.join(output_dir, "招股说明书链接_选取公司【招股】.xlsx")
SHEET    = "招股说明书"

# 市场/板块保持你的原设定
MARKETS = ["szse", "shse"]
PLATES  = ["", "sz", "szmb", "szcy", "sh", "shmb", "shkcp", "bj"]

# 排除干扰词（英文/摘要/取消/提示等）
EXCLUDE_KWS = [
    "英文", "英文版", "Annual", "annual", "Summary", "summary",
    "摘要", "摘要版", "取消", "已取消", "提示性公告", "更名提示", "B股", "H股"
]

# ========= 工具函数 =========
_SPACE_RE = re.compile(r"[\s\u00A0\u3000]+")
def norm(s: str) -> str:
    return _SPACE_RE.sub("", str(s or "").strip())

def any_kw_in(text: str, kws: List[str]) -> bool:
    t = norm(text)
    return any(norm(k) in t for k in kws if k)

# 宽松年份提取：优先从标题中抽取，兜底用公告时间
_TITLE_YEAR_PATS = [
    re.compile(r"(20\d{2})\s*年?\s*度?\s*招\s*股\s*说\s*明\s*书"),
    re.compile(r"(20\d{2})\s*年?\s*招\s*股\s*说\s*明\s*书"),
    re.compile(r"(19\d{2}|20\d{2})")  # 兜底：任何四位年份
]

def extract_year_from_title(title: str) -> str:
    t = norm(title)
    for pat in _TITLE_YEAR_PATS:
        m = pat.search(t)
        if m:
            return m.group(1)
    return ""

def year_from_timestamp_ms(ts: int) -> str:
    try:
        return time.strftime("%Y", time.localtime(int(ts)/1000))
    except Exception:
        return ""

def is_target_prospectus(title: str) -> bool:
    """满足：包含 KEYWORDS/EXTRA_KWS 中任一；且不含排除词"""
    kws = list(KEYWORDS) + list(EXTRA_KWS)
    t = norm(title)
    if not any_kw_in(t, kws):
        return False
    if any_kw_in(t, EXCLUDE_KWS):
        return False
    return True

def ann_weight(it: Dict) -> Tuple[int, int]:
    """
    权重：优先“更新/修订/更正/申报稿”类版本（你可以按需调整），同级按公告时间近
    """
    title = norm(it.get("announcementTitle", ""))
    w_fix = 0
    if any_kw_in(title, ["更新", "修订", "更正", "申报稿"]):
        w_fix = 10
    try:
        ts = int(it.get("announcementTime", 0))
    except Exception:
        ts = 0
    return (w_fix, ts)

# ========= 调接口 =========
def get_report(page_num: int, date_range: str, column: str, plate: str, searchkey: str="") -> requests.Response:
    """
    复用你的 hisAnnouncement/query 接口。
    注意：这里不再指定“category_ndbg_szsh”（年报类），而是改为 keyword 检索。
    """
    url = "http://www.cninfo.com.cn/new/hisAnnouncement/query"
    payload = {
        "pageNum":   page_num,
        "pageSize":  30,
        "column":    column,       # szse/shse
        "tabName":   "fulltext",
        "category":  "",           # 置空：转用 searchkey 匹配“招股说明书”
        "plate":     plate,
        "searchkey": searchkey,    # 例如：招股说明书
        "secid":     "",
        "trade":     "",
        "seDate":    date_range,   # 形如 2000-02-01~2025-08-18
        "sortName":  "code",
        "sortType":  "asc",
        "isHLtitle": "false"
    }
    headers = {
        "Accept":           "*/*",
        "Accept-Language":  "zh-CN,zh;q=0.9",
        "Content-Type":     "application/x-www-form-urlencoded; charset=UTF-8",
        "Origin":           "http://www.cninfo.com.cn",
        "Referer":          "http://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
        "User-Agent":       "Mozilla/5.0",
        "X-Requested-With": "XMLHttpRequest"
    }
    return requests.post(url, data=payload, headers=headers, timeout=15)

def download_for_segments(segments: List[str], column: str, plate: str, searchkey: str) -> List[Dict]:
    """
    分段分页拉取；当场做“招股说明书”粗过滤 + 去重（secCode, adjunctUrl）
    """
    all_ann: List[Dict] = []
    seen = set()
    for date_range in segments:
        try:
            total = get_report(1, date_range, column, plate, searchkey).json().get("totalpages", 0)
        except Exception:
            continue
        if total <= 0:
            continue

        page = 1
        while page <= total:
            for _ in range(3):  # 重试三次
                try:
                    resp = get_report(page, date_range, column, plate, searchkey)
                    resp.raise_for_status()
                    anns = resp.json().get("announcements", []) or []
                    for x in anns:
                        title = x.get("announcementTitle", "")
                        if not is_target_prospectus(title):
                            continue
                        key = (str(x.get("secCode", "")), str(x.get("adjunctUrl", "")))
                        if key in seen:
                            continue
                        seen.add(key)
                        all_ann.append(x)
                    break
                except Exception:
                    time.sleep(1.2)
            page += 1
    return all_ann

def collect_all_prospectus() -> List[Dict]:
    """
    参考你的“年报版”写法，但时间用一个较长范围分片。
    为稳妥，按“年度分片”查询，避免一次范围过大导致结果截断。
    """
    # 把 DATE_BEGIN ~ DATE_END 按年切片
    y_begin = int(DATE_BEGIN[:4])
    y_end   = int(DATE_END[:4])
    segments: List[str] = []
    for y in range(y_begin, y_end + 1):
        seg_start = f"{y}-01-01" if y > y_begin else DATE_BEGIN
        seg_end   = f"{y}-12-31" if y < y_end   else DATE_END
        segments.append(f"{seg_start}~{seg_end}")

    # 关键词（合并主关键词与额外关键词，以“或”关系发请求；也可只用“招股说明书”一个词）
    search_terms = list({norm(k) for k in (KEYWORDS + EXTRA_KWS) if k})
    raw: List[Dict] = []
    for col in MARKETS:
        for pl in PLATES:
            for kw in (search_terms or ["招股说明书"]):
                raw.extend(download_for_segments(segments, col, pl, kw))
    return raw

# ========= 选公司 & 去重取优 =========
def filter_latest_per_company(anns: List[Dict], valid_secs: set) -> List[Dict]:
    """
    只保留我们映射表里的公司（按 secCode 匹配），同公司同“年份”取权重最高（更新/修订优先，其次时间近）
    年份优先从标题抽取；抽不到再用 announcementTime 的年份。
    """
    latest: Dict[Tuple[str, str], Dict] = {}
    for it in anns:
        sec = str(it.get("secCode", ""))
        if sec not in valid_secs:
            continue
        yr = extract_year_from_title(it.get("announcementTitle", "")) or year_from_timestamp_ms(it.get("announcementTime", 0))
        if not yr:
            continue
        key = (sec, yr)
        if key not in latest or ann_weight(it) > ann_weight(latest[key]):
            latest[key] = it
    return list(latest.values())

# ========= 写 Excel =========
def write_selected_excel(anns: List[Dict]) -> None:
    """
    输出列顺序与“年报版”一致：可转债代码、可转债名称、公司代码、公司简称、标题、年份、年报链接
    仅含映射表中的公司；未命中则留空一行（只带前四列）
    """
    df_map = pd.read_excel(mapping_file, dtype=str).fillna("")
    df_map["sec_no_suf"] = df_map["正股代码"].str.split(".").str[0]

    # 准备一个 (secCode, year) -> 记录 的查找表
    ann_dict: Dict[Tuple[str, str], Dict] = {}
    for it in anns:
        sec = str(it.get("secCode", ""))
        yr  = extract_year_from_title(it.get("announcementTitle", "")) or year_from_timestamp_ms(it.get("announcementTime", 0))
        if sec and yr:
            ann_dict[(sec, yr)] = it

    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = SHEET
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","年报链接"])

    for _, row in df_map.iterrows():
        bond_code = row.get("代码", "")
        bond_name = row.get("名称", "")
        sec       = row.get("sec_no_suf", "")
        comp_name = row.get("正股名称", "")

        # “招股说明书”有可能并非每个年度都有；这里按可匹配到的全部年份写多行
        matched = [(k, v) for k, v in ann_dict.items() if k[0] == sec]
        if matched:
            # 按年份升序输出（可按需改为降序）
            matched.sort(key=lambda kv: kv[0][1])
            for (_, yr), it in matched:
                raw_title = re.sub(r"<.*?>","", it.get("announcementTitle","")).replace("：","")
                title = f"《{raw_title}》"
                url   = f"http://static.cninfo.com.cn/{it.get('adjunctUrl','')}"
                ws.append([bond_code, bond_name, sec, comp_name, title, yr, url])
        else:
            # 未命中则留空
            ws.append([bond_code, bond_name, sec, comp_name, "", "", ""])

    wb.save(OUT_XLSX)
    print(f"✅ 已输出：{OUT_XLSX}（工作表：{SHEET}）")

# ========= 主流程 =========
def main():
    # 1) 公司集合
    df_map = pd.read_excel(mapping_file, dtype=str).fillna("")
    valid_secs = set(df_map["正股代码"].str.split(".").str[0].tolist())

    # 2) 拉取 + 粗过滤（按关键词）
    raw = collect_all_prospectus()

    # 3) 仅保留映射公司 + 去重取优（同公司同年择优）
    picked = filter_latest_per_company(raw, valid_secs)

    # 4) 导出（列顺序与年报版一致）
    write_selected_excel(picked)

    print("---- 招股说明书检索完成 ----")

if __name__ == "__main__":
    main()


✅ 已输出：/Users/sam/Desktop/cninfo_output/招股说明书链接_选取公司【招股】.xlsx（工作表：招股说明书）
---- 招股说明书检索完成 ----


In [6]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
02_collect_prospectus_links_SSE_only.py
仅用上交所 API，按《正股映射结果.xlsx》逐公司检索“招股说明书”，导出与年报版一致的 7 列：
[可转债代码, 可转债名称, 公司代码, 公司简称, 标题, 年份, 年报链接]

接口： http://query.sse.com.cn/infodisplay/queryLatestBulletinNew.do （返回 JSONP）
主要参数：
  productId        公司 6 位代码（如 600000）
  keyWord          关键词（本脚本默认“招股说明书”，可改）
  isPagination     true
  pageHelp.pageNo  页码（从 1 开始）
  pageHelp.pageSize 每页条数（建议 25/50）
  beginDate / endDate 时间范围（YYYY-MM-DD）

注意：
- 需要 Referer 头： https://www.sse.com.cn/
- 结果字段常见：title / URL / stockcode / extGSJC / publishTime 等
- URL 多为以 / 开头的相对路径，需补前缀 https://static.sse.com.cn
"""

import os
import re
import time
import json
import html
import requests
import openpyxl
import pandas as pd
from typing import List, Dict, Tuple, Optional
from urllib.parse import urljoin
from datetime import date

# ====== 基本路径配置 ======
output_dir   = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(output_dir, exist_ok=True)
mapping_file = os.path.expanduser("~/Desktop/正股映射结果.xlsx")

OUT_XLSX   = os.path.join(output_dir, "招股说明书链接_选取公司【招股_SSE】.xlsx")
SHEET_NAME = "招股说明书"

# ====== 可调参数 ======
KEYWORD            = "招股说明书"       # 可改为 “募集说明书/招股意向书” 等
DATE_BEGIN         = "1990-01-01"      # 覆盖尽量久远
DATE_END           = date.today().strftime("%Y-%m-%d")
PAGE_SIZE          = 50                # 每页条数
MAX_PAGES_PER_CO   = 60                # 每家公司最多翻多少页，防止无限
REQUEST_GAP_SEC    = 0.35              # 请求间隔，避免过快

# 排除干扰词（英文/摘要/提示等）
EXCLUDE_KWS = ["英文", "英文版", "摘要", "提示性公告", "取消", "已取消", "更名提示", "H股", "B股"]

# ====== 小工具 ======
_SPACE_RE = re.compile(r"[\s\u00A0\u3000]+")
def norm(s: str) -> str:
    return _SPACE_RE.sub("", str(s or "").strip())

def any_kw_in(text: str, kws: List[str]) -> bool:
    t = norm(text)
    return any(norm(k) in t for k in kws if k)

def clean_title(t: str) -> str:
    t = html.unescape(str(t or ""))
    t = re.sub(r"<.*?>", "", t)
    t = t.replace("：", "").strip()
    return t

def extract_year_from_text(x: str) -> str:
    m = re.search(r"(19\d{2}|20\d{2})", str(x))
    return m.group(1) if m else ""

def to_abs_url(u: str) -> str:
    # SSE 返回多为相对路径：/disclosure/.../pdf/xxxx.pdf
    if not u:
        return ""
    if u.startswith("http"):
        return u
    # 官方静态资源域
    return urljoin("https://static.sse.com.cn/", u.lstrip("/"))

# ====== 上交所公告检索（JSONP） ======
SSE_URL = "http://query.sse.com.cn/infodisplay/queryLatestBulletinNew.do"
SSE_HEADERS = {
    "Referer": "https://www.sse.com.cn/",
    "User-Agent": "Mozilla/5.0",
    "Accept": "*/*",
}

def sse_query_company_bulletins(code6: str, keyword: str, page_no: int) -> Dict:
    """
    调用上交所公告检索接口（按公司+关键词+分页）。
    返回解析后的 dict（已去掉 JSONP 外壳）；无结果返回 {}。
    """
    params = {
        "jsonCallBack": f"jsonpCallback{int(time.time()*1000)%100000}",  # 任意占位
        "isPagination": "true",
        "productId": code6,
        "keyWord": keyword,
        # reportType 与 reportType2 留空，用关键字检索；若要限定类别可自行填写
        "reportType": "",
        "reportType2": "",
        "beginDate": DATE_BEGIN,
        "endDate": DATE_END,
        "pageHelp.pageSize": PAGE_SIZE,
        "pageHelp.pageCount": 50,
        "pageHelp.pageNo": page_no,     # 页码从 1 开始
        "pageHelp.beginPage": page_no,
        "pageHelp.cacheSize": 1,
        "pageHelp.endPage": page_no + 4,
        "_": str(int(time.time()*1000)),
    }
    r = requests.get(SSE_URL, params=params, headers=SSE_HEADERS, timeout=20)
    r.raise_for_status()
    # 去掉 JSONP 包装：jsonpCallback123({...})
    m = re.search(r"\((\{.*\})\)\s*$", r.text, re.S)
    if not m:
        return {}
    try:
        return json.loads(m.group(1))
    except Exception:
        return {}

def parse_bulletins(js: Dict) -> Tuple[int, List[Dict]]:
    """
    兼容字段：data.result / result
    返回：(totalSize, items)；item: {title,url,publishTime,stockcode,extGSJC}
    """
    if not js:
        return 0, []
    data = js.get("data") or js  # 有的返回直接就是 {"result":[...]}
    total = int(data.get("totalSize") or data.get("total") or 0)
    arr = data.get("result") or []
    items = []
    for it in arr:
        title = clean_title(it.get("title") or it.get("docTitle") or "")
        url   = to_abs_url(it.get("URL") or it.get("url") or it.get("docURL") or "")
        ptime = it.get("publishTime") or it.get("createTime") or it.get("create_time") or ""
        stock = str(it.get("stockcode") or "")
        name  = it.get("extGSJC") or it.get("gsjc") or ""
        items.append({
            "title": title, "url": url, "publishTime": ptime,
            "stockcode": stock, "shortname": name
        })
    return total, items

def fetch_company_prospectus(code6: str) -> List[Dict]:
    """
    对单家公司：按 KEYWORD 翻页抓取，筛选标题含“招股说明书”，去除英文/摘要等。
    返回：[{secCode, announcementTitle, year, adjunctUrl}]
    """
    picked: List[Dict] = []
    seen = set()

    page_no = 1
    total_size = None
    fetched = 0

    while True:
        js = sse_query_company_bulletins(code6, KEYWORD, page_no)
        total, items = parse_bulletins(js)
        if total_size is None:
            total_size = total
        if not items:
            break

        for x in items:
            title = x["title"]
            url   = x["url"]
            if not title:
                continue
            # 过滤：必须含“招股说明书”，且不含排除词
            if "招股说明书" not in title and "招股 说明书" not in title:
                continue
            if any_kw_in(title, EXCLUDE_KWS):
                continue
            # 允许是 PDF 或 公告详情页（后续也可再解析）
            k = (title, url)
            if k in seen:
                continue
            seen.add(k)

            year = extract_year_from_text(title) or \
                   extract_year_from_text(x.get("publishTime","")) or \
                   extract_year_from_text(url)

            picked.append({
                "secCode": code6,
                "announcementTitle": title,
                "year": year,
                "adjunctUrl": url
            })

        fetched += len(items)
        page_no += 1
        if page_no > MAX_PAGES_PER_CO:
            break
        # 如果已抓到全部，则结束（total 可能为 0 表示未知）
        if total_size and fetched >= total_size:
            break

        time.sleep(REQUEST_GAP_SEC)

    return picked

# ====== 导出：与年报版一致的 7 列 ======
def write_selected_excel(picked_map: Dict[Tuple[str, str], Dict], df_map: pd.DataFrame) -> None:
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = SHEET_NAME
    # 注意列名“年报链接”保持不变，便于你后续直接复用下载器；里面存放招股说明书链接
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","年报链接"])

    for _, row in df_map.iterrows():
        bond_code = row.get("代码","")
        bond_name = row.get("名称","")
        sec       = str(row.get("sec_no_suf",""))
        comp_name = row.get("正股名称","")

        matches = sorted([(k, v) for k, v in picked_map.items() if k[0]==sec], key=lambda kv: kv[0][1])
        if matches:
            for (_, yr), it in matches:
                title = f"《{it.get('announcementTitle','').strip()}》"
                url   = it.get("adjunctUrl","")
                ws.append([bond_code, bond_name, sec, comp_name, title, yr, url])
        else:
            ws.append([bond_code, bond_name, sec, comp_name, "", "", ""])

    wb.save(OUT_XLSX)
    print(f"✅ 已输出：{OUT_XLSX}（工作表：{SHEET_NAME}）")

# ====== 主流程 ======
def main():
    # 读取《正股映射结果.xlsx》；仅使用“正股代码”6位与“正股名称”
    df_map = pd.read_excel(mapping_file, dtype=str).fillna("")
    df_map["sec_no_suf"] = df_map["正股代码"].str.split(".").str[0]

    # （可选）只跑上交所：以 60/68 开头
    # df_map = df_map[df_map["sec_no_suf"].str.startswith(("60","68"))].copy()

    picked_map: Dict[Tuple[str, str], Dict] = {}  # {(sec, year): item}

    for _, row in df_map.iterrows():
        sec = str(row.get("sec_no_suf",""))
        if not sec:
            continue

        try:
            items = fetch_company_prospectus(sec)
            # 同公司同年保留“更相关/更完整”的一条（用标题长度近似；也可改为“注册/更新/修订”优先）
            bucket: Dict[str, Dict] = {}
            for it in items:
                yr = it.get("year") or ""
                if not yr:
                    continue
                if (yr not in bucket) or (len(it["announcementTitle"]) > len(bucket[yr]["announcementTitle"])):
                    bucket[yr] = it
            for yr, it in bucket.items():
                picked_map[(sec, yr)] = it

        except Exception as e:
            print(f"[WARN] {sec} 抓取异常：{e}")

        time.sleep(REQUEST_GAP_SEC)

    write_selected_excel(picked_map, df_map)
    print("---- 上交所 招股说明书 检索完成 ----")

if __name__ == "__main__":
    main()


✅ 已输出：/Users/sam/Desktop/cninfo_output/招股说明书链接_选取公司【招股_SSE】.xlsx（工作表：招股说明书）
---- 上交所 招股说明书 检索完成 ----


In [7]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
02_collect_prospectus_links_SSE_searchapi.py
仅用上交所全文检索接口 /search/getSearchResult.do 抓取“招股说明书”，
按《正股映射结果.xlsx》逐公司检索，导出与年报版相同的 7 列：
[可转债代码, 可转债名称, 公司代码, 公司简称, 标题, 年份, 年报链接]

关键点：
- 请求：GET https://query.sse.com.cn/search/getSearchResult.do
  主要参数：
    searchword      关键词（如 "三房巷 招股说明书" 或 "600370 招股说明书"）
    keywordPosition "title" 或 "title,paper_content"
    spaceId         3   （官网搜索空间）
    page            从 0 开始
    limit           每页条数（10/20/50）
    orderByKey      "create_time" 或 "score"
    orderByDirection "DESC"
- 返回：JSONP（jsonpCallback...({...})），去壳后在 data.knowledgeList 数组里；
  每条的 extend[] 中有 name=CURL 的相对 PDF 路径，需要拼 https://static.sse.com.cn
  还可能有 ZQDM（证券代码）、GSJC（公司简称）、FILETYPE（pdf）

用法：
  pip install requests pandas openpyxl
  直接运行，输出到 ~/Desktop/cninfo_output/招股说明书链接_选取公司【招股_SSE_search】.xlsx
"""

import os
import re
import time
import json
import html
import requests
import openpyxl
import pandas as pd
from typing import List, Dict, Tuple, Optional
from urllib.parse import urljoin

# ===== 基本路径 =====
output_dir   = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(output_dir, exist_ok=True)
mapping_file = os.path.expanduser("~/Desktop/正股映射结果.xlsx")

OUT_XLSX   = os.path.join(output_dir, "招股说明书链接_选取公司【招股_SSE_search】.xlsx")
SHEET_NAME = "招股说明书"

# ===== 搜索参数 =====
SEARCH_URL = "https://query.sse.com.cn/search/getSearchResult.do"
HEADERS = {
    "Referer": "https://www.sse.com.cn/",
    "User-Agent": "Mozilla/5.0",
    "Accept": "*/*",
}

# 每公司最多翻多少页、每页条数；建议先小、稳定后再放大
LIMIT_PER_PAGE = 20
MAX_PAGES_PER_KEY = 20           # 每个关键词最多翻 20 页（page 从 0 开始）
REQUEST_GAP_SEC = 0.35           # 请求间隔，防止过快

# 关键词构造：可按需增删
def build_keywords(short_name: str, code6: str) -> List[str]:
    # 注意“招股说明书”可连写或加空格；这里给 3 种常见写法
    return [
        f"{short_name} 招股说明书",
        f"{code6} 招股说明书",
        f"{short_name}招股说明书",
    ]

# 排除干扰词（英文/摘要等）
EXCLUDE_KWS = ["英文", "英文版", "摘要", "提示性公告", "取消", "已取消", "更名提示", "H股", "B股"]

# ===== 小工具 =====
_SPACE_RE = re.compile(r"[\s\u00A0\u3000]+")
def norm(s: str) -> str:
    return _SPACE_RE.sub("", str(s or "").strip())

def any_kw_in(text: str, kws: List[str]) -> bool:
    t = norm(text)
    return any(norm(k) in t for k in kws if k)

def clean_title(t: str) -> str:
    t = html.unescape(str(t or ""))
    t = re.sub(r"<.*?>", "", t).replace("：", "").strip()
    return t

def extract_year_from_text(x: str) -> str:
    m = re.search(r"(19\d{2}|20\d{2})", str(x))
    return m.group(1) if m else ""

def to_pdf_abs(url_or_curl: str) -> str:
    """把相对 CURL 转为绝对 pdf 链接"""
    u = str(url_or_curl or "").strip()
    if not u:
        return ""
    if u.startswith("http"):
        return u
    # 大多数文件在 static.sse.com.cn
    return urljoin("https://static.sse.com.cn/", u.lstrip("/"))

def is_sse_code(code6: str) -> bool:
    return str(code6).startswith(("60", "68"))

# ===== 调用与解析 =====
def sse_search_page(keyword: str, page: int = 0, limit: int = LIMIT_PER_PAGE, position: str = "title,paper_content") -> Dict:
    """
    调用 /search/getSearchResult.do；返回解出的 dict（已去 JSONP 外壳）。
    关键参数：
      searchword, page(0基), limit, keywordPosition, spaceId=3,
      orderByKey=create_time / score, orderByDirection=DESC
    """
    params = {
        "searchword": keyword,
        "page": page,
        "limit": limit,
        "keywordPosition": position,
        "spaceId": 3,
        "orderByKey": "create_time",
        "orderByDirection": "DESC",
    }
    r = requests.get(SEARCH_URL, params=params, headers=HEADERS, timeout=20)
    r.raise_for_status()
    m = re.search(r"\((\{.*\})\)\s*$", r.text, re.S)
    if not m:
        return {}
    try:
        return json.loads(m.group(1))
    except Exception:
        return {}

def parse_knowledge_list(js: Dict) -> Tuple[int, List[Dict]]:
    """
    从 data.knowledgeList 里抽我们关心的字段：
    返回 (totalPage, items)，其中 items: {title, url_pdf, createTime, zqdm, gsjc}
    """
    if not js:
        return 0, []
    data = js.get("data") or {}
    total_page = int(data.get("totalPage") or 0)
    arr = data.get("knowledgeList") or []

    items = []
    for it in arr:
        title = clean_title(it.get("title") or "")
        create_time = it.get("createTime") or it.get("updateTime") or ""
        # extend 里存放 CURL/FILETYPE/ZQDM/GSJC
        curl, zqdm, gsjc, ftype = "", "", "", ""
        for ext in (it.get("extend") or []):
            name = (ext.get("name") or "").upper()
            val  = ext.get("value") or ""
            if name == "CURL":
                curl = val
            elif name == "ZQDM":
                zqdm = str(val or "")
            elif name == "GSJC":
                gsjc = val
            elif name == "FILETYPE":
                ftype = (val or "").lower()
        # 仅保留 PDF 或明显指向 pdf 的
        if not curl and not (ftype == "pdf"):
            continue
        url_pdf = to_pdf_abs(curl)
        items.append({
            "title": title,
            "url_pdf": url_pdf,
            "createTime": create_time,
            "zqdm": zqdm,
            "gsjc": gsjc,
        })
    return total_page, items

# ===== 公司级抓取（多关键词 + 分页 + 过滤） =====
def fetch_company_prospectus_by_search(short_name: str, code6: str) -> List[Dict]:
    """
    对单家公司：用多关键词逐页抓，过滤出标题含“招股说明书”且（ZQDM==code6 或 URL 带 code6）的记录。
    返回 [{secCode, announcementTitle, year, adjunctUrl}]
    """
    results: List[Dict] = []
    seen = set()
    if not is_sse_code(code6):
        return results  # 仅上交所公司

    for kw in build_keywords(short_name, code6):
        # 第 0 页
        js = sse_search_page(kw, page=0)
        total_page, items = parse_knowledge_list(js)

        def pick(batch: List[Dict]):
            nonlocal results, seen
            for x in batch:
                title = x["title"]
                url   = x["url_pdf"]
                zqdm  = x.get("zqdm","")
                if not title or not url:
                    continue
                # 必须含“招股说明书”，且排除英文/摘要等
                if "招股说明书" not in title and "招股 说明书" not in title:
                    continue
                if any_kw_in(title, EXCLUDE_KWS):
                    continue
                # 只保留匹配该公司的：ZQDM==code6 或 URL 中包含 /{code6}_ 或 -{code6}_ 等常见模式
                if zqdm and zqdm != code6:
                    continue
                if not zqdm and (f"/{code6}_" not in url and f"-{code6}_" not in url and f"/{code6}%5F" not in url):
                    continue

                key = (title, url)
                if key in seen:
                    continue
                seen.add(key)

                year = extract_year_from_text(title) or extract_year_from_text(x.get("createTime","")) or extract_year_from_text(url)
                results.append({
                    "secCode": code6,
                    "announcementTitle": title,
                    "year": year,
                    "adjunctUrl": url
                })

        pick(items)

        # 继续翻页
        max_pages = min(MAX_PAGES_PER_KEY, int(total_page or 0))
        for p in range(1, max_pages):
            time.sleep(REQUEST_GAP_SEC)
            js = sse_search_page(kw, page=p)
            _, items_p = parse_knowledge_list(js)
            if not items_p:
                break
            pick(items_p)

        time.sleep(REQUEST_GAP_SEC)

    return results

# ===== 导出，与年报版一致 =====
def write_selected_excel(picked_map: Dict[Tuple[str, str], Dict], df_map: pd.DataFrame) -> None:
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = SHEET_NAME
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","年报链接"])  # 列名保持不变

    for _, row in df_map.iterrows():
        bond_code = row.get("代码","")
        bond_name = row.get("名称","")
        sec       = str(row.get("sec_no_suf",""))
        comp_name = row.get("正股名称","")

        matches = sorted([(k, v) for k, v in picked_map.items() if k[0]==sec], key=lambda kv: kv[0][1])
        if matches:
            for (_, yr), it in matches:
                title = f"《{it.get('announcementTitle','').strip()}》"
                url   = it.get("adjunctUrl","")
                ws.append([bond_code, bond_name, sec, comp_name, title, yr, url])
        else:
            ws.append([bond_code, bond_name, sec, comp_name, "", "", ""])

    wb.save(OUT_XLSX)
    print(f"✅ 已输出：{OUT_XLSX}（工作表：{SHEET_NAME}）")

# ===== 主流程 =====
def main():
    df_map = pd.read_excel(mapping_file, dtype=str).fillna("")
    df_map["sec_no_suf"] = df_map["正股代码"].str.split(".").str[0]

    # 只跑上交所（60/68 开头）
    df_map = df_map[df_map["sec_no_suf"].str.startswith(("60","68"))].copy()

    picked_map: Dict[Tuple[str, str], Dict] = {}

    for _, row in df_map.iterrows():
        sec  = str(row.get("sec_no_suf",""))
        name = str(row.get("正股名称","")).strip()
        if not sec or not name:
            continue

        try:
            items = fetch_company_prospectus_by_search(name, sec)
            # 同公司同年保留“更完整”的一条（用标题长度或“附录”劣后）
            bucket: Dict[str, Dict] = {}
            for it in items:
                yr = it.get("year") or ""
                if not yr:
                    continue
                # “附录”放低优先级
                score = (len(it["announcementTitle"]), -1 if "附录" in it["announcementTitle"] else 0)
                if (yr not in bucket) or (score > (len(bucket[yr]["announcementTitle"]), -1 if "附录" in bucket[yr]["announcementTitle"] else 0)):
                    bucket[yr] = it
            for yr, it in bucket.items():
                picked_map[(sec, yr)] = it
        except Exception as e:
            print(f"[WARN] {sec} {name} 抓取异常：{e}")

        time.sleep(REQUEST_GAP_SEC)

    write_selected_excel(picked_map, df_map)
    print("---- 上交所 招股说明书（search api）完成 ----")

if __name__ == "__main__":
    main()


✅ 已输出：/Users/sam/Desktop/cninfo_output/招股说明书链接_选取公司【招股_SSE_search】.xlsx（工作表：招股说明书）
---- 上交所 招股说明书（search api）完成 ----


In [8]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
三房巷（600370）专测版：仅用上交所 /search/getSearchResult.do 搜“招股说明书”，
导出与你年报版一致的 7 列（最后一列依旧叫“年报链接”，里存 PDF 链接）。
"""

import os, re, time, json, html, requests, openpyxl, pandas as pd
from urllib.parse import urljoin

# ===== 输出文件 =====
output_dir = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(output_dir, exist_ok=True)
OUT_XLSX   = os.path.join(output_dir, "招股说明书链接_三房巷【SSE_search】.xlsx")
SHEET_NAME = "招股说明书"

# ===== 上交所搜索 API =====
SEARCH_URL = "https://query.sse.com.cn/search/getSearchResult.do"
HEADERS = {
    "Referer": "https://www.sse.com.cn/",
    "User-Agent": "Mozilla/5.0",
    "Accept": "*/*",
}
LIMIT_PER_PAGE = 20
MAX_PAGES_PER_KEY = 20
REQUEST_GAP_SEC = 0.3

# —— 仅测试这家 —— 
TEST_SEC = "600370"
TEST_NAME = "三房巷"

# 关键词（你要改为“募集说明书/招股意向书”也行）
KEYWORDS = [
    f"{TEST_NAME} 招股说明书",
    f"{TEST_SEC} 招股说明书",
    f"{TEST_NAME}招股说明书",
]

EXCLUDE_KWS = ["英文", "英文版", "摘要", "提示性公告", "取消", "已取消", "更名提示", "H股", "B股"]

_SPACE_RE = re.compile(r"[\s\u00A0\u3000]+")
def norm(s: str) -> str:
    return _SPACE_RE.sub("", str(s or "").strip())

def any_kw_in(text: str, kws):
    t = norm(text)
    return any(norm(k) in t for k in kws if k)

def clean_title(t: str) -> str:
    t = html.unescape(str(t or ""))
    t = re.sub(r"<.*?>", "", t).replace("：", "").strip()
    return t

def extract_year(x: str) -> str:
    m = re.search(r"(19\d{2}|20\d{2})", str(x))
    return m.group(1) if m else ""

def to_pdf_abs(curl: str) -> str:
    if not curl: return ""
    if curl.startswith("http"): return curl
    return urljoin("https://static.sse.com.cn/", curl.lstrip("/"))

def sse_search_page(keyword: str, page: int = 0, limit: int = LIMIT_PER_PAGE, position: str = "title,paper_content"):
    params = {
        "searchword": keyword,
        "page": page,
        "limit": limit,
        "keywordPosition": position,
        "spaceId": 3,
        "orderByKey": "create_time",
        "orderByDirection": "DESC",
    }
    r = requests.get(SEARCH_URL, params=params, headers=HEADERS, timeout=20)
    r.raise_for_status()
    m = re.search(r"\((\{.*\})\)\s*$", r.text, re.S)
    return {} if not m else json.loads(m.group(1))

def parse_items(js: dict):
    data = js.get("data") or {}
    total_page = int(data.get("totalPage") or 0)
    arr = data.get("knowledgeList") or []
    items = []
    for it in arr:
        title = clean_title(it.get("title") or "")
        create_time = it.get("createTime") or it.get("updateTime") or ""
        curl, zqdm, ftype = "", "", ""
        for ext in (it.get("extend") or []):
            name = (ext.get("name") or "").upper()
            val  = ext.get("value") or ""
            if name == "CURL":     curl = val
            if name == "ZQDM":     zqdm = str(val or "")
            if name == "FILETYPE": ftype = (val or "").lower()
        if not curl and ftype != "pdf":
            continue
        url_pdf = to_pdf_abs(curl)
        items.append({"title": title, "url_pdf": url_pdf, "createTime": create_time, "zqdm": zqdm})
    return total_page, items

def fetch_sfxt_prospectus():
    results = []
    seen = set()
    for kw in KEYWORDS:
        js = sse_search_page(kw, page=0)
        total_page, items = parse_items(js)

        def pick(batch):
            for x in batch:
                title, url, zqdm = x["title"], x["url_pdf"], x.get("zqdm","")
                if not title or not url: continue
                if "招股说明书" not in title and "招股 说明书" not in title: continue
                if any_kw_in(title, EXCLUDE_KWS): continue
                # 限定为 600370 的结果（ZQDM优先，其次URL里带 600370_）
                if zqdm and zqdm != TEST_SEC: continue
                if not zqdm and (f"/{TEST_SEC}_" not in url and f"-{TEST_SEC}_" not in url): continue
                key = (title, url)
                if key in seen: continue
                seen.add(key)
                year = extract_year(title) or extract_year(x.get("createTime","")) or extract_year(url)
                results.append({"secCode": TEST_SEC, "announcementTitle": title, "year": year, "adjunctUrl": url})

        pick(items)
        max_pages = min(MAX_PAGES_PER_KEY, int(total_page or 0))
        for p in range(1, max_pages):
            time.sleep(REQUEST_GAP_SEC)
            js = sse_search_page(kw, page=p)
            _, items_p = parse_items(js)
            if not items_p: break
            pick(items_p)
        time.sleep(REQUEST_GAP_SEC)
    return results

def export_excel(items):
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = SHEET_NAME
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","年报链接"])
    # 这里没有用你的映射表，就直接输出三房巷一家公司（需要也可改为读取映射表）
    bond_code, bond_name = "", ""
    for it in sorted(items, key=lambda x: x.get("year","")):
        ws.append([bond_code, bond_name, TEST_SEC, TEST_NAME, f"《{it['announcementTitle']}》", it.get("year",""), it["adjunctUrl"]])
    if not items:
        ws.append([bond_code, bond_name, TEST_SEC, TEST_NAME, "", "", ""])
    wb.save(OUT_XLSX)
    print("✅ 导出：", OUT_XLSX)

def main():
    items = fetch_sfxt_prospectus()
    # 打印一下命中的链接，便于你在控制台快速确认
    for it in items:
        print(it["year"], it["announcementTitle"], "->", it["adjunctUrl"])
    export_excel(items)
    print("---- 三房巷 招股说明书（SSE search）完成 ----")

if __name__ == "__main__":
    main()


✅ 导出： /Users/sam/Desktop/cninfo_output/招股说明书链接_三房巷【SSE_search】.xlsx
---- 三房巷 招股说明书（SSE search）完成 ----


In [9]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 三房巷（600370）— 上交所搜索接口专测 v2
# 目标：抓到“招股说明书”与“招股说明书附录”的 PDF 链接，并导出成你熟悉的 7 列格式

import os, re, time, json, html, requests, openpyxl
from urllib.parse import quote, urljoin
from random import randint

OUTPUT_DIR = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(OUTPUT_DIR, exist_ok=True)
OUT_XLSX   = os.path.join(OUTPUT_DIR, "招股说明书链接_三房巷【SSE_search_v2】.xlsx")
SHEET_NAME = "招股说明书"

SEARCH_URL = "https://query.sse.com.cn/search/getSearchResult.do"

TEST_SEC  = "600370"
TEST_NAME = "三房巷"

KEYWORDS = [
    f"{TEST_NAME} 招股说明书",
    f"{TEST_SEC} 招股说明书",
    f"{TEST_NAME}招股说明书",
]

EXCLUDE_KWS = ["英文","英文版","摘要","提示性公告","取消","已取消","更名提示","H股","B股"]

LIMIT = 20
MAX_PAGES = 20
GAP = 0.35

def _headers(keyword: str):
    return {
        "Referer": f"https://www.sse.com.cn/website/search/?webs=1&q={quote(keyword)}",
        "User-Agent": "Mozilla/5.0",
        "Accept": "*/*",
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Connection": "keep-alive",
    }

def _norm(s: str) -> str:
    return re.sub(r"[\s\u00A0\u3000]+", "", str(s or "").strip())

def _any(text: str, kws):
    t = _norm(text)
    return any(_norm(k) in t for k in kws if k)

def _clean_title(t: str) -> str:
    t = html.unescape(str(t or ""))
    t = re.sub(r"<.*?>", "", t).replace("：","").strip()
    return t

def _year(x: str) -> str:
    m = re.search(r"(19\d{2}|20\d{2})", str(x))
    return m.group(1) if m else ""

def _to_pdf_abs(p: str) -> str:
    if not p: return ""
    if p.startswith("http"): return p
    return urljoin("https://static.sse.com.cn/", p.lstrip("/"))

def sse_search_page(keyword: str, page: int, position: str, order_key: str):
    # 同时传 keyword 与 searchword，且带 jsonCallBack
    params = {
        "keyword": keyword,
        "searchword": keyword,
        "page": page,
        "limit": LIMIT,
        "keywordPosition": position,     # "title" or "title,paper_content"
        "spaceId": 3,
        "orderByKey": order_key,         # "create_time" or "score"
        "orderByDirection": "DESC",
        "jsonCallBack": f"jsonpCallback{randint(10000,99999999)}",
    }
    r = requests.get(SEARCH_URL, params=params, headers=_headers(keyword), timeout=20)
    if r.status_code != 200:
        print(f"[HTTP {r.status_code}] 请求失败: {r.url}")
        return {}
    m = re.search(r"\((\{.*\})\)\s*$", r.text, re.S)
    if not m:
        print("[WARN] 非 JSONP 返回，前200字符：", r.text[:200])
        return {}
    try:
        return json.loads(m.group(1))
    except Exception as e:
        print("[WARN] JSON 解析失败：", e, "前200字符：", r.text[:200])
        return {}

def parse_items(js: dict):
    data = js.get("data") or {}
    total_page = int(data.get("totalPage") or 0)
    arr = data.get("knowledgeList") or []
    items = []
    for it in arr:
        title = _clean_title(it.get("title") or "")
        create_time = it.get("createTime") or it.get("updateTime") or ""
        curl, zqdm, ftype = "", "", ""
        for ext in (it.get("extend") or []):
            name = (ext.get("name") or "").upper()
            val  = ext.get("value") or ""
            if name == "CURL":     curl = val
            elif name == "ZQDM":   zqdm = str(val or "")
            elif name == "FILETYPE": ftype = (val or "").lower()
        # 只要拿到 pdf 路径即可
        if not curl and ftype != "pdf":
            continue
        items.append({
            "title": title,
            "url_pdf": _to_pdf_abs(curl),
            "createTime": create_time,
            "zqdm": zqdm,
        })
    return total_page, items

def fetch_sfxt():
    results = []
    seen = set()

    # 两种模式组合试一次：位置 + 排序
    positions = ["title", "title,paper_content"]
    orders    = ["create_time", "score"]

    for kw in KEYWORDS:
        for pos in positions:
            for ok in orders:
                js = sse_search_page(kw, page=0, position=pos, order_key=ok)
                total_page, items = parse_items(js)
                print(f"[DEBUG] kw='{kw}', pos='{pos}', order='{ok}' -> total_page={total_page}, page0_items={len(items)}")
                if total_page == 0 and not items:
                    # 打印一段原始返回帮助定位
                    pass

                def pick(batch):
                    for x in batch:
                        title, url, zqdm = x["title"], x["url_pdf"], x.get("zqdm","")
                        if not title or not url: 
                            continue
                        if "招股说明书" not in title and "招股 说明书" not in title:
                            continue
                        if _any(title, EXCLUDE_KWS):
                            continue
                        # 只要 600370 的：ZQDM 优先，其次 URL 中包含 /600370_
                        if zqdm and zqdm != TEST_SEC:
                            continue
                        if not zqdm and (f"/{TEST_SEC}_" not in url and f"-{TEST_SEC}_" not in url):
                            continue

                        k = (title, url)
                        if k in seen:
                            continue
                        seen.add(k)
                        yr = _year(title) or _year(x.get("createTime","")) or _year(url)
                        results.append({
                            "secCode": TEST_SEC,
                            "announcementTitle": title,
                            "year": yr,
                            "adjunctUrl": url
                        })

                pick(items)

                maxp = min(MAX_PAGES, int(total_page or 0))
                for p in range(1, maxp):
                    time.sleep(GAP)
                    js = sse_search_page(kw, page=p, position=pos, order_key=ok)
                    _, items_p = parse_items(js)
                    if not items_p:
                        break
                    pick(items_p)
                time.sleep(GAP)

    return results

def export_excel(items):
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = SHEET_NAME
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","年报链接"])
    if items:
        for it in sorted(items, key=lambda x: x.get("year","")):
            ws.append(["","", TEST_SEC, TEST_NAME, f"《{it['announcementTitle']}》", it.get("year",""), it["adjunctUrl"]])
    else:
        ws.append(["","", TEST_SEC, TEST_NAME, "", "", ""])
    wb.save(OUT_XLSX)
    print("✅ 导出：", OUT_XLSX)

def main():
    items = fetch_sfxt()
    for it in items:
        print(it["year"], it["announcementTitle"], "->", it["adjunctUrl"])
    if not items:
        print("⚠️ 没命中结果。请把上面 [DEBUG] 行与任何 [WARN]/前200字符输出发我，我再继续对症修。")
    export_excel(items)

if __name__ == "__main__":
    main()


[DEBUG] kw='三房巷 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] kw='三房巷 招股说明书', pos='title', order='score' -> total_page=0, page0_items=0
[DEBUG] kw='三房巷 招股说明书', pos='title,paper_content', order='create_time' -> total_page=0, page0_items=0
[DEBUG] kw='三房巷 招股说明书', pos='title,paper_content', order='score' -> total_page=0, page0_items=0
[DEBUG] kw='600370 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] kw='600370 招股说明书', pos='title', order='score' -> total_page=0, page0_items=0
[DEBUG] kw='600370 招股说明书', pos='title,paper_content', order='create_time' -> total_page=0, page0_items=0
[DEBUG] kw='600370 招股说明书', pos='title,paper_content', order='score' -> total_page=0, page0_items=0
[DEBUG] kw='三房巷招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] kw='三房巷招股说明书', pos='title', order='score' -> total_page=0, page0_items=0
[DEBUG] kw='三房巷招股说明书', pos='title,paper_content', order='create_time' -> total_page=

In [10]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os, re, time, json, html, requests, openpyxl
from urllib.parse import quote, urljoin

OUTPUT_DIR = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(OUTPUT_DIR, exist_ok=True)
OUT_XLSX   = os.path.join(OUTPUT_DIR, "招股说明书链接_三房巷【SSE_search_v3】.xlsx")
SHEET_NAME = "招股说明书"

TEST_SEC  = "600370"
TEST_NAME = "三房巷"

KEYWORDS = [
    f"{TEST_NAME} 招股说明书",
    f"{TEST_SEC} 招股说明书",
    f"{TEST_NAME}招股说明书",
]

EXCLUDE_KWS = ["英文","英文版","摘要","提示性公告","取消","已取消","更名提示","H股","B股"]

LIMIT = 20
MAX_PAGES = 20
GAP = 0.35

ENDPOINTS = [
    "https://query.sse.com.cn/search/getSearchResult.do",
    "http://query.sse.com.cn/search/getSearchResult.do",   # 作为兜底
]

def _headers(keyword: str):
    return {
        "Referer": f"https://www.sse.com.cn/website/search/?webs=1&q={quote(keyword)}",
        "User-Agent": "Mozilla/5.0",
        "Accept": "application/json, text/javascript, */*; q=0.01",
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Connection": "keep-alive",
        "X-Requested-With": "XMLHttpRequest",
    }

def _norm(s: str) -> str:
    return re.sub(r"[\s\u00A0\u3000]+", "", str(s or "").strip())

def _any(text: str, kws):
    t = _norm(text)
    return any(_norm(k) in t for k in kws if k)

def _clean_title(t: str) -> str:
    t = html.unescape(str(t or ""))
    t = re.sub(r"<.*?>", "", t).replace("：","").strip()
    return t

def _year(x: str) -> str:
    m = re.search(r"(19\d{2}|20\d{2})", str(x))
    return m.group(1) if m else ""

def _to_pdf_abs(p: str) -> str:
    if not p: return ""
    if p.startswith("http"): return p
    return urljoin("https://static.sse.com.cn/", p.lstrip("/"))

def make_session(seed_keyword: str) -> requests.Session:
    s = requests.Session()
    # 先访问官网搜索页取 Cookie
    seed_url = f"https://www.sse.com.cn/website/search/?webs=1&q={quote(seed_keyword)}"
    s.get(seed_url, headers={
        "User-Agent":"Mozilla/5.0",
        "Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language":"zh-CN,zh;q=0.9",
    }, timeout=20)
    return s

def sse_search_page(session: requests.Session, endpoint: str, keyword: str, page: int, position: str, order_key: str):
    # 同时传 keyword 与 searchword；加上 spaceId 与 jsonCallBack
    params = {
        "keyword": keyword,
        "searchword": keyword,
        "page": page,
        "limit": LIMIT,
        "keywordPosition": position,     # "title" or "title,paper_content"
        "spaceId": 3,
        "orderByKey": order_key,         # "create_time" or "score"
        "orderByDirection": "DESC",
        "jsonCallBack": f"jsonpCallback{int(time.time()*1000)%100000000}",
        "_": str(int(time.time()*1000)),
    }
    r = session.get(endpoint, params=params, headers=_headers(keyword), timeout=20)
    if r.status_code != 200:
        print(f"[HTTP {r.status_code}] {endpoint}")
        return {}
    m = re.search(r"\((\{.*\})\)\s*$", r.text, re.S)
    if not m:
        print("[WARN] 非 JSONP 返回，前200字符：", r.text[:200])
        return {}
    try:
        return json.loads(m.group(1))
    except Exception as e:
        print("[WARN] JSON 解析失败：", e, "前200字符：", r.text[:200])
        return {}

def parse_items(js: dict):
    data = js.get("data") or {}
    total_page = int(data.get("totalPage") or 0)
    arr = data.get("knowledgeList") or []
    items = []
    for it in arr:
        title = _clean_title(it.get("title") or "")
        create_time = it.get("createTime") or it.get("updateTime") or ""
        curl, zqdm, ftype = "", "", ""
        for ext in (it.get("extend") or []):
            name = (ext.get("name") or "").upper()
            val  = ext.get("value") or ""
            if name == "CURL":     curl = val
            elif name == "ZQDM":   zqdm = str(val or "")
            elif name == "FILETYPE": ftype = (val or "").lower()
        if not curl and ftype != "pdf":
            continue
        items.append({
            "title": title,
            "url_pdf": _to_pdf_abs(curl),
            "createTime": create_time,
            "zqdm": zqdm,
        })
    return total_page, items

def fetch_sfxt():
    results = []
    seen = set()

    positions = ["title", "title,paper_content"]
    orders    = ["create_time", "score"]

    # 先建 session 取 Cookie
    sess = make_session(KEYWORDS[0])

    for kw in KEYWORDS:
        for pos in positions:
            for ok in orders:
                page0_total, page0_items = 0, []
                # 试两个端点（https / http）
                for ep in ENDPOINTS:
                    js0 = sse_search_page(sess, ep, kw, page=0, position=pos, order_key=ok)
                    tp, items = parse_items(js0)
                    print(f"[DEBUG] ep='{ep.split(':')[0]}', kw='{kw}', pos='{pos}', order='{ok}' -> total_page={tp}, page0_items={len(items)}")
                    if items:
                        page0_total, page0_items = tp, items
                        break  # 某个端点命中就用它

                def pick(batch):
                    for x in batch:
                        title, url, zqdm = x["title"], x["url_pdf"], x.get("zqdm","")
                        if not title or not url:
                            continue
                        if "招股说明书" not in title and "招股 说明书" not in title:
                            continue
                        if _any(title, EXCLUDE_KWS):
                            continue
                        # 只要 600370 的
                        if zqdm and zqdm != TEST_SEC:
                            continue
                        if not zqdm and (f"/{TEST_SEC}_" not in url and f"-{TEST_SEC}_" not in url):
                            continue
                        k = (title, url)
                        if k in seen:
                            continue
                        seen.add(k)
                        yr = _year(title) or _year(x.get("createTime","")) or _year(url)
                        results.append({
                            "secCode": TEST_SEC,
                            "announcementTitle": title,
                            "year": yr,
                            "adjunctUrl": url
                        })

                pick(page0_items)

                maxp = min(MAX_PAGES, int(page0_total or 0))
                # 翻页时沿用命中的端点（若 page0 没命中则跳过）
                if page0_items:
                    # 找到是哪个端点命中的
                    hit_ep = ENDPOINTS[0]
                    for ep in ENDPOINTS:
                        js_try = sse_search_page(sess, ep, kw, page=0, position=pos, order_key=ok)
                        _, items_try = parse_items(js_try)
                        if len(items_try) == len(page0_items):
                            hit_ep = ep; break

                    for p in range(1, maxp):
                        time.sleep(GAP)
                        js = sse_search_page(sess, hit_ep, kw, page=p, position=pos, order_key=ok)
                        _, items_p = parse_items(js)
                        if not items_p:
                            break
                        pick(items_p)
                time.sleep(GAP)

    return results

def export_excel(items):
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = SHEET_NAME
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","年报链接"])
    if items:
        for it in sorted(items, key=lambda x: x.get("year","")):
            ws.append(["","", TEST_SEC, TEST_NAME, f"《{it['announcementTitle']}》", it.get("year",""), it["adjunctUrl"]])
    else:
        ws.append(["","", TEST_SEC, TEST_NAME, "", "", ""])
    wb.save(OUT_XLSX)
    print("✅ 导出：", OUT_XLSX)

def main():
    items = fetch_sfxt()
    for it in items:
        print(it["year"], it["announcementTitle"], "->", it["adjunctUrl"])
    if not items:
        print("⚠️ 仍为 0 条：请把所有 [DEBUG]/[WARN] 输出贴给我，我继续调。")
    export_excel(items)

if __name__ == "__main__":
    main()


[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title,paper_content', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title,paper_content', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title,paper_content', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title,paper_content', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='600370 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='600370 招股说明书', pos='title', order='create_time

In [11]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 三房巷（600370）— 上交所搜索接口 专测最终版
# 目标：抓到“招股说明书 / 招股说明书附录”的 PDF 链接，并导出为你熟悉的 7 列格式

import os, re, time, json, html, requests, openpyxl
from urllib.parse import quote, urljoin

OUT_DIR = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(OUT_DIR, exist_ok=True)
OUT_XLSX = os.path.join(OUT_DIR, "招股说明书链接_三房巷【SSE_final】.xlsx")
SHEET = "招股说明书"

TEST_SEC  = "600370"
TEST_NAME = "三房巷"

KEYWORDS = [
    f"{TEST_NAME} 招股说明书",
    f"{TEST_SEC} 招股说明书",
    f"{TEST_NAME}招股说明书",
]

EXCLUDE = ["英文","英文版","摘要","提示性公告","取消","已取消","更名提示","H股","B股"]

LIMIT = 20
MAX_PAGES = 20
SLEEP = 0.35

ENDPOINTS = [
    "https://query.sse.com.cn/search/getSearchResult.do",
    "http://query.sse.com.cn/search/getSearchResult.do",
]

def norm(s: str) -> str:
    return re.sub(r"[\s\u00A0\u3000]+", "", str(s or "").strip())

def any_kw(text: str, kws) -> bool:
    t = norm(text);  return any(norm(k) in t for k in kws if k)

def clean_title(t: str) -> str:
    t = html.unescape(str(t or ""));  t = re.sub(r"<.*?>", "", t)
    return t.replace("：","").strip()

def get_year(x: str) -> str:
    m = re.search(r"(19\d{2}|20\d{2})", str(x));  return m.group(1) if m else ""

def to_abs_pdf(curl: str) -> str:
    if not curl: return ""
    if curl.startswith("http"): return curl
    return urljoin("https://static.sse.com.cn/", curl.lstrip("/"))

def make_session(seed_kw: str) -> requests.Session:
    s = requests.Session()
    # 1) 搜索页（拿站内 cookie）
    s.get(f"https://www.sse.com.cn/website/search/?webs=1&q={quote(seed_kw)}",
          headers={
              "User-Agent": "Mozilla/5.0",
              "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
              "Accept-Language": "zh-CN,zh;q=0.9",
          }, timeout=20)
    # 2) 再拉一个静态资源（进一步完善 cookie）
    s.get("https://www.sse.com.cn/assets/js/common/common.js",
          headers={"User-Agent":"Mozilla/5.0","Accept":"*/*","Referer":"https://www.sse.com.cn/"},
          timeout=20)
    return s

def headers_for(keyword: str):
    return {
        "Host": "query.sse.com.cn",
        "Origin": "https://www.sse.com.cn",
        "Referer": f"https://www.sse.com.cn/website/search/?webs=1&q={quote(keyword)}",
        "User-Agent": "Mozilla/5.0",
        "Accept": "*/*",
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Connection": "keep-alive",
        "X-Requested-With": "XMLHttpRequest",
        "Sec-Fetch-Mode": "cors",
        "Sec-Fetch-Site": "same-site",
        "Sec-Fetch-Dest": "empty",
    }

def call_search(sess: requests.Session, endpoint: str, keyword: str, page: int,
                position: str, order_key: str):
    params = {
        # 关键：两套参数名都带上
        "keyword": keyword,
        "searchword": keyword,
        "page": page,
        "limit": LIMIT,
        "keywordPosition": position,            # "title" or "title,paper_content"
        "spaceId": 3,
        "orderByKey": order_key,                # "create_time" or "score"
        "orderByDirection": "DESC",
        # 关键：限定频道=上市公司公告；站点=官网
        "channelCode": "8349",
        "siteCode": "28",
        # JSONP
        "jsonCallBack": f"jsonpCallback{int(time.time()*1000)%100000000}",
        "_": str(int(time.time()*1000)),
    }
    r = sess.get(endpoint, params=params, headers=headers_for(keyword), timeout=20)
    if r.status_code != 200:
        print(f"[HTTP {r.status_code}] {endpoint}")
        return {}
    m = re.search(r"\((\{.*\})\)\s*$", r.text, re.S)
    if not m:
        print("[WARN] 非 JSONP 返回，前200字符：", r.text[:200])
        return {}
    try:
        return json.loads(m.group(1))
    except Exception as e:
        print("[WARN] JSON 解析失败：", e, "前200字符：", r.text[:200])
        return {}

def parse_items(js: dict):
    data = js.get("data") or {}
    total_page = int(data.get("totalPage") or 0)
    arr = data.get("knowledgeList") or []
    items = []
    for it in arr:
        title = clean_title(it.get("title") or "")
        create_time = it.get("createTime") or it.get("updateTime") or ""
        curl, zqdm, ftype = "", "", ""
        for ext in (it.get("extend") or []):
            name = (ext.get("name") or "").upper()
            val  = ext.get("value") or ""
            if name == "CURL":       curl = val
            elif name == "ZQDM":     zqdm = str(val or "")
            elif name == "FILETYPE": ftype = (val or "").lower()
        if not curl and ftype != "pdf":
            continue
        items.append({
            "title": title,
            "url_pdf": to_abs_pdf(curl),
            "createTime": create_time,
            "zqdm": zqdm,
        })
    return total_page, items

def fetch_sfxt():
    res, seen = [], set()
    sess = make_session(KEYWORDS[0])
    positions = ["title", "title,paper_content"]
    orders    = ["create_time", "score"]

    for kw in KEYWORDS:
        for pos in positions:
            for ok in orders:
                page0_total, page0_items, hit_ep = 0, [], ""
                for ep in ENDPOINTS:
                    js0 = call_search(sess, ep, kw, 0, pos, ok)
                    tp, items = parse_items(js0)
                    print(f"[DEBUG] ep='{ep.split(':')[0]}', kw='{kw}', pos='{pos}', order='{ok}' -> total_page={tp}, page0_items={len(items)}")
                    if items:
                        page0_total, page0_items, hit_ep = tp, items, ep
                        break
                def pick(batch):
                    for x in batch:
                        title, url, zqdm = x["title"], x["url_pdf"], x.get("zqdm","")
                        if not title or not url: continue
                        if "招股说明书" not in title and "招股 说明书" not in title: continue
                        if any_kw(title, EXCLUDE): continue
                        if zqdm and zqdm != TEST_SEC: continue
                        if not zqdm and (f"/{TEST_SEC}_" not in url and f"-{TEST_SEC}_" not in url): continue
                        k = (title, url)
                        if k in seen: continue
                        seen.add(k)
                        yr = get_year(title) or get_year(x.get("createTime","")) or get_year(url)
                        res.append({"secCode": TEST_SEC, "announcementTitle": title, "year": yr, "adjunctUrl": url})
                pick(page0_items)
                if hit_ep:
                    for p in range(1, min(MAX_PAGES, int(page0_total or 0))):
                        time.sleep(SLEEP)
                        js = call_search(sess, hit_ep, kw, p, pos, ok)
                        _, items_p = parse_items(js)
                        if not items_p: break
                        pick(items_p)
                time.sleep(SLEEP)
    return res

def export_excel(items):
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = SHEET
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","年报链接"])
    if items:
        for it in sorted(items, key=lambda x: x.get("year","")):
            ws.append(["","", TEST_SEC, TEST_NAME, f"《{it['announcementTitle']}》", it.get("year",""), it["adjunctUrl"]])
    else:
        ws.append(["","", TEST_SEC, TEST_NAME, "", "", ""])
    wb.save(OUT_XLSX)
    print("✅ 导出：", OUT_XLSX)

def main():
    items = fetch_sfxt()
    for it in items:
        print(it["year"], it["announcementTitle"], "->", it["adjunctUrl"])
    if not items:
        print("⚠️ 仍为 0 条：请把控制台所有 [DEBUG]/[WARN] 行贴我。我再根据返回的前 200 字符继续兼容。")
    export_excel(items)

if __name__ == "__main__":
    main()


[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title,paper_content', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title,paper_content', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='三房巷 招股说明书', pos='title,paper_content', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='三房巷 招股说明书', pos='title,paper_content', order='score' -> total_page=0, page0_items=0
[DEBUG] ep='https', kw='600370 招股说明书', pos='title', order='create_time' -> total_page=0, page0_items=0
[DEBUG] ep='http', kw='600370 招股说明书', pos='title', order='create_time

In [14]:
import re, json, time, random, requests

BASE = "https://query.sse.com.cn/search/getESSearchDoc.do"

HEADERS = {
    "User-Agent": "Mozilla/5.0",
    "Accept": "*/*",
    "Referer": "https://www.sse.com.cn/",
    "Host": "query.sse.com.cn",
    "Accept-Language": "zh-CN,zh;q=0.9",
    # 若偶发 403，再把你浏览器里看到的 Cookie 原样加进来：
    # "Cookie": "JSESSIONID=...; acw_sc__v2=...; ..."
}

session = requests.Session()
session.headers.update(HEADERS)

def parse_jsonp(text: str):
    m = re.search(r"\(\s*({.*})\s*\)\s*$", text, re.S)
    if not m:
        raise ValueError("JSONP 解析失败")
    return json.loads(m.group(1))

def extract_curl(item: dict):
    """从 knowledgeList[i].extend[] 里取出相对 PDF 路径并补全为完整 URL"""
    for kv in item.get("extend") or []:
        if kv.get("name") == "CURL" and kv.get("value"):
            v = kv["value"]
            return "https://www.sse.com.cn" + v if v.startswith("/") else v
    return None

def search_page(keyword: str, page: int, limit: int = 10):
    params = {
        "jsonCallBack": f"jsonpCallback{random.randint(10_0000, 99_9999)}",
        "page": page,               # 0 开始
        "limit": limit,
        "publishTimeEnd": "",
        "publishTimeStart": "",
        "orderByDirection": "DESC",
        "orderByKey": "create_time",
        "searchMode": "preciseMulti",
        "spaceId": 3,
        "keyword": keyword,
        "siteName": "sse",
        "keywordPosition": "title",
        "channelId": "10001",
        "channelCode": "8349,12971,13118,12002,8361,13219,8362,12905,9857,9858,9859,9860,9862,9863,9871,9868,9865,9867",
        "trackId": str(random.getrandbits(128)),
        "_": str(int(time.time() * 1000)),
    }
    r = session.get(BASE, params=params, timeout=20)
    r.raise_for_status()
    data = parse_jsonp(r.text)
    d = data.get("data") or {}
    items = d.get("knowledgeList") or []
    total_page = d.get("totalPage")
    return items, total_page

def crawl_keyword(keyword: str, max_pages: int = 3, delay: float = 0.35):
    rows = []
    for p in range(max_pages):
        try:
            items, total_page = search_page(keyword, page=p, limit=10)
        except requests.HTTPError as e:
            print(f"[SKIP {keyword}] HTTP {e.response.status_code}")
            break
        except Exception as e:
            print(f"[SKIP {keyword}] {e}")
            break

        if not items:
            break

        for it in items:
            title = it.get("title") or it.get("docTitle") or ""
            # 去掉返回中 <em> 标签
            title = re.sub(r"</?em>", "", title)
            link  = extract_curl(it)
            year  = (it.get("createTime") or "")[:4]
            if link:
                rows.append({
                    "title": title,
                    "year": year,
                    "download_url": link
                })

        # 接口告诉我们总页数时，用它来收敛；没有就按 max_pages 收敛
        if isinstance(total_page, int) and (p + 1) >= total_page:
            break

        time.sleep(delay)
    return rows

# ===== 用法示例 =====
if __name__ == "__main__":
    kws = ["三房巷 招股说明书", "浦发银行 招股说明书", "600000 招股说明书"]
    all_rows = []
    for kw in kws:
        all_rows.extend(crawl_keyword(kw, max_pages=2))
    with open("sse_es_search_results.json", "w", encoding="utf-8") as f:
        json.dump(all_rows, f, ensure_ascii=False, indent=2)
    print("saved:", len(all_rows))


OSError: [Errno 30] Read-only file system: 'sse_es_search_results.json'

In [17]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import re
import time
import json
import random
import string
import pathlib
import requests
import openpyxl
import pandas as pd
from typing import List, Dict, Tuple, Optional

# ========= 参数配置 =========
# 输出目录（与原脚本保持一致）
output_dir = os.path.expanduser("~/Desktop/cninfo_output")
os.makedirs(output_dir, exist_ok=True)

# 正股映射表路径（与原脚本一致）
mapping_file = os.path.expanduser("~/Desktop/正股映射结果.xlsx")

# 每家公司最多保留的“招股说明书”条目数量（主文件 + 附录等）
MAX_ITEMS_PER_COMPANY = 3

# 是否下载 PDF 文件到本地
DOWNLOAD_FILES = True

# PDF 保存目录
pdf_dir = os.path.join(output_dir, "招股说明书_PDF")
if DOWNLOAD_FILES:
    os.makedirs(pdf_dir, exist_ok=True)

# 可选：按年份过滤（None 表示不过滤）
YEAR_MIN: Optional[int] = None
YEAR_MAX: Optional[int] = None

# 是否包含“附录”（True=包含；False=仅主文件）
INCLUDE_APPENDIX = True

# ========= 过滤与正则 =========
EXCLUDE_KWS = [
    "英文", "英文版", "Annual", "annual", "Summary", "summary",
    "摘要", "摘要版", "招股意向书", "提示性公告", "取消", "终止", "更正", "修订"
]

# 去标签/空格
_EM_RE = re.compile(r"</?em>")
_SPACE_RE = re.compile(r"[\s\u00A0\u3000]+")

def norm(s: str) -> str:
    return _SPACE_RE.sub("", str(s or "").strip())

# “是否为招股说明书主文件”
def is_main_prospectus(title: str) -> bool:
    t = norm(_EM_RE.sub("", title))
    if "招股说明书" not in t:
        return False
    if any(kw in t for kw in EXCLUDE_KWS):
        return False
    # 排除“附录”给主文件判定
    if "附录" in t:
        return False
    return True

# “是否为招股说明书附录”
def is_appendix(title: str) -> bool:
    t = norm(_EM_RE.sub("", title))
    return ("招股说明书" in t) and ("附录" in t) and (not any(kw in t for kw in EXCLUDE_KWS))

def extract_year_from_time(create_time: str) -> str:
    # "2003-02-14 00:00:00" -> "2003"
    try:
        return str(create_time)[:4]
    except Exception:
        return ""

# ========= 上交所 ES 搜索接口 =========
BASE = "https://query.sse.com.cn/search/getESSearchDoc.do"
HEADERS = {
    "User-Agent": "Mozilla/5.0",
    "Accept": "*/*",
    "Referer": "https://www.sse.com.cn/",
    "Host": "query.sse.com.cn",
    "Accept-Language": "zh-CN,zh;q=0.9",
    # 如遇偶发 403，把你浏览器里 DevTools 看到的 Cookie 填进来即可：
    # "Cookie": "JSESSIONID=...; acw_sc__v2=...; ..."
}

session = requests.Session()
session.headers.update(HEADERS)

def parse_jsonp(text: str) -> dict:
    m = re.search(r"\(\s*({.*})\s*\)\s*$", text, re.S)
    if not m:
        raise ValueError("JSONP 解析失败")
    return json.loads(m.group(1))

def extract_curl(item: dict) -> Optional[str]:
    """knowledgeList[i].extend[] 里 name=='CURL' 的相对路径，补成完整 URL"""
    for kv in item.get("extend") or []:
        if kv.get("name") == "CURL" and kv.get("value"):
            v = kv["value"]
            return "https://www.sse.com.cn" + v if isinstance(v, str) and v.startswith("/") else v
    return None

def search_page(keyword: str, page: int, limit: int = 10) -> Tuple[List[dict], Optional[int]]:
    params = {
        "jsonCallBack": f"jsonpCallback{random.randint(10_0000, 99_9999)}",
        "page": page,               # 0 开始
        "limit": limit,
        "publishTimeEnd": "",
        "publishTimeStart": "",
        "orderByDirection": "DESC",
        "orderByKey": "create_time",
        "searchMode": "preciseMulti",
        "spaceId": 3,
        "keyword": keyword,
        "siteName": "sse",
        "keywordPosition": "title",
        "channelId": "10001",
        # 与你 DevTools 保持一致的一组频道
        "channelCode": "8349,12971,13118,12002,8361,13219,8362,12905,9857,9858,9859,9860,9862,9863,9871,9868,9865,9867",
        "trackId": str(random.getrandbits(128)),
        "_": str(int(time.time() * 1000)),
    }
    r = session.get(BASE, params=params, timeout=20)
    r.raise_for_status()
    data = parse_jsonp(r.text)
    d = data.get("data") or {}
    items = d.get("knowledgeList") or []
    total_page = d.get("totalPage")
    return items, total_page

def crawl_keyword_for_prospectus(keyword: str,
                                 max_pages: int = 3,
                                 delay: float = 0.35) -> List[Dict]:
    """搜索关键词，抓取符合条件的招股说明书（主文件 + 可选附录）"""
    rows: List[Dict] = []
    for p in range(max_pages):
        try:
            items, total_page = search_page(keyword, page=p, limit=10)
        except requests.HTTPError as e:
            print(f"[SKIP {keyword}] HTTP {e.response.status_code}")
            break
        except Exception as e:
            print(f"[SKIP {keyword}] {e}")
            break

        if not items:
            break

        for it in items:
            title = it.get("title") or ""
            link  = extract_curl(it)
            ctime = it.get("createTime") or ""
            year  = extract_year_from_time(ctime)

            if not link or "招股说明书" not in title:
                continue

            # 年份过滤（可选）
            if YEAR_MIN and (not year or int(year) < YEAR_MIN):
                continue
            if YEAR_MAX and (not year or int(year) > YEAR_MAX):
                continue

            # 分类：主文件 or 附录
            is_main = is_main_prospectus(title)
            is_apdx = is_appendix(title)

            if (not is_main) and (not (INCLUDE_APPENDIX and is_apdx)):
                continue

            rows.append({
                "title": _EM_RE.sub("", title),
                "year": year,
                "download_url": link
            })

        if isinstance(total_page, int) and (p + 1) >= total_page:
            break
        time.sleep(delay)
    return rows

# ========= 下载工具 =========
def sanitize_filename(s: str) -> str:
    s = _EM_RE.sub("", s)
    invalid = r'<>:"/\|?*'
    table = str.maketrans({ch: "_" for ch in invalid})
    s = s.translate(table)
    # 控制文件名长度
    return s[:150].strip() or "file"

def download_pdf(url: str, dst_dir: str, fname_hint: str = "") -> Optional[str]:
    try:
        r = session.get(url, timeout=60)
        r.raise_for_status()
        # 猜测文件名
        if fname_hint:
            base = sanitize_filename(fname_hint)
        else:
            base = "prospectus"
        # 处理没有 .pdf 的情况
        if not base.lower().endswith(".pdf"):
            base += ".pdf"
        # 确保不重名
        full = os.path.join(dst_dir, base)
        stem = pathlib.Path(full).stem
        suf  = pathlib.Path(full).suffix
        idx = 1
        while os.path.exists(full):
            full = os.path.join(dst_dir, f"{stem}_{idx}{suf}")
            idx += 1
        with open(full, "wb") as f:
            f.write(r.content)
        return full
    except Exception as e:
        print(f"[DOWNLOAD FAIL] {url} -> {e}")
        return None

# ========= 主流程：读取映射 → 搜索 → 选择 → 导出 =========
def search_company_prospectus(company_name: str,
                              sec_code: Optional[str]) -> List[Dict]:
    """对单家公司发起两种关键词搜索，合并去重并按主文件优先排序"""
    keys = []
    nm = str(company_name or "").strip()
    if nm:
        keys.append(f"{nm} 招股说明书")
    cd = str(sec_code or "").strip()
    if cd:
        keys.append(f"{cd} 招股说明书")

    bucket: Dict[Tuple[str, str], Dict] = {}
    for kw in keys:
        rows = crawl_keyword_for_prospectus(kw, max_pages=3)
        for r in rows:
            k = (r["title"], r["download_url"])
            if k not in bucket:
                bucket[k] = r

    # 排序：主文件优先，其次按年份降序（新在前）
    def score(item: Dict) -> Tuple[int, int]:
        t = item["title"]
        y = int(item["year"]) if str(item.get("year","")).isdigit() else 0
        main = 1 if is_main_prospectus(t) else 0
        return (main, y)

    merged = list(bucket.values())
    merged.sort(key=score, reverse=True)
    # 截断
    return merged[:MAX_ITEMS_PER_COMPANY]

def collect_all_companies() -> List[Dict]:
    """遍历映射表：每行公司抽取若干条招股说明书结果"""
    if not os.path.exists(mapping_file):
        raise FileNotFoundError(f"找不到映射表：{mapping_file}")

    df_map = pd.read_excel(mapping_file, dtype=str).fillna("")
    # 兼容字段名：你之前脚本里用到这些列
    # - '正股代码' 里通常是 "600000.SH" 这样的，去掉点后缀
    df_map['sec_no_suf'] = df_map['正股代码'].str.split('.').str[0] if '正股代码' in df_map.columns else ""
    results: List[Dict] = []

    for _, row in df_map.iterrows():
        comp_name = row.get('正股名称', "") or row.get('公司简称', "")
        sec       = row.get('sec_no_suf', "")
        bond_code = row.get('代码', "")         # 可转债代码
        bond_name = row.get('名称', "")

        items = search_company_prospectus(comp_name, sec)

        # 记录最多 MAX_ITEMS_PER_COMPANY 条
        for rank, it in enumerate(items, 1):
            results.append({
                "bond_code": bond_code,
                "bond_name": bond_name,
                "sec_code":  sec,
                "company":   comp_name,
                "rank":      rank,
                "title":     it["title"],
                "year":      it["year"],
                "download_url": it["download_url"]
            })

        # 可选：立即下载 PDF
        if DOWNLOAD_FILES:
            for it in items:
                hint = f"{sec}_{comp_name}_{it['year']}_{it['title']}"
                download_pdf(it["download_url"], pdf_dir, hint)

    return results

def write_excel(results: List[Dict]) -> str:
    """把每家公司最多 N 条结果平铺到一行，导出到 Excel"""
    # 先按公司聚合
    from collections import defaultdict
    agg = defaultdict(list)
    for r in results:
        key = (r["bond_code"], r["bond_name"], r["sec_code"], r["company"])
        agg[key].append(r)

    # 表头
    headers = ["可转债代码","可转债名称","公司代码","公司简称"]
    for i in range(1, MAX_ITEMS_PER_COMPANY + 1):
        headers += [f"标题{i}", f"年份{i}", f"下载链接{i}"]

    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = "招股说明书"
    ws.append(headers)

    for (bond_code, bond_name, sec, comp), items in agg.items():
        row = [bond_code, bond_name, sec, comp]
        # 按 rank 排序
        items = sorted(items, key=lambda x: x["rank"])
        for i in range(MAX_ITEMS_PER_COMPANY):
            if i < len(items):
                it = items[i]
                row += [f"《{it['title']}》", it["year"], it["download_url"]]
            else:
                row += ["", "", ""]
        ws.append(row)

    dst = os.path.join(output_dir, "招股说明书_选取公司.xlsx")
    wb.save(dst)
    return dst

def save_json(results: List[Dict]) -> str:
    dst = os.path.join(output_dir, "招股说明书_选取公司.json")
    with open(dst, "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    return dst

if __name__ == "__main__":
    print(">>> 开始根据上交所接口抓取“招股说明书”…")
    all_rows = collect_all_companies()
    xlsx_path = write_excel(all_rows)
    json_path = save_json(all_rows)
    print(f"✅ 已输出 Excel：{xlsx_path}")
    print(f"✅ 已输出 JSON ：{json_path}")
    if DOWNLOAD_FILES:
        print(f"✅ PDF 已保存到：{pdf_dir}")
    print("---- 完成 ----")


>>> 开始根据上交所接口抓取“招股说明书”…
[DOWNLOAD FAIL] https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-08-12/688234_20250812_G132.pdf -> 404 Client Error:  for url: https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-08-12/688234_20250812_G132.pdf
[DOWNLOAD FAIL] https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-08-01/603406_20250801_XYFE.pdf -> 404 Client Error:  for url: https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-08-01/603406_20250801_XYFE.pdf
[DOWNLOAD FAIL] https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-07-18/603262_20250718_UUGY.pdf -> 404 Client Error:  for url: https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-07-18/603262_20250718_UUGY.pdf
[DOWNLOAD FAIL] https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-08-12/688234_20250812_G132.pdf -> 404 Client Error:  for url: https://www.sse.com.cn/disclosure/listedinfo/announcement/c/new/2025-08-12/688234_202508

In [18]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
上交所 | 招股说明书【只导出链接，不下载】— 与“年报链接”同风格
输入：~/Desktop/正股映射结果.xlsx（至少含：代码、名称、正股代码、正股名称）
输出：~/Desktop/cninfo_output/招股说明书链接_选取公司【上交所】.xlsx（Sheet 名：招股说明书）

要点：
- 只写入链接，不发起任何 PDF 下载（避免你看到的 404 滚屏）
- JSONP/JSON 双模式解析；多端点（http/https + 2 个 API）轮询；重试+指数退避
- 预热 Cookie、允许系统代理（如 127.0.0.1:1082）
"""

import os, re, time, json, html, random
import pandas as pd
import openpyxl
import requests
from urllib.parse import quote, urljoin
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# ====== 路径/表名 ======
IN_XLSX   = os.path.expanduser("~/Desktop/正股映射结果.xlsx")
OUT_DIR   = os.path.expanduser("~/Desktop/cninfo_output")
SHEETNAME = "招股说明书"
OUT_XLSX  = os.path.join(OUT_DIR, f"招股说明书链接_选取公司【上交所】.xlsx")
os.makedirs(OUT_DIR, exist_ok=True)

# ====== 上交所参数（与抓包一致）======
CHANNEL_CODE = "8349,12971,13118,12002,8361,13219,8362,12905,9857,9858,9859,9860,9862,9863,9871,9868,9865,9867"
SPACE_ID     = 3
ENDPOINTS = [
    "https://query.sse.com.cn/search/getESSearchDoc.do",
    "http://query.sse.com.cn/search/getESSearchDoc.do",
    "https://query.sse.com.cn/search/getSearchDoc.do",
    "http://query.sse.com.cn/search/getSearchDoc.do",
]

# ====== 小工具 ======
def norm(s): return re.sub(r"[\s\u00A0\u3000]+","", str(s or "").strip())

def parse_json_or_jsonp(text: str):
    """兼容：纯 JSON 或 JSONP(jsonpCallback(...))；尾部可能带分号/换行"""
    text = text.strip()
    if text.startswith("{"):
        return json.loads(text)
    m = re.search(r"\(\s*({.*})\s*\)\s*;?\s*$", text, re.S)
    return json.loads(m.group(1)) if m else {}

def clean_title(t: str) -> str:
    t = html.unescape(str(t or ""))
    t = re.sub(r"<.*?>", "", t)  # 去 <em> 等
    return t.replace("：","").strip()

def extract_year(s: str) -> str:
    m = re.search(r"(19|20)\d{2}", str(s))
    return m.group(0) if m else ""

def to_abs_pdf(curl: str) -> str:
    if not curl: return ""
    return curl if curl.startswith("http") else urljoin("https://www.sse.com.cn/", curl.lstrip("/"))

def make_session(seed_kw="招股说明书") -> requests.Session:
    s = requests.Session()
    s.trust_env = True  # 用系统代理（如 127.0.0.1:1082）
    retry = Retry(total=4, connect=2, read=2, backoff_factor=0.7,
                  status_forcelist=[429,500,502,503,504], allowed_methods=["GET"], raise_on_status=False)
    adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=50)
    s.mount("https://", adapter); s.mount("http://", adapter)
    # 预热 cookie
    try:
        s.get(f"https://www.sse.com.cn/website/search/?webs=1&q={quote(seed_kw)}",
              headers={"User-Agent":"Mozilla/5.0","Accept":"text/html,*/*"}, timeout=(8,20))
    except Exception:
        pass
    return s

def sse_headers(keyword: str):
    return {
        "User-Agent": "Mozilla/5.0",
        "Accept": "*/*",
        "Origin": "https://www.sse.com.cn",
        "Referer": f"https://www.sse.com.cn/website/search/?webs=1&q={quote(keyword)}",
        "Connection": "keep-alive",
        "Sec-Fetch-Mode": "cors",
        "Sec-Fetch-Site": "same-site",
        "Sec-Fetch-Dest": "script",
    }

def call_search(session: requests.Session, endpoint: str, keyword: str, page=0, limit=20):
    params = {
        "jsonCallBack": f"jsonpCallback{int(time.time()*1000)%100000000}",
        "page": page, "limit": limit,
        "publishTimeEnd": "", "publishTimeStart": "",
        "orderByDirection": "DESC", "orderByKey": "create_time",
        "searchMode": "preciseMulti",
        "spaceId": SPACE_ID,
        "keyword": keyword, "siteName": "sse",
        "keywordPosition": "title",
        "channelId": "10001", "channelCode": CHANNEL_CODE,
        "trackId": str(int(time.time()*1000)), "_": str(int(time.time()*1000)),
    }
    r = session.get(endpoint, params=params, headers=sse_headers(keyword), timeout=(8, 35))
    r.raise_for_status()
    return parse_json_or_jsonp(r.text)

def search_once(session: requests.Session, keyword: str):
    last = None
    for ep in ENDPOINTS:
        for i in range(3):
            try:
                js = call_search(session, ep, keyword)
                data = (js or {}).get("data") or {}
                klist = data.get("knowledgeList") or []
                if klist: return klist
            except Exception as e:
                last = e
                time.sleep(0.5 * (2 ** i) + random.random()*0.3)
    if last: print(f"[SKIP {keyword}] {type(last).__name__}: {last}")
    return []

EXCLUDE = ["英文","英文版","英文译本","摘要","提示性公告","取消","更正","修订说明","H股","B股"]

def pick_best(klist, sec_code: str):
    cand = []
    for it in klist:
        title = clean_title(it.get("title",""))
        if "招股说明书" not in title: 
            continue
        if any(x in title for x in EXCLUDE): 
            continue
        curl = ""
        zqdm = ""
        for ext in (it.get("extend") or []):
            n = (ext.get("name") or "").upper()
            v = ext.get("value") or ""
            if n == "CURL":  curl = v
            if n == "ZQDM":  zqdm = str(v or "")
        if not curl: 
            continue
        url = to_abs_pdf(curl)
        # 代码匹配优先；没有 ZQDM 就用 URL/文件名中带代码的启发式
        if zqdm and zqdm != sec_code:
            continue
        if not zqdm and f"/{sec_code}_" not in url and f"/{sec_code}-" not in url and f"-{sec_code}_" not in url:
            # 不强制，但降权
            pass
        score = 100
        if "附录" in title: score -= 40
        if "摘要" in title: score -= 40
        year = extract_year(title) or extract_year(url)
        cand.append({"title": title, "url": url, "year": year, "score": score})
    cand.sort(key=lambda x: (x["score"], x["year"], x["title"]), reverse=True)
    return cand[0] if cand else {}

def main():
    if not os.path.exists(IN_XLSX):
        print("❌ 找不到输入：", IN_XLSX); return
    df = pd.read_excel(IN_XLSX, dtype=str).fillna("")
    # 与“年报链接”脚本一致：输出列顺序
    rows = []
    sess = make_session("招股说明书")

    # 取“正股代码”的纯数字（去 .SH/.SZ）
    df["sec_code"] = df["正股代码"].astype(str).str.split(".").str[0]

    for _, r in df.iterrows():
        bond_code = r.get("代码","")
        bond_name = r.get("名称","")
        sec_code  = r.get("sec_code","").strip()
        sec_name  = r.get("正股名称","").strip()

        if not sec_code:
            rows.append([bond_code, bond_name, "", sec_name, "", "", ""]); 
            continue

        # 多关键词尝试（和你年报那套一致的思路）
        kws = [f"{sec_name} 招股说明书", f"{sec_code} 招股说明书", f"{sec_name}招股说明书"]
        best = {}
        for kw in kws:
            klist = search_once(sess, kw)
            best = pick_best(klist, sec_code)
            if best: break

        title = f"《{best['title']}》" if best else ""
        year  = best.get("year","") if best else ""
        url   = best.get("url","")  if best else ""
        rows.append([bond_code, bond_name, sec_code, sec_name, title, year, url])

        # 轻节流，防风控
        time.sleep(0.2)

    # 写 Excel（表头保持一致，只是最后一列是“招股说明书链接”）
    wb = openpyxl.Workbook(); ws = wb.active; ws.title = SHEETNAME
    ws.append(["可转债代码","可转债名称","公司代码","公司简称","标题","年份","招股说明书链接"])
    for row in rows: ws.append(row)
    wb.save(OUT_XLSX)
    print("✅ 已输出：", OUT_XLSX)

if __name__ == "__main__":
    main()


✅ 已输出： /Users/sam/Desktop/cninfo_output/招股说明书链接_选取公司【上交所】.xlsx
