In [1]:
def calculate_llt(prices, alpha=0.05):
    # 转换为numpy数组以避免pandas索引问题
    if hasattr(prices, 'values'):
        prices = prices.values
    
    n = len(prices)
    llt = np.zeros(n)
    if n >= 1:
        llt[0] = prices[0]
    if n >= 2:
        llt[1] = prices[1]
    
    a1 = alpha - (alpha**2) / 4
    a2 = (alpha**2) / 2
    a3 = alpha - 3 * (alpha**2) / 4
    a4 = 2 * (1 - alpha)
    a5 = - (1 - alpha)**2
    
    for t in range(2, n):
        llt[t] = a1 * prices[t] + a2 * prices[t-1] - a3 * prices[t-2] + a4 * llt[t-1] + a5 * llt[t-2]
    
    return llt

# 信号计算函数
def llt_slope_signal(df, d: int=39, slope_window=5, thresh=(0, 0)):
    df = df.copy()
    alpha = 2 / (d + 1)
    df["llt"] = calculate_llt(df["close"], alpha)
    df['slope'] = (df["llt"].rolling(slope_window)
                    .apply(lambda x: np.polyfit(np.arange(slope_window), x, 1)[0]))
    
    signals = pd.Series(0, index=df.index)
    signals[df['slope'] > thresh[1]] = 1
    signals[df['slope'] < thresh[0]] = -1
    signals.ffill(inplace = True)
    
    return signals

<!--PAID CONTENT END-->

这是回测策略类：

In [2]:
import backtrader as bt

class LLTStrategy(bt.Strategy):
    params = (
        ('d', 39),
        ('slope_window', 5), 
        ('position_ratio', 1),
        ('thresh', (0, 0))
    )
    
    def __init__(self):
        self.order_dict = {}
        self.signals = llt_slope_signal(
            self.data._dataname,
            d=self.p.d,
            slope_window=self.p.slope_window,
            thresh=self.p.thresh
        )
        
        self._last_direction = 0
        
        print(f"策略初始化完成，信号数量: {len(self.signals)}")
        print(f"信号前20个值: {self.signals.head(20)}")
    
    def next(self):
        current_date = pd.Timestamp(self.data.datetime.date())
        
        current_signal = self.signals.loc[current_date]
        position = self.getposition(self.data).size
        
        if current_signal != 0:
            print(f"日期: {current_date}, 信号: {current_signal}, 当前持仓: {position}")
        
        if current_signal == 1 and self._last_direction <= 0:
            order = self.order_target_percent(target=0.95)
            if order:
                print(f"做多信号: {current_date.date()}")
        elif current_signal == -1 and self._last_direction >= 0:
            order = self.order_target_percent(target=-0.95)
            if order:
                print(f"做空信号: {current_date.date()}")
        elif current_signal == 0 and position != 0:
            order = self.order_target_percent(target=0.0)
            if order:
                print(f"平仓信号: {current_date.date()}")
        else:
            pass

def run_backtest(data, d=39, 
                 slope_window=5, 
                 thresh=(0,0),
                 initial_cash=1_000_0000, 
                 commission=1e-3):
    cerebro = bt.Cerebro()

    cerebro.broker.setcash(initial_cash)
    cerebro.broker.setcommission(commission=commission)

    cerebro.addstrategy(LLTStrategy, d=d, slope_window=slope_window, thresh=thresh)
    
    bt_data = bt.feeds.PandasData(dataname=data)
    cerebro.adddata(bt_data)
    
    # 添加绩效分析器
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    
    # 运行回测
    print(f"初始资金: {cerebro.broker.getvalue():.2f}")
    results = cerebro.run(maxcpus = 6, optreturn = True)
    final_value = cerebro.broker.getvalue()
    print(f"最终资金: {final_value:.2f}")
    
    # 输出绩效指标
    strat = results[0]
    returns = strat.analyzers.returns.get_analysis()
    sharpe = strat.analyzers.sharpe.get_analysis()
    drawdown = strat.analyzers.drawdown.get_analysis()
    
    print(f"夏普比率: {sharpe.get('sharperatio', 0):.2f}")
    print(f"最大回撤: {drawdown.get('max', {}).get('drawdown', 0):.2f}%")
    print(f"年化收益率: {returns.get('rnorm', 0):.2%}")


def get_price(symbol, start_date, end_date):
    pro = pro_api()

    price_df = pro.index_daily(
        ts_code=symbol,
        start_date=start_date.strftime("%Y%m%d"),
        end_date=end_date.strftime("%Y%m%d"),
    )

    price_df = (
        price_df.rename({"trade_date": "date", "ts_code": "asset"}, axis=1)
        .sort_values("date", ascending=True)
        .set_index("date")
    )

    price_df.index = pd.to_datetime(price_df.index)
    return price_df

start = datetime.date(2013, 1, 1)
end = datetime.date(2024, 12, 31)
prices= get_price("000001.SH", start, end)

run_backtest(prices, d=59, thresh=(-0.01, 0.01), commission=1e-4)

初始资金: 10000000.00
策略初始化完成，信号数量: 2914
信号前20个值: date
2013-01-04    0
2013-01-07    0
2013-01-08    0
2013-01-09    0
2013-01-10    1
2013-01-11    1
2013-01-14    1
2013-01-15    1
2013-01-16    1
2013-01-17    1
2013-01-18    1
2013-01-21    1
2013-01-22    1
2013-01-23    1
2013-01-24    1
2013-01-25    1
2013-01-28    1
2013-01-29    1
2013-01-30    1
2013-01-31    1
dtype: int64
日期: 2013-01-10 00:00:00, 信号: 1, 当前持仓: 0
做多信号: 2013-01-10
日期: 2013-01-11 00:00:00, 信号: 1, 当前持仓: 4159
做多信号: 2013-01-11
日期: 2013-01-14 00:00:00, 信号: 1, 当前持仓: 4160
做多信号: 2013-01-14
日期: 2013-01-15 00:00:00, 信号: 1, 当前持仓: 4155
做多信号: 2013-01-15
日期: 2013-01-16 00:00:00, 信号: 1, 当前持仓: 4154
日期: 2013-01-17 00:00:00, 信号: 1, 当前持仓: 4154
做多信号: 2013-01-17
日期: 2013-01-18 00:00:00, 信号: 1, 当前持仓: 4156
做多信号: 2013-01-18
日期: 2013-01-21 00:00:00, 信号: 1, 当前持仓: 4155
做多信号: 2013-01-21
日期: 2013-01-22 00:00:00, 信号: 1, 当前持仓: 4154
日期: 2013-01-23 00:00:00, 信号: 1, 当前持仓: 4154
日期: 2013-01-24 00:00:00, 信号: 1, 当前持仓: 4154
做多信号: 2013-01-24
日期: 2013-0

<!-- BEGIN IPYNB STRIPOUT -->
我们省略回测调用代码，以节省篇幅。如果你需要这些代码，可以购买匡醍会员。如果你不太理解我们在讨论些啥，你可以申请参加我们的《量化24课》和《因子挖掘与机器学习策略》。
<!-- END IPYNB STRIPOUT -->

<!--PAID CONTENT START-->

In [3]:
import datetime

class LLTSignal(bt.Indicator):
    lines = ("signal",)
    params = (("d", 59), ("slope_window", 5), ("thresh", (-0.01, 0.01)))
    
    def __init__(self):
        self.signals = llt_slope_signal(self.data._dataname, self.p.d, self.p.slope_window, self.p.thresh)
        self.signal_index = 0
    
    def next(self):
        if self.signal_index < len(self.signals):
            self.lines.signal[0] = self.signals.iloc[self.signal_index]
            self.signal_index += 1
        else:
            self.lines.signal[0] = 0

from backtrader import feeds as btfeeds

start = datetime.date(2013, 1, 1)
end = datetime.date(2024, 12, 31)


prices = get_price("000001.SH", start, end)
prices.index = pd.to_datetime(prices.index)

cerebro = bt.Cerebro()

cerebro.broker.setcash(1_000_000)
cerebro.broker.setcommission(commission=1e-4)
cerebro.addsizer(bt.sizers.PercentSizer, percents=95)

data = btfeeds.PandasData(dataname=prices)
cerebro.adddata(data)
cerebro.add_signal(bt.SIGNAL_LONGSHORT, LLTSignal)

cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe")
cerebro.addanalyzer(bt.analyzers.Returns, _name="returns")
cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown")

results = cerebro.run()
strat = results[0]

returns = strat.analyzers.returns.get_analysis()
sharpe = strat.analyzers.sharpe.get_analysis()
drawdown = strat.analyzers.drawdown.get_analysis()
print(f"总收益率：{returns['rtot']:.2%}")
print(f"年化收益率：{returns['rnorm']:.2%}")
print(f"夏普比率：{sharpe['sharperatio']:.2f}")
print(f"最大回撤：{drawdown['max']['drawdown']:.2f}%")

总收益率：62.22%
年化收益率：5.53%
夏普比率：0.29
最大回撤：38.31%
