In [215]:
class SSATool():
    def __init__(self):
        pass

    @staticmethod
    #根据时间序列数据创建轨迹矩阵
    def create_trajectory_matrix(price_data, window_size):
        # 检查窗口大小是否合适
        if window_size > len(price_data):
            raise ValueError("Window size must be less than or equal to the size of the price_data")
        # 计算轨迹矩阵的列数
        n = len(price_data) - window_size + 1 
        # 初始化轨迹矩阵
        trajectory_matrix = np.empty((window_size, n))
        # 填充轨迹矩阵
        for i in range(n):
            trajectory_matrix[:, i] = price_data[i:i + window_size]
        return trajectory_matrix

    @staticmethod
    #对轨迹矩阵进行奇异值分解
    def perform_SVD(trajectory_matrix):
        U, s, VT = np.linalg.svd(trajectory_matrix, full_matrices=False)
        return U, s, VT

    @staticmethod
    #取第一奇异值，用对角线平均法重构时间序列数据
    def reconstruct_time_series(U, s, VT, window_size, n):
        reconstructed_matrix = np.dot(U[:, :1] * s[:1], VT[:1, :])
        return SSATool.diagonal_averaging(reconstructed_matrix, window_size, n)

    @staticmethod
    #对角线平均法
    def diagonal_averaging(reconstructed_matrix, window_size, n):
        k = window_size + n - 1
        reconstructed_data = np.zeros(k)
        for diag in range(k):
            elements = []
            for i in range(max(0, diag - window_size + 1), min(diag, n - 1) + 1):
                elements.append(reconstructed_matrix[diag - i, i])
            reconstructed_data[diag] = np.mean(elements) if elements else 0
        return reconstructed_data

    @staticmethod
    #对时间序列数据进行奇异谱分析，返回重构后的序列结果
    def analyze_series(price_data, window_size):
        trajectory_matrix = SSATool.create_trajectory_matrix(price_data, window_size)
        U, s, VT = SSATool.perform_SVD(trajectory_matrix)
        n = trajectory_matrix.shape[1]
        reconstructed_data = SSATool.reconstruct_time_series(U, s, VT, window_size, n)
        return reconstructed_data[-1]
    

In [216]:
class DataProcessor():
    def __init__(self, stock_data, window_sizes, ssa_tool,m,n):
        self.stock_data = stock_data
        self.window_sizes = window_sizes
        self.ssa_tool = ssa_tool
        self.m = m
        self.n = n
    def prepare_data(self):
        for stock, df in self.stock_data.items():
            self.apply_SSA_to_stock(df, self.window_sizes)
        return self.stock_data

    def apply_SSA_to_stock(self, df, window_sizes):
        for window_size in window_sizes:
            ssa_result = df['close'].rolling(2 * window_size).apply(
                lambda x: self.ssa_tool.analyze_series(x, window_size) if len(x) == 2 * window_size else None,
                raw=False)
            gap = (ssa_result - df['close'])/df['close']
            gap_std = gap.rolling(2 * window_size).std()
            
            # 这里 m 和 n 的值作为参数传入
            df[f'SSA_{window_size}_buy_strength'] = np.maximum(gap - self.m * gap_std, 0)
            df[f'SSA_{window_size}_sell_strength'] = np.minimum(gap + self.n * gap_std, 0)


In [217]:
class SSATradingStrategy(bt.Strategy):
    def __init__(self):
        self.fixed_trade_cash = self.broker.get_cash() / 50
        self.window_sizes = window_sizes
        print('策略初始化')

    def next(self):
        stocks_to_buy_list, stocks_to_sell_list = self.evaluate_stocks()
        self.execute_trades(stocks_to_buy_list, stocks_to_sell_list)

    def evaluate_stocks(self):
        stocks_to_buy = []
        stocks_to_sell = []
        for data in self.datas:
            stock = data._name
            avg_buy_strength, avg_sell_strength = self.calculate_strength(data)
            if avg_buy_strength > 0:
                stocks_to_buy.append((stock, avg_buy_strength))
            if avg_sell_strength < 0:
                stocks_to_sell.append((stock, avg_sell_strength))
        df_stocks_to_buy = pd.DataFrame(stocks_to_buy, columns=['stock', 'avg_buy_strength'])
        df_stocks_to_sell = pd.DataFrame(stocks_to_sell, columns=['stock', 'avg_sell_strength'])
        #按强度排序，买（降序），卖（升序）
        stocks_to_buy_list = df_stocks_to_buy.sort_values(by='avg_buy_strength', ascending=False)['stock'].tolist()
        stocks_to_sell_list = df_stocks_to_sell.sort_values(by='avg_sell_strength', ascending=True)['stock'].tolist()
        
        return stocks_to_buy_list, stocks_to_sell_list

    def calculate_strength(self, data):
        buy_strength_sum = sell_strength_sum = 0
        buy_count = sell_count = 0
        for window_size in self.window_sizes:
            buy_strength = max(getattr(data, f'SSA_{window_size}_buy_strength')[0], 0)
            sell_strength = min(getattr(data, f'SSA_{window_size}_sell_strength')[0], 0)
            if buy_strength > 0:
                buy_strength_sum += buy_strength
                buy_count += 1
            if sell_strength < 0:
                sell_strength_sum += sell_strength
                sell_count += 1
        avg_buy_strength = buy_strength_sum / buy_count if buy_count>=2 else 0
        avg_sell_strength = sell_strength_sum / sell_count if sell_count>= 2 else 0
        return avg_buy_strength, avg_sell_strength

    def execute_trades(self, stocks_to_buy_list, stocks_to_sell_list):
        # 买卖逻辑
        # 首先处理卖出
        for stock in stocks_to_sell_list:
            data = self.getdatabyname(stock)
            if self.getposition(data).size > 0:
                self.sell(data=data, size=self.getposition(data).size, exectype=bt.Order.Limit, price=data.close[0])
        # 然后处理买入
        for stock in stocks_to_buy_list:
            data = self.getdatabyname(stock)
            current_cash = self.broker.get_cash()
            trade_cash = min(current_cash, self.fixed_trade_cash)

            if self.getposition(data).size == 0:
                # 计算可买入的股票数量
                size_to_buy = trade_cash / data.close[0]
                # 调整为100的整数倍
                size_to_buy = int(size_to_buy / 100) * 100
                self.buy(data=data, size=size_to_buy, exectype=bt.Order.Limit, price=data.close[0])

    def stop(self):
        # 策略结束时调用
        print("策略执行完毕")

    def notify_order(self, order):
        if order.status in [order.Completed]:
            if order.isbuy():
                self.log(f'买入: 股票={order.data._name}, '
                         f'数量={order.executed.size:.2f}, '
                         f'价格={order.executed.price:.2f}, '
                         f'成本={order.executed.value:.2f}, '
                         f'手续费={order.executed.comm:.2f}')
            elif order.issell():
                self.log(f'卖出: 股票={order.data._name}, '
                         f'数量={order.executed.size:.2f}, '
                         f'价格={order.executed.price:.2f}, '
                         f'成本={order.executed.value:.2f}, '
                         f'手续费={order.executed.comm:.2f}')
        elif order.status == order.Canceled:
            self.log('订单取消')
        elif order.status == order.Rejected:
            self.log('订单拒绝')

    def notify_trade(self, trade):
        if trade.isclosed:
            self.log(f'交易结果: 股票={trade.data._name}, 毛利润={trade.pnl:.2f}, 净利润={trade.pnlcomm:.2f}')

    def log(self, text, dt=None):
        ''' 日志记录功能，每条日志前添加分隔符 '''
        dt = dt or self.datas[0].datetime.date(0)
        print(f'{"-"*60}\n{dt.isoformat()} {text}')

In [218]:
class CustomPandasData(bt.feeds.PandasData):
    # 添加一个必要的行，告知 Backtrader 这些列的存在，lines 是一个元组，包含了所有额外的数据名称的行
    # 动态创建lines和params
    lines = tuple(f'SSA_{size}_buy_strength' for size in window_sizes) + \
            tuple(f'SSA_{size}_sell_strength' for size in window_sizes)

    # 数据列名称与你 DataFrame 中的列名称相匹配
    params = tuple((f'SSA_{size}_buy_strength', -1) for size in window_sizes) + \
             tuple((f'SSA_{size}_sell_strength', -1) for size in window_sizes)
    

In [219]:
class BacktestRunner():
    def __init__(self, strategy, initial_cash, commission, from_date, to_date):
        self.strategy = strategy
        self.initial_cash = initial_cash
        self.commission = commission
        self.from_date = from_date
        self.to_date = to_date
        self.cerebro = bt.Cerebro()

    def setup_data_feeds(self, stock_data):
        for stock, data in stock_data.items():
            data['trade_date'] = pd.to_datetime(data['trade_date'], format='%Y%m%d')
            stock_code = data['ts_code'].iloc[0]  # 获取第一行的 ts_code 值作为股票代码
            
            data_feed = CustomPandasData(dataname=data,
                                         name=stock_code,  # 使用实际的股票代码作为名称
                                         datetime='trade_date',  
                                         fromdate=self.from_date,
                                         todate=self.to_date)
            
            self.cerebro.adddata(data_feed)

    def run(self):
        self.cerebro.addstrategy(self.strategy)
        self.cerebro.broker.set_cash(self.initial_cash)
        self.cerebro.broker.setcommission(commission=self.commission)
        self.cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
        self.cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
        self.cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')

        results = self.cerebro.run()
        return self.analyze_results(results)

    def analyze_results(self, results):
        strategy = results[0]
        returns_analysis = strategy.analyzers.returns.get_analysis()
        sharpe_ratio_analysis = strategy.analyzers.sharpe.get_analysis()
        drawdown_analysis = strategy.analyzers.drawdown.get_analysis()

        # 创建并返回结果 DataFrame
        analyze_result = pd.DataFrame({
            '回测期收益率': [returns_analysis.get('rtot', None)],
            '年化收益率': [returns_analysis.get('rnorm', None)],
            '夏普比率': [sharpe_ratio_analysis.get('sharperatio', None)],
            '最大回撤长度': [drawdown_analysis.get('len', None)],
            '最大回撤百分比': [drawdown_analysis.get('drawdown', None)],
        })
        return analyze_result


In [220]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import backtrader as bt
import tushare as ts
import datetime
from tqdm import tqdm
#初始化tushare接口
pro = ts.pro_api()
# 股票列表
stock_list = pro.index_weight(index_code='399300.SZ',start_date='20231201',end_date='20231201')['con_code'].values.tolist()

#数据起始时间
start_date = '2022-05-01' #因为要用到rolling,数据起始日要比回测初始日期要早，
end_date = '2024-01-19'

In [None]:
all_stock_data = {}
for stock in tqdm(stock_list,desc='数据获取中'):
    df = ts.pro_bar(stock, start_date=start_date, end_date=end_date, adj='hfq', freq='D')
    df.sort_values(by='trade_date', ascending=True, inplace=True)
    all_stock_data[stock] = df


数据获取中:  95%|██████████████████████████▌ | 284/300 [11:13<00:36,  2.30s/it]

In [None]:
#需要用到的窗口大小
window_sizes = [7, 15, 30]

m = 1  # 计算buy_strength用到,即大于多少倍的gap_std才可以
n = 1  # 计算sell_strength用到，即小于多少倍的gap_std才可以

ssa_tool = SSATool()
data_processor = DataProcessor(all_stock_data, window_sizes, ssa_tool, m, n)
processed_data = data_processor.prepare_data()


In [None]:
#初始资金及佣金比例
initial_cash = 10000000
commission = 0.0025

# 回测日期
from_date = datetime.datetime(2022, 12, 31)
to_date = datetime.datetime(2024, 1, 19)

# 实例化策略
strategy = SSATradingStrategy

# 初始化回测执行器
backtest_runner = BacktestRunner(strategy, initial_cash, commission, from_date, to_date)

# 设置数据馈送
backtest_runner.setup_data_feeds(processed_data)

# 运行回测
analyze_result = backtest_runner.run()


In [None]:
# 输出结果
analyze_result