初始化

In [None]:
# 导入函数库
from jqdata import *
import numpy as np
ADJUST_INTERVAL = 20  # 调仓周期
CHK_PF_INTERVAL = 60  # 回看窗口
PERCENT_TO_KEEP = 10  # 组合容量

ATR_WIN_SIZE = 20  # ATR移动窗口
RISK_RATIO = 0.00008 # 风险系数
INC_POS_PF_RATE = 0.05 # 当日开盘价相对于前一次买入价的盈利比阈值
MAX_DROP_RATE = 0.03 # 前高回落比率

INIT_POS_ATR_MODE = True # 是否用ATR分配初始建仓的头寸
ADD_POS_ATR_MODE = False # 是否用ATR分配加仓的头寸

# 初始化函数，设定基准等等
def initialize(context):
    set_benchmark('000300.XSHG')
    set_option('use_real_price', True)

    init_cash = context.portfolio.starting_cash

    # 设置账户类型: 融资融券账户
    set_subportfolios([SubPortfolioConfig(cash=init_cash, type='stock_margin')])
    
    ## 设定融资/融券的利率和保证金/交易成本
    set_option('margincash_interest_rate', 0.00) # 设定融资利率: 年化8%, 默认8%
    set_option('margincash_margin_rate', 1.0) # 设置融资保证金比率: 150%, 默认100%
    set_option('marginsec_interest_rate', 0.10) # 设定融券利率: 年化10%, 默认10%
    set_option('marginsec_margin_rate', 1.5) # 设定融券保证金比率: 150%, 默认100%
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
    run_daily(before_market_open, time='before_open', reference_security='000300.XSHG') 
    run_daily(market_open, time='open', reference_security='000300.XSHG')
    run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
    
    # 记录交易日期及其索引
    g.all_tdays = get_all_trade_days() # 交易日
    g.date_xlat = dict([(doc[1], doc[0]) for doc in enumerate(g.all_tdays)]) # 交易日,索引
    
    # 设定若干全局变量
    g.elapsed_days = 0 # 记录策略运行的天数
    g.rebalance_today = False # 是否为rebalance day
    g.new_long_pos = {}
    g.new_short_pos = {}
    
    stock_list = list(get_all_securities(['stock']))
    
    g.last_long_entry_prices = {} # 加仓 -- 记录入场价格
    g.last_long_entry_dates = {} # 浮动止损 -- 记录入场时间
    g.last_short_entry_prices = {} # 加仓 -- 记录入场价格
    g.last_short_entry_dates = {} # 浮动止损 -- 记录入场时间 

开盘前的函数

In [None]:
## 开盘前运行函数     
def before_market_open(context):
    cur_dt = context.current_dt.date() # 当前日期
    if g.elapsed_days % ADJUST_INTERVAL == 0: # 是否为调仓日
        log.info("Rebalance at %s", cur_dt)
        g.rebalance_today = True
        idx = g.date_xlat[cur_dt]
        
        # 观察窗口首末日期, lookback window 不包含今天，所以是前-61天到-1天
        tail_date = g.all_tdays[idx-1]
        head_date = g.all_tdays[idx-1-CHK_PF_INTERVAL]
        
        # 调仓日前一天的股票列表
        candidates = list(get_all_securities(["stock"], tail_date).index)
        
        # 观察窗口末尾日期的股票价格
        tail_prices = get_price(candidates, tail_date, tail_date, frequency='1d',
                                fields=['close'])
        g.tail_values = dict(tail_prices["close"].iloc[0]) # key和value分别是code 和 close
        
        # 观察窗口起始日期的股票价格
        head_prices = get_price(candidates, head_date, head_date, frequency='1d',
                                fields=['close'])
        g.head_values = dict(head_prices["close"].iloc[0])
        
        # 在观察窗口内按收益率对股票排序(从小到大)
        merged_list = []
        for code, tail_value in g.tail_values.items():
            if math.isnan(tail_value):
                continue
            
            # 确保股票始终在观察窗口期内始终有数据
            if code not in g.head_values:
                continue
            head_value = g.head_values[code]
            if math.isnan(head_value):
                continue
            
            # 计算相对收益
            profit_r = tail_value/head_value - 1.0 
            
            # 保存中间结果
            merged_list.append({
                "code": code,
                "profit_r": profit_r,
                "price": tail_value
            })
        
        # 从首尾分别取出一定比例的股票，构造新的多控投资组合
        merged_list.sort(key=lambda x : x["profit_r"], reverse=False)
        
        num_to_keep = len(merged_list) * PERCENT_TO_KEEP // 100  
        g.new_long_pos = {doc["code"]:doc for doc in merged_list[0:num_to_keep]}
        g.new_short_pos = {doc["code"]:doc for doc in merged_list[-num_to_keep:]}
        
    else:
        g.rebalance_today = False
        
    g.elapsed_days += 1

盘中的函数

In [None]:
## 开盘时运行函数
def market_open(context):
    cur_dt = context.current_dt.date() # 当前日期
    p = context.portfolio.subportfolios[0] # 融资/融券保证金账户
    current_data = get_current_data()
    if g.rebalance_today:
        # 再平衡步骤1: 平掉原有多空仓位
        prev_long_pos = p.long_positions
        prev_short_pos = p.short_positions
        
        for code, pos in prev_long_pos.items():
            if current_data[code].paused:
                continue
            
            # 持仓检测
            if code not in g.new_long_pos:
                
                margincash_close(code, pos.closeable_amount)
                
        # 空头仓位是否能平掉?        
        for code, pos in prev_short_pos.items():
            if current_data[code].paused:
                continue
            
            if code not in g.new_short_pos:
                
                marginsec_close(code, pos.closeable_amount)

                
        # 再平衡步骤2: 开立新的多空仓位
        nums = len(g.new_long_pos) + len(g.new_short_pos)
        each_cash = np.round(p.available_margin / nums, 2)
        
        for code, doc in g.new_long_pos.items():
            if current_data[code].paused:
                continue
            
            if code not in prev_long_pos:
                open_price = current_data[code].day_open
                
                if INIT_POS_ATR_MODE:
                    atr = calcATR(code, ATR_WIN_SIZE)
                    if atr is None:
                        continue
                    num_to_buy = p.available_margin * RISK_RATIO / atr // 100 * 100
                    
                else:
                    num_to_buy = each_cash / open_price // 100 * 100
                    
                margincash_open(code, num_to_buy)

        
        for code, doc in g.new_short_pos.items():
            if current_data[code].paused:
                continue
            
            if code not in prev_short_pos:
                open_price = current_data[code].day_open
                
                if INIT_POS_ATR_MODE:
                    atr = calcATR(code, ATR_WIN_SIZE)
                    if atr is None:
                        continue
                    num_to_sell = p.available_margin * RISK_RATIO / atr // 100 * 100
                else:
                    num_to_sell = each_cash / open_price // 100 * 100
                    
                marginsec_open(code, num_to_sell)
        
    else:
        # 非调仓日，可以增加止盈/止损等额外操作... ...
        # 仓位管理，附上仓位图；止盈/止损
        
#         pass
        
        nums = len(p.long_positions) + len(p.short_positions)
        each_cash = np.round(p.available_margin / nums, 2)
        
        # 判断多头仓位
        for code, pos in p.long_positions.items():
            if current_data[code].paused:
                continue
            if pos.today_amount == 0 and pos.closeable_amount > 0:
                # 用于加仓 -- start
                open_price = current_data[code].day_open
                last_entry = g.last_long_entry_prices[code]
                # log.info('code: ', code, ', last long entry price: ', last_entry)
                # 用于加仓 -- end
                
                # 用于回撤止盈
                last_entry_date = g.last_long_entry_dates[code]
                hhv, drop_rate = calcDropRate(code, current_data, last_entry_date, cur_dt, 'long')
                # 浮动止损价格
                key_price = max(last_entry * (1-MAX_DROP_RATE), hhv * (1-MAX_DROP_RATE))
                
                # if drop_rate > MAX_DROP_RATE:
                if open_price <= key_price:
                    margincash_close(code, pos.closeable_amount)
                    
                elif (open_price - last_entry) / last_entry >= INC_POS_PF_RATE:
                    if ADD_POS_ATR_MODE:
                        atr = calcATR(code, ATR_WIN_SIZE)
                        num_to_buy = p.available_margin * RISK_RATIO / atr // 100 * 100
                    else:
                        num_to_buy = each_cash / open_price // 100 * 100
                    margincash_open(code, num_to_buy)
        
        # 判断空头仓位
        for code, pos in p.short_positions.items():
            if current_data[code].paused:
                continue
            if pos.today_amount == 0 and pos.closeable_amount > 0:
                # 用于加仓 -- start
                open_price = current_data[code].day_open
                last_entry = g.last_short_entry_prices[code]
                # 用于加仓 -- end
                
                # 用于回撤止盈
                last_entry_date = g.last_short_entry_dates[code]
                llv, drop_rate = calcDropRate(code, current_data, last_entry_date, cur_dt, 'short')
                
                key_price = min(last_entry * (1+MAX_DROP_RATE), llv*(1+MAX_DROP_RATE))
                
                
                # if drop_rate > MAX_DROP_RATE:
                if open_price >= key_price:
                    marginsec_close(code, pos.closeable_amount)

                
                elif (last_entry - open_price) / last_entry >= INC_POS_PF_RATE:
                    if ADD_POS_ATR_MODE:
                        atr = calcATR(code, ATR_WIN_SIZE)
                        num_to_sell = p.available_margin * RISK_RATIO / atr // 100 * 100
                    else:
                        num_to_sell = each_cash / open_price // 100 * 100   
                    marginsec_open(code, num_to_sell)

收盘后函数

In [None]:
## 收盘后运行函数  
def after_market_close(context):
    # 查看融资融券账户相关相关信息(更多请见API-对象-SubPortfolio)
    p = context.portfolio.subportfolios[0]
    cur_dt = context.current_dt.date()
    # pos_level = p.positions_value / p.total_value
    pos_level = 1 - p.available_margin / p.total_value
    record(pos_level=pos_level)
    
    trades = get_trades()
    for _trade in trades.values():
        _order = get_orders(_trade.order_id).values()[0]
        log.info(_order)
        code = _order.security
        side = _order.side
        action = _order.action
        price = _order.price
        
        if action == 'open':
            if side == 'long':
                g.last_long_entry_prices[code] = price
                g.last_long_entry_dates[code] = cur_dt
            elif side == 'short':
                g.last_short_entry_prices[code] = price
                g.last_short_entry_dates[code] = cur_dt
        elif action == 'close':
            if side == 'long':
                g.last_long_entry_prices[code] = None
                g.last_long_entry_dates[code] = None
            elif side == 'short':
                g.last_short_entry_prices[code] = None
                g.last_short_entry_dates[code] = None 
                
    # log.info('- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -')
    # log.info('查看融资融券账户相关相关信息(更多请见API-对象-SubPortfolio)：')
    # log.info('总资产：',p.total_value)
    # log.info('净资产：',p.net_value)
    # log.info('总负债：',p.total_liability)
    # log.info('融资负债：',p.cash_liability)
    # log.info('融券负债：',p.sec_liability)
    # log.info('利息总负债：',p.interest)
    # log.info('可用保证金：',p.available_margin)
    # log.info('维持担保比例：',p.maintenance_margin_rate)
    # log.info('账户所属类型：',p.type)
    # log.info('##############################################################')

helper function

In [None]:
# %% helper function
def calcATR(code, window):
    """
    calculate the atr value
    """
    df = attribute_history(code, window + 1, "1d", 
    ['high', 'low', 'close'], skip_paused=True)
    
    if len(df) != window + 1:
        return None
    
    df['pdc'] = df['close'].shift(1)
    tr = df.apply(lambda x : max( x['high'] - x['low'], abs(x["high"] - x["pdc"]), abs(x['low'] - x['pdc'])), axis=1)
    atr = tr[-window:].mean()
    return atr
    
def calcDropRate(code, current_data, last_entry_date, current_dt, mode):
    """
    calculate the drop rate:
        long : from high down
        short : from low raise
    """
    prev_date = current_dt - timedelta(days=1)
    
    if mode == 'long':
        # 计算截止到前一天的HHV, 避免未来函数, 考虑到前复权, 每个交易日都重新计算
        df = get_price(code, start_date=last_entry_date, end_date=prev_date,
        frequency="1d", fields=['high'], skip_paused=True)
        hhv = df['high'].max()
        # 以当日开盘价计算，或者，也可以用前一交易日的收盘价计算
        drop_rate = (hhv - current_data[code].day_open) / hhv
        return (hhv, drop_rate)

    elif mode == 'short':
        df = get_price(code, start_date=last_entry_date, end_date=prev_date,
        frequency="1d", fields=['low'], skip_paused=True)
        llv = df['low'].min()
        drop_rate = (current_data[code].day_open - llv) / llv
        
    return (llv, drop_rate)