In [None]:
import numpy as np
import pandas as pd
from IPython.core.interactiveshell import InteractiveShell
import openpyxl
import akshare as ak
InteractiveShell.ast_node_interactivity = "all"
pd.options.display.max_columns = 1000
pd.options.display.max_rows = 5000
pd.options.display.float_format = lambda x: '%.5f' % x
import matplotlib.pyplot as plt
%matplotlib inline
plt.rcParams['font.sans-serif'] = 'SimHei'
plt.rcParams['axes.unicode_minus'] = False
%config InlineBackend.figure_format='svg'
import matplotlib.dates as mdates
import matplotlib.ticker as ticker

In [None]:
def excess_profit(code):
    # 去除单引号
    code = code.strip("'")
    
    try:
        # 获取股票数据
        price = ak.stock_zh_a_hist(symbol=code, period='daily', adjust='hfq', 
                                  start_date='20221218', end_date='20250101')
        
        # 检查是否获取到数据
        if price.empty:
            print(f"警告: 股票 {code} 没有历史数据")
            return 0.0, 0.0, 0.0

        
        # 检查关键列是否存在
        required_columns = ['日期', '开盘', '收盘', '最高', '最低']
        missing_columns = [col for col in required_columns if col not in price.columns]
        
        if missing_columns:
            print(f"错误: 股票 {code} 缺少必要的列: {missing_columns}")
            return 0.0, 0.0, 0.0
        
        # 数据预处理
        price = price.dropna()
        
        # 确保列名正确
        if '收盘' in price.columns:
            price['MA10'] = price['收盘'].rolling(window=10).mean()
        else:
            print(f"错误: 股票 {code} 没有找到 '收盘' 列")
            return 0.0, 0.0, 0.0
        
        # 删除前10行(因为MA10前10行是NaN)
        price = price.drop(index=range(10)).reset_index(drop=True)
        
        # 检查处理后的数据是否足够
        if len(price) < 100:
            print(f"错误: 股票 {code} 数据不足，无法进行回测")
            return 0.0, 0.0, 0.0
        
        # 初始化策略参数
        initial_cash = 10000000.0
        tax = 0.00025
        
        # 创建结果DataFrame
        df = pd.DataFrame(columns=['日期', '开盘', '收盘', 'MA10', '涨跌幅', '可买', '持仓', '持有天数', '交易信号', '总市值', '现金', '策略收益率', '股票收益率'],
                          index=range(len(price)))
        
        # 复制数据
        df[['日期', '开盘', '收盘', 'MA10']] = price[['日期', '开盘', '收盘', 'MA10']]
        
        # 初始化列
        df['持仓'] = 0
        df['总市值'] = 0.0
        df['现金'] = initial_cash
        df['策略收益率'] = 0.0
        df['股票收益率'] = 0.0
        df['交易信号'] = 0
        df['持有天数'] = 0
        df['总资产'] = initial_cash
        df['可买'] = 0
        
        # 执行策略回测
        for i in range(1,len(df)):
            # 计算可买股数
            df.loc[i, '可买'] = (((df.loc[i-1, '现金'] / df.loc[i, '开盘'])*(1-tax)) // 100) * 100

                    # 金叉死叉生成交易信号
            if df.loc[i-1,'MA10']>=df.loc[i-1,'收盘'] and df.loc[i,'MA10']<df.loc[i,'收盘']:
                df.loc[i,'交易信号']=1
            if df.loc[i-1,'MA10']<=df.loc[i-1,'收盘'] and df.loc[i,'MA10']>df.loc[i,'收盘']:
                df.loc[i,'交易信号']=-1


                    # 计算交易成本
            buy = 0
            sell = 0

            if i < len(df):
                buy = max(df.loc[i, '开盘'] * df.loc[i, '可买'] * tax, 5) + df.loc[i, '开盘'] * df.loc[i, '可买']

            if i in range(len(df)) and df.loc[i-1, '持仓'] > 0:
                sell = df.loc[i, '开盘'] * df.loc[i-1, '持仓'] - max(df.loc[i, '开盘'] * df.loc[i-1, '持仓'] * tax, 5)

                    # 第二天开盘执行交易
            if df.loc[i-1, '交易信号'] == 1 and df.loc[i-1, '现金'] >= buy and df.loc[i-1, '持仓'] == 0 and i!=len(df)-1:
                df.loc[i, '现金'] = df.loc[i-1, '现金'] - buy
                df.loc[i, '持仓'] = df.loc[i-1, '持仓'] + df.loc[i, '可买']
            elif df.loc[i-1, '交易信号'] == -1 and df.loc[i-1, '持仓'] > 0:
                df.loc[i, '现金'] = df.loc[i-1, '现金'] + sell
                df.loc[i, '持仓'] = 0
            else:
                df.loc[i, '现金'] = df.loc[i-1, '现金']
                df.loc[i, '持仓'] = df.loc[i-1, '持仓']

            # 计算总资产和收益率
            df.loc[i, '总市值'] = df.loc[i, '持仓'] * df.loc[i, '收盘']
            df.loc[i, '总资产'] = df.loc[i, '总市值'] + df.loc[i, '现金']
            df.loc[i, '策略收益率'] = (df.loc[i, '总资产'] / initial_cash) - 1
            df.loc[i, '股票收益率'] = (df.loc[i, '收盘'] / df.loc[0, '收盘']) - 1
        
            # 计算最终结果
        if len(df) > 0:
            excess = df.loc[len(df)-1, '策略收益率'] - df.loc[len(df)-1, '股票收益率']
            strategy = df.loc[len(df)-1, '策略收益率']
            stock = df.loc[len(df)-1, '股票收益率']

            # 结果检测
            if not all([isinstance(x, (int, float)) for x in [excess, strategy, stock]]):
                print(f"错误: 股票 {code} 的收益率计算结果包含非数值类型")
                return 0.0, 0.0, 0.0

            if any(pd.isna([excess, strategy, stock])):
                print(f"错误: 股票 {code} 的收益率计算结果包含NaN值")
                return 0.0, 0.0, 0.0

            if any([abs(x) > 10 for x in [excess, strategy, stock]]):
                print(f"警告: 股票 {code} 的收益率计算结果异常大: "
                          f"超额收益率={excess:.4f}, 策略收益率={strategy:.4f}, 股票收益率={stock:.4f}")

                # 打印最终结果
            print(f"股票 {code} 策略回测完成: "
                      f"超额收益率={excess:.4f}, 策略收益率={strategy:.4f}, 股票收益率={stock:.4f}")

            return excess, strategy, stock
        else:
            print(f"错误: 股票 {code} 的回测结果DataFrame为空")
            return 0.0, 0.0, 0.0
            
    except Exception as e:
        print(f"处理股票 {code} 时出错: {e}")
        return 0.0, 0.0, 0.0

In [None]:
# 获取A股实时数据
df_excess = ak.stock_zh_a_spot()

# 移除代码中的前缀
df_excess['代码'] = df_excess['代码'].str[2:]
# 筛选沪深市股票
df_excess = df_excess[~df_excess['代码'].str.startswith(('4','8','9'))]
df_excess = df_excess[~df_excess['名称'].str.startswith(('ST','*ST'))]

# 重置索引
df_excess = df_excess.reset_index(drop=True)

# 删除不需要的列
columns_to_drop = ['最新价', '涨跌额', '涨跌幅', '买入', '卖出', '昨收', '今开', '最高', '最低', '成交量', '成交额', '时间戳']
df_excess = df_excess.drop(columns=[col for col in columns_to_drop if col in df_excess.columns])

# 应用策略并填充结果
def calculate_returns(row):
    code = row['代码']
    return pd.Series(excess_profit(code), index=['超额收益率', '策略收益率', '股票收益率'])

# 先创建新的列，再应用函数
df_excess[['超额收益率', '策略收益率', '股票收益率']] = df_excess.apply(calculate_returns, axis=1)

# 过滤掉三个收益率都为0的行
df_excess = df_excess[~((df_excess['超额收益率'] == 0) & 
                        (df_excess['策略收益率'] == 0) & 
                        (df_excess['股票收益率'] == 0))]

# 删除包含缺失值的行
df_excess = df_excess.dropna()

# 按超额收益率降序排列
#df_excess = df_excess.sort_values(by='超额收益率', ascending=False)
#df_excess=df_excess.reset_index(drop=True)

# 将收益率转换为百分比格式
#df_excess[['超额收益率', '策略收益率', '股票收益率']] = df_excess[['超额收益率', '策略收益率', '股票收益率']].apply(lambda x: x * 100).applymap("{:.2f}%".format)

# 保存结果到Excel文件
df_excess.to_excel(r'E:/jupyter/excel/MA10策略回测.xlsx')

# 显示结果
df_excess

#