In [1]:
import akshare as ak
print("AkShare version:", ak.__version__)

AkShare version: 1.17.61


In [3]:

import pandas as pd

df = ak.stock_zh_a_spot_em()   # A股全市场快照
print(df.shape)
df.head(10)


  0%|          | 0/57 [00:00<?, ?it/s]

ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

In [4]:
# 目标：统计快照中「科创板 / 创业板 / 主板」数量，并打印
# 分类规则（按代码前缀，贴近日常口径）：
# - 科创板：688***、689***
# - 创业板：300***、301***
# - 主板：  600***、601***、603***、605***、000***、001***、002***、003***
# - 其他：  不在以上前缀（比如北交所/存托凭证等），单独计入“其他”以免遗漏

def _get_snapshot_codes():
    # 方案 1：AkShare
    try:
        import akshare as ak
        df = ak.stock_zh_a_spot_em()
        # AkShare 通常列名为“代码”，但兜底一下
        if "代码" not in df.columns and "code" in df.columns:
            df = df.rename(columns={"code": "代码"})
        codes = df["代码"].astype(str).str.strip().tolist()
        return codes, "AkShare/Eastmoney"
    except Exception as e:
        print("AkShare 获取快照失败，尝试直接请求 Eastmoney push2：", repr(e))

    # 方案 2：Eastmoney push2（一次性拉全量，尽量减少翻页）
    import requests
    hosts = [
        "https://push2.eastmoney.com/api/qt/clist/get",
        "https://82.push2.eastmoney.com/api/qt/clist/get",
        "https://32.push2.eastmoney.com/api/qt/clist/get",
    ]
    params = {
        "pn": "1", "pz": "5000", "po": "1", "np": "1",
        "ut": "bd1d9ddb04089700cf9c27f6f7426281",
        "fltt": "2", "invt": "2", "fid": "f12",
        # 与 AkShare 口径一致的沪深（含主板/创业/科创），必要时你也可自行调整
        "fs": "m:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23,m:0 t:81 s:2048",
        "fields": "f12"  # 只要代码字段
    }
    last_err = None
    for url in hosts:
        try:
            r = requests.get(url, params=params, timeout=30)
            r.raise_for_status()
            j = r.json()
            diff = (j or {}).get("data", {}).get("diff", []) or []
            codes = [str(it.get("f12", "")).strip() for it in diff if it.get("f12")]
            if codes:
                return codes, "Eastmoney push2"
        except Exception as e2:
            last_err = e2
            continue
    raise RuntimeError(f"无法从 Eastmoney 获取快照：{repr(last_err)}")

def _classify_board(code: str) -> str:
    c = str(code)
    if c.startswith(("688", "689")):
        return "科创板"
    if c.startswith(("300", "301")):
        return "创业板"
    if c.startswith(("600", "601", "603", "605", "000", "001", "002", "003")):
        return "主板"
    return "其他"

# ---- 执行 ----
codes, source = _get_snapshot_codes()

# 去重后分类计数
import pandas as pd
s = pd.Series(sorted(set(codes)), name="代码").map(_classify_board).value_counts()

kechuang = int(s.get("科创板", 0))
chuangye = int(s.get("创业板", 0))
zhuban  = int(s.get("主板", 0))
qita    = int(s.get("其他", 0))
heji    = kechuang + chuangye + zhuban + qita

print(f"数据来源：{source}")
print(f"科创板：{kechuang}")
print(f"创业板：{chuangye}")
print(f"主  板：{zhuban}")
print(f"其  他：{qita}")
print(f"合  计：{heji}")


AkShare 获取快照失败，尝试直接请求 Eastmoney push2： ConnectionError(ProtocolError('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')))


RuntimeError: 无法从 Eastmoney 获取快照：ConnectionError(ProtocolError('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')))