In [1]:
# 本文说明设计回测系统时，如何实现行情数据处理部分，包括接口基类、历史和实时tick 示例。

from __future__ import print_function
from abc import ABCMeta, abstractmethod 
import datetime
import threading  
import os, os.path
import numpy as np 
import pandas as pd
import akshare as ak
#from event import MarketEvent

##### 策略

策略是一个抽象基类，为所有后续（继承的）策略处理对象提供一个接口。
策略对象的目标是，基于输入数据，生成信号对象并发送到消息队列。策略既可以处理历史数据，也可以处理实时数据。

In [2]:
# 策略基类
class Strategy(object): 
    """
    在Python中，__metaclass__ 是一个特殊的属性，用于在创建类时改变类的行为。这个属性通常在类的定义中设置，
    以指示应该使用哪个元类（metaclass）来创建该类。

    ABCMeta 是Python的 abc 模块中定义的一个元类。它用于创建抽象基类（Abstract Base Classes，简称ABCs）。
    抽象基类允许你定义接口，也就是说，你可以定义一些方法，但不实现它们。然后，其他类可以继承这个抽象基类，并必须实现所有的抽象方法。
    使用 ABCMeta 作为元类可以确保子类实现所有的抽象方法。如果子类没有实现所有的抽象方法，那么在尝试创建该子类的实例时，
    Python会引发一个 TypeError。
    """
    __metaclass__ = ABCMeta
    
@abstractmethod
def calculate_signals(self):
    """
    触发信号的逻辑
    """
    raise NotImplementedError("Should implement calculate_signals()")


In [3]:

def create_sharpe_ratio(returns, periods=252): 
    """
    计算sharp
    Parameters:
    returns - A pandas Series representing period percentage returns. 
    periods - Daily (252), Hourly (252*4), Minutely(252*4*60) etc. 
    """
    return np.sqrt(periods) * (np.mean(returns)) / np.std(returns)

def create_drawdowns(pnl): 
    """
    Calculate the largest peak-to-trough drawdown of the PnL curve as well as the duration of the drawdown. 
    Requires that the pnl_returns is a pandas Series.
    Parameters:
    pnl - A pandas Series representing period percentage returns.
    Returns:
    drawdown, duration - Highest peak-to-trough drawdown and duration. """
    # Calculate the cumulative returns curve
    # and set up the High Water Mark
    hwm = [0]
    # Create the drawdown and duration series
    idx = pnl.index
    drawdown = pd.Series(index = idx)
    duration = pd.Series(index = idx)
    # Loop over the index range
    for t in range(1, len(idx)):
        # hwm 总是保存到当前周期位置的最大收益率
        hwm.append(max(hwm[t-1], pnl[t]))
        # 计算 历史最大收益率 - 当前收益率
        drawdown[t]= (hwm[t]-pnl[t])
        # 最大回撤周期
        duration[t]= (0 if drawdown[t] == 0 else duration[t-1]+1)
    return drawdown, drawdown.max(), duration.max()

#####  从akshare 获取历史数据

In [6]:
# 定于数据处理的抽象基类
class DataHandler(object): 
    # ABCMeta 是 Python 的一个内置元类（metaclass），用于实现抽象基类（Abstract Base Classes, ABCs）。 
    #抽象基类是一种特殊的类，它不能被直接实例化，而是用来定义其他类应该具有的共同接口。
    __metaclass__ = ABCMeta
    
    @abstractmethod
    def get_latest_bar(self, symbol):
        """
        Returns the last bar updated.
        """
        raise NotImplementedError("Should implement get_latest_bar()")
        
    @abstractmethod
    def get_latest_bars(self, symbol, N=1):
        """
        Returns the last N bars updated.
        """
        raise NotImplementedError("Should implement get_latest_bars()")
        
    @abstractmethod
    def get_latest_bar_datetime(self, symbol):
        """
        Returns a Python datetime object for the last bar. """
        raise NotImplementedError("Should implement get_latest_bar_datetime()")
        
    @abstractmethod
    def get_latest_bar_value(self, symbol, val_type): 
        """
        Returns one of the Open, High, Low, Close, Volume or OI from the last bar.
        """
        raise NotImplementedError("Should implement get_latest_bar_value()")
        
    @abstractmethod
    def get_latest_bars_values(self, symbol, val_type, N=1):
        """
        Returns the last N bar values from the
        latest_symbol list, or N-k if less available. """
        raise NotImplementedError("Should implement get_latest_bars_values()")
        
    @abstractmethod
    def update_bars(self):
        """
         Pushes the latest bars to the bars_queue for each symbol
        in a tuple OHLCV format: (datetime, open, high, low,
        close, volume).
        """
   
        raise NotImplementedError("Should implement update_bars()")



# 继承抽象基类
# 
class HistoricFromAkshareDataHandler(DataHandler):
    """
    HistoricCSVDataHandler is designed to read CSV files 
    """
    def __init__(self, events, market, symbol_list):
        self.events = events
        self.market = market
        self.symbol_list = symbol_list
        self.symbol_data = {}
        self.latest_symbol_data = {}
        self.continue_backtest = True
        self._get_akshare_data()
        
        
    def _get_akshare_data(self): 
        """
        Get data from akshare, converting
        them into pandas DataFrames within a symbol dictionary.
        """
        comb_index = None
        for s in self.symbol_list:
            self.symbol_data[s] = ak.futures_zh_daily_sina(s)           
            #print("----")
            #print(self.symbol_data[s])
            self.latest_symbol_data[s] = pd.DataFrame()
        # Reindex the dataframes
       
        for s in self.symbol_list:
            comb_index = self.symbol_data[s]['date']
            self.symbol_data[s] = self.symbol_data[s].set_index(comb_index)
            #print(self.symbol_data[s])

        
    def _get_new_bar(self, symbol): 
        """
        Returns the latest bar from the data feed. 
        """
        if len(self.symbol_data[symbol]) > 0:
            return self.symbol_data[symbol]
        return None
        
    def get_latest_bar(self, symbol): 
        """
            Returns the last bar from the latest_symbol list.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.") 
            raise
        else:
            #print(type(bars_list))
            #print(bars_list)
            return bars_list[-1:]
    
    def get_latest_bars(self, symbol, N=1): 
        """
        Returns the last N bars from the latest_symbol list,
        or N-k if less available. 
        """
        try:
            bars_list = self.latest_symbol_data[symbol] 
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise 
        else:
            return bars_list[-N:]

    def get_latest_bar_datetime(self, symbol): 
        """
        Returns a Python datetime object for the last bar. 
        """
        try:
            bars_list = self.latest_symbol_data[symbol].index
        except KeyError:
            print("That symbol is not available in the historical data set.") 
            raise
        else:
            return bars_list[-1]
        
    def get_latest_bar_value(self, symbol, val_type): 
        """
        Returns one of the Open, High, Low, Close, Volume or OI
        values from the pandas Bar series object.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise 
        else:
            return (bars_list.iloc[-1][val_type])
    
    def get_latest_bars_values(self, symbol, val_type, N=1): 
            """
            Returns the last N bar values from the latest_symbol list, or N-k if less available. 
            """
            try:
                bars_list = self.latest_symbol_data[symbol] 
            except KeyError:
                print("That symbol is not available in the historical data set.")
                raise 
            else:
                return bars_list.iloc[-N:][val_type].to_numpy()

    def update_bars(self): 
        """
            Pushes the latest bar to the latest_symbol_data structure for all symbols in the symbol list.
        """
        for s in self.symbol_list: 
            try:
                bar = self._get_new_bar(s)
            except StopIteration:
                self.continue_backtest = False 
            else:
                if bar is not None: 
                    # 添加新增行情到行情数据尾部
                    self.latest_symbol_data[s] = pd.concat([self.latest_symbol_data[s], bar], axis=0)
        # 发送行情通知事件
        #self.events.put(MarketEvent())
        

##### 模拟从akshare 实时获取TICK

In [7]:
# 继承抽象基类
# 
class TickAKShareDataHandler(DataHandler):
    """
    HistoricCSVDataHandler is designed to read CSV files 
    """
    def __init__(self, events, market, symbol_list):
        self.events = events
        self.market = market
        self.symbol_list = symbol_list
        self.symbol_data = {}
        self.latest_symbol_data = {}
        self.continue_backtest = True
        self._get_akshare_data()
        # 事件处理线程
        self._task_thread = threading.Thread(target = self._run)
        # 模拟tick 3秒请求一次
        self._timerSleep= 3 
        self._stop = True
        
        

    def _run(self):
        
        while not self._stop :
            time.sleep(self.timerSleep_)
            _get_akshare_data(self)
            update_bars(self)
        
    def _get_akshare_data(self): 
        """
        Get data from akshare, converting
        them into pandas DataFrames within a symbol dictionary.
        """
        comb_index = None
        for s in self.symbol_list:
            self.symbol_data[s] = ak.futures_zh_spot(s)           
            #print("----")
            #print(self.symbol_data[s])
            self.latest_symbol_data[s] = pd.DataFrame()
        # Reindex the dataframes
       
        for s in self.symbol_list:
            comb_index = self.symbol_data[s]['time']
            self.symbol_data[s] = self.symbol_data[s].set_index(comb_index)
            #print(self.symbol_data[s])

        
    def _get_new_bar(self, symbol): 
        """
        Returns the latest bar from the data feed. 
        """
        if len(self.symbol_data[symbol]) > 0:
            return self.symbol_data[symbol]
        return None
    
    def start(self):
        self._stop = False
        self._task_thread.start()
        
    def get_latest_bar(self, symbol): 
        """
            Returns the last bar from the latest_symbol list.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.") 
            raise
        else:
            #print(type(bars_list))
            #print(bars_list)
            return bars_list[-1:]
    
    def get_latest_bars(self, symbol, N=1): 
        """
        Returns the last N bars from the latest_symbol list,
        or N-k if less available. 
        """
        try:
            bars_list = self.latest_symbol_data[symbol] 
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise 
        else:
            return bars_list[-N:]

    def get_latest_bar_datetime(self, symbol): 
        """
        Returns a Python datetime object for the last bar. 
        """
        try:
            bars_list = self.latest_symbol_data[symbol].index
        except KeyError:
            print("That symbol is not available in the historical data set.") 
            raise
        else:
            return bars_list[-1]
        
    def get_latest_bar_value(self, symbol, val_type): 
        """
        Returns one of the Open, High, Low, Close, Volume or OI
        values from the pandas Bar series object.
        """
        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError:
            print("That symbol is not available in the historical data set.")
            raise 
        else:
            return (bars_list.iloc[-1][val_type])
    
    def get_latest_bars_values(self, symbol, val_type, N=1): 
            """
            Returns the last N bar values from the latest_symbol list, or N-k if less available. 
            """
            try:
                bars_list = self.latest_symbol_data[symbol] 
            except KeyError:
                print("That symbol is not available in the historical data set.")
                raise 
            else:
                return bars_list.iloc[-N:][val_type].to_numpy()

    def update_bars(self): 
        """
            Pushes the latest bar to the latest_symbol_data structure for all symbols in the symbol list.
        """
        for s in self.symbol_list: 
            try:
                bar = self._get_new_bar(s)
            except StopIteration:
                self.continue_backtest = False 
            else:
                if bar is not None: 
                    # 添加新增行情到行情数据尾部
                    self.latest_symbol_data[s] = pd.concat([self.latest_symbol_data[s], bar], axis=0)
        # 发送行情通知事件
        #self.events.put(MarketEvent())
        

##### 投资组合

In [12]:
class Portfolio(object): 
    """
    投资组合用来保存持仓、资金、K线数据、事件驱动模型等信息。
    头寸数据保存持有的头寸数量及时间信息。
    持仓数据保存每个交易品种的现金和总市场持仓价值。
    """
    def __init__(self, bars, events, start_date, initial_capital=100000.0):
        """
        Parameters:
        bars - 行情K线数据.
        events - 事件驱动模型
        start_date - 构建投资组合的开始时间.
        initial_capital - 初始资金.
        """
        self.bars = bars
        self.events = events
        self.symbol_list = self.bars.symbol_list
        self.start_date = start_date
        self.initial_capital = initial_capital
        self.all_positions = self.construct_all_positions()
        self.current_positions = dict( (k,v) for k, v in [(s, 0) for s in self.symbol_list] )
        self.all_holdings = self.construct_all_holdings() 
        self.current_holdings = self.construct_current_holdings()
    
    def construct_all_positions(self): 
        """
        为每个交易品种创建一个字典，将每个交易品种的值设为0，然后添加一个日期时间键，最后将其添加到列表中
        """
        d = dict( (k,v) for k, v in [(s, 0) for s in self.symbol_list] ) 
        d['datetime'] = self.start_date
        return [d]
    
    def construct_all_holdings(self): 
        """
        与上述方法类似，但增加了额外的键，分别用于表示现金、佣金和总额。现金代表在购买任何交易品种后账户中剩余的闲置现金，佣金代表累计的佣金总额，总额代表账户权益总额，包括现金和任何未平仓的头寸。空头头寸被视为负数。起始现金和账户权益总额都设置为初始资本。

        通过这种方式，每个交易品种都有单独的“账户”，包括“手头现金”、“已支付佣金”（Interactive Broker 费用）和“总”投资组合价值。
        """
        d = dict( (k,v) for k, v in [(s, 0.0) for s in self.symbol_list] ) 
        d['datetime'] = self.start_date
        d['cash'] = self.initial_capital
        d['commission'] = 0.0
        d['total'] = self.initial_capital
        return [d]
    
    def construct_current_holdings(self):
        """
        构建字典，将保存投资组合在所有股票的当前价值。
        """
        d = dict( (k,v) for k, v in [(s, 0.0) for s in self.symbol_list] ) 
        d['cash'] = self.initial_capital
        d['commission'] = 0.0
        d['total'] = self.initial_capital 
        return d
    
    """   
    每次从DataHandler对象请求新的市场数据时，投资组合必须更新其所持所有头寸的当前市场价值。
    在实时交易场景中，这些信息可以从经纪公司直接下载和解析，但对于回测实现，则需要从DataHandler的Bars中手动计算这些值。
    但是，由于买卖差价和流动性问题，没有所谓的“当前市场价值”。因此，需要通过用特定近似“价格”乘以所持有资产的数量来估算它。
    这所采用的方法是使用最新周期K线的收盘价。
    对于日内交易策略，这相对现实。但对于日交易策略，这就不那么现实了，因为开盘价可能与收盘价相差很大。
    
    update_timeindex方法跟踪新的持仓情况。首先从市场数据处理程序获取最新价格，
    并通过将“新”头寸设置为“当前”头寸来创建一个代表当前头寸的股票字典。
    只有获得CostEvent时，当前头寸才会被修改，这将在投资组合代码的后续部分进行处理。
    然后，该方法将这组当前头寸添加到all_positions列表中。
    持仓的更新方式与此类似，不同之处在于市场价值是通过将当前头寸数量与最新轴心K线数据的收盘价相乘来重新计算的。
    最后，将新的持仓添加到all_holdings中。
    """
    def update_timeindex(self, event): 
        """
        Adds a new record to the positions matrix for the current market data bar. This reflects the PREVIOUS bar, i.e. all
        current market data at this stage is known (OHLCV).
        Makes use of a MarketEvent from the events queue. """
        latest_datetime = self.bars.get_latest_bar_datetime(
            self.symbol_list[0]
        )
        # Update positions
        # ================
        dp = dict( (k,v) for k, v in [(s, 0) for s in self.symbol_list] ) 
        dp['datetime'] = latest_datetime
        for s in self.symbol_list:
            dp[s] = self.current_positions[s]
        # Append the current positions
        self.all_positions.append(dp)
        # Update holdings
        # ===============
        dh = dict( (k,v) for k, v in [(s, 0)  for s in self.symbol_list] ) 
        dh['datetime'] = latest_datetime
        dh['cash'] = self.current_holdings['cash']
        dh['commission'] = self.current_holdings['commission'] 
        dh['total'] = self.current_holdings['cash']
        for s in self.symbol_list:
        # Approximation to the real value 
            market_value = self.current_positions[s] * \
            self.bars.get_latest_bar_value(s, "adj_close") 
            dh[s] = market_value
            dh['total'] += market_value
        # Append the current holdings
        self.all_holdings.append(dh)

In [8]:
d = dict( (k,v) for k, v in [(s, 0) for s in [1,2, 3, 4]] ) 
print(d) 

{1: 0, 2: 0, 3: 0, 4: 0}
