In [None]:
import akshare as ak
import pandas as pd
from datetime import datetime, timedelta
import json
import os
import time

In [None]:
# 读取 JSON 文件中的股票列表
file_path = "data/a_name.json"

if os.path.exists(file_path):
    stock_list = pd.read_json(file_path)
else:
    # 如果文件不存在，则从 akshare 获取数据
    stock_list = ak.stock_info_a_code_name()
    stock_list["Roe"] = 0.0
    stock_list["Pe_ttm"] = 0.0
    stock_list.to_json(file_path, orient="records", force_ascii=False, indent=4)

code_file_path = "data/a_name_list.json"
if os.path.exists(code_file_path):
    original_stock_list = pd.read_json(code_file_path)

In [None]:
for index, row in original_stock_list.iterrows():
    stock_code = str(row["code"]).zfill(6)  # 确保股票代码格式正确
    stock_name = row["name"]
    print(f"{index},{stock_code},{stock_name}")
    stock_list.at[index, "code"] = stock_code
stock_list.to_json(file_path, orient="records", force_ascii=False, indent=4)
print(stock_list)

In [None]:
def get_roe(stock_code):
    try:
        print(f"开始获取{stock_code}数据")
        financial_data = ak.stock_financial_analysis_indicator(symbol=stock_code,start_year="2022")
        # **判断返回的数据是否为空**
        if financial_data is None:
            print(f"⚠️ 获取 {stock_code} 的 ROE 数据为空，可能是接口问题或无数据")
            return None
        
        if financial_data.empty:
            print(f"⚠️ {stock_code} 没有财务数据，跳过")
            return None
        
        last_row = financial_data.iloc[-1]  # 获取最近一期数据

        if "净资产收益率(%)" in last_row:
            return float(last_row["净资产收益率(%)"])
        else:
            print(f"⚠️ {stock_code} 的数据中没有找到 '净资产收益率' 字段")
            return None

    except Exception as e:
        print(f"❌ 获取 {stock_code} ROE 时发生错误: {e}")
        return None



In [56]:
#获取pe市盈率
def get_pe(stock_code):
    try:
        print(f"PE_开始获取{stock_code}数据")
        stock_a_indicator_lg_df = ak.stock_a_indicator_lg(symbol=stock_code)
        if stock_a_indicator_lg_df is None:
            return 0
        # 获取最近一日数据
        last_row = stock_a_indicator_lg_df.iloc[-1]
        if "pe_ttm" in last_row.index:
                return last_row['pe_ttm']
        else:
            return 0
    except AttributeError as e:
        return 0
    except Exception as e:
        return 0

In [62]:
print(type(stock_list))
# **遍历 DataFrame 并更新 ROE**
error_times = 0
for index, row in stock_list.iterrows():
    if index > 0 :
        stock_code = str(row["code"]).zfill(6)  # 确保股票代码格式正确
        if row["Roe"] == 0.0 :
            new_roe = get_roe(stock_code)
            if new_roe is not None and new_roe != row["Roe"]:
                print(f"更新 {stock_code} ROE: {row['Roe']} -> {new_roe}")
                stock_list.at[index, "Roe"] = new_roe  # 更新 ROE
            if new_roe is None:
                error_times += 1
            else:
                time.sleep(8)  # 等待 3 秒后再试
            if error_times >= 3:
                print(f"❌---------------------网络获取数据失败,中断")
                break
        if row['Pe_ttm'] == 0.0 :
            new_pe = get_pe(stock_code)
            if new_pe is not None and new_pe > 0 and new_pe != row["Pe_ttm"]:
                print(f"更新 {stock_code} Pe_ttm: {row['Pe_ttm']} -> {new_pe}")
                stock_list.at[index, "Pe_ttm"] = new_pe  # 更新 Pe_ttm
            if new_pe is None:
                print(f"❌---------------------网络获取数据失败,中断")
                break
            else:
                print(new_pe)
                time.sleep(6)  # 等待 3 秒后再试

<class 'pandas.core.frame.DataFrame'>
PE_开始获取000006数据
nan
PE_开始获取000008数据
nan
PE_开始获取000010数据
nan


KeyboardInterrupt: 

In [None]:
# 将更新后的数据写回 JSON 文件
stock_list.to_json(file_path, orient="records", force_ascii=False, indent=4)
print("------------------------->game over")

In [None]:
# 读取 JSON 文件
try:
    with open(file_path, "r", encoding="utf-8") as file:
        stock_list_new = json.load(file)
    # 遍历并输出内容
    for stock in stock_list_new:
        print(stock)
except FileNotFoundError:
    print(f"文件 {file_path} 未找到！")
except json.JSONDecodeError:
    print(f"读取 JSON 数据时发生错误！")